Zookepp&Kafka采用SASL安全认证

young 811 2022-11-02

环境

操作系统:windows10

Java版本:JDK8

Zookeeper版本:3.8.0

Kafka版本:2.12_2.8.2

集群搭建

因没有多余的服务器,所以集群环境搭建属于本地伪集群。

zookeeper

下载地址:https://zookeeper.apache.org/releases.html

将下载解压后的zk复制3份,分别为zk1,zk2,zk3

修改zk1的zoo.cfg文件,仅展示修改或增加的内容

# 数据路径
dataDir=/tmp/zookeeper1
# 端口
clientPort=2181
# 集群配置,server.id=ip:集群通信端口:选举端口
server.1=0.0.0.0:2880:2881
server.2=127.0.0.1:2882:2883
server.3=127.0.0.1:2884:2885

在/tmp/zookeep1的文件夹下创建名为myid的文件,并给该文件填充内容:1,表示该zk在节点中的id为1,需与zoo.cfg中的server.id中的id对应

修改zk1的zoo.cfg文件,仅展示修改或增加的内容

# 数据路径
dataDir=/tmp/zookeeper2
# 端口
clientPort=2182
# 集群配置,server.id=ip:集群通信端口:选举端口
server.1=127.0.0.1:2880:2881
server.2=0.0.0.0:2882:2883
server.3=127.0.0.1:2884:2885

在/tmp/zookeep1的文件夹下创建名为myid的文件,并给该文件填充内容:2

其余节点依次类推。

都配置完之后,进入对应的zk目录执行脚本

bin\zkServer.cmd

即可依次将zk集群的节点启动

kafka

下载地址:https://kafka.apache.org/downloads

讲下载解压后的kafka复制为3份,kafka1,kafka2,kafka3

修改kafka1的config目录下的server.properties

# 集群中的id
broker.id=1
# 监听的ip端口
listeners = PLAINTEXT://localhost:9092
#数据路径
log.dirs=/tmp/kafka-logs1
# kafka的连接
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183

修改kafka2的config目录下的server.properties

# 集群中的id
broker.id=2
# 监听的ip端口
listeners = PLAINTEXT://localhost:9093
#数据路径
log.dirs=/tmp/kafka-logs2
# kafka的连接
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183

其余节点依次类推

都配置完之后,进入对应的kafka目录执行脚本

\bin\windows\kafka-server-start.bat config\server.properties

全部启动之后,进入任意一个zk的cli,执行命令

ls /brokers/ids

如显示结果为[1,2,3]则说明kafka集群搭建成功

SASL配置

zookeeper

导入kafka相关的包

将kafka/lib目录下的部分jar包复制到zookeeper的lib目录下

kafka-clients-2.8.2.jar
lz4-java-1.7.1.jar
slf4j-api-1.7.30.jar
slf4j-log4j12-1.7.30.jar
snappy-java-1.1.8.1.jar

增加config配置

在zoo.cfg中增加配置

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000

编写JAAS文件

在zookeeper的conf目录下增加zk_server_jaas.conf,这个文件定义需要链接到Zookeeper服务器的用户名和密码。JAAS配置节默认为Server。

Server {
org.apache.kafka.common.security.plain.PlainLoginModule required 
    username="admin" 
    password="admin-2019" 
    user_kafka="kafka-2019" 
    user_producer="prod-2019";
};

这个文件中定义了两个用户,一个是kafka,一个是producer,这些用user_配置出来的用户都可以提供给生产者程序和消费者程序认证使用。还有两个属性,username和password,其中username是配置Zookeeper节点之间内部认证的用户名,password是对应的密码。

修改zk启动文件

zkEnv.cmd中添加配置,路径按照实际路径写

set SERVER_JVMFLAGS= -Djava.security.auth.login.config=D:\young\cluster\apache-zookeeper-3.8.0-bin1\conf\zk_server_jaas.conf

zkServer.cmd中修改

call %JAVA% "-Dzookeeper.log.dir=%ZOO_LOG_DIR%" "-Dzookeeper.log.file=%ZOO_LOG_FILE%" "-XX:+HeapDumpOnOutOfMemoryError" "-XX:OnOutOfMemoryError=cmd /c taskkill /pid %%%%p /t /f" -cp "%CLASSPATH%" %ZOOMAIN% "%ZOOCFG%" %*

修改为

call %JAVA%  %SERVER_JVMFLAGS% "-Dzookeeper.log.dir=%ZOO_LOG_DIR%" "-Dzookeeper.log.file=%ZOO_LOG_FILE%" "-XX:+HeapDumpOnOutOfMemoryError" "-XX:OnOutOfMemoryError=cmd /c taskkill /pid %%%%p /t /f" -cp "%CLASSPATH%" %ZOOMAIN% "%ZOOCFG%" %*

所有zk都进行上述操作,然后依次启动即可,如果启动异常,根据异常进行排查

kafka

编写JAAS文件

在kafka的config目录下,创建kafka_server_jaas.conf文件

KafkaServer {
      org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-2019"
    user_admin="admin-2019"
    user_producer="prod-2019"
    user_consumer="cons-2019";
};

Client {
  org.apache.kafka.common.security.plain.PlainLoginModule required
    username="kafka"
    password="kafka-2019";
};

KafkaServer配置的是kafka的账号和密码,Client配置节主要配置了broker到Zookeeper的链接用户名密码,这里要和前面zookeeper配置中的zk_server_jaas.conf中user_kafka的账号和密码相同。

修改server.properties

listeners=SASL_PLAINTEXT://0.0.0.0:9092
advertised.listeners=SASL_PLAINTEXT://127.0.0.1:9092
security.inter.broker.protocol=SASL_PLAINTEXT  
sasl.enabled.mechanisms=PLAIN  
sasl.mechanism.inter.broker.protocol=PLAIN  
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true

修改启动参数

修改kafka-server-start.bat文件

在set KAFKA_HEAP_OPTS后面加上-Djava.security.auth.login.config=D:\young\cluster\kafka_2.12-2.8.2-1\config\kafka_server_jaas.conf 路径按照实际路径来写

 IF NOT ERRORLEVEL 1 (
        rem 32-bit OS
        set KAFKA_HEAP_OPTS=-Xmx512M -Xms512M -Djava.security.auth.login.config=D:\young\cluster\kafka_2.12-2.8.2-1\config\kafka_server_jaas.conf
    ) ELSE (
        rem 64-bit OS
        set KAFKA_HEAP_OPTS=-Xmx1G -Xms1G -Djava.security.auth.login.config=D:\young\cluster\kafka_2.12-2.8.2-1\config\kafka_server_jaas.conf
    )

配置其他节点

其他节点均按此配置来修改,主要注意端口

SpringBoot配置SASL

在spring的配置文件中增加

spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.security.protocol=SASL_PLAINTEXT
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-2019";