可変長配列データベースのまとめ

かなり間が空きましたが、可変長配列データベースのまとめです。今まで個別に説明してきたものをまとめただけでなく幾つか修正しています。一つバグがあったので修正した他は、以下の点を改良しています。

  • データベースの生成とオープンのメソッドを分離
  • データベースのロックのメソッドを追加
  • stringの登録追加のメソッドを追加

以下コードをそのまま一つのヘッダーファイルにすれば利用できます。

#ifndef ARRAYDATABASE
#define ARRAYDATABASE

#include <fcntl.h>
#include <errno.h>
#include <sys/mman.h>
#include <sys/shm.h>
#include <assert.h>

#include <iostream>
#include <exception>

class Exception : public std::exception {
 public:
  Exception(const std::string &msg)
    {
      message = msg;
    }

  virtual ~Exception() throw() {}

  const char *what() {
    return message.c_str();
  }

  std::string message;
};

class Mutex {
 public:
  Mutex(){};
  ~Mutex(){};
  void lock();
  void unlock();
  void wait();
  void signal();
  void broadcast();
  void create();
  void destroy();
 protected:
  pthread_mutex_t       mutex;
  pthread_cond_t        condition;
  pthread_mutexattr_t   mutex_attr;
};

inline void Mutex::create() {
  int stat = 0;
  if (pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED) != 0) {
    throw Exception(strerror(stat));
  }
  if ((stat = pthread_mutex_init(&mutex, &mutex_attr)) != 0) {
    throw Exception(strerror(stat));
  }
  if ((stat = pthread_cond_init(&condition, NULL)) != 0) {
    pthread_mutex_destroy(&mutex);
    throw Exception(strerror(stat));
  }
}

inline void Mutex::destroy() {
  pthread_mutex_destroy(&mutex);
  pthread_cond_destroy(&condition);
}

inline void
Mutex::lock() {
  int stat;
  if ((stat = pthread_mutex_lock(&mutex)) != 0) {
    throw Exception(strerror(stat));
  }
}

inline void
Mutex::unlock() {
  int stat;
  if ((stat = pthread_mutex_unlock(&mutex)) != 0) {
    throw Exception(strerror(stat));
  }
}

inline void
Mutex::signal() {
  int stat;
  if ((stat = pthread_cond_signal(&condition)) != 0) {
    throw Exception(strerror(stat));
  }
}

inline void
Mutex::wait() {
  int stat;
  if ((stat = pthread_cond_wait(&condition, &mutex)) != 0) {
    throw Exception(strerror(stat));
  }
}

inline void
Mutex::broadcast() {
  int stat;
  if ((stat = pthread_cond_broadcast(&condition)) != 0) {
    throw Exception(strerror(stat));
  }
}

template <class T> class Array {
 public:
  class Header {
  public:
    void initialize() {
      headerSize        = 256;
      pageSize          = sysconf(_SC_PAGE_SIZE);
      elementSize       = sizeof(T);
      arraySize         = 0;
      validArraySize    = 0;
      size              = headerSize;
    }

    size_t updateSize(size_t arraysize) {
      size = ((headerSize + arraysize * elementSize - 1) / pageSize + 1) *
        pageSize;
      arraySize = (size - headerSize) / elementSize;
      return size;
    }
    Mutex &getMutex() { return mutex; }

    size_t headerSize;
    size_t pageSize;            // page size
    size_t elementSize;         // size of an element
    size_t arraySize;           // number of elements
    size_t validArraySize;      // number of valid elements
    size_t size;                // total size
    Mutex  mutex;               // mutex
  };

 Array():fd(-1) {}

 Array(const std::string &f):fd(-1) {
    initialize(f);
  }

  ~Array() {
    if (fd != -1) {
      msync(header, initialSize, MS_SYNC);
      munmap(header, initialSize);
      close();
    }
  }

  void initialize(const std::string &f) {
    file = f;
    if ((fd = open(file.c_str(), O_RDWR, 0666)) == -1) {
      throw Exception(strerror(errno));
    }
    struct stat fs;
    fstat(fd, &fs);
    map(fs.st_size);
    reservedSize = 0;
  }

  void map(size_t sz) {
    if (fd == -1) {
      throw Exception("Not ready");
    }
    initialSize = sz;
    header = (Header*)mmap(0, sz, PROT_READ|PROT_WRITE,MAP_SHARED,
                           fd, 0);
    if ((long long)header == -1){
      close();
      throw Exception(strerror(errno));
    }
    array = (T*)((unsigned char*)header + header->headerSize);
    if (sz != header->size) {
      close();
      throw Exception("Sizes are inconsistency");
    }
  }

  void resize(size_t arraysize) {
    resizeCapacity(arraysize);
    if (arraysize != header->validArraySize) {
      header->validArraySize = arraysize;
    }
  }

  T &at(size_t idx) {
    if (fd == -1) {
      throw Exception("Not ready");
    }
    if (idx >= header->validArraySize) {
      throw Exception("Invalid element");
    }
    return array[idx];
  }

  T &set(size_t idx, T& v) {
    if (idx >= header->arraySize) {
      resizeCapacity(idx + 1);
    }
    array[idx] = v;
    if (idx >= header->validArraySize) {
      header->validArraySize = idx + 1;
    }
    T &data = array[idx];
    return data;
  }

  size_t reserved(size_t rs) {
    return reservedSize += rs;
  }

  size_t getReservedSize() {
    return reservedSize;
  }

  void *getReservedVariable(size_t offset) {
    return (void*)((unsigned char*)(Array<T>::array) - offset);
  }

  size_t getSize() { return header->validArraySize; }

  Header &getHeader() { return *header; }
  Mutex &getMutex() { return header->mutex; }

  static void
    create(const std::string &file) {
    int fd = -1;
    if ((fd = open(file.c_str(), O_RDWR|O_CREAT, 0666)) == -1) {
      throw Exception(strerror(errno));
    }
    Header h;
    h.initialize();
    if (write(fd, &h, sizeof(Header)) == -1) {
      close(fd);
      throw Exception(strerror(errno));
    }
    unsigned char c = 0;
    for (size_t i = 0; i < h.headerSize - sizeof(Header); i++) {
      if (write(fd, &c, sizeof(c)) == -1) {
        close(fd);
        throw Exception(strerror(errno));
      }
    }
    close(fd);
    fd = -1;
    Array<T> array(file);
    array.header->getMutex().create();
  }

 protected:
  void remap(size_t sz) {
    msync(header, initialSize, MS_SYNC);
    munmap(header, initialSize);
    map(sz);
  }

  void close() {
    close(fd);
  }

  static void close(int fd) {
    if (fd != -1) {
      ::close(fd);
    }
    fd = -1;
  }

  void resizeCapacity(size_t arraysize) {
    if (fd == -1) {
      throw Exception("Not ready");
    }

    if (initialSize != header->size) {
      remap(header->size);
    }

    Header h;
    h = *header;
    size_t s = h.size;
    h.updateSize(arraysize);
    if (h.size == s) {
      return;
    }

    if (fd != -1) {
      msync(header, header->size, MS_SYNC);
      munmap(header, header->size);
      close();
    }
    if ((fd = open(file.c_str(), O_RDWR, 0666)) == -1) {
      throw Exception(strerror(errno));
    }
    if (ftruncate(fd, h.size) == -1) {
      close();
      throw Exception(strerror(errno));
    }
    if (write(fd, &h, sizeof(Header)) == -1) {
      close();
      throw Exception(strerror(errno));
    }
    map(h.size);
    if (arraysize < header->validArraySize) {
      header->validArraySize = arraysize;
    }
  }


 protected:
  Header        *header;
  int           fd;
  T             *array;
  std::string   file;
  size_t        initialSize;
  size_t        reservedSize;

};

