惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

S
Security @ Cisco Blogs
爱范儿
爱范儿
雷峰网
雷峰网
博客园 - 三生石上(FineUI控件)
人人都是产品经理
人人都是产品经理
Hugging Face - Blog
Hugging Face - Blog
WordPress大学
WordPress大学
F
Full Disclosure
博客园 - 聂微东
GbyAI
GbyAI
Blog — PlanetScale
Blog — PlanetScale
I
InfoQ
Cyber Security Advisories - MS-ISAC
Cyber Security Advisories - MS-ISAC
V
Visual Studio Blog
B
Blog
C
Check Point Blog
奇客Solidot–传递最新科技情报
奇客Solidot–传递最新科技情报
T
The Blog of Author Tim Ferriss
小众软件
小众软件
G
Google Developers Blog
H
Hackread – Cybersecurity News, Data Breaches, AI and More
D
Docker
钛媒体:引领未来商业与生活新知
钛媒体:引领未来商业与生活新知
美团技术团队
Martin Fowler
Martin Fowler
Microsoft Security Blog
Microsoft Security Blog
宝玉的分享
宝玉的分享
量子位
MongoDB | Blog
MongoDB | Blog
Microsoft Azure Blog
Microsoft Azure Blog
月光博客
月光博客
D
DataBreaches.Net
博客园 - 【当耐特】
博客园_首页
H
Help Net Security
IT之家
IT之家
OSCHINA 社区最新新闻
OSCHINA 社区最新新闻
Vercel News
Vercel News
大猫的无限游戏
大猫的无限游戏
博客园 - 司徒正美
A
About on SuperTechFans
U
Unit 42
J
Java Code Geeks
The Cloudflare Blog
Stack Overflow Blog
Stack Overflow Blog
让小产品的独立变现更简单 - ezindie.com
让小产品的独立变现更简单 - ezindie.com
freeCodeCamp Programming Tutorials: Python, JavaScript, Git & More
Y
Y Combinator Blog
Jina AI
Jina AI
腾讯CDC

Apache Kafka

Important configuration properties for Kafka broker Important configuration properties for the high-level consumer Kafka Configuration API Design API Design API Design API Design API Design API Design API Design Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Datacenters Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design
Writing a Streams Application
2001-01-01 · via Apache Kafka

You are viewing documentation for an older version (1.0) of Kafka. For up-to-date documentation, see the latest version.

Any Java application that makes use of the Kafka Streams library is considered a Kafka Streams application. The computational logic of a Kafka Streams application is defined as a processor topology, which is a graph of stream processors (nodes) and streams (edges).

You can define the processor topology with the Kafka Streams APIs:

Kafka Streams DSL A high-level API that provides provides the most common data transformation operations such as map, filter, join, and aggregations out of the box. The DSL is the recommended starting point for developers new to Kafka Streams, and should cover many use cases and stream processing needs. Processor API A low-level API that lets you add and connect processors as well as interact directly with state stores. The Processor API provides you with even more flexibility than the DSL but at the expense of requiring more manual work on the side of the application developer (e.g., more lines of code).

Libraries and Maven artifacts

This section lists the Kafka Streams related libraries that are available for writing your Kafka Streams applications.

You can define dependencies on the following libraries for your Kafka Streams applications.

Group ID

Artifact ID

Version

Description

org.apache.kafka

kafka-streams

1.0.2

(Required) Base library for Kafka Streams.

org.apache.kafka

kafka-clients

1.0.2

(Required) Kafka client library. Contains built-in serializers/deserializers.

Tip

See the section Data Types and Serialization for more information about Serializers/Deserializers.

Example pom.xml snippet when using Maven:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>1.0.2</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.0.2</version>
</dependency>

Using Kafka Streams within your application code

You can call Kafka Streams from anywhere in your application code, but usually these calls are made within the main() method of your application, or some variant thereof. The basic elements of defining a processing topology within your application are described below.

First, you must create an instance of KafkaStreams.

  • The first argument of the KafkaStreams constructor takes a topology (either StreamsBuilder#build() for the DSL or Topology for the Processor API) that is used to define a topology.
  • The second argument is an instance of StreamsConfig, which defines the configuration for this specific topology.

Code example:

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.StreamsBuilder;
import org.apache.kafka.streams.processor.Topology;

// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.  We will cover this in detail in the subsequent
// sections of this Developer Guide.

StreamsBuilder builder = ...;  // when using the DSL
Topology topology = builder.build();
//
// OR
//
Topology topology = ...; // when using the Processor API

// Use the configuration to tell your application where the Kafka cluster is,
// which Serializers/Deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;

KafkaStreams streams = new KafkaStreams(topology, config);

At this point, internal structures are initialized, but the processing is not started yet. You have to explicitly start the Kafka Streams thread by calling the KafkaStreams#start() method:

// Start the Kafka Streams threads
streams.start();

If there are other instances of this stream processing application running elsewhere (e.g., on another machine), Kafka Streams transparently re-assigns tasks from the existing instances to the new instance that you just started. For more information, see Stream Partitions and Tasks and Threading Model.

To catch any unexpected exceptions, you can set an java.lang.Thread.UncaughtExceptionHandler before you start the application. This handler is called whenever a stream thread is terminated by an unexpected exception:

// Java 8+, using lambda expressions
streams.setUncaughtExceptionHandler((Thread thread, Throwable throwable) -> {
  // here you should examine the throwable/exception and perform an appropriate action!
});


// Java 7
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
  public void uncaughtException(Thread thread, Throwable throwable) {
    // here you should examine the throwable/exception and perform an appropriate action!
  }
});

To stop the application instance, call the KafkaStreams#close() method:

// Stop the Kafka Streams threads
streams.close();

To allow your application to gracefully shutdown in response to SIGTERM, it is recommended that you add a shutdown hook and call KafkaStreams#close.

  • Here is a shutdown hook example in Java 8+:

// Add shutdown hook to stop the Kafka Streams threads. // You can optionally provide a timeout to close. Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

  • Here is a shutdown hook example in Java 7:

// Add shutdown hook to stop the Kafka Streams threads. // You can optionally provide a timeout to close. Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { streams.close(); } }));

After an application is stopped, Kafka Streams will migrate any tasks that had been running in this instance to available remaining instances.