惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

L
LangChain Blog
Martin Fowler
Martin Fowler
P
Palo Alto Networks Blog
MongoDB | Blog
MongoDB | Blog
A
About on SuperTechFans
Google DeepMind News
Google DeepMind News
博客园_首页
量子位
小众软件
小众软件
F
Full Disclosure
Vercel News
Vercel News
爱范儿
爱范儿
Engineering at Meta
Engineering at Meta
F
Fortinet All Blogs
博客园 - 聂微东
V
V2EX
Blog — PlanetScale
Blog — PlanetScale
罗磊的独立博客
WordPress大学
WordPress大学
D
Darknet – Hacking Tools, Hacker News & Cyber Security
Threat Intelligence Blog | Flashpoint
Threat Intelligence Blog | Flashpoint
T
Tor Project blog
Google DeepMind News
Google DeepMind News
M
MIT News - Artificial intelligence
L
Lohrmann on Cybersecurity
H
Hacker News: Front Page
Spread Privacy
Spread Privacy
AI
AI
C
Cyber Attacks, Cyber Crime and Cyber Security
C
CERT Recently Published Vulnerability Notes
D
Docker
钛媒体:引领未来商业与生活新知
钛媒体:引领未来商业与生活新知
Recorded Future
Recorded Future
L
LINUX DO - 热门话题
Microsoft Azure Blog
Microsoft Azure Blog
Recent Commits to openclaw:main
Recent Commits to openclaw:main
cs.AI updates on arXiv.org
cs.AI updates on arXiv.org
让小产品的独立变现更简单 - ezindie.com
让小产品的独立变现更简单 - ezindie.com
Latest news
Latest news
W
WeLiveSecurity
Application and Cybersecurity Blog
Application and Cybersecurity Blog
博客园 - 司徒正美
博客园 - 叶小钗
T
Threat Research - Cisco Blogs
P
Privacy International News Feed
O
OpenAI News
Help Net Security
Help Net Security
aimingoo的专栏
aimingoo的专栏
宝玉的分享
宝玉的分享
博客园 - Franky

博客园 - 不懂123

nodejs升级管理 elastic定时清除索引数据 sftp集成设置 kafka3.9集群部署 nginx扩展编译模块 rancher kafka部署 JumpServer使用示例 rancher kafka多监听配置 jenkins远程动态打包镜像 docker镜像仓库清理迁移 elastic单机多节点集群搭建 fastdfs编译升降版本 监控系统搭建集成实例 rancher服务启动异常 linux使用ssh免密连接windows主机 jenkins pipeline搭建 docker跨平台构建镜像 fastdfs系统异常 mongodb集群用户管理
kafka添加用户管理认证
不懂123 · 2026-04-13 · via 博客园 - 不懂123

kafka3.9 SCRAM部署

     SASL 就是给kafka添加用户权限管理认证

     /home/admin/kafka/bitnami/kafka/config/kraft/server.properties

# KRaft 核心配置
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@localhost:9093

# 监听器配置
listeners=SASL_PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093
advertised.listeners=SASL_PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093

# 安全协议映射
listener.security.protocol.map=CONTROLLER:PLAINTEXT,SASL_SSL:SASL_SSL,SASL_PLAINTEXT:SASL_PLAINTEXT

# Controller 监听器名称
controller.listener.names=CONTROLLER

# broker 间通信配置
inter.broker.listener.name=SASL_PLAINTEXT


# SASL 配置
sasl.enabled.mechanisms=SCRAM-SHA-512
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512

# SCRAM-SHA-512 用户认证配置
listener.name.sasl_ssl.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
   username="admin" \
   password="adminauth123";


######## ACL 账号限制 配置
super.users=User:admin;User:ANONYMOUS
# ACL 对于 KRaft 模式的授权方式(类名)
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer

# 日志配置
log.dirs=/tmp/kraft-combined-logs
num.partitions=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

server.properties

# KRaft 核心配置
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@kafkanode1:9093

