、先配置对用户(SASL/PLAIN)

1,kafka安装目录下的config下的server.properties 复制了 一份在复制上面进行改动 server-sasl.properties 修改文件

添加下面的配置  ip和端口改成自己需要

listeners=SASL_PLAINTEXT://localhost:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN

2,config目录添加kafka_server_jaas.conf 文件 此文件是服务端 设置用户名 和密码

KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="1234admin"
user_admin="1234admin"
user_write="123456"
user_read="123456"
};
其中 Kafka 定义了关键字KafkaServer字段用于指定服务端登录配置。该配置通过org.apache.
org.apache.kafka.common.security.plain.PlainLoginModule由指定采用PLAIN 机制, 定义了两个用户, 用户通
过usemame 和password 指定该代理与集群其他代理初始化连接的用户名和密码, 通过“ user_ "
为前缀后接用户名方式创建连接代理的用户名和密码,例如, user_read = "123456” 是指
用户名为read, 密码为123456

3,在config目录添加kafka_client_jaas.conf

KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="1234admin";
};
4,在安装目录bin下kafka-server-start.sh文件 。复制出来一份 ,然后进行修改kafka-server-start-saal.sh 添加以下文件

if [ "x$KAFKA_OPTS"  ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config="安装目录"/config/kafka_server_jaas.conf"
fi
5,bin 下的 kafka-console-producer.sh 和 kafka-console-consumer.sh 文件也是 都各自复制一份 在复制上面进行修改

if [ "x$KAFKA_OPTS"  ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config="安装目录"/config/kafka_client_jaas.conf"
fi

kafka的 SASL/PLAIN 认证 就添加完成了,配置ACl

6,zookpeer认证,在zookeeper安装根目录的conf目录下,创建zk_server_jaas.conf文件

Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="1234admin"
user_admin="1234admin"
user_write="123456"
user_read="123456"
};
7,修改zookpeer的启动参数, 修改bin/zookeeper-server-start.sh, 在文件尾加上
export KAFKA_OPTS="-Djava.security.auth.login.config="安装目录"/config/zookeeper_jaas.conf"

8,kafka broker的认证配置config目录下, kafka_server_jaas.conf (第2步)添加

kafkaclient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="1234admin";
};
9,在server-sasl.properties(第1步)添加

super.users=User:admin
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
zookeeper.set.acl=true

#listeners,用于server真正bind
#advertised.listeners,用于开发给用户,如果没有设定,直接使用listeners

10, 添加zoo_jaas.conf配置

Server {
       org.apache.zookeeper.server.auth.DigestLoginModule required
       username="admin"
       password="1234admin”
       user_admin="1234admin";
};

zookeeper 服务zkEnv.sh中添加如下参数

export SERVER_JVMFLAGS="-Djava.security.auth.login.config=zookeeper安装目录/conf/zoo_jaas.conf $SERVER_JVMFLAGS"


11,启动server-sasl服务

bin/zookeeper-server-start-sasl.sh -daemon ./config/zoo.cfg

启动kafka

bin/kafka-server-start-saal.sh config/server-sasl.properties

11,config目录下创建zookeeper_client_jaas.conf

Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="1234admin";
};
12,修改kafka-topics.sh 添加配置

if [ "x$KAFKA_OPTS" ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config="安装目录"/config/zookeeper_client_jaas.conf"
fi
创建topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka

修改bin/kafka-acls.sh 添加以下配置

if [ "x$KAFKA_OPTS" ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config="安装目录"/config/zookeeper_client_jaas.conf"
fi
13,write read用户赋权

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:write --operation Write --topic kkk

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:read --operation Read --group test-group --topic kkk

查看所有权限


bin/kafka-acls.sh --list --authorizer-properties zookeeper.connect=localhost:2181

14,kafka client的认证配置
config/下创建kafka_write_jaas.conf

KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="write"
password="123456";
};
15,修改bin/kafka-console-producer.sh 添加以下配置

if [ "x$KAFKA_OPTS" ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config="安装目录"/config/kafka_write_jaas.conf"
fi
16,config/下创建producer.config

bootstrap.servers=localhost:9092
compression.type=none
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
producer启动测试

bin/kafka-console-producer.sh --broker-list localhoset:9092 --topic kkk --producer.config producer.config
config/下创建kafka_read_jaas.conf

KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="read"
password="123456";
};
修改bin/kafka-console-consumer.sh

if [ "x$KAFKA_OPTS" ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config="安装目录"/config/kafka_read_jaas.conf"
fi
config/下创建consumer.config

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
group.id=test-group
consumer启动测试

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kkk --from-beginning --consumer.config consumer.config

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