惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

Google DeepMind News
Google DeepMind News
Martin Fowler
Martin Fowler
T
Threatpost
云风的 BLOG
云风的 BLOG
博客园 - 司徒正美
C
CERT Recently Published Vulnerability Notes
V
Vulnerabilities – Threatpost
Help Net Security
Help Net Security
Project Zero
Project Zero
博客园 - 聂微东
博客园_首页
T
Tor Project blog
Threat Intelligence Blog | Flashpoint
Threat Intelligence Blog | Flashpoint
V
Visual Studio Blog
人人都是产品经理
人人都是产品经理
The Register - Security
The Register - Security
Latest news
Latest news
K
Kaspersky official blog
L
LINUX DO - 热门话题
P
Proofpoint News Feed
钛媒体:引领未来商业与生活新知
钛媒体:引领未来商业与生活新知
美团技术团队
C
Cyber Attacks, Cyber Crime and Cyber Security
A
Arctic Wolf
aimingoo的专栏
aimingoo的专栏
J
Java Code Geeks
F
Full Disclosure
Recent Announcements
Recent Announcements
SecWiki News
SecWiki News
C
Cybersecurity and Infrastructure Security Agency CISA
F
Fortinet All Blogs
The Hacker News
The Hacker News
Apple Machine Learning Research
Apple Machine Learning Research
NISL@THU
NISL@THU
The GitHub Blog
The GitHub Blog
量子位
Hugging Face - Blog
Hugging Face - Blog
cs.CL updates on arXiv.org
cs.CL updates on arXiv.org
P
Palo Alto Networks Blog
T
Troy Hunt's Blog
O
OpenAI News
T
Threat Research - Cisco Blogs
博客园 - Franky
Hacker News - Newest:
Hacker News - Newest: "LLM"
A
About on SuperTechFans
C
Check Point Blog
Hacker News: Ask HN
Hacker News: Ask HN
AWS News Blog
AWS News Blog
freeCodeCamp Programming Tutorials: Python, JavaScript, Git & More
T
Tenable Blog

浮华生

Elasticsearch 检索性能优化 - 浮华生 舆情监控系统综述 - 浮华生 2024 半年度总结 - 浮华生 2023 年终总结 - 浮华生 异地机器组网方案 - 浮华生 Kubernetes 部署 Elasticsearch 和 Kibana - 浮华生 2022 年终总结 - 浮华生 RabbitMQ connection channel 的关系 - 浮华生 RabbitMQ 和 Kafka 应用原理简单对比 - 浮华生 阿里云 OpenSearch 介绍 - 浮华生 Golang Array 和 Slice 区别 - 浮华生 Mac OS 下打造 golang nvim 编程环境之基础配置 - 浮华生 电商搜索技术总结 - 浮华生 电商搜索业务总结 - 浮华生 2021 年终总结 - 浮华生 Cypress 实践总结 - 浮华生 年终总结 - 浮华生 关于我 - 浮华生 使用 cucumber 进行行为驱动开发(BDD) - 浮华生 微服务应用集成 SpringCloud 步骤 - 浮华生 电商搜索数据同步方案 - 浮华生 通过一道数值转换题重温计算机补码 - 浮华生 macOS 系统推荐的一些软件 - 浮华生 DevOps 实施规划(持续更新) - 浮华生 rabbitmq 如何提高可靠性并保证消费端幂等 - 浮华生 AMQ Model总结 - 浮华生 结对编程 - 浮华生 RSocket 介绍 - 浮华生 面向对象的理解 - 浮华生 企业平台技术框架 - 浮华生 对创业的思考 - 浮华生 知难行易 - 浮华生 一年工作经验总结 - 浮华生 我与领域驱动之缘 - 浮华生 TDD 中使用的工具 - 浮华生 tf-idf 算法 - 浮华生 gitlab 添加代码规范检测 - 浮华生 query 改写 - 浮华生 阶段性总结 - 浮华生 操作系统作用 - 浮华生 - 浮华生 hamming-distance - 浮华生 ElasticSearch API 基本操作 - 浮华生 elasticsearch 集群容错 - 浮华生 ElasticSearch 基础概念 - 浮华生 技术选型怎么做 - 浮华生 条件概率、全概率与贝叶斯公式 - 浮华生 年终总结 - 浮华生 迁移到 ubuntu18 的问题及配置 - 浮华生 总结 - 浮华生 使用 psi-probe 监控 Tomcat - 浮华生 Tomcat 远程 Debug - 浮华生 jstack 死循环和死锁定位 - 浮华生 jmap & mat 内存溢出 - 浮华生 JVM 常用参数查看 - 浮华生 周总结(8.13-8.19) - 浮华生 周总结(7.30-8.4) - 浮华生 使用移位运算符 - 浮华生 master 公式 - 浮华生 VMware 12 NAT网络下配置 ubuntu 16.04 LTS 系统静态 IP - 浮华生 关于进制的计算 - 浮华生 项目总结 第三篇 - 浮华生 项目总结 第二篇 - 浮华生 editor.md 富文本编辑器的使用 - 浮华生 项目总结 第一篇 - 浮华生 2017至今总结 - 浮华生 谈谈微服务 - 浮华生 单例模式 - 浮华生 tor 使用 - 浮华生 归档 - 浮华生 搜索 - 浮华生 搜索 && 推荐 - 浮华生
Kafka Java 客户端 Producer 原理分析 - 浮华生
浮华生 · 2022-06-06 · via 浮华生

