スレッドプール

簡単なスレッドプログラミングは今まで書いてきたことで十分できると思います。ただ、この方法では処理の単位でスレッドを生成することになります。プロセスの生成に比べればスレッドの生成は格段に軽いのですが、同様の処理を大量に実行するといった場合にはスレッドを生成する数が多くなるので生成のオーバヘッドが無視できないものとなります。そこであらかじめスレッドを生成しておいてそのスレッドを使い回すというのがスレッドプールの考えです。これでちょっと本格的なマルチスレッドプログラミングが簡単にできます。

#include    <iostream>
#include    <deque>
#include    <vector>

template <class JOB, class SHARED_DATA>
class ThreadPool {
public:
  class JobQueue : public deque<JOB>, public Mutex {
  public:
    JobQueue() {}
    ~JobQueue() {
      limit = -1; // no limit
    }
    void wait(JobQueue &q) {
      q.wait();
    }
    bool isEmpty() {
      return deque<JOB>::size() == 0;
    }
    bool isFull() {
      if (limit >= 0) {
        return deque<JOB>::size() >= limit;
      } else {
        return false;
      }
    }
    int limit;
  };

  class InJobQueue : public JobQueue {
  public:
    InJobQueue() {
      isTerminate = false;
      isPushing = false;
      size = 0;
    }

    void push(JOB &data) {
      JobQueue::lock();
      if (!isPushing) {
        isPushing = true;
        size = 0;
      }
      size++;
      deque<JOB>::push_back(data);
      JobQueue::unlock();
      JobQueue::signal();
    }

    bool pop(JOB &j) {
      JobQueue::lock();
      while (JobQueue::isEmpty()) {
        if (isTerminate) {
          JobQueue::unlock();
          return false;
        }
        JobQueue::Mutex::wait();
      }
      j = deque<JOB>::front();
      deque<JOB>::pop_front();
      JobQueue::unlock();
      return true;
    }

    void terminate() {
      JobQueue::lock();
      if (isPushing || !JobQueue::isEmpty()) {
        JobQueue::unlock();
        throw Exception("Invalid processing sequence");
      }
      isTerminate = true;
      JobQueue::unlock();
      JobQueue::broadcast();
    }

    bool        isTerminate;
    bool        isPushing;
    size_t      size;
  };

  class OutJobQueue : public JobQueue {
  public:
    void push(JOB &j) {
      JobQueue::lock();
      deque<JOB>::push_back(j);
      if (JobQueue::isFull()) {
        JobQueue::unlock();
        JobQueue::signal();
      }
      JobQueue::unlock();
      return;
    }

    void pop() { JobQueue::pop_front(); }

    void wait() {
      JobQueue::Mutex::wait();
      JobQueue::unlock();
    }

  };

  class PooledThread : public Thread {
  public:
    PooledThread(){}
    void run() {
      JOB::initializeThread(*threadPool);
      for (;;) {
        JOB job;
        if (!threadPool->inJobQueue.pop(job)) {
          break;
        }
        job.execute(*threadPool);
        threadPool->outJobQueue.push(job);
      }
      JOB::finalizeThread(*threadPool);
    }

    ThreadPool *threadPool;
  };

  ThreadPool(size_t s, SHARED_DATA sd):sharedData(sd){
    for (size_t i = 0; i < s; i++) {
      PooledThread *pt = new PooledThread;
      pt->threadPool = this;
      pt->start();
      threads.push_back(pt);
    }
  }

  ~ThreadPool() {
    terminate();
  }

  SHARED_DATA getSharedData() {
    return sharedData;
  }

  void push(JOB &j) {
    if (!inJobQueue.isPushing) {
      outJobQueue.lock();
      outJobQueue.clear();
    }
    inJobQueue.push(j);
  }

  bool pop(JOB &j) {
    if (outJobQueue.size() == 0) {
      return false;
    }
    j = outJobQueue.front();
    outJobQueue.pop();
    return true;
  }

  void wait() {
    inJobQueue.isPushing = false;
    outJobQueue.limit = inJobQueue.size;
    inJobQueue.size = 0;
    outJobQueue.OutJobQueue::wait();
  }