template <class T> class Stack : public Array<T>  {
 public:
  Stack(const std::string &f):Array<T>(f) {}
  Stack() {}

  void initialize(const std::string &f) {
    Array<T>::initialize(f);
  }

  void push(T &t) {
    set(Array<T>::getSize(), t);
  }
  T front() {
    if (Array<T>::getSize() <= 0) {
      throw Exception("No data");
    }
    return Array<T>::at(Array<T>::getSize() - 1);
  }
  void pop() {
    Array<T>::resize(Array<T>::getSize() - 1);
  }
};

template <class T> class Pool  {
 public:
  Pool() {}
  Pool(const std::string &f) {
    initialize(f);
  }

  void
    initialize(const std::string &f) {
    std::string str = f;
    str.append("_p");
    pool.initialize(str);
    reservedOffset = pool.reserved(sizeof(size_t));
    str = f;
    str.append("_i");
    invalidObjects.initialize(str);
  }

  static void
    create(const std::string &f) {
    std::string str = f;
    str.append("_p");
    Array<T>::create(str);
    str = f;
    str.append("_i");
    Stack<size_t>::create(str);
  }

  size_t insert() {
    size_t idx;
    try {
      idx = invalidObjects.front();
      invalidObjects.pop();
    } catch(...) {
      getPoolSize()++;
      idx = getPoolSize();
      if (idx >= pool.getSize()) {
        pool.resize(idx + 1);
      }
    }
    return idx;
  }

  size_t insert(T &t) {
    size_t idx = insert();
    pool.set(idx - 1, t);
    return idx;
  }

  void remove(size_t idx) {
    invalidObjects.push(idx);
  }

  void set(size_t idx, T &t) {
    pool.set(idx - 1, t);
  }

  T &at(size_t idx) {
    return pool.at(idx - 1);
  }
 protected:
  Array<T> pool;
  Stack<size_t> invalidObjects;
  size_t reservedOffset;
  size_t &getPoolSize() {
    return *(size_t*)pool.getReservedVariable(reservedOffset);
  }

};

class Codec {
 public:
  Codec(){}
  static size_t encode(size_t v, unsigned char *buf) {
    size_t mask = 0xffffffffffffff80;
    size_t cnt;
    for (cnt = 0; cnt < sizeof(size_t); cnt++) {
      if ((mask & v) == 0) {
        break;
      }
      mask <<= 7;
    }

    if (cnt == sizeof(size_t)) {
      valcpy(&buf[1], v, sizeof(size_t));
      buf[0] = 0xff;
    } else {
      valcpy(buf, v, cnt + 1);
      buf[0] |= (0xff << (sizeof(size_t) - cnt));
    }

    size_t validSize = cnt + 1;
    return validSize;
  }
  static size_t decode(unsigned char *buf, size_t &size) {
    size_t cnt;
    unsigned char mask = 0x80;
    unsigned char head = buf[0];
    for (cnt = 0; cnt < sizeof(size_t); cnt++) {
      if ((mask & head) == 0) {
        break;
      }
      mask >>= 1;
    }

    size_t v = 0;
    if (cnt > sizeof(size_t)) {
      valcpy(v, &buf[1], sizeof(size_t));
      size = 9;
    } else {
      valcpy(v, buf, cnt + 1);
      *(((unsigned char*)&v) + cnt) &= (0x7f >> cnt);
      size = cnt + 1;
    }
    return v;
  }

