惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

S
Security @ Cisco Blogs
爱范儿
爱范儿
雷峰网
雷峰网
博客园 - 三生石上(FineUI控件)
人人都是产品经理
人人都是产品经理
Hugging Face - Blog
Hugging Face - Blog
WordPress大学
WordPress大学
F
Full Disclosure
博客园 - 聂微东
GbyAI
GbyAI
Blog — PlanetScale
Blog — PlanetScale
I
InfoQ
Cyber Security Advisories - MS-ISAC
Cyber Security Advisories - MS-ISAC
V
Visual Studio Blog
B
Blog
C
Check Point Blog
奇客Solidot–传递最新科技情报
奇客Solidot–传递最新科技情报
T
The Blog of Author Tim Ferriss
小众软件
小众软件
G
Google Developers Blog
H
Hackread – Cybersecurity News, Data Breaches, AI and More
D
Docker
钛媒体:引领未来商业与生活新知
钛媒体:引领未来商业与生活新知
美团技术团队
Martin Fowler
Martin Fowler
Microsoft Security Blog
Microsoft Security Blog
宝玉的分享
宝玉的分享
量子位
MongoDB | Blog
MongoDB | Blog
Microsoft Azure Blog
Microsoft Azure Blog
月光博客
月光博客
D
DataBreaches.Net
博客园 - 【当耐特】
博客园_首页
H
Help Net Security
IT之家
IT之家
OSCHINA 社区最新新闻
OSCHINA 社区最新新闻
Vercel News
Vercel News
大猫的无限游戏
大猫的无限游戏
博客园 - 司徒正美
A
About on SuperTechFans
U
Unit 42
J
Java Code Geeks
The Cloudflare Blog
Stack Overflow Blog
Stack Overflow Blog
让小产品的独立变现更简单 - ezindie.com
让小产品的独立变现更简单 - ezindie.com
freeCodeCamp Programming Tutorials: Python, JavaScript, Git & More
Y
Y Combinator Blog
Jina AI
Jina AI
腾讯CDC

Apache Kafka

Important configuration properties for Kafka broker Important configuration properties for the high-level consumer Kafka Configuration API Design API Design API Design API Design API Design API Design API Design Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Basic Kafka Operations Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Broker Configs Datacenters Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design Design
Introduction
2001-01-01 · via Apache Kafka

You are viewing documentation for an older version (2.5) of Kafka. For up-to-date documentation, see the latest version.

Kafka Streams

The easiest way to write mission-critical real-time applications and microservices

Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters. It combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka’s server-side cluster technology.

TOUR OF THE STREAMS API

1Intro to Streams

2Creating a Streams Application

3Transforming Data Pt. 1

4Transforming Data Pt. 11


Why you’ll love using Kafka Streams!

  • Elastic, highly scalable, fault-tolerant
  • Deploy to containers, VMs, bare metal, cloud
  • Equally viable for small, medium, & large use cases
  • Fully integrated with Kafka security
  • Write standard Java and Scala applications
  • Exactly-once processing semantics
  • No separate processing cluster required
  • Develop on Mac, Linux, Windows

Write your first app


Kafka Streams use cases

The New York Times uses Apache Kafka and the Kafka Streams API to store and distribute, in real-time, published content to the various applications and systems that make it available to the readers.

As the leading online fashion retailer in Europe, Zalando uses Kafka as an ESB (Enterprise Service Bus), which helps us in transitioning from a monolithic to a micro services architecture. Using Kafka for processing  event streams enables our technical team to do near-real time business intelligence.

LINE uses Apache Kafka as a central datahub for our services to communicate to one another. Hundreds of billions of messages are produced daily and are used to execute various business logic, threat detection, search indexing and data analysis. LINE leverages Kafka Streams to reliably transform and filter topics enabling sub topics consumers can efficiently consume, meanwhile retaining easy maintainability thanks to its sophisticated yet minimal code base.

Rabobank is one of the 3 largest banks in the Netherlands. Its digital nervous system, the Business Event Bus, is powered by Apache Kafka. It is used by an increasing amount of financial processes and services, one which is Rabo Alerts. This service alerts customers in real-time upon financial events and is built using Kafka Streams.

We use Apache Kafka (and Kafka Streams) to collect and ingest all of our game service logs (including analytics, server, or access logs). Apache Kafka has been one of the core components of our data pipeline from early 2015.

ironSource powers the growth of the world's top games, using Apache Kafka as the backbone infrastructure for the async messaging of millions of events per second that run through their industry-leading game growth platform. In addition ironSource uses the Kafka Streams API to handle multiple real-time use cases, such as budget management, monitoring and alerting.

Kpow is an enterprise-grade toolkit that provides a rich, data-oriented UI and secure API for Apache Kafka. It's designed to give engineers deep visibility and control over their Kafka clusters, Schema Registry, Kafka Connect, and Kafka Streams applications. Offering multiple deployment options for cloud or on-premise environments, Kpow works with any Kafka distribution and integrates with enterprise security systems for authentication and role-based access.

La Redoute, the digital platform for families, uses Kafka as a central nervous system to decouple its application through business events. It enables a decentralized, event-driven architecture bringing near-real-time data reporting, analytics and emerging AI-pipelines combining Kafka Connect, Kafka Streams and KSQL.