# 监听器配置
listeners=SASL_PLAINTEXT://kafkanode1:9092,CONTROLLER://kafkanode1:9093
advertised.listeners=SASL_PLAINTEXT://kafkanode1:9092,CONTROLLER://kafkanode1:9093

# 安全协议映射
listener.security.protocol.map=CONTROLLER:PLAINTEXT,SASL_SSL:SASL_SSL,SASL_PLAINTEXT:SASL_PLAINTEXT

# Controller 监听器名称
controller.listener.names=CONTROLLER

# broker 间通信配置
inter.broker.listener.name=SASL_PLAINTEXT


# SASL 配置
sasl.enabled.mechanisms=SCRAM-SHA-512
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512

# SCRAM-SHA-512 用户认证配置
listener.name.sasl_ssl.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
   username="admin" \
   password="adminauth123";


######## ACL 账号限制 配置
super.users=User:admin;User:ANONYMOUS
# ACL 对于 KRaft 模式的授权方式(类名)
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer

# 日志配置
log.dirs=/tmp/kraft-combined-logs
num.partitions=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

2-server.properties

     bin/kafka-storage.sh random-uuid

     bin/kafka-storage.sh format -t 0hDbylYqTXqJsW-Jhk1teA -c /home/admin/kafka/bitnami/kafka/config/kraft/server.properties --add-scram 'SCRAM-SHA-512=[name=admin,iterations=8192,password=adminauth123]'

KafkaServer {
    org.apache.kafka.common.security.scram.ScramLoginModule required
    username="admin"
    password="adminauth123";
};

kafka_server_jaas.conf

      export KAFKA_OPTS="-Djava.security.auth.login.config=/home/admin/kafka/bitnami/kafka/kafka_server_jaas.conf"

      bin/kafka-server-start.sh /home/admin/kafka/bitnami/kafka/config/kraft/server.properties

       image

       创建admin用户验证文件

       /home/admin/kafka/bitnami/kafka/config/admin-user-jaas

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="adminauth123";

admin-user-jaas

   使用admin用户权限创建普通用户并且给普通用户授权

     创建用户名密码

      bin/kafka-configs.sh --bootstrap-server localhost:9092  --alter --add-config 'SCRAM-SHA-512=[password=ghca123]' --entity-type users --entity-name ghca --command-config /home/admin/kafka/bitnami/kafka/config/admin-user-jaas

     给普通用户指定topic和消费组

      bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --consumer --allow-principal User:ghca --topic testtopic --group '*' --command-config /home/admin/kafka/bitnami/kafka/config/admin-user-jaas

      image

       设置普通用户可以创建topic

       bin/kafka-acls.sh --bootstrap-server localhost:9092   --command-config /home/admin/kafka/bitnami/kafka/config/admin-user-jaas    --add --allow-principal User:ghca    --operation Create --cluster

普通用户操作

      创建普通用户鉴权文件

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="ghca" password="ghca123";

command_config

  1.创建topic

    ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test2 --partitions 1 --replication-factor 1 --command-config /home/admin/kafka/bitnami/kafka/config/command_config

  2.设置普通用户权限

    echo "为ghca用户配置生产者权限..."

   bin/kafka-acls.sh --bootstrap-server localhost:9092 \
    --command-config /home/admin/kafka/bitnami/kafka/config/admin-user-jaas \
    --add --allow-principal User:ghca \
    --operation Write --operation Describe \
    --topic test2

    echo "为ghca用户配置消费者权限..."
    bin/kafka-acls.sh --bootstrap-server localhost:9092 \
      --command-config /home/admin/kafka/bitnami/kafka/config/admin-user-jaas \
      --add --allow-principal User:ghca \
      --operation Read --operation Describe \
      --topic test2 \
     --group '*'

  3.启动生产者

     bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test2 --producer.config /home/admin/kafka/bitnami/kafka/config/command_config

      image

  4.启动消费者

     bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test2 --consumer.config /home/admin/kafka/bitnami/kafka/config/command_config --from-beginning

     image

 kafka3.9 SSL数据加密传输

     1.生成证书文件

      image

#!/usr/bin/env bash

set -eu