 private:
  static void
    valcpy(unsigned char *buf, size_t &v, int s) {
    unsigned char *vptr = (unsigned char*)&v + s - 1;
    for (int i = 0; i < s; i++) {
      *buf++ = *vptr--;
    }
  }

  static void
    valcpy(size_t &v, unsigned char *buf, int s) {
    v = 0;
    unsigned char *vptr = (unsigned char*)&v + s - 1;
    for (int i = 0; i < s; i++) {
      *vptr-- = *buf++;
    }
  }

};

class VariableArray {
 public:
  static const size_t headSize = 128;
  static const size_t bodySize = 256;

  // Format:|size(encoded)|body index(encoded)|data...|
  class Head {
  public:
    Head() {
      *record = 0;
    }
    unsigned char record[headSize];
  };
  // Format:|next index(encoded)|data...|
  class Body {
  public:
    unsigned char record[bodySize];
  };

  VariableArray(const std::string &f) {
    initialize(f);
  }

  void initialize(const std::string &f) {
    std::string str = f;
    str.append("_h");
    head.initialize(str);
    str = f;
    str.append("_t");
    body.initialize(str);
  }

  static void
    create(const std::string &f) {
    std::string str = f;
    str.append("_h");
    Array<Head>::create(str);
    str = f;
    str.append("_t");
    Pool<Body>::create(str);
  }

  void set(size_t idx, std::vector<unsigned char> &val) {
    set(idx, &val[0], val.size());
  }

  void set(size_t idx, const string &val) {
    set(idx, (unsigned char*)val.c_str(), val.size() + 1);
  }

  void set(size_t idx, unsigned char *val, size_t size) {
    erase(idx);
    if (idx >= head.getSize()) {
      head.resize(idx + 1);
    }
    unsigned char *ptr = (unsigned char*)&head.at(idx);
    int sizeSize = Codec::encode(size, ptr);
    size_t bodyIndexSize = 1;
    size_t totalSize = size + sizeSize + bodyIndexSize;
    ptr += sizeSize;
    size_t bodyIndex;
    int len;
    if (totalSize > headSize) {
      bodyIndex = body.insert();
      bodyIndexSize = Codec::encode(bodyIndex, ptr);
      len = headSize - sizeSize - bodyIndexSize;
    } else {
      bodyIndex = 0;
      //bodyIndexSize = Codec::encode(bodyIndex, ptr);
      bodyIndexSize = 1;
      len = size;
    }
    ptr += bodyIndexSize;
    memcpy(ptr, val, len);
    while (bodyIndex != 0) {
      size -= len;
      val += len;
      bodyIndexSize = 1;
      totalSize = size + bodyIndexSize;
      if (totalSize > bodySize) {
        size_t ti = body.insert();
        ptr = (unsigned char*)&body.at(bodyIndex);
        bodyIndex = ti;
        bodyIndexSize = Codec::encode(bodyIndex, ptr);
        len = bodySize - bodyIndexSize;
      } else {
        ptr = (unsigned char*)&body.at(bodyIndex);
        bodyIndex = 0;
        *ptr = 0;
        len = size;
      }
      ptr += bodyIndexSize;
      memcpy(ptr, val, len);

    }
  }

  void
  get(size_t idx, string &str) {
    size_t strsize = 0;
    char *val = (char*)get(idx, strsize);
    if (val[strsize] != 0) {
      throw Exception("Not string!");
    }
    if (strsize == 0) {
      str = "";
    } else {
      str.assign(val, strsize - 1);
    }
    return;
  }

  unsigned char *get(size_t idx, size_t &size) {
    unsigned char *ptr = (unsigned char*)&head.at(idx);

    size_t sizeSize;
    size = Codec::decode(ptr, sizeSize);
    if (size == 0) {
      throw Exception("The specified element is empty");
    }
    size_t restSize = size;
    unsigned char *buf = new unsigned char[size];
    unsigned char *bufptr = buf;
    ptr += sizeSize;
    size_t bodyIndexSize;
    size_t bodyIndex = Codec::decode(ptr, bodyIndexSize);
    ptr += bodyIndexSize;
    size_t len = headSize - sizeSize - bodyIndexSize;
    len = restSize > len ? len : restSize;
    memcpy(bufptr, ptr, len);
    while (bodyIndex != 0) {
      restSize -= len;
      assert(restSize > 0);
      bufptr += len;
      ptr = (unsigned char*)&body.at(bodyIndex);
      bodyIndex = Codec::decode(ptr, bodyIndexSize);
      ptr += bodyIndexSize;
      len = bodySize - bodyIndexSize;
      len = restSize > len ? len : restSize;;
      memcpy(bufptr, ptr, len);
    }

    return buf;
  }