Nuuly, a clothing rental subscription from the Urban Outfitters family of brands, uses Kafka as a central nervous system to integrate our front-end customer experience with real-time inventory management and operations at our distribution center. Nuuly relies on Kafka Streams and Kafka Connect, coupled with data science and machine learning to provide in-the-moment business intelligence and to tailor a personalized rental experience to our customers.

Recursion uses Kafka Streams to power its data pipeline for its drug discovery efforts. Kafka is used to to coordinate various services across the company. For more information about the use case see this Kafka Summit talk.

At Schrödinger, Kafka powers our physics-based computational platform by feeding data into our predictive modeling, data analytics, and collaboration services thus enabling rapid exploration of chemical space.

More specifically, Kafka is used as a distributed high speed event bus while Kafka Connect and Kafka Streams are the basic components of our streaming Change Data Capture framework used by LiveDesign, our enterprise informatics solution.

Currently, Schrödinger processes billions of molecules per week and our Kafka-powered data pipeline enables us to scale our architecture easily and push this even further.

Hello Kafka Streams

The code example below implements a WordCount application that is elastic, highly scalable, fault-tolerant, stateful, and ready to run in production at large scale

import org.apache.kafka.common.serialization.Serdes;
                   import org.apache.kafka.common.utils.Bytes;
                   import org.apache.kafka.streams.KafkaStreams;
                   import org.apache.kafka.streams.StreamsBuilder;
                   import org.apache.kafka.streams.StreamsConfig;
                   import org.apache.kafka.streams.kstream.KStream;
                   import org.apache.kafka.streams.kstream.KTable;
                   import org.apache.kafka.streams.kstream.Materialized;
                   import org.apache.kafka.streams.kstream.Produced;
                   import org.apache.kafka.streams.state.KeyValueStore;

                   import java.util.Arrays;
                   import java.util.Properties;

                   public class WordCountApplication {

                       public static void main(final String[] args) throws Exception {
                           Properties props = new Properties();
                           props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
                           props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
                           props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
                           props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

                           StreamsBuilder builder = new StreamsBuilder();
                           KStream<String, String> textLines = builder.stream("TextLinesTopic");
                           KTable<String, Long> wordCounts = textLines
                               .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\W+")))
                               .groupBy((key, word) -> word)
                               .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));
                           wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));

                           KafkaStreams streams = new KafkaStreams(builder.build(), props);
                           streams.start();
                       }

                   }


                   import org.apache.kafka.common.serialization.Serdes;
                   import org.apache.kafka.common.utils.Bytes;
                   import org.apache.kafka.streams.KafkaStreams;
                   import org.apache.kafka.streams.StreamsBuilder;
                   import org.apache.kafka.streams.StreamsConfig;
                   import org.apache.kafka.streams.kstream.KStream;
                   import org.apache.kafka.streams.kstream.KTable;
                   import org.apache.kafka.streams.kstream.ValueMapper;
                   import org.apache.kafka.streams.kstream.KeyValueMapper;
                   import org.apache.kafka.streams.kstream.Materialized;
                   import org.apache.kafka.streams.kstream.Produced;
                   import org.apache.kafka.streams.state.KeyValueStore;

                   import java.util.Arrays;
                   import java.util.Properties;

                   public class WordCountApplication {

                       public static void main(final String[] args) throws Exception {
                           Properties props = new Properties();
                           props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
                           props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
                           props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
                           props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

                           StreamsBuilder builder = new StreamsBuilder();
                           KStream<String, String> textLines = builder.stream("TextLinesTopic");
                           KTable<String, Long> wordCounts = textLines
                               .flatMapValues(new ValueMapper<String, Iterable<String>>() {
                                   @Override
                                   public Iterable<String> apply(String textLine) {
                                       return Arrays.asList(textLine.toLowerCase().split("\W+"));
                                   }
                               })
                               .groupBy(new KeyValueMapper<String, String, String>() {
                                   @Override
                                   public String apply(String key, String word) {
                                       return word;
                                   }
                               })
                               .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));


                           wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));

                           KafkaStreams streams = new KafkaStreams(builder.build(), props);
                           streams.start();
                       }

                   }
import java.util.Properties
import java.util.concurrent.TimeUnit

import org.apache.kafka.streams.kstream.Materialized
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}

object WordCountApplication extends App {
  import Serdes._

  val props: Properties = {
    val p = new Properties()
    p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
    p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092")
    p
  }

  val builder: StreamsBuilder = new StreamsBuilder
  val textLines: KStream[String, String] = builder.stream[String, String]("TextLinesTopic")
  val wordCounts: KTable[String, Long] = textLines
    .flatMapValues(textLine => textLine.toLowerCase.split("\W+"))
    .groupBy((_, word) => word)
    .count()(Materialized.as("counts-store"))
  wordCounts.toStream.to("WordsWithCountsTopic")

  val streams: KafkaStreams = new KafkaStreams(builder.build(), props)
  streams.start()

  sys.ShutdownHookThread {
     streams.close(10, TimeUnit.SECONDS)
  }
}