KEYSTORE_FILENAME="kafka.keystore.jks"
VALIDITY_IN_DAYS=3650
DEFAULT_TRUSTSTORE_FILENAME="kafka.truststore.jks"
TRUSTSTORE_WORKING_DIRECTORY="truststore"
KEYSTORE_WORKING_DIRECTORY="keystore"
CA_CERT_FILE="ca-cert"
KEYSTORE_SIGN_REQUEST="cert-file"
KEYSTORE_SIGN_REQUEST_SRL="ca-cert.srl"
KEYSTORE_SIGNED_CERT="cert-signed"

COUNTRY="bj"
STATE="bj"
OU="bj"
CN="kafkanode1"
LOCATION="bj"
PASS="123456"

function file_exists_and_exit() {
  echo "'$1' cannot exist. Move or delete it before"
  echo "re-running this script."
  exit 1
}

if [ -e "$KEYSTORE_WORKING_DIRECTORY" ]; then
  file_exists_and_exit $KEYSTORE_WORKING_DIRECTORY
fi

if [ -e "$CA_CERT_FILE" ]; then
  file_exists_and_exit $CA_CERT_FILE
fi

if [ -e "$KEYSTORE_SIGN_REQUEST" ]; then
  file_exists_and_exit $KEYSTORE_SIGN_REQUEST
fi

if [ -e "$KEYSTORE_SIGN_REQUEST_SRL" ]; then
  file_exists_and_exit $KEYSTORE_SIGN_REQUEST_SRL
fi

if [ -e "$KEYSTORE_SIGNED_CERT" ]; then
  file_exists_and_exit $KEYSTORE_SIGNED_CERT
fi

echo "Welcome to the Kafka SSL keystore and trust store generator script."

trust_store_file=""
trust_store_private_key_file=""

  if [ -e "$TRUSTSTORE_WORKING_DIRECTORY" ]; then
    file_exists_and_exit $TRUSTSTORE_WORKING_DIRECTORY
  fi

  mkdir $TRUSTSTORE_WORKING_DIRECTORY
  echo
  echo "OK, we'll generate a trust store and associated private key."
  echo
  echo "First, the private key."
  echo

  openssl req -new -x509 -keyout $TRUSTSTORE_WORKING_DIRECTORY/ca-key \
    -out $TRUSTSTORE_WORKING_DIRECTORY/ca-cert -days $VALIDITY_IN_DAYS -nodes \
    -subj "/C=$COUNTRY/ST=$STATE/L=$LOCATION/O=$OU/CN=$CN"

  trust_store_private_key_file="$TRUSTSTORE_WORKING_DIRECTORY/ca-key"

  echo
  echo "Two files were created:"
  echo " - $TRUSTSTORE_WORKING_DIRECTORY/ca-key -- the private key used later to"
  echo "   sign certificates"
  echo " - $TRUSTSTORE_WORKING_DIRECTORY/ca-cert -- the certificate that will be"
  echo "   stored in the trust store in a moment and serve as the certificate"
  echo "   authority (CA). Once this certificate has been stored in the trust"
  echo "   store, it will be deleted. It can be retrieved from the trust store via:"
  echo "   $ keytool -keystore <trust-store-file> -export -alias CARoot -rfc"

  echo
  echo "Now the trust store will be generated from the certificate."
  echo

  keytool -keystore $TRUSTSTORE_WORKING_DIRECTORY/$DEFAULT_TRUSTSTORE_FILENAME \
    -alias CARoot -import -file $TRUSTSTORE_WORKING_DIRECTORY/ca-cert \
    -noprompt -dname "C=$COUNTRY, ST=$STATE, L=$LOCATION, O=$OU, CN=$CN" -keypass $PASS -storepass $PASS

  trust_store_file="$TRUSTSTORE_WORKING_DIRECTORY/$DEFAULT_TRUSTSTORE_FILENAME"

  echo
  echo "$TRUSTSTORE_WORKING_DIRECTORY/$DEFAULT_TRUSTSTORE_FILENAME was created."

  # don't need the cert because it's in the trust store.
  rm $TRUSTSTORE_WORKING_DIRECTORY/$CA_CERT_FILE

