惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

H
Hackread – Cybersecurity News, Data Breaches, AI and More
T
The Blog of Author Tim Ferriss
IT之家
IT之家
J
Java Code Geeks
C
Check Point Blog
F
Full Disclosure
B
Blog RSS Feed
Cyber Security Advisories - MS-ISAC
Cyber Security Advisories - MS-ISAC
freeCodeCamp Programming Tutorials: Python, JavaScript, Git & More
Engineering at Meta
Engineering at Meta
Recorded Future
Recorded Future
博客园 - 聂微东
Stack Overflow Blog
Stack Overflow Blog
M
MIT News - Artificial intelligence
大猫的无限游戏
大猫的无限游戏
让小产品的独立变现更简单 - ezindie.com
让小产品的独立变现更简单 - ezindie.com
奇客Solidot–传递最新科技情报
奇客Solidot–传递最新科技情报
月光博客
月光博客
The Cloudflare Blog
罗磊的独立博客
钛媒体:引领未来商业与生活新知
钛媒体:引领未来商业与生活新知
有赞技术团队
有赞技术团队
MongoDB | Blog
MongoDB | Blog
GbyAI
GbyAI
量子位
宝玉的分享
宝玉的分享
博客园 - 三生石上(FineUI控件)
F
Fortinet All Blogs
V
V2EX
人人都是产品经理
人人都是产品经理
A
About on SuperTechFans
D
Docker
MyScale Blog
MyScale Blog
P
Proofpoint News Feed
Blog — PlanetScale
Blog — PlanetScale
Recent Announcements
Recent Announcements
腾讯CDC
The GitHub Blog
The GitHub Blog
L
LangChain Blog
爱范儿
爱范儿
G
Google Developers Blog
OSCHINA 社区最新新闻
OSCHINA 社区最新新闻
阮一峰的网络日志
阮一峰的网络日志
U
Unit 42
酷 壳 – CoolShell
酷 壳 – CoolShell
博客园_首页
aimingoo的专栏
aimingoo的专栏
S
SegmentFault 最新的问题
Microsoft Security Blog
Microsoft Security Blog
V
Visual Studio Blog

Apache Kafka

Important configuration properties for Kafka broker Important configuration properties for the high-level consumer Kafka Configuration API Design API Design API Design API Design API Design API Design API Design Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Datacenters Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design
Introduction
2001-01-01 · via Apache Kafka

The code example below implements a WordCount application that is elastic, highly scalable, fault-tolerant, stateful, and ready to run in production at large scale

import org.apache.kafka.common.serialization.Serdes;
                import org.apache.kafka.streams.KafkaStreams;
                import org.apache.kafka.streams.StreamsConfig;
                import org.apache.kafka.streams.kstream.KStream;
                import org.apache.kafka.streams.kstream.KStreamBuilder;
                import org.apache.kafka.streams.kstream.KTable;

                import java.util.Arrays;
                import java.util.Properties;

                public class WordCountApplication {

                    public static void main(final String[] args) throws Exception {
                        Properties config = new Properties();
                        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
                        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
                        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
                        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

                        KStreamBuilder builder = new KStreamBuilder();
                        KStream<String, String> textLines = builder.stream("TextLinesTopic");
                        KTable<String, Long> wordCounts = textLines
                            .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\W+")))
                            .groupBy((key, word) -> word)
                            .count("Counts");
                        wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic");

                        KafkaStreams streams = new KafkaStreams(builder, config);
                        streams.start();
                    }

                }
            


                import org.apache.kafka.common.serialization.Serdes;
                import org.apache.kafka.streams.KafkaStreams;
                import org.apache.kafka.streams.StreamsConfig;
                import org.apache.kafka.streams.kstream.KStream;
                import org.apache.kafka.streams.kstream.KStreamBuilder;
                import org.apache.kafka.streams.kstream.KTable;
                import org.apache.kafka.streams.kstream.KeyValueMapper;
                import org.apache.kafka.streams.kstream.ValueMapper;

                import java.util.Arrays;
                import java.util.Properties;

                public class WordCountApplication {

                    public static void main(final String[] args) throws Exception {
                        Properties config = new Properties();
                        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
                        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
                        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
                        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

                        KStreamBuilder builder = new KStreamBuilder();
                        KStream<String, String> textLines = builder.stream("TextLinesTopic");
                        KTable<String, Long> wordCounts = textLines
                            .flatMapValues(new ValueMapper<String, Iterable<String>>() {
                                @Override
                                public Iterable<String> apply(String textLine) {
                                    return Arrays.asList(textLine.toLowerCase().split("\W+"));
                                }
                            })
                            .groupBy(new KeyValueMapper<String, String, String>() {
                                @Override
                                public String apply(String key, String word) {
                                    return word;
                                }
                            })
                            .count("Counts");
                        wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic");

                        KafkaStreams streams = new KafkaStreams(builder, config);
                        streams.start();
                    }

                }
            


                import java.lang.Long
import java.util.Properties
                import java.util.concurrent.TimeUnit

                import org.apache.kafka.common.serialization._
                import org.apache.kafka.streams._
                import org.apache.kafka.streams.kstream.{KStream, KStreamBuilder, KTable}

                import scala.collection.JavaConverters.asJavaIterableConverter

                object WordCountApplication {

                    def main(args: Array[String]) {
                        val config: Properties = {
                            val p = new Properties()
                            p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
                            p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092")
                            p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
                            p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
                            p
                        }

                        val builder: KStreamBuilder = new KStreamBuilder()
                        val textLines: KStream[String, String] = builder.stream("TextLinesTopic")
                        val wordCounts: KTable[String, Long] = textLines
                            .flatMapValues(textLine => textLine.toLowerCase.split("\W+").toIterable.asJava)
                            .groupBy((_, word) => word)
                            .count("Counts")
                        wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic")

                        val streams: KafkaStreams = new KafkaStreams(builder, config)
                        streams.start()

                        Runtime.getRuntime.addShutdownHook(new Thread(() => {
                            streams.close(10, TimeUnit.SECONDS)
                        }))
                    }

                }

Rabobank is one of the 3 largest banks in the Netherlands. Its digital nervous system, the Business Event Bus, is powered by Apache Kafka and Kafka Streams. Learn More

As the leading online fashion retailer in Europe, Zalando uses Kafka as an ESB (Enterprise Service Bus), which helps us in transitioning from a monolithic to a micro services architecture. Using Kafka for processing event streams enables our technical team to do near-real time business intelligence. Learn More