惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

Security Latest
Security Latest
让小产品的独立变现更简单 - ezindie.com
让小产品的独立变现更简单 - ezindie.com
Stack Overflow Blog
Stack Overflow Blog
WordPress大学
WordPress大学
N
Netflix TechBlog - Medium
GbyAI
GbyAI
云风的 BLOG
云风的 BLOG
Threat Intelligence Blog | Flashpoint
Threat Intelligence Blog | Flashpoint
宝玉的分享
宝玉的分享
博客园 - 【当耐特】
C
Cyber Attacks, Cyber Crime and Cyber Security
雷峰网
雷峰网
奇客Solidot–传递最新科技情报
奇客Solidot–传递最新科技情报
T
Threat Research - Cisco Blogs
NISL@THU
NISL@THU
Spread Privacy
Spread Privacy
P
Proofpoint News Feed
J
Java Code Geeks
freeCodeCamp Programming Tutorials: Python, JavaScript, Git & More
MyScale Blog
MyScale Blog
T
Tor Project blog
P
Proofpoint News Feed
C
CERT Recently Published Vulnerability Notes
P
Privacy & Cybersecurity Law Blog
MongoDB | Blog
MongoDB | Blog
Simon Willison's Weblog
Simon Willison's Weblog
C
Cybersecurity and Infrastructure Security Agency CISA
L
LINUX DO - 热门话题
小众软件
小众软件
G
GRAHAM CLULEY
P
Privacy International News Feed
AWS News Blog
AWS News Blog
Know Your Adversary
Know Your Adversary
P
Palo Alto Networks Blog
人人都是产品经理
人人都是产品经理
S
Schneier on Security
Scott Helme
Scott Helme
OSCHINA 社区最新新闻
OSCHINA 社区最新新闻
B
Blog RSS Feed
T
The Exploit Database - CXSecurity.com
Recent Announcements
Recent Announcements
E
Exploit-DB.com RSS Feed
C
CXSECURITY Database RSS Feed - CXSecurity.com
U
Unit 42
The Register - Security
The Register - Security
S
Securelist
Martin Fowler
Martin Fowler
Project Zero
Project Zero
大猫的无限游戏
大猫的无限游戏
Cisco Talos Blog
Cisco Talos Blog

博客园 - boydfd

为了随时随地控制 AI Agent,我做了一个 Web Terminal 彻底告别OpenClaw使用焦虑:我给他装上了“透视眼”和“批量克隆模组 万字长文详解Text-to-SQL LLM应用落地实施手册 LLM生成代码后,如何一键合并到源代码中(FastApply技术研究) 为了改一行代码,我花了10多天时间,让性能提升了40多倍---Pascal架构GPU在vllm下的模型推理优化 喝个水也要整智能家居? 大语言模型中一个调皮的EOS token 如何定制一个智能洒水装置(养狗/养花人士请进) 如何用智能地教狗狗上厕所 端到端智能音箱 再看Lambda架构 如何写一个好的测试 - boydfd - 博客园 用Python和Pandas以及爬虫技术统计历史天气 坚持连续背单词一年是什么体验 - boydfd - 博客园 看直播到底能得到什么 - boydfd - 博客园 Item 27: 明白什么时候选择重载,什么时候选择universal引用 - boydfd 对于大学4年的反思(续),记我的ThoughtWorks面试 - boydfd - 博客园 Item 26: 避免对universal引用进行重载 - boydfd Item 25: 对右值引用使用std::move,对universal引用则使用std::forward - boydfd
Kafka源码研究--Comsumer获取partition下标
boydfd · 2019-10-27 · via 博客园 - boydfd

背景

由于项目上Flink在设置parallel多于1的情况下,job没法正确地获取watermark,所以周末来研究一下一部分,大概已经锁定了原因:
虽然我们的topic只设置了1的partition,但是Kafka的Comsumer还是起了好几个subtask去读索引是2、3的partition,然后这几个subtask的watermark一直不更新,导致我们job整体的watermark一直是Long.MIN_VALUE。现在需要去了解一下subtask获取partition的流程,等上班的时候debug一遍应该就可以知道原因。

翻源码的过程

通过log找到分配partition的大概位置

find partition

从图中可以看到,在org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase这个类中可以找到一些关键信息。

跟踪源码

log partition

往上翻翻,看有没有有用信息

all partitions