echo
echo "Continuing with:"
echo " - trust store file:        $trust_store_file"
echo " - trust store private key: $trust_store_private_key_file"

mkdir $KEYSTORE_WORKING_DIRECTORY

echo
echo "Now, a keystore will be generated. Each broker and logical client needs its own"
echo "keystore. This script will create only one keystore. Run this script multiple"
echo "times for multiple keystores."
echo
echo "     NOTE: currently in Kafka, the Common Name (CN) does not need to be the FQDN of"
echo "           this host. However, at some point, this may change. As such, make the CN"
echo "           the FQDN. Some operating systems call the CN prompt 'first / last name'"

# To learn more about CNs and FQDNs, read:
# https://docs.oracle.com/javase/7/docs/api/javax/net/ssl/X509ExtendedTrustManager.html

keytool -keystore $KEYSTORE_WORKING_DIRECTORY/$KEYSTORE_FILENAME \
  -alias localhost -validity $VALIDITY_IN_DAYS -genkey -keyalg RSA \
   -noprompt -dname "C=$COUNTRY, ST=$STATE, L=$LOCATION, O=$OU, CN=$CN" -keypass $PASS -storepass $PASS

echo
echo "'$KEYSTORE_WORKING_DIRECTORY/$KEYSTORE_FILENAME' now contains a key pair and a"
echo "self-signed certificate. Again, this keystore can only be used for one broker or"
echo "one logical client. Other brokers or clients need to generate their own keystores."

echo
echo "Fetching the certificate from the trust store and storing in $CA_CERT_FILE."
echo

keytool -keystore $trust_store_file -export -alias CARoot -rfc -file $CA_CERT_FILE -keypass $PASS -storepass $PASS

echo
echo "Now a certificate signing request will be made to the keystore."
echo
keytool -keystore $KEYSTORE_WORKING_DIRECTORY/$KEYSTORE_FILENAME -alias localhost \
  -certreq -file $KEYSTORE_SIGN_REQUEST -keypass $PASS -storepass $PASS

echo
echo "Now the trust store's private key (CA) will sign the keystore's certificate."
echo
openssl x509 -req -CA $CA_CERT_FILE -CAkey $trust_store_private_key_file \
  -in $KEYSTORE_SIGN_REQUEST -out $KEYSTORE_SIGNED_CERT \
  -days $VALIDITY_IN_DAYS -CAcreateserial
# creates $KEYSTORE_SIGN_REQUEST_SRL which is never used or needed.

echo
echo "Now the CA will be imported into the keystore."
echo
keytool -keystore $KEYSTORE_WORKING_DIRECTORY/$KEYSTORE_FILENAME -alias CARoot \
  -import -file $CA_CERT_FILE -keypass $PASS -storepass $PASS -noprompt
rm $CA_CERT_FILE # delete the trust store cert because it's stored in the trust store.

echo
echo "Now the keystore's signed certificate will be imported back into the keystore."
echo
keytool -keystore $KEYSTORE_WORKING_DIRECTORY/$KEYSTORE_FILENAME -alias localhost -import \
  -file $KEYSTORE_SIGNED_CERT -keypass $PASS -storepass $PASS

echo
echo "All done!"
echo
echo "Deleting intermediate files. They are:"
echo " - '$KEYSTORE_SIGN_REQUEST_SRL': CA serial number"
echo " - '$KEYSTORE_SIGN_REQUEST': the keystore's certificate signing request"
echo "   (that was fulfilled)"
echo " - '$KEYSTORE_SIGNED_CERT': the keystore's certificate, signed by the CA, and stored back"
echo "    into the keystore"

  rm $KEYSTORE_SIGN_REQUEST_SRL
  rm $KEYSTORE_SIGN_REQUEST
  rm $KEYSTORE_SIGNED_CERT

kafka-generate-ssl-automatic.sh

      image

    2.修改kafka配置文件

# KRaft 核心配置
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@kafkanode1:9093

