スレッドセーフ・遅延機構付き・上限なし・データキューのC++実装*1。Java標準ライブラリ java.util.concurrent.DelayQueue に相当。
通常のデータキューと異なり、キューへの要素追加時に “有効時刻” を指定する。該当要素はその有効時刻を過ぎるまではキュー内に滞留し、キューからの取り出し操作の対象とはならない(遅延機構)。
- enqueue操作
- 有効時刻を指定した要素値をキューに格納する。キュー容量は上限無しのため、enqueue操作は常に成功する。
- dequeue操作
- 現在時刻より小さく、かつ最古の有効時刻を持つ要素値をキューから取り出す。有効な要素が存在しない場合、dequeue操作はブロッキングする。
- close操作
- データ終端を通知する。close操作以後のenqueue操作は失敗する。キューが空になった後のdequeue操作は無効値を返す。
#include <chrono> #include <condition_variable> #include <mutex> #include <utility> #include <vector> struct closed_queue : std::exception {}; template <typename T, typename Clock = std::chrono::system_clock> class delay_queue { public: using value_type = T; using time_point = typename Clock::time_point; delay_queue() = default; explicit delay_queue(std::size_t initial_capacity) { q_.reserve(initial_capacity); } ~delay_queue() = default; delay_queue(const delay_queue&) = delete; delay_queue& operator=(const delay_queue&) = delete; void enqueue(value_type v, time_point tp) { std::lock_guard<decltype(mtx_)> lk(mtx_); if (closed_) throw closed_queue{}; q_.emplace_back(std::move(v), tp); // descending sort on time_point std::sort(begin(q_), end(q_), [](auto&& a, auto&& b) { return a.second > b.second; }); cv_.notify_one(); } value_type dequeue() { std::unique_lock<decltype(mtx_)> lk(mtx_); auto now = Clock::now(); // wait condition: (empty && closed) || (!empty && back.tp <= now) while (!(q_.empty() && closed_) && !(!q_.empty() && q_.back().second <= now)) { if (q_.empty()) cv_.wait(lk); else cv_.wait_until(lk, q_.back().second); now = Clock::now(); } if (q_.empty() && closed_) return {}; // invalid value value_type ret = std::move(q_.back().first); q_.pop_back(); if (q_.empty() && closed_) cv_.notify_all(); return ret; } void close() { std::lock_guard<decltype(mtx_)> lk(mtx_); closed_ = true; cv_.notify_all(); } private: std::vector<std::pair<value_type, time_point>> q_; bool closed_ = false; std::mutex mtx_; std::condition_variable cv_; };
利用サンプルコード:
template <typename T> void dump(const T& v, std::chrono::system_clock::time_point epoch) { using namespace std::chrono; auto elapsed = duration_cast<milliseconds>(system_clock::now() - epoch).count() / 1000.; std::cout << elapsed << ":" << v << std::endl; } int main() { auto base = std::chrono::system_clock::now(); constexpr std::size_t capacity = 5; delay_queue<std::unique_ptr<std::string>> q{capacity}; auto f = std::async(std::launch::async, [&q, base] { using namespace std::chrono_literals; q.enqueue(std::make_unique<std::string>("two"), base + 2s); q.enqueue(std::make_unique<std::string>("three"), base + 3s); q.enqueue(std::make_unique<std::string>("one"), base + 1s); q.close(); }); dump("start", base); dump(*q.dequeue(), base); // "one" dump(*q.dequeue(), base); // "two" dump(*q.dequeue(), base); // "three" assert(q.dequeue() == nullptr); // end of data }
関連URL