关键源码,附上注释

	public void open(Configuration configuration) throws Exception {
		// determine the offset commit mode
		this.offsetCommitMode = OffsetCommitModes.fromConfiguration(
				getIsAutoCommitEnabled(),
				enableCommitOnCheckpoints,
				((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());

		// create the partition discoverer
		this.partitionDiscoverer = createPartitionDiscoverer(
				topicsDescriptor,
				getRuntimeContext().getIndexOfThisSubtask(),
				getRuntimeContext().getNumberOfParallelSubtasks());
		this.partitionDiscoverer.open();

		subscribedPartitionsToStartOffsets = new HashMap<>();
        // 重点函数,这个函数或获取到subtask的所有partition。
		final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();
		if (restoredState != null) {
            ...
		} else {
			// use the partition discoverer to fetch the initial seed partitions,
			// and set their initial offsets depending on the startup mode.
			// for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the specific offsets now;
			// for other modes (EARLIEST, LATEST, and GROUP_OFFSETS), the offset is lazily determined
			// when the partition is actually read.
			switch (startupMode) {
                ...
				default:
					for (KafkaTopicPartition seedPartition : allPartitions) {
						subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel());
					}
			}

			if (!subscribedPartitionsToStartOffsets.isEmpty()) {
				switch (startupMode) {
                    ...
					case GROUP_OFFSETS:
						LOG.info("Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}",
							getRuntimeContext().getIndexOfThisSubtask(),
							subscribedPartitionsToStartOffsets.size(),
							subscribedPartitionsToStartOffsets.keySet());
				}
			} else {
				LOG.info("Consumer subtask {} initially has no partitions to read from.",
					getRuntimeContext().getIndexOfThisSubtask());
			}
		}

	public List<KafkaTopicPartition> discoverPartitions() throws WakeupException, ClosedException {
		if (!closed && !wakeup) {
			try {
				List<KafkaTopicPartition> newDiscoveredPartitions;

				// (1) get all possible partitions, based on whether we are subscribed to fixed topics or a topic pattern
				if (topicsDescriptor.isFixedTopics()) {
                    // 对于没有使用通配符的topic,直接获取topic的所有partition
					newDiscoveredPartitions = getAllPartitionsForTopics(topicsDescriptor.getFixedTopics());
				} else {
                    // 对于使用了通配符的topic, 先找到所有topic,再一一match
					List<String> matchedTopics = getAllTopics();

					// retain topics that match the pattern
					Iterator<String> iter = matchedTopics.iterator();
					while (iter.hasNext()) {
						if (!topicsDescriptor.isMatchingTopic(iter.next())) {
							iter.remove();
						}
					}

					if (matchedTopics.size() != 0) {
						// get partitions only for matched topics
						newDiscoveredPartitions = getAllPartitionsForTopics(matchedTopics);
					} else {
						newDiscoveredPartitions = null;
					}
				}

				// (2) eliminate partition that are old partitions or should not be subscribed by this subtask
				if (newDiscoveredPartitions == null || newDiscoveredPartitions.isEmpty()) {
					throw new RuntimeException("Unable to retrieve any partitions with KafkaTopicsDescriptor: " + topicsDescriptor);
				} else {
					Iterator<KafkaTopicPartition> iter = newDiscoveredPartitions.iterator();
					KafkaTopicPartition nextPartition;
					while (iter.hasNext()) {
						nextPartition = iter.next();
                        // 只保留符合要求的partition,这就是我们要找的函数
						if (!setAndCheckDiscoveredPartition(nextPartition)) {
							iter.remove();
						}
					}
				}

				return newDiscoveredPartitions;
			}...
		}...
    }
    
    public boolean setAndCheckDiscoveredPartition(KafkaTopicPartition partition) {
		if (isUndiscoveredPartition(partition)) {
			discoveredPartitions.add(partition);

            // 在这
			return KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks) == indexOfThisSubtask;
		}

		return false;
	}


    public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) {
        // 先算出此topic的hash(partition.getTopic().hashCode() * 31),这里不知道为什么不直接用hash,还要再*31,然后取正数(& 0x7FFFFFFF),最后获取到此topic的起始位置。
		int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks;

		// here, the assumption is that the id of Kafka partitions are always ascending
		// starting from 0, and therefore can be used directly as the offset clockwise from the start index
        // 计算当前的partition应该属于哪个subtask。例如:一共有20个subtask,算出来的起始位置是5,partition是5,那么最后就是
        // (5 + 5) % 20 = 10, 这个partition应该分给10号subtask。
		return (startIndex + partition.getPartition()) % numParallelSubtasks;
	}

思考

某topic的每个partition会分给哪个subtask其实是确定的

topic名字是确定的 -> topic的hashCode是确定的 && subtask的数量是确定的 -> startIndex是确定的 -> 某partition会分给哪个subtask其实是确定的

为什么要算startIndex

大概是为了平均分配不同的topic,如果topic很多,每个topic都只从0开始,那么subtask 0,1,2之类的靠前subtask就需要读大量的partition。