惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

Threat Intelligence Blog | Flashpoint
Threat Intelligence Blog | Flashpoint
C
CERT Recently Published Vulnerability Notes
博客园 - 【当耐特】
有赞技术团队
有赞技术团队
Hugging Face - Blog
Hugging Face - Blog
Cisco Talos Blog
Cisco Talos Blog
爱范儿
爱范儿
cs.CL updates on arXiv.org
cs.CL updates on arXiv.org
人人都是产品经理
人人都是产品经理
大猫的无限游戏
大猫的无限游戏
博客园 - 三生石上(FineUI控件)
Martin Fowler
Martin Fowler
量子位
Cyberwarzone
Cyberwarzone
腾讯CDC
博客园 - Franky
T
The Blog of Author Tim Ferriss
U
Unit 42
Engineering at Meta
Engineering at Meta
CTFtime.org: upcoming CTF events
CTFtime.org: upcoming CTF events
小众软件
小众软件
酷 壳 – CoolShell
酷 壳 – CoolShell
G
GRAHAM CLULEY
L
LINUX DO - 最新话题
The Hacker News
The Hacker News
Security Latest
Security Latest
N
News and Events Feed by Topic
S
Schneier on Security
www.infosecurity-magazine.com
www.infosecurity-magazine.com
H
Hacker News: Front Page
Schneier on Security
Schneier on Security
O
OpenAI News
C
Cybersecurity and Infrastructure Security Agency CISA
月光博客
月光博客
美团技术团队
博客园_首页
V
V2EX
freeCodeCamp Programming Tutorials: Python, JavaScript, Git & More
T
Tailwind CSS Blog
雷峰网
雷峰网
WordPress大学
WordPress大学
GbyAI
GbyAI
C
Cisco Blogs
I
InfoQ
L
LINUX DO - 热门话题
Simon Willison's Weblog
Simon Willison's Weblog
T
Tor Project blog
让小产品的独立变现更简单 - ezindie.com
让小产品的独立变现更简单 - ezindie.com
S
Securelist
F
Full Disclosure

jola.dev

Automatically syncing your blog to atproto and standard.site | jola.dev Appreciation for the small web | jola.dev Treating LLMs as programming books Publishing your blog to standard.site in Elixir Generating OG images in Elixir The social contract of writing Highest Random Weight in Elixir bunnyx: a bunny.net Elixir client library Building for the joy of building Running local models on an M4 with 24GB memory How to hit your Claude weekly limit so you can go outside and touch grass Dropping Cloudflare for bunny.net Building a blog with Elixir and Phoenix Stay in the Loop: How I Actually Use Claude Code Ruthless Prioritization: The Path to Delivery Estimates Are More Valuable Than You Think When Software Engineers Think They Need More Focus Time If the Goal is Resiliency, Defensive Programming is Your Enemy The Magic of Daily Pull Requests: Why Smaller is Better Building a Distributed Rate Limiter in Elixir with HashRing Announcing Hex Diff Building Hex Diff The Erlang :queue module in Elixir Patterns for managing ETS tables Health checks for Plug and Phoenix The new `Registry.select/2` and what match specs are Elixir String Processing Optimization
Push-based GenStage
Johanna Larsson · 2019-10-19 · via jola.dev

GenStage is a pull-based system, where consumers pull events from the producers, and most of the documentation describes this. But it can also be used as a push-based one. The idea is straightforward, consumers register their demand with the producer, and if the producer has buffered events it will satisfy it immediately, otherwise, it keeps track of how much demand it has received. When new events are pushed into the producer it passes it on to the consumers while decreasing the demand stored in state. GenStage handles the distribution of the events automatically to the waiting consumers.

This blog post is mostly educational, but this pattern can be applied effectively on all kinds of use cases.

Let’s take a look at a stripped-down example. We’ll set up a producer that accepts messages from any other process, like a regular GenServer, and distributes them to the consumers. We’ll avoid worrying about min_demand and max_demand and limit consumers to ask for a single event at a time. This is perfectly reasonable where the work is slow and processing time irregular, eg if they involve making requests to network resources. GenStage comes with a built-in buffer, if a producer produces more messages than the consumers can handle, they’re automatically buffered with a maximum of 10_000 items by default. We’re not going to rely on it though, we’ll be buffering ourselves in the producer. This has some interesting benefits in improvements we can build to the producer, including backpressure. That’s a future post though.

