惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

H
Hackread – Cybersecurity News, Data Breaches, AI and More
T
The Blog of Author Tim Ferriss
IT之家
IT之家
J
Java Code Geeks
C
Check Point Blog
F
Full Disclosure
B
Blog RSS Feed
Cyber Security Advisories - MS-ISAC
Cyber Security Advisories - MS-ISAC
freeCodeCamp Programming Tutorials: Python, JavaScript, Git & More
Engineering at Meta
Engineering at Meta
Recorded Future
Recorded Future
博客园 - 聂微东
Stack Overflow Blog
Stack Overflow Blog
M
MIT News - Artificial intelligence
大猫的无限游戏
大猫的无限游戏
让小产品的独立变现更简单 - ezindie.com
让小产品的独立变现更简单 - ezindie.com
奇客Solidot–传递最新科技情报
奇客Solidot–传递最新科技情报
月光博客
月光博客
The Cloudflare Blog
罗磊的独立博客
钛媒体:引领未来商业与生活新知
钛媒体:引领未来商业与生活新知
有赞技术团队
有赞技术团队
MongoDB | Blog
MongoDB | Blog
GbyAI
GbyAI
量子位
宝玉的分享
宝玉的分享
博客园 - 三生石上(FineUI控件)
F
Fortinet All Blogs
V
V2EX
人人都是产品经理
人人都是产品经理
A
About on SuperTechFans
D
Docker
MyScale Blog
MyScale Blog
P
Proofpoint News Feed
Blog — PlanetScale
Blog — PlanetScale
Recent Announcements
Recent Announcements
腾讯CDC
The GitHub Blog
The GitHub Blog
L
LangChain Blog
爱范儿
爱范儿
G
Google Developers Blog
OSCHINA 社区最新新闻
OSCHINA 社区最新新闻
阮一峰的网络日志
阮一峰的网络日志
U
Unit 42
酷 壳 – CoolShell
酷 壳 – CoolShell
博客园_首页
aimingoo的专栏
aimingoo的专栏
S
SegmentFault 最新的问题
Microsoft Security Blog
Microsoft Security Blog
V
Visual Studio Blog

Apache Kafka

Important configuration properties for Kafka broker Important configuration properties for the high-level consumer Kafka Configuration API Design API Design API Design API Design API Design API Design API Design Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Datacenters Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design
Introduction
2001-01-01 · via Apache Kafka

You are viewing documentation for an older version (1.1) of Kafka. For up-to-date documentation, see the latest version.

Kafka Streams

The easiest way to write mission-critical real-time applications and microservices

Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters. It combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka’s server-side cluster technology.

TOUR OF THE STREAMS API

1Intro to Streams

2Creating a Streams Application

3Transforming Data Pt. 1

4Transforming Data Pt. 11


Why you’ll love using Kafka Streams!

  • Elastic, highly scalable, fault-tolerant
  • Deploy to containers, VMs, bare metal, cloud
  • Equally viable for small, medium, & large use cases
  • Fully integrated with Kafka security
  • Write standard Java applications
  • Exactly-once processing semantics
  • No separate processing cluster required
  • Develop on Mac, Linux, Windows

Write your first app


Kafka Streams use cases

The New York Times uses Apache Kafka and the Kafka Streams API to store and distribute, in real-time, published content to the various applications and systems that make it available to the readers.

As the leading online fashion retailer in Europe, Zalando uses Kafka as an ESB (Enterprise Service Bus), which helps us in transitioning from a monolithic to a micro services architecture. Using Kafka for processing  event streams enables our technical team to do near-real time business intelligence.

LINE uses Apache Kafka as a central datahub for our services to communicate to one another. Hundreds of billions of messages are produced daily and are used to execute various business logic, threat detection, search indexing and data analysis. LINE leverages Kafka Streams to reliably transform and filter topics enabling sub topics consumers can efficiently consume, meanwhile retaining easy maintainability thanks to its sophisticated yet minimal code base.

Rabobank is one of the 3 largest banks in the Netherlands. Its digital nervous system, the Business Event Bus, is powered by Apache Kafka. It is used by an increasing amount of financial processes and services, one which is Rabo Alerts. This service alerts customers in real-time upon financial events and is built using Kafka Streams.

We use Apache Kafka (and Kafka Streams) to collect and ingest all of our game service logs (including analytics, server, or access logs). Apache Kafka has been one of the core components of our data pipeline from early 2015.

ironSource powers the growth of the world's top games, using Apache Kafka as the backbone infrastructure for the async messaging of millions of events per second that run through their industry-leading game growth platform. In addition ironSource uses the Kafka Streams API to handle multiple real-time use cases, such as budget management, monitoring and alerting.

Kpow is an enterprise-grade toolkit that provides a rich, data-oriented UI and secure API for Apache Kafka. It's designed to give engineers deep visibility and control over their Kafka clusters, Schema Registry, Kafka Connect, and Kafka Streams applications. Offering multiple deployment options for cloud or on-premise environments, Kpow works with any Kafka distribution and integrates with enterprise security systems for authentication and role-based access.

La Redoute, the digital platform for families, uses Kafka as a central nervous system to decouple its application through business events. It enables a decentralized, event-driven architecture bringing near-real-time data reporting, analytics and emerging AI-pipelines combining Kafka Connect, Kafka Streams and KSQL.

