[root@localhost kafka]# vim config/server.properties
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
###找到这个并把下方所有配置覆盖###
#listeners=SASL_PLAINTEXT://127.0.0.1:9092
listeners=SASL_PLAINTEXT://你的ip:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
在kafka的config文件夹下添加kafka_server_jaas.conf文件 文件内容如下:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="123456"
user_admin="123456"
user_cxy="654321";
};
在kafka的config文件夹下添加kafka_client_jaas.conf文件 文件内容如下:
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="cxy"
password="654321";
};
注:用户名密码和上方kafka_server_jaas自定义的用户名密码一致
在bin/kafka-server-start.sh文件添加如下代码:
if [ "x$KAFKA_OPTS" ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka-2.12/config/kafka_server_jaas.conf"
fi
注:必须在 exec $base_dir/kafka-run-class.sh E X T R A A R G S k a f k a . K a f k a " EXTRA_ARGS kafka.Kafka " EXTRAARGSkafka.Kafka"@" 之前添加
在kafka-console-producer.sh 和 kafka-console-consumer.sh文件添加如下代码:
if [ "x$KAFKA_OPTS" ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka-2.12/config/kafka_client_jaas.conf"
fi
注:必须在 exec $(dirname 0 ) / k a f k a − r u n − c l a s s . s h k a f k a . t o o l s . C o n s o l e X X X " 0)/kafka-run-class.sh kafka.tools.ConsoleXXX " 0)/kafka−run−class.shkafka.tools.ConsoleXXX"@" 之前添加
#新建名为topicTest的主题,要求分区数1,副本数1
bin/kafka-topics.sh --create --topic topicTest --partitions 1 --replication-factor 1 --zookeeper localhost:2181
#查看topic
bin/kafka-topics.sh -list -zookeeper localhost:2181
#开启消费者
bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.1:9092 --topic topicTest --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN --from-beginning
#开启生产者
bin/kafka-console-producer.sh --broker-list 192.168.0.1:9092 --topic topicTest --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN
在springboot的yml文件添加如下配置信息:
kafka:
bootstrap-servers: 你的ip:9092 #此处必须与server.properties的ip一致
#生产者配置
producer:
#每次批量发送消息的数量
batch-size: 16
#缓存容量
buffer-memory: 33554432
#设置大于0的值,则客户端会将发送失败的记录重新发送
retries: 0
#指定消息key和消息体的编解码方式 UTF-8
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
#kafka加密配置
properties:
sasl.mechanism: PLAIN
security.protocol: SASL_PLAINTEXT
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username='cxy' password='654321';
#消费者配置
consumer:
group-id: test
enable-auto-commit: true
auto-commit-interval: 1000
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
auto-offset-reset: latest
max-poll-records: 100
#kafka加密配置
properties:
sasl.mechanism: PLAIN
security.protocol: SASL_PLAINTEXT
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username='cxy' password='654321';
因篇幅问题不能全部显示,请点此查看更多更全内容