First out, we define a module using GenStage with start_link and init, with the latter returning a tuple of a queue and an initial demand of 0. If you haven't seen :queue before, take a look at my walkthrough or the Erlang documentation. In short, it allows us to create an efficient buffer for situations where the producer gets pushed more events than the consumers can handle.

defmodule Producer do

use GenStage

def start_link(_args) do

GenStage.start_link(__MODULE__, [], name: __MODULE__)

end

def init(_args) do

{:producer, {:queue.new(), 0}}

end

Next, we’ll define the required handle_demand callback. This is called whenever consumers register demand. Note that we can ignore the incoming demand, it’s always 1. We attempt to take an item out of the queue and return it. If there are no items in the queue, we instead register the demand with demand + 1. For educational purposes, the incoming demand is also printed.

def handle_demand(incoming, {queue, demand}) do

IO.inspect(incoming, label: "demand")

with {item, queue} <- :queue.out(queue),

{:value, event} <- item do

{:noreply, [event], {queue, demand}}

else

_ -> {:noreply, [], {queue, demand + 1}}

end

end

Next, we’ll need a way of pushing items into the producer. GenStage is built on top of GenServer and supports those callbacks, including handle_cast. We need to handle two different cases: consumers are waiting for events, and the consumers are busy. The first clause stores the incoming event. In the second clause, we know that there is stored demand and that we’ve received a single item, so we put that in the queue, get the oldest item from the queue and return it while decreasing stored demand by 1.

def handle_cast({:enqueue, event}, {queue, 0}) do

queue = :queue.in(event, queue)

{:noreply, [], {queue, 0}}

end

def handle_cast({:enqueue, event}, {queue, demand}) do

queue = :queue.in(event, queue)

{{:value, event}, queue} = :queue.out(queue)

{:noreply, [event], {queue, demand - 1}}

end

Finally we expose a public API for ease of use.

def enqueue(event) do

GenServer.cast(__MODULE__, {:enqueue, event})

end

end

First part is done! Next up is the consumer. It takes an ID in initialization because we want to start a few and keep track of which one did what. max_demand is set to 1 to ensure that the consumer asks for one item at a time. In the handle_events callback we pretend to work for 500+0..1000 ms, this helps illustrate the way work is distributed on the consumers. They print their own ID, as well as the event.

defmodule Consumer do

use GenStage

def start_link(id) do

GenStage.start_link(__MODULE__, id)

end

def init(id) do

{:consumer, id, subscribe_to: [{Producer, max_demand: 1}]}

end

def handle_events([event], _from, id) do

IO.puts("#{id}: received #{event}")

Process.sleep(500 + :rand.uniform(1000))

IO.puts("#{id}: finished #{event}")

{:noreply, [], id}

end

end

You can either start them up manually in IEx or add them to your application supervision tree. If you’re starting them in IEx, it looks like this:

iex(1)> Producer.start_link(:ok)

{:ok, #PID<0.189.0>}

iex(2)> Consumer.start_link(1)

{:ok, #PID<0.191.0>}

demand: 1

iex(3)> Consumer.start_link(2)

demand: 1

{:ok, #PID<0.193.0>}

iex(4)> Consumer.start_link(3)

demand: 1

{:ok, #PID<0.195.0>}

You can see the consumers immediately subscribing to the producer and registering their demand. Now you’re free to play around with it!

iex(5)> Producer.enqueue("hello")

:ok

1: received "hello"

1: finished "hello"

demand: 1

iex(7)> for i <- 1..5, do: Producer.enqueue("message #{i}")

3: received "message 1"

1: received "message 2"

2: received "message 3"

[:ok, :ok, :ok, :ok, :ok]

3: finished "message 1"

demand: 1

3: received "message 4"

2: finished "message 3"

demand: 1

2: received "message 5"

3: finished "message 4"

demand: 1

1: finished "message 2"

demand: 1

2: finished "message 5"

demand: 1

Moving forward

Now that you have this working, here are a few different things you can try extending it with:

  1. Update enqueue/1 to take lists of events instead of single items, and update the producer and consumer to handle demand properly, including setting min_demand and max_demand.
  2. Keep track of the queue buffer in the producer and shed events if it grows beyond a given size.
  3. Add a ProducerConsumer middle step.

Conclusion

I hope this has been educational or useful! GenStage is a super interesting piece of software, but the concepts involve take a while to wrap your head around, and I wanted to help build on the available knowledge.

Written by Johanna Larsson. Thoughts on this post? Find me on Bluesky at @jola.dev or why not give it a vote on Bubbles.