# 监听器配置
listeners=SASL_PLAINTEXT://kafkanode1:9092,CONTROLLER://kafkanode1:9093,SASL_SSL://kafkanode1:9094
advertised.listeners=SASL_PLAINTEXT://kafkanode1:9092,CONTROLLER://kafkanode1:9093,SASL_SSL://kafkanode1:9094

# 安全协议映射
listener.security.protocol.map=CONTROLLER:PLAINTEXT,SASL_SSL:SASL_SSL,SASL_PLAINTEXT:SASL_PLAINTEXT

# Controller 监听器名称
controller.listener.names=CONTROLLER

# broker 间通信配置
inter.broker.listener.name=SASL_SSL


# SASL 配置
sasl.enabled.mechanisms=SCRAM-SHA-512
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512

ssl.endpoint.identification.algorithm=
ssl.client.auth=required

# Broker security settings
ssl.truststore.location=/home/admin/kafka/truststore/kafka.truststore.jks
ssl.truststore.password=123456
ssl.keystore.location=/home/admin/kafka/keystore/kafka.keystore.jks
ssl.keystore.password=123456
ssl.key.password=123456


# SCRAM-SHA-512 用户认证配置
listener.name.sasl_ssl.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
   username="admin" \
   password="adminauth123";





######## ACL 账号限制 配置
super.users=User:admin;User:ANONYMOUS
# ACL 对于 KRaft 模式的授权方式(类名)
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer

# 日志配置
log.dirs=/tmp/kraft-combined-logs
num.partitions=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

server.properties

     image

  3. 重启kafka

  4.创建admin管理员配置文件

security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
  username="admin" \
  password="adminauth123";
ssl.truststore.location=/home/admin/kafka/truststore/kafka.truststore.jks
ssl.truststore.password=123456

admin-client.properties

   5.创建普通用户

     bin/kafka-configs.sh --bootstrap-server kafkanode1:9094 \
      --command-config /home/admin/kafka/bitnami/kafka/config/admin-client.properties \
     --alter --add-config 'SCRAM-SHA-512=[password=ssluser123]' \
     --entity-type users --entity-name ssluser

   6.创建普通用户配置文件

security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
  username="ssluser" \
  password="ssluser123";

ssl.truststore.location=/home/admin/kafka/truststore/kafka.truststore.jks
ssl.truststore.password=123456

client-ssl.properties

   7.管理员给普通用户设置权限

      bin/kafka-acls.sh --bootstrap-server kafkanode1:9094\
      --command-config /home/admin/kafka/bitnami/kafka/config/admin-client.properties \
      --add --allow-principal User:ssluser \
      --operation Create --cluster

    bin/kafka-acls.sh --bootstrap-server kafkanode1:9094 \
     --command-config /home/admin/kafka/bitnami/kafka/config/admin-client.properties \
     --add --allow-principal User:ssluser \
     --operation Read --operation Write --topic test3

   bin/kafka-acls.sh --bootstrap-server kafkanode1:9094 \
   --command-config /home/admin/kafka/bitnami/kafka/config/admin-client.properties \
   --add --allow-principal User:ssluser \
   --operation Read --operation Describe \
   --topic test3 \
   --group '*'

8.启动生产者

       bin/kafka-topics.sh --bootstrap-server kafkanode1:9094 \
         --command-config /home/admin/kafka/bitnami/kafka/config/client-ssl.properties \
         --create --topic test3 --partitions 1 --replication-factor 1

     bin/kafka-console-producer.sh --bootstrap-server kafkanode1:9094 \
     --producer.config /home/admin/kafka/bitnami/kafka/config/client-ssl.properties \
      --topic test3

    image

9.启动消费者

      bin/kafka-console-consumer.sh --bootstrap-server kafkanode1:9094 \
      --consumer.config /home/admin/kafka/bitnami/kafka/config/client-ssl.properties \
      --topic test3 --from-beginning

  image

   当客户端连接9092的时候,只要向kafka提供对应的用户名和密码即可,节点之间的数据交互还是明文传输

   当客户端连接9094的时候,不仅需要kafka用户名密码还要提供ssl证书和密码,这样kafka节点之间的通信数据是以加密形式传输的