























SASL 就是给kafka添加用户权限管理认证
/home/admin/kafka/bitnami/kafka/config/kraft/server.properties

# KRaft 核心配置 process.roles=broker,controller node.id=1 controller.quorum.voters=1@localhost:9093 # 监听器配置 listeners=SASL_PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093 advertised.listeners=SASL_PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093 # 安全协议映射 listener.security.protocol.map=CONTROLLER:PLAINTEXT,SASL_SSL:SASL_SSL,SASL_PLAINTEXT:SASL_PLAINTEXT # Controller 监听器名称 controller.listener.names=CONTROLLER # broker 间通信配置 inter.broker.listener.name=SASL_PLAINTEXT # SASL 配置 sasl.enabled.mechanisms=SCRAM-SHA-512 sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512 # SCRAM-SHA-512 用户认证配置 listener.name.sasl_ssl.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="admin" \ password="adminauth123"; ######## ACL 账号限制 配置 super.users=User:admin;User:ANONYMOUS # ACL 对于 KRaft 模式的授权方式(类名) authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer # 日志配置 log.dirs=/tmp/kraft-combined-logs num.partitions=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1
server.properties

# KRaft 核心配置 process.roles=broker,controller node.id=1 controller.quorum.voters=1@kafkanode1:9093 # 监听器配置 listeners=SASL_PLAINTEXT://kafkanode1:9092,CONTROLLER://kafkanode1:9093 advertised.listeners=SASL_PLAINTEXT://kafkanode1:9092,CONTROLLER://kafkanode1:9093 # 安全协议映射 listener.security.protocol.map=CONTROLLER:PLAINTEXT,SASL_SSL:SASL_SSL,SASL_PLAINTEXT:SASL_PLAINTEXT # Controller 监听器名称 controller.listener.names=CONTROLLER # broker 间通信配置 inter.broker.listener.name=SASL_PLAINTEXT # SASL 配置 sasl.enabled.mechanisms=SCRAM-SHA-512 sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512 # SCRAM-SHA-512 用户认证配置 listener.name.sasl_ssl.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="admin" \ password="adminauth123"; ######## ACL 账号限制 配置 super.users=User:admin;User:ANONYMOUS # ACL 对于 KRaft 模式的授权方式(类名) authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer # 日志配置 log.dirs=/tmp/kraft-combined-logs num.partitions=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1
2-server.properties
bin/kafka-storage.sh random-uuid
bin/kafka-storage.sh format -t 0hDbylYqTXqJsW-Jhk1teA -c /home/admin/kafka/bitnami/kafka/config/kraft/server.properties --add-scram 'SCRAM-SHA-512=[name=admin,iterations=8192,password=adminauth123]'

KafkaServer { org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="adminauth123"; };
kafka_server_jaas.conf
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/admin/kafka/bitnami/kafka/kafka_server_jaas.conf"
bin/kafka-server-start.sh /home/admin/kafka/bitnami/kafka/config/kraft/server.properties

创建admin用户验证文件
/home/admin/kafka/bitnami/kafka/config/admin-user-jaas

security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="adminauth123";
admin-user-jaas
使用admin用户权限创建普通用户并且给普通用户授权
创建用户名密码
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'SCRAM-SHA-512=[password=ghca123]' --entity-type users --entity-name ghca --command-config /home/admin/kafka/bitnami/kafka/config/admin-user-jaas
给普通用户指定topic和消费组
bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --consumer --allow-principal User:ghca --topic testtopic --group '*' --command-config /home/admin/kafka/bitnami/kafka/config/admin-user-jaas

设置普通用户可以创建topic
bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /home/admin/kafka/bitnami/kafka/config/admin-user-jaas --add --allow-principal User:ghca --operation Create --cluster
创建普通用户鉴权文件

