惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

H
Hackread – Cybersecurity News, Data Breaches, AI and More
T
The Blog of Author Tim Ferriss
IT之家
IT之家
J
Java Code Geeks
C
Check Point Blog
F
Full Disclosure
B
Blog RSS Feed
Cyber Security Advisories - MS-ISAC
Cyber Security Advisories - MS-ISAC
freeCodeCamp Programming Tutorials: Python, JavaScript, Git & More
Engineering at Meta
Engineering at Meta
Recorded Future
Recorded Future
博客园 - 聂微东
Stack Overflow Blog
Stack Overflow Blog
M
MIT News - Artificial intelligence
大猫的无限游戏
大猫的无限游戏
让小产品的独立变现更简单 - ezindie.com
让小产品的独立变现更简单 - ezindie.com
奇客Solidot–传递最新科技情报
奇客Solidot–传递最新科技情报
月光博客
月光博客
The Cloudflare Blog
罗磊的独立博客
钛媒体:引领未来商业与生活新知
钛媒体:引领未来商业与生活新知
有赞技术团队
有赞技术团队
MongoDB | Blog
MongoDB | Blog
GbyAI
GbyAI
量子位
宝玉的分享
宝玉的分享
博客园 - 三生石上(FineUI控件)
F
Fortinet All Blogs
V
V2EX
人人都是产品经理
人人都是产品经理
A
About on SuperTechFans
D
Docker
MyScale Blog
MyScale Blog
P
Proofpoint News Feed
Blog — PlanetScale
Blog — PlanetScale
Recent Announcements
Recent Announcements
腾讯CDC
The GitHub Blog
The GitHub Blog
L
LangChain Blog
爱范儿
爱范儿
G
Google Developers Blog
OSCHINA 社区最新新闻
OSCHINA 社区最新新闻
阮一峰的网络日志
阮一峰的网络日志
U
Unit 42
酷 壳 – CoolShell
酷 壳 – CoolShell
博客园_首页
aimingoo的专栏
aimingoo的专栏
S
SegmentFault 最新的问题
Microsoft Security Blog
Microsoft Security Blog
V
Visual Studio Blog

Apache Kafka

Important configuration properties for Kafka broker Important configuration properties for the high-level consumer Kafka Configuration API Design API Design API Design API Design API Design API Design API Design Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Datacenters Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design
Introduction
2001-01-01 · via Apache Kafka

You are viewing documentation for an older version (1.0) of Kafka. For up-to-date documentation, see the latest version.

Kafka Streams

The easiest way to write mission-critical real-time applications and microservices

Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters. It combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka’s server-side cluster technology.

TOUR OF THE STREAMS API

1Intro to Streams

2Creating a Streams Application

3Transforming Data Pt. 1

4Transforming Data Pt. 11


Why you’ll love using Kafka Streams!

  • Elastic, highly scalable, fault-tolerant
  • Deploy to containers, VMs, bare metal, cloud
  • Equally viable for small, medium, & large use cases
  • Fully integrated with Kafka security
  • Write standard Java applications
  • Exactly-once processing semantics
  • No separate processing cluster required
  • Develop on Mac, Linux, Windows

Write your first app


Streams API use cases

The New York Times uses Apache Kafka and the Kafka Streams API to store and distribute, in real-time, published content to the various applications and systems that make it available to the readers.

As the leading online fashion retailer in Europe, Zalando uses Kafka as an ESB (Enterprise Service Bus), which helps us in transitioning from a monolithic to a micro services architecture. Using Kafka for processing event streams enables our technical team to do near-real time business intelligence.

LINE uses Apache Kafka as a central datahub for our services to communicate to one another. Hundreds of billions of messages are produced daily and are used to execute various business logic, threat detection, search indexing and data analysis. LINE leverages Kafka Streams to reliably transform and filter topics enabling sub topics consumers can efficiently consume, meanwhile retaining easy maintainability thanks to its sophisticated yet minimal code base.

Pinterest uses Apache Kafka and the Kafka Streams API at large scale to power the real-time, predictive budgeting system of their advertising infrastructure. With Kafka Streams, spend predictions are more accurate than ever.

Rabobank is one of the 3 largest banks in the Netherlands. Its digital nervous system, the Business Event Bus, is powered by Apache Kafka. It is used by an increasing amount of financial processes and services, one of which is Rabo Alerts. This service alerts customers in real-time upon financial events and is built using Kafka Streams.

Trivago is a global hotel search platform. We are focused on reshaping the way travelers search for and compare hotels, while enabling hotel advertisers to grow their businesses by providing access to a broad audience of travelers via our websites and apps. As of 2017, we offer access to approximately 1.8 million hotels and other accommodations in over 190 countries. We use Kafka, Kafka Connect, and Kafka Streams to enable our developers to access data freely in the company. Kafka Streams powers parts of our analytics pipeline and delivers endless options to explore and operate on the data sources we have at hand.

