yohhoyの日記

技術的メモをしていきたい日記

遅延機構付きデータキュー

スレッドセーフ・遅延機構付き・上限なし・データキューのC++実装*1Java標準ライブラリ java.util.concurrent.DelayQueue に相当。

通常のデータキューと異なり、キューへの要素追加時に “有効時刻” を指定する。該当要素はその有効時刻を過ぎるまではキュー内に滞留し、キューからの取り出し操作の対象とはならない(遅延機構)。

enqueue操作
有効時刻を指定した要素値をキューに格納する。キュー容量は上限無しのため、enqueue操作は常に成功する。
dequeue操作
現在時刻より小さく、かつ最古の有効時刻を持つ要素値をキューから取り出す。有効な要素が存在しない場合、dequeue操作はブロッキングする。
close操作
データ終端を通知する。close操作以後のenqueue操作は失敗する。キューが空になった後のdequeue操作は無効値を返す。
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <utility>
#include <vector>

struct closed_queue : std::exception {};

template <typename T, typename Clock = std::chrono::system_clock>
class delay_queue {
public:
  using value_type = T;
  using time_point = typename Clock::time_point;

  delay_queue() = default;
  explicit delay_queue(std::size_t initial_capacity)
    { q_.reserve(initial_capacity); }
  ~delay_queue() = default;

  delay_queue(const delay_queue&) = delete;
  delay_queue& operator=(const delay_queue&) = delete;

  void enqueue(value_type v, time_point tp)
  {
    std::lock_guard<decltype(mtx_)> lk(mtx_);
    if (closed_)
      throw closed_queue{};
    q_.emplace_back(std::move(v), tp);
    // descending sort on time_point
    std::sort(begin(q_), end(q_),
              [](auto&& a, auto&& b) { return a.second > b.second; });
    cv_.notify_one();
  }

  value_type dequeue()
  {
    std::unique_lock<decltype(mtx_)> lk(mtx_);
    auto now = Clock::now();
    // wait condition: (empty && closed) || (!empty && back.tp <= now)
    while (!(q_.empty() && closed_) && !(!q_.empty() && q_.back().second <= now)) {
      if (q_.empty())
        cv_.wait(lk);
      else
        cv_.wait_until(lk, q_.back().second);
      now = Clock::now();
    }
    if (q_.empty() && closed_)
      return {};  // invalid value
    value_type ret = std::move(q_.back().first);
    q_.pop_back();
    if (q_.empty() && closed_)
      cv_.notify_all();
    return ret;
  }

  void close()
  {
    std::lock_guard<decltype(mtx_)> lk(mtx_);
    closed_ = true;  
    cv_.notify_all();
  }

private:
  std::vector<std::pair<value_type, time_point>> q_;
  bool closed_ = false;
  std::mutex mtx_;
  std::condition_variable cv_;
};

利用サンプルコード:

template <typename T>
void dump(const T& v, std::chrono::system_clock::time_point epoch)
{
  using namespace std::chrono;
  auto elapsed = duration_cast<milliseconds>(system_clock::now() - epoch).count() / 1000.;
  std::cout << elapsed << ":" << v << std::endl;
}

int main()
{
  auto base = std::chrono::system_clock::now();
  constexpr std::size_t capacity = 5;
  delay_queue<std::unique_ptr<std::string>> q{capacity};

  auto f = std::async(std::launch::async, [&q, base] {
    using namespace std::chrono_literals;
    q.enqueue(std::make_unique<std::string>("two"),   base + 2s);
    q.enqueue(std::make_unique<std::string>("three"), base + 3s);
    q.enqueue(std::make_unique<std::string>("one"),   base + 1s);
    q.close();
  });

  dump("start", base);
  dump(*q.dequeue(), base);  // "one"
  dump(*q.dequeue(), base);  // "two"
  dump(*q.dequeue(), base);  // "three"
  assert(q.dequeue() == nullptr);  // end of data
}

関連URL