Nuuly, a clothing rental subscription from the Urban Outfitters family of brands, uses Kafka as a central nervous system to integrate our front-end customer experience with real-time inventory management and operations at our distribution center. Nuuly relies on Kafka Streams and Kafka Connect, coupled with data science and machine learning to provide in-the-moment business intelligence and to tailor a personalized rental experience to our customers.

Recursion uses Kafka Streams to power its data pipeline for its drug discovery efforts. Kafka is used to to coordinate various services across the company. For more information about the use case see this Kafka Summit talk.

At Schrödinger, Kafka powers our physics-based computational platform by feeding data into our predictive modeling, data analytics, and collaboration services thus enabling rapid exploration of chemical space.

More specifically, Kafka is used as a distributed high speed event bus while Kafka Connect and Kafka Streams are the basic components of our streaming Change Data Capture framework used by LiveDesign, our enterprise informatics solution.

Currently, Schrödinger processes billions of molecules per week and our Kafka-powered data pipeline enables us to scale our architecture easily and push this even further.

Hello Kafka Streams

The code example below implements a WordCount application that is elastic, highly scalable, fault-tolerant, stateful, and ready to run in production at large scale

import org.apache.kafka.common.serialization.Serdes;
                   import org.apache.kafka.common.utils.Bytes;
                   import org.apache.kafka.streams.KafkaStreams;
                   import org.apache.kafka.streams.StreamsBuilder;
                   import org.apache.kafka.streams.StreamsConfig;
                   import org.apache.kafka.streams.kstream.KStream;
                   import org.apache.kafka.streams.kstream.KTable;
                   import org.apache.kafka.streams.kstream.Materialized;
                   import org.apache.kafka.streams.kstream.Produced;
                   import org.apache.kafka.streams.state.KeyValueStore;

                   import java.util.Arrays;
                   import java.util.Properties;

                   public class WordCountApplication {

                       public static void main(final String[] args) throws Exception {
                           Properties config = new Properties();
                           config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
                           config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
                           config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
                           config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

                           StreamsBuilder builder = new StreamsBuilder();
                           KStream<String, String> textLines = builder.stream("TextLinesTopic");
                           KTable<String, Long> wordCounts = textLines
                               .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\W+")))
                               .groupBy((key, word) -> word)
                               .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));
                           wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));

                           KafkaStreams streams = new KafkaStreams(builder.build(), config);
                           streams.start();
                       }

                   }
               


                   import org.apache.kafka.common.serialization.Serdes;
                   import org.apache.kafka.common.utils.Bytes;
                   import org.apache.kafka.streams.KafkaStreams;
                   import org.apache.kafka.streams.StreamsBuilder;
                   import org.apache.kafka.streams.StreamsConfig;
                   import org.apache.kafka.streams.kstream.KStream;
                   import org.apache.kafka.streams.kstream.KTable;
                   import org.apache.kafka.streams.kstream.ValueMapper;
                   import org.apache.kafka.streams.kstream.KeyValueMapper;
                   import org.apache.kafka.streams.kstream.Materialized;
                   import org.apache.kafka.streams.kstream.Produced;
                   import org.apache.kafka.streams.state.KeyValueStore;

                   import java.util.Arrays;
                   import java.util.Properties;
       
                   public class WordCountApplication {
       
                       public static void main(final String[] args) throws Exception {
                           Properties config = new Properties();
                           config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
                           config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
                           config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
                           config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
       
                           StreamsBuilder builder = new StreamsBuilder();
                           KStream<String, String> textLines = builder.stream("TextLinesTopic");
                           KTable<String, Long> wordCounts = textLines
                               .flatMapValues(new ValueMapper<String, Iterable<String>>() {
                                   @Override
                                   public Iterable<String> apply(String textLine) {
                                       return Arrays.asList(textLine.toLowerCase().split("\W+"));
                                   }
                               })
                               .groupBy(new KeyValueMapper<String, String, String>() {
                                   @Override
                                   public String apply(String key, String word) {
                                       return word;
                                   }
                               })
                               .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));


                           wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));
       
                           KafkaStreams streams = new KafkaStreams(builder.build(), config);
                           streams.start();
                       }
       
                   }
               


                   import java.lang.Long
import java.util.Properties
                   import java.util.concurrent.TimeUnit

                   import org.apache.kafka.common.serialization._
                   import org.apache.kafka.common.utils.Bytes
                   import org.apache.kafka.streams._
                   import org.apache.kafka.streams.kstream.{KStream, KTable, Materialized, Produced}
                   import org.apache.kafka.streams.state.KeyValueStore

                   import scala.collection.JavaConverters.asJavaIterableConverter

                   object WordCountApplication {

                       def main(args: Array[String]) {
                           val config: Properties = {
                               val p = new Properties()
                               p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
                               p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092")
                               p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
                               p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
                               p
                           }

                           val builder: StreamsBuilder = new StreamsBuilder()
                           val textLines: KStream[String, String] = builder.stream("TextLinesTopic")
                           val wordCounts: KTable[String, Long] = textLines
                               .flatMapValues(textLine => textLine.toLowerCase.split("\W+").toIterable.asJava)
                               .groupBy((_, word) => word)
                               .count(Materialized.as("counts-store").asInstanceOf[Materialized[String, Long, KeyValueStore[Bytes, Array[Byte]]]])
                           wordCounts.toStream().to("WordsWithCountsTopic", Produced.`with`(Serdes.String(), Serdes.Long()))

                           val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
                           streams.start()

                           Runtime.getRuntime.addShutdownHook(new Thread(() => {
                               streams.close(10, TimeUnit.SECONDS)
                           }))
                       }

                   }