security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="ghca" password="ghca123";
command_config
1.创建topic
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test2 --partitions 1 --replication-factor 1 --command-config /home/admin/kafka/bitnami/kafka/config/command_config
2.设置普通用户权限
echo "为ghca用户配置生产者权限..."
bin/kafka-acls.sh --bootstrap-server localhost:9092 \
--command-config /home/admin/kafka/bitnami/kafka/config/admin-user-jaas \
--add --allow-principal User:ghca \
--operation Write --operation Describe \
--topic test2
echo "为ghca用户配置消费者权限..."
bin/kafka-acls.sh --bootstrap-server localhost:9092 \
--command-config /home/admin/kafka/bitnami/kafka/config/admin-user-jaas \
--add --allow-principal User:ghca \
--operation Read --operation Describe \
--topic test2 \
--group '*'
3.启动生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test2 --producer.config /home/admin/kafka/bitnami/kafka/config/command_config

4.启动消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test2 --consumer.config /home/admin/kafka/bitnami/kafka/config/command_config --from-beginning

1.生成证书文件


#!/usr/bin/env bash set -eu KEYSTORE_FILENAME="kafka.keystore.jks" VALIDITY_IN_DAYS=3650 DEFAULT_TRUSTSTORE_FILENAME="kafka.truststore.jks" TRUSTSTORE_WORKING_DIRECTORY="truststore" KEYSTORE_WORKING_DIRECTORY="keystore" CA_CERT_FILE="ca-cert" KEYSTORE_SIGN_REQUEST="cert-file" KEYSTORE_SIGN_REQUEST_SRL="ca-cert.srl" KEYSTORE_SIGNED_CERT="cert-signed" COUNTRY="bj" STATE="bj" OU="bj" CN="kafkanode1" LOCATION="bj" PASS="123456" function file_exists_and_exit() { echo "'$1' cannot exist. Move or delete it before" echo "re-running this script." exit 1 } if [ -e "$KEYSTORE_WORKING_DIRECTORY" ]; then file_exists_and_exit $KEYSTORE_WORKING_DIRECTORY fi if [ -e "$CA_CERT_FILE" ]; then file_exists_and_exit $CA_CERT_FILE fi if [ -e "$KEYSTORE_SIGN_REQUEST" ]; then file_exists_and_exit $KEYSTORE_SIGN_REQUEST fi if [ -e "$KEYSTORE_SIGN_REQUEST_SRL" ]; then file_exists_and_exit $KEYSTORE_SIGN_REQUEST_SRL fi if [ -e "$KEYSTORE_SIGNED_CERT" ]; then file_exists_and_exit $KEYSTORE_SIGNED_CERT fi echo "Welcome to the Kafka SSL keystore and trust store generator script." trust_store_file="" trust_store_private_key_file="" if [ -e "$TRUSTSTORE_WORKING_DIRECTORY" ]; then file_exists_and_exit $TRUSTSTORE_WORKING_DIRECTORY fi mkdir $TRUSTSTORE_WORKING_DIRECTORY echo echo "OK, we'll generate a trust store and associated private key." echo echo "First, the private key." echo openssl req -new -x509 -keyout $TRUSTSTORE_WORKING_DIRECTORY/ca-key \ -out $TRUSTSTORE_WORKING_DIRECTORY/ca-cert -days $VALIDITY_IN_DAYS -nodes \ -subj "/C=$COUNTRY/ST=$STATE/L=$LOCATION/O=$OU/CN=$CN" trust_store_private_key_file="$TRUSTSTORE_WORKING_DIRECTORY/ca-key" echo echo "Two files were created:" echo " - $TRUSTSTORE_WORKING_DIRECTORY/ca-key -- the private key used later to" echo " sign certificates" echo " - $TRUSTSTORE_WORKING_DIRECTORY/ca-cert -- the certificate that will be" echo " stored in the trust store in a moment and serve as the certificate" echo " authority (CA). Once this certificate has been stored in the trust" echo " store, it will be deleted. It can be retrieved from the trust store via:" echo " $ keytool -keystore <trust-store-file> -export -alias CARoot -rfc" echo echo "Now the trust store will be generated from the certificate." echo keytool -keystore $TRUSTSTORE_WORKING_DIRECTORY/$DEFAULT_TRUSTSTORE_FILENAME \ -alias CARoot -import -file $TRUSTSTORE_WORKING_DIRECTORY/ca-cert \ -noprompt -dname "C=$COUNTRY, ST=$STATE, L=$LOCATION, O=$OU, CN=$CN" -keypass $PASS -storepass $PASS trust_store_file="$TRUSTSTORE_WORKING_DIRECTORY/$DEFAULT_TRUSTSTORE_FILENAME" echo echo "$TRUSTSTORE_WORKING_DIRECTORY/$DEFAULT_TRUSTSTORE_FILENAME was created." # don't need the cert because it's in the trust store. rm $TRUSTSTORE_WORKING_DIRECTORY/$CA_CERT_FILE echo echo "Continuing with:" echo " - trust store file: $trust_store_file" echo " - trust store private key: $trust_store_private_key_file" mkdir $KEYSTORE_WORKING_DIRECTORY echo echo "Now, a keystore will be generated. Each broker and logical client needs its own" echo "keystore. This script will create only one keystore. Run this script multiple" echo "times for multiple keystores." echo echo " NOTE: currently in Kafka, the Common Name (CN) does not need to be the FQDN of" echo " this host. However, at some point, this may change. As such, make the CN" echo " the FQDN. Some operating systems call the CN prompt 'first / last name'" # To learn more about CNs and FQDNs, read: # https://docs.oracle.com/javase/7/docs/api/javax/net/ssl/X509ExtendedTrustManager.html keytool -keystore $KEYSTORE_WORKING_DIRECTORY/$KEYSTORE_FILENAME \ -alias localhost -validity $VALIDITY_IN_DAYS -genkey -keyalg RSA \ -noprompt -dname "C=$COUNTRY, ST=$STATE, L=$LOCATION, O=$OU, CN=$CN" -keypass $PASS -storepass $PASS echo echo "'$KEYSTORE_WORKING_DIRECTORY/$KEYSTORE_FILENAME' now contains a key pair and a" echo "self-signed certificate. Again, this keystore can only be used for one broker or" echo "one logical client. Other brokers or clients need to generate their own keystores." echo echo "Fetching the certificate from the trust store and storing in $CA_CERT_FILE." echo keytool -keystore $trust_store_file -export -alias CARoot -rfc -file $CA_CERT_FILE -keypass $PASS -storepass $PASS echo echo "Now a certificate signing request will be made to the keystore." echo keytool -keystore $KEYSTORE_WORKING_DIRECTORY/$KEYSTORE_FILENAME -alias localhost \ -certreq -file $KEYSTORE_SIGN_REQUEST -keypass $PASS -storepass $PASS echo echo "Now the trust store's private key (CA) will sign the keystore's certificate." echo openssl x509 -req -CA $CA_CERT_FILE -CAkey $trust_store_private_key_file \ -in $KEYSTORE_SIGN_REQUEST -out $KEYSTORE_SIGNED_CERT \ -days $VALIDITY_IN_DAYS -CAcreateserial # creates $KEYSTORE_SIGN_REQUEST_SRL which is never used or needed. echo echo "Now the CA will be imported into the keystore." echo keytool -keystore $KEYSTORE_WORKING_DIRECTORY/$KEYSTORE_FILENAME -alias CARoot \ -import -file $CA_CERT_FILE -keypass $PASS -storepass $PASS -noprompt rm $CA_CERT_FILE # delete the trust store cert because it's stored in the trust store. echo echo "Now the keystore's signed certificate will be imported back into the keystore." echo keytool -keystore $KEYSTORE_WORKING_DIRECTORY/$KEYSTORE_FILENAME -alias localhost -import \ -file $KEYSTORE_SIGNED_CERT -keypass $PASS -storepass $PASS echo echo "All done!" echo echo "Deleting intermediate files. They are:" echo " - '$KEYSTORE_SIGN_REQUEST_SRL': CA serial number" echo " - '$KEYSTORE_SIGN_REQUEST': the keystore's certificate signing request" echo " (that was fulfilled)" echo " - '$KEYSTORE_SIGNED_CERT': the keystore's certificate, signed by the CA, and stored back" echo " into the keystore" rm $KEYSTORE_SIGN_REQUEST_SRL rm $KEYSTORE_SIGN_REQUEST rm $KEYSTORE_SIGNED_CERT
kafka-generate-ssl-automatic.sh