Hello Kafka Streams

The code example below implements a WordCount application that is elastic, highly scalable, fault-tolerant, stateful, and ready to run in production at large scale

import org.apache.kafka.common.serialization.Serdes;
                   import org.apache.kafka.common.utils.Bytes;
                   import org.apache.kafka.streams.KafkaStreams;
                   import org.apache.kafka.streams.StreamsBuilder;
                   import org.apache.kafka.streams.StreamsConfig;
                   import org.apache.kafka.streams.kstream.KStream;
                   import org.apache.kafka.streams.kstream.KTable;
                   import org.apache.kafka.streams.kstream.Materialized;
                   import org.apache.kafka.streams.kstream.Produced;
                   import org.apache.kafka.streams.state.KeyValueStore;

                   import java.util.Arrays;
                   import java.util.Properties;

                   public class WordCountApplication {

                       public static void main(final String[] args) throws Exception {
                           Properties config = new Properties();
                           config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
                           config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
                           config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
                           config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

                           StreamsBuilder builder = new StreamsBuilder();
                           KStream<String, String> textLines = builder.stream("TextLinesTopic");
                           KTable<String, Long> wordCounts = textLines
                               .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\W+")))
                               .groupBy((key, word) -> word)
                               .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));
                           wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));

                           KafkaStreams streams = new KafkaStreams(builder.build(), config);
                           streams.start();
                       }

                   }
               


                   import org.apache.kafka.common.serialization.Serdes;
                   import org.apache.kafka.common.utils.Bytes;
                   import org.apache.kafka.streams.KafkaStreams;
                   import org.apache.kafka.streams.StreamsBuilder;
                   import org.apache.kafka.streams.StreamsConfig;
                   import org.apache.kafka.streams.kstream.KStream;
                   import org.apache.kafka.streams.kstream.KTable;
                   import org.apache.kafka.streams.kstream.ValueMapper;
                   import org.apache.kafka.streams.kstream.KeyValueMapper;
                   import org.apache.kafka.streams.kstream.Materialized;
                   import org.apache.kafka.streams.kstream.Produced;
                   import org.apache.kafka.streams.state.KeyValueStore;

                   import java.util.Arrays;
                   import java.util.Properties;
       
                   public class WordCountApplication {
       
                       public static void main(final String[] args) throws Exception {
                           Properties config = new Properties();
                           config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
                           config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
                           config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
                           config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
       
                           StreamsBuilder builder = new StreamsBuilder();
                           KStream<String, String> textLines = builder.stream("TextLinesTopic");
                           KTable<String, Long> wordCounts = textLines
                               .flatMapValues(new ValueMapper<String, Iterable<String>>() {
                                   @Override
                                   public Iterable<String> apply(String textLine) {
                                       return Arrays.asList(textLine.toLowerCase().split("\W+"));
                                   }
                               })
                               .groupBy(new KeyValueMapper<String, String, String>() {
                                   @Override
                                   public String apply(String key, String word) {
                                       return word;
                                   }
                               })
                               .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));


                           wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));
       
                           KafkaStreams streams = new KafkaStreams(builder.build(), config);
                           streams.start();
                       }
       
                   }
               


                   import java.lang.Long
import java.util.Properties
                   import java.util.concurrent.TimeUnit

                   import org.apache.kafka.common.serialization._
                   import org.apache.kafka.common.utils.Bytes
                   import org.apache.kafka.streams._
                   import org.apache.kafka.streams.kstream.{KStream, KTable, Materialized, Produced}
                   import org.apache.kafka.streams.state.KeyValueStore

                   import scala.collection.JavaConverters.asJavaIterableConverter

                   object WordCountApplication {

                       def main(args: Array[String]) {
                           val config: Properties = {
                               val p = new Properties()
                               p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
                               p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092")
                               p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
                               p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
                               p
                           }

                           val builder: StreamsBuilder = new StreamsBuilder()
                           val textLines: KStream[String, String] = builder.stream("TextLinesTopic")
                           val wordCounts: KTable[String, Long] = textLines
                               .flatMapValues(textLine => textLine.toLowerCase.split("\W+").toIterable.asJava)
                               .groupBy((_, word) => word)
                               .count(Materialized.as("counts-store").asInstanceOf[Materialized[String, Long, KeyValueStore[Bytes, Array[Byte]]]])
                           wordCounts.toStream().to("WordsWithCountsTopic", Produced.`with`(Serdes.String(), Serdes.Long()))

                           val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
                           streams.start()

                           Runtime.getRuntime.addShutdownHook(new Thread(() => {
                               streams.close(10, TimeUnit.SECONDS)
                           }))
                       }

                   }