  void terminate() {
    if (threads.size() == 0) {
      return;
    }
    inJobQueue.terminate();
    for (size_t i = 0; i < threads.size(); i++) {
      threads[i]->join();
      delete threads[i];
    }
    threads.clear();
  }

protected:
  InJobQueue            inJobQueue;
  OutJobQueue           outJobQueue;

  SHARED_DATA           sharedData;
  vector<PooledThread*> threads;

};

いつものようにここまでの動作は気にしなくて結構です。では、スレッドプールの使い方です。

スレッドプールを使う時にはもうスレッドを意識する必要はありません。ジョブとしてスレッドで実行される作業を捉えます。スレッドプールは入力用のジョブキュー(InJobQueue)と出力用のジョブキュー(InJobQueue)の二つのジョブキューをもっています。InJobQueueにジョブを入れる(push)とスレッドプールのスレッドがInJobQueueから未処理のジョブを取り出し処理を実行してOutJobQueueに入れます。処理されたジョブをOutJobQueueから取り出し(pop)て処理の結果を取得します。

以下がサンプルです。

class MyJobSharedData {
public:
  MyJobSharedData() {
  }
  int value;
};
class MyJob;
typedef ThreadPool<MyJob, MyJobSharedData&> MyThreadPool;

class MyJob {
public:
  MyJob() {}
  static void initializeThread(MyThreadPool &tp) {
    cout << "initialize" << endl;
  }
  static void finalizeThread(MyThreadPool &tp) {
    cout << "finalize" << endl;
  }
  void execute(MyThreadPool &tp) {
    *data *= tp.getSharedData().value;
  };
  int *data;
};

int
main()
{
  static int threadPoolSize = 4;
  MyJobSharedData sd;
  sd.value = 10;
  MyThreadPool threadPool(threadPoolSize, sd);

  vector<int> data;
  for (int i = 1; i <= 10; i++) {
    data.push_back(i);
  }

  for (vector<int>::iterator i = data.begin(); i != data.end(); i++) {
    MyJob inJob;
    inJob.data = &(*i);
    threadPool.push(inJob);
  }

  threadPool.wait();

  MyJob outJob;
  while (threadPool.pop(outJob)) {
    cout << "job=" << *outJob.data << endl;
  }

  for (vector<int>::iterator i = data.begin(); i != data.end(); i++) {
    cout << "vector=" << *i << endl;
  }
}

この例では10個の配列の値に共有データの値を掛け合わせるという簡単なものです。以下のMyJob::execute()がスレッドでの処理となり、ここでは単に共有データの値を掛け合わせているだけです。

  void execute(MyThreadPool &tp) {
    *data *= tp.getSharedData().value;
  };

以下の行でスレッド数を指定しています。

  static int threadPoolSize = 4;

以下の行でInJobQueueにジョブを投入します。

  threadPool.push(inJob);

以下の行でジョブに配列の要素を指定しています。配列の要素である必要はないのですが、出力順は入力順にならずにランダムになるので入力順を維持するために配列の要素を与えています。

    inJob.data = &(*i);

以下の行ですべてのジョブが終了するのを待ちます。

  threadPool.wait();

以下の行でOutJobQueueから処理結果のジョブを取得します。

  while (threadPool.pop(outJob)) {

以下が実行結果です。

initialize
initialize
initialize
initialize
job=40
job=50
job=60
job=70
job=80
job=90
job=100
job=10
job=20
job=30
vector=10
vector=20
vector=30
vector=40
vector=50
vector=60
vector=70
vector=80
vector=90
vector=100
finalize
finalize
finalize
finalize

スレッド処理をするときに場合によってはスレッドの中で初期化や終了処理をしなければならないこともありますので、一応initialize()とfinalize()というメソッドを用意してみました。そのダミー出力が最初と最後の「initialize」と「finalize」です。

「job=」取得したJobの順序ですが、投入順序になっていません。Jobの順序はスレッドの終了順になるので、必ずしもジョブの投入順にはなりません。ただし、配列を表示していところでは正しい順序になっていることがわかります。

前回:ロック - 排他処理