  void erase(size_t idx) {
    if (idx >= head.getSize()) {
      return;
    }
    unsigned char *headPtr = (unsigned char*)&head.at(idx);
    unsigned char *ptr = headPtr;
    size_t sizeSize;
    size_t size = Codec::decode(ptr, sizeSize);
    if (size == 0) {
      // already cleared
      return;
    }
    ptr += sizeSize;
    size_t bodyIndexSize;
    size_t bodyIndex = Codec::decode(ptr, bodyIndexSize);
    headPtr[0] = 0;  // size <- 0
    headPtr[1] = 0;  // index <- 0 invalid index
    while (bodyIndex != 0) {
      ptr = (unsigned char*)&body.at(bodyIndex);
      size_t nextIndex = Codec::decode(ptr, bodyIndexSize);
      body.remove(bodyIndex);
      bodyIndex = nextIndex;
    }
    return;
  }
  size_t getSize() { return head.getSize(); }
  void lock() { getHeader().getMutex().lock(); }
  void unlock() { getHeader().getMutex().unlock(); }
  Array<Head>::Header &getHeader() { return head.getHeader(); }

 protected:
  Array<Head>   head;
  Pool<Body>    body;

};

#endif

使い方の例です。簡単ですね。

void
array_write()
{

  VariableArray::create("sample_db");

  VariableArray array("sample_db");

  const string str = "abcdefghijklmn";
  array.lock();
  try {
    for (int i = 0; i < 10; i++) {
      string val = str.substr(0, i + 1);
      cerr << "val=" << val << endl;
      array.set(i, val);
    }
  } catch (Exception &e) {
    array.unlock();
    std::cerr << e.what() << std::endl;
  }
  array.unlock();

}

void
array_read()
{
  VariableArray array("sample_db");

  array.lock();
  try {
    for (int i = 0; i < 10; i++) {
      string val;
      array.get(i, val);
      std::cerr << "array[" << i << "]=" << val << std::endl;
    }
  } catch (Exception &e) {
    array.unlock();
    std::cerr << e.what() << std::endl;
  }
  array.unlock();
}

int
main()
{
  array_write();
  array_read();
}

前:可変長配列データベース

スレッドプール

簡単なスレッドプログラミングは今まで書いてきたことで十分できると思います。ただ、この方法では処理の単位でスレッドを生成することになります。プロセスの生成に比べればスレッドの生成は格段に軽いのですが、同様の処理を大量に実行するといった場合にはスレッドを生成する数が多くなるので生成のオーバヘッドが無視できないものとなります。そこであらかじめスレッドを生成しておいてそのスレッドを使い回すというのがスレッドプールの考えです。これでちょっと本格的なマルチスレッドプログラミングが簡単にできます。

#include    <iostream>
#include    <deque>
#include    <vector>

template <class JOB, class SHARED_DATA>
class ThreadPool {
public:
  class JobQueue : public deque<JOB>, public Mutex {
  public:
    JobQueue() {}
    ~JobQueue() {
      limit = -1; // no limit
    }
    void wait(JobQueue &q) {
      q.wait();
    }
    bool isEmpty() {
      return deque<JOB>::size() == 0;
    }
    bool isFull() {
      if (limit >= 0) {
        return deque<JOB>::size() >= limit;
      } else {
        return false;
      }
    }
    int limit;
  };

  class InJobQueue : public JobQueue {
  public:
    InJobQueue() {
      isTerminate = false;
      isPushing = false;
      size = 0;
    }

    void push(JOB &data) {
      JobQueue::lock();
      if (!isPushing) {
        isPushing = true;
        size = 0;
      }
      size++;
      deque<JOB>::push_back(data);
      JobQueue::unlock();
      JobQueue::signal();
    }

    bool pop(JOB &j) {
      JobQueue::lock();
      while (JobQueue::isEmpty()) {
        if (isTerminate) {
          JobQueue::unlock();
          return false;
        }
        JobQueue::Mutex::wait();
      }
      j = deque<JOB>::front();
      deque<JOB>::pop_front();
      JobQueue::unlock();
      return true;
    }

    void terminate() {
      JobQueue::lock();
      if (isPushing || !JobQueue::isEmpty()) {
        JobQueue::unlock();
        throw Exception("Invalid processing sequence");
      }
      isTerminate = true;
      JobQueue::unlock();
      JobQueue::broadcast();
    }

    bool        isTerminate;
    bool        isPushing;
    size_t      size;
  };

  class OutJobQueue : public JobQueue {
  public:
    void push(JOB &j) {
      JobQueue::lock();
      deque<JOB>::push_back(j);
      if (JobQueue::isFull()) {
        JobQueue::unlock();
        JobQueue::signal();
      }
      JobQueue::unlock();
      return;
    }

    void pop() { JobQueue::pop_front(); }

    void wait() {
      JobQueue::Mutex::wait();
      JobQueue::unlock();
    }

  };

  class PooledThread : public Thread {
  public:
    PooledThread(){}
    void run() {
      JOB::initializeThread(*threadPool);
      for (;;) {
        JOB job;
        if (!threadPool->inJobQueue.pop(job)) {
          break;
        }
        job.execute(*threadPool);
        threadPool->outJobQueue.push(job);
      }
      JOB::finalizeThread(*threadPool);
    }

    ThreadPool *threadPool;
  };

  ThreadPool(size_t s, SHARED_DATA sd):sharedData(sd){
    for (size_t i = 0; i < s; i++) {
      PooledThread *pt = new PooledThread;
      pt->threadPool = this;
      pt->start();
      threads.push_back(pt);
    }
  }

