惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

阮一峰的网络日志
阮一峰的网络日志
D
Darknet – Hacking Tools, Hacker News & Cyber Security
S
Schneier on Security
The Last Watchdog
The Last Watchdog
Cyberwarzone
Cyberwarzone
S
Securelist
Threat Intelligence Blog | Flashpoint
Threat Intelligence Blog | Flashpoint
C
Cyber Attacks, Cyber Crime and Cyber Security
L
Lohrmann on Cybersecurity
钛媒体:引领未来商业与生活新知
钛媒体:引领未来商业与生活新知
博客园 - 司徒正美
The Cloudflare Blog
V
V2EX
博客园_首页
博客园 - 聂微东
Vercel News
Vercel News
人人都是产品经理
人人都是产品经理
G
GRAHAM CLULEY
T
Tenable Blog
Last Week in AI
Last Week in AI
Y
Y Combinator Blog
L
LINUX DO - 最新话题
cs.CL updates on arXiv.org
cs.CL updates on arXiv.org
SecWiki News
SecWiki News
博客园 - 三生石上(FineUI控件)
S
Secure Thoughts
N
News | PayPal Newsroom
T
The Blog of Author Tim Ferriss
The GitHub Blog
The GitHub Blog
T
Troy Hunt's Blog
博客园 - 【当耐特】
Forbes - Security
Forbes - Security
H
Hacker News: Front Page
A
About on SuperTechFans
B
Blog RSS Feed
Engineering at Meta
Engineering at Meta
MongoDB | Blog
MongoDB | Blog
CTFtime.org: upcoming CTF events
CTFtime.org: upcoming CTF events
freeCodeCamp Programming Tutorials: Python, JavaScript, Git & More
罗磊的独立博客
D
DataBreaches.Net
P
Privacy & Cybersecurity Law Blog
Schneier on Security
Schneier on Security
Application and Cybersecurity Blog
Application and Cybersecurity Blog
Google DeepMind News
Google DeepMind News
奇客Solidot–传递最新科技情报
奇客Solidot–传递最新科技情报
OSCHINA 社区最新新闻
OSCHINA 社区最新新闻
Jina AI
Jina AI
D
Docker
P
Proofpoint News Feed

Jia Yue Hua

完美转发不完美 闲散颂 adapting c++20 ranges algorithms for most metaprogramming Reproducible github Developer Environments 在其它线程周期回调函数 在当前线程周期回调函数 for_each cpo和tag_invoke transparent,为关联容器增加查找成员
使用mpmcpipeline和jthread实现软流水
Jia Yue Hua · 2023-05-22 · via Jia Yue Hua

多个Jthread线程,每个线程完成mpmcpipeline中一个stage的处理,实现软件流水。第一个stage收到sentinal value时,所有work线程都退出。

#include <gtest/gtest.h>
#include <folly/MPMCPipeline.h>
#include <vector>
#include <thread>
#include <boost/mp11.hpp>
#include <fmt/format.h>
#include <folly/Function.h>
#include <type_traits>
#include <boost/hana.hpp>
namespace mp11 = boost::mp11;
namespace hana = boost::hana;

template <class T>
struct getNumber {
  constexpr int operator()(int n) const noexcept { return n; }
};
template <class T>
constexpr auto getnumber = getNumber<T>{};
 //n queuesize, T...: sizeof...(T) ie (N+1) queues' type
template <int n, typename... T>
  requires(sizeof...(T) >= 2)
class SoftPipeline
{

  //N+1 queues' sentinal
  boost::hana::tuple<T...> sentinals_;
  //N stage piepeline
  folly::MPMCPipeline<T...> pipeline_;
  //N stage,so N threads
  std::vector<std::jthread> threads_;
  //every stage work function
  template <int N>
  struct StageCall {
    SoftPipeline* softpipeline_;

    using Input  = mp11::mp_at_c<mp11::mp_list<T...>, N>;
    using Output = mp11::mp_at_c<mp11::mp_list<T...>, N + 1>;
    using Func   = folly::Function<Output(const Input&)>;
    Func func_;
    //if receive sentinal, break
    void operator()()
    {
      for (;;)
      {
      Input val;
      auto  ticket = softpipeline_->pipeline_.template blockingReadStage<N>(val);
      softpipeline_->pipeline_.template blockingWriteStage<N>(ticket, func_(val));
      if (val == hana::at(softpipeline_->sentinals_, hana::size_t<N>{}) )
        break;
      }
    }
  };

 public:
   using HeadType= mp11::mp_front<mp11::mp_list<T...>>;
   using TailType= mp11::mp_back<mp11::mp_list<T...>>;

  template <class... U>
    requires((sizeof...(U) == sizeof...(T) - 1) && !std::is_same_v<std::remove_cv_t< mp11::mp_at_c<mp11::mp_list<U...>,0>   >, SoftPipeline>)
  SoftPipeline(const HeadType& sentinalvalue, U&&... arg) : pipeline_(getnumber<T>(n)...)
  {
  //make sentinals tuple
    auto val   = hana::make_tuple(sentinalvalue);
    auto f     = []<class Seq, class Func>(Seq s, Func func) { return hana::append(s, func(hana::back(s))
    ); };
    auto funcs = hana::make_tuple(arg...);
    sentinals_ = hana::fold(funcs, val, f);
   
   //initional N threads
    [this] <std::size_t... I, class... F>(std::index_sequence<I...>, F&&... f) {
      (..., this->threads_.emplace_back(std::jthread(StageCall<I>{this, std::forward<F>(f)})));
    }(std::index_sequence_for<U...>{}, std::forward<U>(arg)...);
  }
  void blockingWrite(const HeadType &h)
  {
    pipeline_.blockingWrite(h);
  }
  TailType blockingRead()
  {
    TailType val;
    pipeline_.blockingRead(val);
    return val;
  }
};

TEST(mpmcpipeline, basic) {
  SoftPipeline<2,int,int,int> pipeline(3,[](int n) { return n + 1; },[](int n) { return n + 2; });
  pipeline.blockingWrite(1);
  auto n =pipeline.blockingRead();
  fmt::print("n:{}\n",n);
  pipeline.blockingWrite(3);
  n =pipeline.blockingRead();
  fmt::print("n:{}\n",n);
}

value category 和noexcept规范作为练习留给读者。

Posted 2023-05-22