Kafka 是我认为最值得深入研究的一个消息队列,它的官方文档写的非常详尽,从配置到使用,从设计到实现无不体现研发的技术功底。

Kafka 已经发展到 3.x 时代,增加了很多的功能,比如幂等、事务等,如今已经能够保证消息 100% 消费了。

阅读源码能够有两点好处:一个是能够在工作中对 Kafka 进行深度优化,二是能够学习到消息中间件的设计思路。

Kafka Producer Java 客户端如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class MyProducer {
    static String topic = "test";
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        /*
         0 生产者发送消息之后不需要等待任何服务端的响应。
         1 生产者发送消息之后,只要分区的 leader 副本成功写入消息,那么它就会收到来自服务端的成功响应,但是如果发生在 leader 选举阶段会造成消息丢失
        -1/all 生产者在消息发送之后,需要等待 ISR 中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应。
         */
        props.put("acks", "all");
        // 重试次数
        props.put("retries", 0);
        // 指定 Buffer Pool 大小
        props.put("batch.size", 16384);
        // ProducerBatch 发送时间,如果 batch.size 未达到,但是 linger ms 时间达到,则会发送次 Batch 消息
        props.put("linger.ms", 10000); // 10s 发送
        //RecordAccumulator 缓存大小,默认值为 33554432B,32MB
        props.put("buffer.memory", 33554432);
        // 生成消息的速度大于发送的速度会造成生产者内存不足,要么抛异常,要么阻塞这个配置的毫秒数
        props.put("max.block.ms",60000);
        // 消息最大容量,默认 1MB
        props.put("max.request.size",1048576);

        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");

        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String,String> producer = new KafkaProducer<>(props);

        for (int i = 0; i < 10; i++) {
            Thread.sleep(1000);
            producer.send(new ProducerRecord<>(topic,
                    Integer.toString(i), Integer.toString(i)), (recordMetadata, exception) -> {
                System.out.printf("topic=%s,partition=%s,offset=%d,timestamp=%s \n",
                        recordMetadata.topic(),recordMetadata.partition(),
                        recordMetadata.offset(),recordMetadata.timestamp());
                    });
        }
        System.out.println("Message sent successfully");
        producer.close();
    }

从代码结构上分析,主要是两个部分:1. Producer 参数配置; 2. 发送消息。

Producer 参数部分是控制 Kafka 高效稳定发送消息的关键,常用的参数已经写到代码中。如果需要查看全部 Producer 参数,可以在代码开启 Debug 模式运行查看。


我们主要分析一下 Send 的原理

image-20220608223610937

Kafka 客户端主要由一个主线程,一个缓冲器,一个发送线程三个部分组成。

主线程做的操作是对消息进行生成、拦截器处理、序列化、分区器处理然后将格式化的消息发送到 RecordAccumulator 缓存中。

缓存的作用是将消息批量发送以减少 IO 次数,当一条消息会经过分区器路由到对应分区的缓存队列中,如果没有则会创建。

如果缓存大小达到了batch.size后会唤醒 sender 线程,或者 sender 线程等待时间到了linger.ms值后会检查缓存中待发送的数据。

sender 线程负责处理发送前的数据封装,将缓存中的分区消息转成 <Node,Request> 格式,这样就完成了应用逻辑到网络 I/O 层面的转换。

最后准备好发送的消息经过 Nio Selecor 发送到 Broker 集群中,然后 Broker 会进行响应 Response,然后 sender 线程决定重试或者删除缓存。

在 NetWorkClient 中维护了一个InFlightRequests数据结构主要是Map<NodeId,Deque<Request>>,用来管理请求和元数据,默认每个链接能够缓存 5 个 request,超过后将不会再向此连接发送请求,除非接收到 broker 的响应。当对应的 NodeId 的请求堆积了 5 个,此时这个连接可能出现负载问题无法快速处理问题,那再向这个NodeId 上发送请求肯定会加剧堆积的情况。

总结:Kafka Producer 在设计上利用缓存来提升吞吐量、利用两个线程区分不同的工作内容,整体设计非常的清晰和优雅。