环境
操作系统:windows10
Java版本:JDK8
Zookeeper版本:3.8.0
Kafka版本:2.12_2.8.2
集群搭建
因没有多余的服务器,所以集群环境搭建属于本地伪集群。
zookeeper
下载地址:https://zookeeper.apache.org/releases.html
将下载解压后的zk复制3份,分别为zk1,zk2,zk3
修改zk1的zoo.cfg文件,仅展示修改或增加的内容
# 数据路径
dataDir=/tmp/zookeeper1
# 端口
clientPort=2181
# 集群配置,server.id=ip:集群通信端口:选举端口
server.1=0.0.0.0:2880:2881
server.2=127.0.0.1:2882:2883
server.3=127.0.0.1:2884:2885
在/tmp/zookeep1的文件夹下创建名为myid的文件,并给该文件填充内容:1,表示该zk在节点中的id为1,需与zoo.cfg中的server.id中的id对应
修改zk1的zoo.cfg文件,仅展示修改或增加的内容
# 数据路径
dataDir=/tmp/zookeeper2
# 端口
clientPort=2182
# 集群配置,server.id=ip:集群通信端口:选举端口
server.1=127.0.0.1:2880:2881
server.2=0.0.0.0:2882:2883
server.3=127.0.0.1:2884:2885
在/tmp/zookeep1的文件夹下创建名为myid的文件,并给该文件填充内容:2
其余节点依次类推。
都配置完之后,进入对应的zk目录执行脚本
bin\zkServer.cmd
即可依次将zk集群的节点启动
kafka
下载地址:https://kafka.apache.org/downloads
讲下载解压后的kafka复制为3份,kafka1,kafka2,kafka3
修改kafka1的config目录下的server.properties
# 集群中的id
broker.id=1
# 监听的ip端口
listeners = PLAINTEXT://localhost:9092
#数据路径
log.dirs=/tmp/kafka-logs1
# kafka的连接
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
修改kafka2的config目录下的server.properties
# 集群中的id
broker.id=2
# 监听的ip端口
listeners = PLAINTEXT://localhost:9093
#数据路径
log.dirs=/tmp/kafka-logs2
# kafka的连接
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
其余节点依次类推
都配置完之后,进入对应的kafka目录执行脚本
\bin\windows\kafka-server-start.bat config\server.properties
全部启动之后,进入任意一个zk的cli,执行命令
ls /brokers/ids
如显示结果为[1,2,3]
则说明kafka集群搭建成功
SASL配置
zookeeper
导入kafka相关的包
将kafka/lib目录下的部分jar包复制到zookeeper的lib目录下
kafka-clients-2.8.2.jar
lz4-java-1.7.1.jar
slf4j-api-1.7.30.jar
slf4j-log4j12-1.7.30.jar
snappy-java-1.1.8.1.jar
增加config配置
在zoo.cfg中增加配置
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
编写JAAS文件
在zookeeper的conf目录下增加zk_server_jaas.conf,这个文件定义需要链接到Zookeeper服务器的用户名和密码。JAAS配置节默认为Server。
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-2019"
user_kafka="kafka-2019"
user_producer="prod-2019";
};
这个文件中定义了两个用户,一个是kafka,一个是producer,这些用user_配置出来的用户都可以提供给生产者程序和消费者程序认证使用。还有两个属性,username和password,其中username是配置Zookeeper节点之间内部认证的用户名,password是对应的密码。
修改zk启动文件
在zkEnv.cmd
中添加配置,路径按照实际路径写
set SERVER_JVMFLAGS= -Djava.security.auth.login.config=D:\young\cluster\apache-zookeeper-3.8.0-bin1\conf\zk_server_jaas.conf
在zkServer.cmd
中修改
将
call %JAVA% "-Dzookeeper.log.dir=%ZOO_LOG_DIR%" "-Dzookeeper.log.file=%ZOO_LOG_FILE%" "-XX:+HeapDumpOnOutOfMemoryError" "-XX:OnOutOfMemoryError=cmd /c taskkill /pid %%%%p /t /f" -cp "%CLASSPATH%" %ZOOMAIN% "%ZOOCFG%" %*
修改为
call %JAVA% %SERVER_JVMFLAGS% "-Dzookeeper.log.dir=%ZOO_LOG_DIR%" "-Dzookeeper.log.file=%ZOO_LOG_FILE%" "-XX:+HeapDumpOnOutOfMemoryError" "-XX:OnOutOfMemoryError=cmd /c taskkill /pid %%%%p /t /f" -cp "%CLASSPATH%" %ZOOMAIN% "%ZOOCFG%" %*
所有zk都进行上述操作,然后依次启动即可,如果启动异常,根据异常进行排查
kafka
编写JAAS文件
在kafka的config目录下,创建kafka_server_jaas.conf文件
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-2019"
user_admin="admin-2019"
user_producer="prod-2019"
user_consumer="cons-2019";
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafka"
password="kafka-2019";
};
KafkaServer配置的是kafka的账号和密码,Client配置节主要配置了broker到Zookeeper的链接用户名密码,这里要和前面zookeeper配置中的zk_server_jaas.conf中user_kafka的账号和密码相同。
修改server.properties
listeners=SASL_PLAINTEXT://0.0.0.0:9092
advertised.listeners=SASL_PLAINTEXT://127.0.0.1:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true
修改启动参数
修改kafka-server-start.bat文件
在set KAFKA_HEAP_OPTS后面加上-Djava.security.auth.login.config=D:\young\cluster\kafka_2.12-2.8.2-1\config\kafka_server_jaas.conf
路径按照实际路径来写
IF NOT ERRORLEVEL 1 (
rem 32-bit OS
set KAFKA_HEAP_OPTS=-Xmx512M -Xms512M -Djava.security.auth.login.config=D:\young\cluster\kafka_2.12-2.8.2-1\config\kafka_server_jaas.conf
) ELSE (
rem 64-bit OS
set KAFKA_HEAP_OPTS=-Xmx1G -Xms1G -Djava.security.auth.login.config=D:\young\cluster\kafka_2.12-2.8.2-1\config\kafka_server_jaas.conf
)
配置其他节点
其他节点均按此配置来修改,主要注意端口
SpringBoot配置SASL
在spring的配置文件中增加
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.security.protocol=SASL_PLAINTEXT
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-2019";