2.修改kafka配置文件

# KRaft 核心配置 process.roles=broker,controller node.id=1 controller.quorum.voters=1@kafkanode1:9093 # 监听器配置 listeners=SASL_PLAINTEXT://kafkanode1:9092,CONTROLLER://kafkanode1:9093,SASL_SSL://kafkanode1:9094 advertised.listeners=SASL_PLAINTEXT://kafkanode1:9092,CONTROLLER://kafkanode1:9093,SASL_SSL://kafkanode1:9094 # 安全协议映射 listener.security.protocol.map=CONTROLLER:PLAINTEXT,SASL_SSL:SASL_SSL,SASL_PLAINTEXT:SASL_PLAINTEXT # Controller 监听器名称 controller.listener.names=CONTROLLER # broker 间通信配置 inter.broker.listener.name=SASL_SSL # SASL 配置 sasl.enabled.mechanisms=SCRAM-SHA-512 sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512 ssl.endpoint.identification.algorithm= ssl.client.auth=required # Broker security settings ssl.truststore.location=/home/admin/kafka/truststore/kafka.truststore.jks ssl.truststore.password=123456 ssl.keystore.location=/home/admin/kafka/keystore/kafka.keystore.jks ssl.keystore.password=123456 ssl.key.password=123456 # SCRAM-SHA-512 用户认证配置 listener.name.sasl_ssl.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="admin" \ password="adminauth123"; ######## ACL 账号限制 配置 super.users=User:admin;User:ANONYMOUS # ACL 对于 KRaft 模式的授权方式(类名) authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer # 日志配置 log.dirs=/tmp/kraft-combined-logs num.partitions=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1
server.properties