  ~ThreadPool() {
    terminate();
  }

  SHARED_DATA getSharedData() {
    return sharedData;
  }

  void push(JOB &j) {
    if (!inJobQueue.isPushing) {
      outJobQueue.lock();
      outJobQueue.clear();
    }
    inJobQueue.push(j);
  }

  bool pop(JOB &j) {
    if (outJobQueue.size() == 0) {
      return false;
    }
    j = outJobQueue.front();
    outJobQueue.pop();
    return true;
  }

  void wait() {
    inJobQueue.isPushing = false;
    outJobQueue.limit = inJobQueue.size;
    inJobQueue.size = 0;
    outJobQueue.OutJobQueue::wait();
  }

  void terminate() {
    if (threads.size() == 0) {
      return;
    }
    inJobQueue.terminate();
    for (size_t i = 0; i < threads.size(); i++) {
      threads[i]->join();
      delete threads[i];
    }
    threads.clear();
  }

protected:
  InJobQueue            inJobQueue;
  OutJobQueue           outJobQueue;

  SHARED_DATA           sharedData;
  vector<PooledThread*> threads;

};

いつものようにここまでの動作は気にしなくて結構です。では、スレッドプールの使い方です。

スレッドプールを使う時にはもうスレッドを意識する必要はありません。ジョブとしてスレッドで実行される作業を捉えます。スレッドプールは入力用のジョブキュー(InJobQueue)と出力用のジョブキュー(InJobQueue)の二つのジョブキューをもっています。InJobQueueにジョブを入れる(push)とスレッドプールのスレッドがInJobQueueから未処理のジョブを取り出し処理を実行してOutJobQueueに入れます。処理されたジョブをOutJobQueueから取り出し(pop)て処理の結果を取得します。

以下がサンプルです。

class MyJobSharedData {
public:
  MyJobSharedData() {
  }
  int value;
};
class MyJob;
typedef ThreadPool<MyJob, MyJobSharedData&> MyThreadPool;

class MyJob {
public:
  MyJob() {}
  static void initializeThread(MyThreadPool &tp) {
    cout << "initialize" << endl;
  }
  static void finalizeThread(MyThreadPool &tp) {
    cout << "finalize" << endl;
  }
  void execute(MyThreadPool &tp) {
    *data *= tp.getSharedData().value;
  };
  int *data;
};

int
main()
{
  static int threadPoolSize = 4;
  MyJobSharedData sd;
  sd.value = 10;
  MyThreadPool threadPool(threadPoolSize, sd);

  vector<int> data;
  for (int i = 1; i <= 10; i++) {
    data.push_back(i);
  }

  for (vector<int>::iterator i = data.begin(); i != data.end(); i++) {
    MyJob inJob;
    inJob.data = &(*i);
    threadPool.push(inJob);
  }

  threadPool.wait();

  MyJob outJob;
  while (threadPool.pop(outJob)) {
    cout << "job=" << *outJob.data << endl;
  }

  for (vector<int>::iterator i = data.begin(); i != data.end(); i++) {
    cout << "vector=" << *i << endl;
  }
}

この例では10個の配列の値に共有データの値を掛け合わせるという簡単なものです。以下のMyJob::execute()がスレッドでの処理となり、ここでは単に共有データの値を掛け合わせているだけです。

  void execute(MyThreadPool &tp) {
    *data *= tp.getSharedData().value;
  };

以下の行でスレッド数を指定しています。

  static int threadPoolSize = 4;

以下の行でInJobQueueにジョブを投入します。

  threadPool.push(inJob);

以下の行でジョブに配列の要素を指定しています。配列の要素である必要はないのですが、出力順は入力順にならずにランダムになるので入力順を維持するために配列の要素を与えています。

    inJob.data = &(*i);

以下の行ですべてのジョブが終了するのを待ちます。

  threadPool.wait();

