惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

L
LINUX DO - 热门话题
Stack Overflow Blog
Stack Overflow Blog
B
Blog
WordPress大学
WordPress大学
Project Zero
Project Zero
P
Palo Alto Networks Blog
阮一峰的网络日志
阮一峰的网络日志
博客园 - 司徒正美
有赞技术团队
有赞技术团队
S
SegmentFault 最新的问题
freeCodeCamp Programming Tutorials: Python, JavaScript, Git & More
小众软件
小众软件
T
Tailwind CSS Blog
Forbes - Security
Forbes - Security
F
Full Disclosure
SecWiki News
SecWiki News
钛媒体:引领未来商业与生活新知
钛媒体:引领未来商业与生活新知
Hacker News: Ask HN
Hacker News: Ask HN
C
Check Point Blog
Microsoft Security Blog
Microsoft Security Blog
Threat Intelligence Blog | Flashpoint
Threat Intelligence Blog | Flashpoint
F
Fortinet All Blogs
Cisco Talos Blog
Cisco Talos Blog
G
Google Developers Blog
J
Java Code Geeks
Google DeepMind News
Google DeepMind News
人人都是产品经理
人人都是产品经理
CTFtime.org: upcoming CTF events
CTFtime.org: upcoming CTF events
Recorded Future
Recorded Future
O
OpenAI News
Spread Privacy
Spread Privacy
MongoDB | Blog
MongoDB | Blog
H
Hackread – Cybersecurity News, Data Breaches, AI and More
C
Cybersecurity and Infrastructure Security Agency CISA
S
Securelist
V
Vulnerabilities – Threatpost
Y
Y Combinator Blog
IT之家
IT之家
U
Unit 42
腾讯CDC
S
Security Affairs
C
Cisco Blogs
Schneier on Security
Schneier on Security
The Last Watchdog
The Last Watchdog
B
Blog RSS Feed
宝玉的分享
宝玉的分享
cs.AI updates on arXiv.org
cs.AI updates on arXiv.org
S
Security @ Cisco Blogs
Cyberwarzone
Cyberwarzone
T
The Blog of Author Tim Ferriss

Jia Yue Hua

完美转发不完美 闲散颂 adapting c++20 ranges algorithms for most metaprogramming Reproducible github Developer Environments 在其它线程周期回调函数 在当前线程周期回调函数 for_each cpo和tag_invoke transparent,为关联容器增加查找成员
使用mpmcpipeline和jthread实现软流水
Jia Yue Hua · 2023-05-22 · via Jia Yue Hua

多个Jthread线程,每个线程完成mpmcpipeline中一个stage的处理,实现软件流水。第一个stage收到sentinal value时,所有work线程都退出。

#include <gtest/gtest.h>
#include <folly/MPMCPipeline.h>
#include <vector>
#include <thread>
#include <boost/mp11.hpp>
#include <fmt/format.h>
#include <folly/Function.h>
#include <type_traits>
#include <boost/hana.hpp>
namespace mp11 = boost::mp11;
namespace hana = boost::hana;

template <class T>
struct getNumber {
  constexpr int operator()(int n) const noexcept { return n; }
};
template <class T>
constexpr auto getnumber = getNumber<T>{};
 //n queuesize, T...: sizeof...(T) ie (N+1) queues' type
template <int n, typename... T>
  requires(sizeof...(T) >= 2)
class SoftPipeline
{

  //N+1 queues' sentinal
  boost::hana::tuple<T...> sentinals_;
  //N stage piepeline
  folly::MPMCPipeline<T...> pipeline_;
  //N stage,so N threads
  std::vector<std::jthread> threads_;
  //every stage work function
  template <int N>
  struct StageCall {
    SoftPipeline* softpipeline_;

    using Input  = mp11::mp_at_c<mp11::mp_list<T...>, N>;
    using Output = mp11::mp_at_c<mp11::mp_list<T...>, N + 1>;
    using Func   = folly::Function<Output(const Input&)>;
    Func func_;
    //if receive sentinal, break
    void operator()()
    {
      for (;;)
      {
      Input val;
      auto  ticket = softpipeline_->pipeline_.template blockingReadStage<N>(val);
      softpipeline_->pipeline_.template blockingWriteStage<N>(ticket, func_(val));
      if (val == hana::at(softpipeline_->sentinals_, hana::size_t<N>{}) )
        break;
      }
    }
  };

 public:
   using HeadType= mp11::mp_front<mp11::mp_list<T...>>;
   using TailType= mp11::mp_back<mp11::mp_list<T...>>;

  template <class... U>
    requires((sizeof...(U) == sizeof...(T) - 1) && !std::is_same_v<std::remove_cv_t< mp11::mp_at_c<mp11::mp_list<U...>,0>   >, SoftPipeline>)
  SoftPipeline(const HeadType& sentinalvalue, U&&... arg) : pipeline_(getnumber<T>(n)...)
  {
  //make sentinals tuple
    auto val   = hana::make_tuple(sentinalvalue);
    auto f     = []<class Seq, class Func>(Seq s, Func func) { return hana::append(s, func(hana::back(s))
    ); };
    auto funcs = hana::make_tuple(arg...);
    sentinals_ = hana::fold(funcs, val, f);
   
   //initional N threads
    [this] <std::size_t... I, class... F>(std::index_sequence<I...>, F&&... f) {
      (..., this->threads_.emplace_back(std::jthread(StageCall<I>{this, std::forward<F>(f)})));
    }(std::index_sequence_for<U...>{}, std::forward<U>(arg)...);
  }
  void blockingWrite(const HeadType &h)
  {
    pipeline_.blockingWrite(h);
  }
  TailType blockingRead()
  {
    TailType val;
    pipeline_.blockingRead(val);
    return val;
  }
};

TEST(mpmcpipeline, basic) {
  SoftPipeline<2,int,int,int> pipeline(3,[](int n) { return n + 1; },[](int n) { return n + 2; });
  pipeline.blockingWrite(1);
  auto n =pipeline.blockingRead();
  fmt::print("n:{}\n",n);
  pipeline.blockingWrite(3);
  n =pipeline.blockingRead();
  fmt::print("n:{}\n",n);
}

value category 和noexcept规范作为练习留给读者。

Posted 2023-05-22