3. 重启kafka
4.创建admin管理员配置文件

security.protocol=SASL_SSL sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="admin" \ password="adminauth123"; ssl.truststore.location=/home/admin/kafka/truststore/kafka.truststore.jks ssl.truststore.password=123456
admin-client.properties
5.创建普通用户
bin/kafka-configs.sh --bootstrap-server kafkanode1:9094 \
--command-config /home/admin/kafka/bitnami/kafka/config/admin-client.properties \
--alter --add-config 'SCRAM-SHA-512=[password=ssluser123]' \
--entity-type users --entity-name ssluser
6.创建普通用户配置文件

security.protocol=SASL_SSL sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="ssluser" \ password="ssluser123"; ssl.truststore.location=/home/admin/kafka/truststore/kafka.truststore.jks ssl.truststore.password=123456
client-ssl.properties
7.管理员给普通用户设置权限
bin/kafka-acls.sh --bootstrap-server kafkanode1:9094\
--command-config /home/admin/kafka/bitnami/kafka/config/admin-client.properties \
--add --allow-principal User:ssluser \
--operation Create --cluster
bin/kafka-acls.sh --bootstrap-server kafkanode1:9094 \
--command-config /home/admin/kafka/bitnami/kafka/config/admin-client.properties \
--add --allow-principal User:ssluser \
--operation Read --operation Write --topic test3
bin/kafka-acls.sh --bootstrap-server kafkanode1:9094 \
--command-config /home/admin/kafka/bitnami/kafka/config/admin-client.properties \
--add --allow-principal User:ssluser \
--operation Read --operation Describe \
--topic test3 \
--group '*'
8.启动生产者
bin/kafka-topics.sh --bootstrap-server kafkanode1:9094 \
--command-config /home/admin/kafka/bitnami/kafka/config/client-ssl.properties \
--create --topic test3 --partitions 1 --replication-factor 1
bin/kafka-console-producer.sh --bootstrap-server kafkanode1:9094 \
--producer.config /home/admin/kafka/bitnami/kafka/config/client-ssl.properties \
--topic test3

9.启动消费者
bin/kafka-console-consumer.sh --bootstrap-server kafkanode1:9094 \
--consumer.config /home/admin/kafka/bitnami/kafka/config/client-ssl.properties \
--topic test3 --from-beginning

当客户端连接9092的时候,只要向kafka提供对应的用户名和密码即可,节点之间的数据交互还是明文传输
当客户端连接9094的时候,不仅需要kafka用户名密码还要提供ssl证书和密码,这样kafka节点之间的通信数据是以加密形式传输的
此内容由惯性聚合(RSS阅读器)自动聚合整理,仅供阅读参考。 原文来自 — 版权归原作者所有。