以下の行でOutJobQueueから処理結果のジョブを取得します。

  while (threadPool.pop(outJob)) {

以下が実行結果です。

initialize
initialize
initialize
initialize
job=40
job=50
job=60
job=70
job=80
job=90
job=100
job=10
job=20
job=30
vector=10
vector=20
vector=30
vector=40
vector=50
vector=60
vector=70
vector=80
vector=90
vector=100
finalize
finalize
finalize
finalize

スレッド処理をするときに場合によってはスレッドの中で初期化や終了処理をしなければならないこともありますので、一応initialize()とfinalize()というメソッドを用意してみました。そのダミー出力が最初と最後の「initialize」と「finalize」です。

「job=」取得したJobの順序ですが、投入順序になっていません。Jobの順序はスレッドの終了順になるので、必ずしもジョブの投入順にはなりません。ただし、配列を表示していところでは正しい順序になっていることがわかります。

前回:ロック - 排他処理

ロック - 排他処理

共通のデータを扱う複数のスレッドを起動する場合には排他処理のためのロックが必須となります。あるスレッドがロックをかけると他のスレッドがロックをかけようとしてもロックが解除されるまで待たされます。本当は「待たされる」だけでなくいろいろな挙動をさせることもできますが、複雑になるので「待たされる」という挙動のみをここでは扱います。

class Mutex {
public:
  Mutex();
  ~Mutex();
  void lock();
  void unlock();
  void wait();
  void signal();
  void broadcast();
protected:
  pthread_mutex_t       mutex;
  pthread_cond_t        condition;
};

Mutex::Mutex() {
  int stat;
  if ((stat = pthread_mutex_init(&mutex, NULL)) != 0) {
    throw Exception(strerror(stat));
  }
  if ((stat = pthread_cond_init(&condition, NULL)) != 0) {
    pthread_mutex_destroy(&mutex);
    throw Exception(strerror(stat));
  }
}
Mutex::~Mutex() {
  pthread_mutex_destroy(&mutex);
  pthread_cond_destroy(&condition);
}
void
Mutex::lock() {
  int stat;
  if ((stat = pthread_mutex_lock(&mutex)) != 0) {
    throw Exception(strerror(stat));
  }
}
void
Mutex::unlock() {
  int stat;
  if ((stat = pthread_mutex_unlock(&mutex)) != 0) {
    throw Exception(strerror(stat));
  }
}
void
Mutex::signal() {
  int stat;
  if ((stat = pthread_cond_signal(&condition)) != 0) {
    throw Exception(strerror(stat));
  }
}
void
Mutex::wait() {
  int stat;
  if ((stat = pthread_cond_wait(&condition, &mutex)) != 0) {
    throw Exception(strerror(stat));
  }
}
void
Mutex::broadcast() {
  int stat;
  if ((stat = pthread_cond_broadcast(&condition)) != 0) {
    throw Exception(strerror(stat));
  }
}

以上のようにpthreadを使って排他処理を記述しています。まぁ、pthreadにC++をかぶせているだけですね。

lock()、unlock()でロックをかけたり解除したりします。他のメソッドは次のスレッドプールの実装のために使うので、ここでは気にしないで下さい。

では使い方のサンプルですが、ロックがないとうまくいかない例です。

class MySimpleThread : public Thread {
public:
  MySimpleThread(int &c, Mutex &m):
    count(c), mutex(m) {}
  ~MySimpleThread() {}
  void run();
  int 
  Mutex 
};

void
MySimpleThread::run()
{
  for (int i = 0; i < 10; i++) {
    int c = count;
    usleep(1);
    count = c + 1;
  }
}

int
main()
{
  int count = 0;
  Mutex mutex;
  MySimpleThread thread1(count, mutex), thread2(count, mutex);
  thread1.start();
  thread2.start();

  thread1.join();
  thread2.join();
  cout << "count=" << count << endl;
}

二つのスレッドが共通のcountという変数をカウントアップしています。すれぞれ10回カウントアップしているので最終的には20になってほしいのですが、動作させると

count=12

といった具合に20未満の数字になると思います。しかも、動作させるたびに違う値になったりします。共有するデータを同時に更新することによる問題です。そこで次のようにスレッドの処理を変更します。データの更新中にロックをかけます。

void
MySimpleThread::run()
{
  for (int i = 0; i < 10; i++) {
    mutex.lock();
    int c = count;
    usleep(1);
    count = c + 1;
    mutex.unlock();
  }
}

これを実行すると

count=20

と、正しく表示されます。

ロックは実に便利な仕組みですが安易に使うとデッドロックを起こしたりして、実に奥が深い機能です。また、排他処理をすればするほど並列処理のメリットが減少するのでなるべく使わないで済むロジックを考えるのが重要です。

前回:C++簡単スレッドプログラミング

次回:スレッドプール

C++簡単スレッドプログラミング

Javaでのスレッドプログラミングの方法に慣れると、pthreadの関数によるマルチスレッドのプログラミングが使いづらく感じて仕方ありません。かといってスレッドプログラミングのツールを本格的に使うほどのこともないようなときに簡単にスレッドを利用できる仕組みを考えて見ました。

C++で簡単にJavaのようなスレッドプログラミングができます。

#include    <iostream>

using namespace std;

class Exception : public std::exception {
public:
  Exception(const std::string &msg)
  {
    message = msg;
  }
  virtual ~Exception() throw() {}
  const char *what() {
    return message.c_str();
  }
  std::string message;
};

class Thread {
public:
  Thread():thread(0){}
  virtual ~Thread(){}
  void start();
  void join();

protected:
  virtual void run() = 0;

private:
  static void* runThread(void *t) {
    if (t != 0) {
      ((Thread*)t)->run();
    }
    return t;
  }
  pthread_t             thread;
  pthread_attr_t        attr;
};

void
Thread::start()
{
  pthread_attr_init(&attr);
  int stat;
  if ((stat = pthread_create(&thread, &attr, Thread::runThread, this)) != 0) {
    throw Exception(strerror(stat));
  }
}
void
Thread::join()
{
  void *returnValue;
  int stat;
  if ((stat = pthread_join(thread, (void**)&returnValue)) != 0) {
    throw Exception(strerror(stat));
  }
  if (returnValue == 0) {
    throw Exception("Invalid thread");
  }
}

使う分には上記部分の動作を理解する必要はありません。Exceptionは簡易的に定義していますので、使い易いように変更した方が良いかもしれません。

使う時はJavaと同じように上記のThreadクラスを継承したクラスを定義して使います。以下は使い方のサンプルです。

class MySimpleThread : public Thread {
public:
  MySimpleThread() {}
  ~MySimpleThread() {}
  void run();
};

void
MySimpleThread::run()
{
  cerr << "thread start." << endl;
  sleep(1);
  cerr << "thread finish." << endl;
}

int
main()
{
  MySimpleThread thread;
  thread.start();
  cerr << "main." << endl;
  thread.join();
}

run()にスレッドでの処理を記述します。thread.start()でスレッドがスタートし、thread.join()でスレッドの終了を待ちます。これを実行すると

thread start.
main.
thread finish.

となります。main部分とスレッドが並列に動作していることが分かります。スレッドで利用する変数や定数などはMySimpleThreadで定義します。


リンク時には pthread のリンクを忘れないようにしてください。

複数のスレッドを生成しデータを共有するにはロックが必要になるかもしれませんが、ロックは次の機会にします。ロックがなくても結構使えると思います。

次:ロック - 排他処理

可変長配列データベース

では、可変長配列データベースを作成します。ここでの「可変長」とは配列の各要素のサイズが可変であることを意味します。もちろん要素数も可変です。前にも書いたように可変長データを固定長データに分割して固定長データーベースに保存します。分割した固定長データをたどるためのポインタは圧縮して持ちます。可変長配列データベースを1から作成するのは結構大変ですが、既に可変長配列を構成する部品は揃っているので、純粋な可変長配列データベースの実装部分は結構少ないです。

class VariableArray {
public:
  static const size_t headSize = 128;
  static const size_t bodySize = 256;

  // Format:|size(encoded)|body index(encoded)|data...|
  class Head {
  public:
    Head() {
      *record = 0;
    }
    unsigned char record[headSize];
  };
  // Format:|next index(encoded)|data...|
  class Body {
  public:
    unsigned char record[bodySize];
  };

  VariableArray(const std::string &f) {
    initialize(f);
  }

  void initialize(const std::string &f) {
    std::string str = f;
    str.append("_h");
    head.initialize(str);
    str = f;
    str.append("_t");
    body.initialize(str);
  }

  void set(size_t idx, std::vector<unsigned char> &val) {
    set(idx, &val[0], val.size());
  }

  void set(size_t idx, unsigned char *val, size_t size) {
    erase(idx);
    if (idx >= head.getSize()) {
      head.resize(idx + 1);
    }
    unsigned char *ptr = (unsigned char*)&head.at(idx);
    int sizeSize = Codec::encode(size, ptr);
    size_t bodyIndexSize = 1;
    size_t totalSize = size + sizeSize + bodyIndexSize;
    ptr += sizeSize;

    size_t bodyIndex;
    int len;
    if (totalSize > headSize) {
      bodyIndex = body.insert();
      bodyIndexSize = Codec::encode(bodyIndex, ptr);
      len = headSize - sizeSize - bodyIndexSize;
    } else {
      bodyIndex = 0;
      //bodyIndexSize = Codec::encode(bodyIndex, ptr);
      bodyIndexSize = 1;
      len = size;
    }

    ptr += bodyIndexSize;
    memcpy(ptr, val, len);
    while (bodyIndex != 0) {
      size -= len;
      val += len;

      bodyIndexSize = 1;
      totalSize = size + bodyIndexSize;
      if (totalSize > bodySize) {
        size_t ti = body.insert();
        ptr = (unsigned char*)&body.at(bodyIndex);
        bodyIndex = ti;
        bodyIndexSize = Codec::encode(bodyIndex, ptr);
        len = bodySize - bodyIndexSize;
      } else {
        ptr = (unsigned char*)&body.at(bodyIndex);
        bodyIndex = 0;
        *ptr = 0;
        len = size;
      }

      ptr += bodyIndexSize;
      memcpy(ptr, val, len);

    }
  }

  unsigned char *get(size_t idx, size_t &size) {
    unsigned char *ptr = (unsigned char*)&head.at(idx);

    size_t sizeSize;
    size = Codec::decode(ptr, sizeSize);
    if (size == 0) {
      throw Exception("The specified element is empty");
    }
    unsigned char *buf = new unsigned char[size];
    unsigned char *bufptr = buf;
    ptr += sizeSize;
    size_t bodyIndexSize;
    size_t bodyIndex = Codec::decode(ptr, bodyIndexSize);
    ptr += bodyIndexSize;
    int len = headSize - sizeSize - bodyIndexSize;
    len = size > len ? len : size;
    memcpy(bufptr, ptr, len);
    while (bodyIndex != 0) {
      size -= len;
      assert(size > 0);
      bufptr += len;
      ptr = (unsigned char*)&body.at(bodyIndex);
      bodyIndex = Codec::decode(ptr, bodyIndexSize);
      ptr += bodyIndexSize;
      len = bodySize - bodyIndexSize;
      len = size > len ? len : size;;
      memcpy(bufptr, ptr, len);
    }

    return buf;
  }

  void erase(size_t idx) {
    if (idx >= head.getSize()) {
      return;
    }
    unsigned char *headPtr  = (unsigned char*)&head.at(idx);
    unsigned char *ptr = headPtr;
    size_t sizeSize;
    size_t size = Codec::decode(ptr, sizeSize);
    if (size == 0) {
      // already cleared
      return;
    }
    ptr += sizeSize;
    size_t bodyIndexSize;
    size_t bodyIndex = Codec::decode(ptr, bodyIndexSize);
    headPtr[0] = 0;  // size <- 0
    headPtr[1] = 0;  // index <- 0 invalid index
    while (bodyIndex != 0) {
      ptr = (unsigned char*)&body.at(bodyIndex);
      size_t nextIndex = Codec::decode(ptr, bodyIndexSize);
      body.remove(bodyIndex);
      bodyIndex = nextIndex;
    }
    return;
  }

  Array<Head>   head;
  Pool<Body>    body;

};

ということで、ソースを書きなぐってきた感じですので、ちょっと使ってみたいと思っても、かなり使いにくい状態になっていますね。時間を見つけてライブラリにでもしないと。

前:ポインタのエンコード

次:可変長データベースのまとめ

ポインタのエンコード

さぁ、可変長配列データベースを作成しましょう、といいたいところですが、やはり、前準備が必要です。可変長データベースでは可変長データを複数の固定長データに分割して固定長データベースに管理するので、固定長データのリスト構造を生成します。そのために各固定長データに次の固定長データへのポインタを持つ必要があります。

たとえば8バイトの領域をポインタとして持っても良いのですが、データが少ない時には、ポインタの値も小さくなります。よほど大量にデータを保存しない限り通常8バイトのサイズは不要です。かといってサイズを小さくしてしまうと、データが増えた時に対応できなくなります。

そこで、ポインタの値の大きさに応じた可変長のデータ形式にポインタのエンコードを行うことにします。データの先頭に値の大きさを示す値を入れておくだけです。その値の大きさに応じて有効なデータ領域のサイズが可変となります。複雑なビット処理は速度低下につながるので値そのものに対してビット単位のシフトは行わずバイト単位のシフトのみするようにしています。

class Codec {
public:
  Codec(){}
  static size_t encode(size_t v, unsigned char *buf) {
    size_t mask = 0xffffffffffffff80;
    int cnt;
    for (cnt = 0; cnt < sizeof(size_t); cnt++) {
      if ((mask & v) == 0) {
        break;
      }
      mask <<= 7;
    }

    if (cnt == sizeof(size_t)) {
      valcpy(&buf[1], v, sizeof(size_t));
      buf[0] = 0xff;
    } else {
      valcpy(buf, v, cnt + 1);
      buf[0] |= (0xff << (sizeof(size_t) - cnt));
    }

    size_t validSize = cnt + 1;
    return validSize;
  }
  static size_t decode(unsigned char *buf, size_t &size) {
    int cnt;
    unsigned char mask = 0x80;
    unsigned char head = buf[0];
    for (cnt = 0; cnt < sizeof(size_t); cnt++) {
      if ((mask & head) == 0) {
        break;
      }
      mask >>= 1;
    }

    size_t v = 0;
    if (cnt > sizeof(size_t)) {
      valcpy(v, &buf[1], sizeof(size_t));
      size = 9;
    } else {
      valcpy(v, buf, cnt + 1);
      *(((unsigned char*)&v) + cnt) &= (0x7f >> cnt);
      size = cnt + 1;
    }
    return v;
  }

 private:
  static void
  valcpy(unsigned char *buf, size_t &v, int s) {
    unsigned char *vptr = (unsigned char*)&v + s - 1;
    for (int i = 0; i < s; i++) {
      *buf++ = *vptr--;
    }
  }

  static void
  valcpy(size_t &v, unsigned char *buf, int s) {
    v = 0;
    unsigned char *vptr = (unsigned char*)&v + s - 1;
    for (int i = 0; i < s; i++) {
      *vptr-- = *buf++;
    }
  }

};

前:固定長データベース

次:可変長データベース

固定長データベース

継承クラスとなる固定長クラスの永続化ができたので、これで固定長データベース(Pool)の実装が可能です。

固定長データを扱う場合に、追加するだけなら固定長配列データベースでもあまり問題はないのですが、削除しようとした途端、削除した配列要素の管理が必要になります。まぁ、削除した要素を二度と使わないという手もありますが、そうもいきません。

固定長データベースでは、データを挿入するとデータに割り振られたIDが返り、データを取得したり削除したりするときには、そのIDでデータを指定することとします。

データが削除されるとそのIDはスタック(削除スタック)にプッシュされ、データが挿入されると削除スタックからIDをポップして利用します。削除スタックに何もなければ、固定長配列データベースを拡張するだけです。

template <class T> class Pool  {
public:
  Pool() {}
  Pool(const std::string &f) {
    initialize(f);
  }

  void
  initialize(const std::string &f) {
    std::string str = f;
    str.append("_p");
    pool.initialize(str);
    reservedOffset = pool.reserved(sizeof(size_t));
    str = f;
    str.append("_i");
    invalidObjects.initialize(str);
  }

  size_t insert() {
    size_t idx;
    try {
      idx = invalidObjects.front();
      invalidObjects.pop();
    } catch(...) {
      getPoolSize()++;
      idx = getPoolSize();
      if (idx >= pool.getSize()) {
        pool.resize(idx + 1);
      }
    }
    return idx;
  }

  size_t insert(T &t) {
    size_t idx = insert();
    pool.set(idx - 1, t);
    return idx;
  }

  void remove(size_t idx) {
    invalidObjects.push(idx);
  }

  void set(size_t idx, T &t) {
    pool.set(idx - 1, t);
  }

  T &at(size_t idx) {
    return pool.at(idx - 1);
  }
protected:
  Array<T> pool;
  Stack<size_t> invalidObjects;
  size_t reservedOffset;
  size_t &getPoolSize() {
    return *(size_t*)pool.getReservedVariable(reservedOffset);
  }

};

前:固定長配列データベースの改良

次:ポインタのエンコード