之前写过一篇spark消费kafka问题记录(cdh5.16.2),主要记录了安装spark2后,spark的问题,和本地可以执行,而一提集群缺包的问题,这篇主要记录下启用kerberos后的一些问题
版本信息如下
集群启用kerberos后,想用命令行测下kafka
kafka-topics --create --zookeeper ip:port --replication-factor 1 --partitions 1 --topic test
创建成功,虽然没有kinit
然后命令行模拟生产消费
kafka-console-producer --broker-list ip:port --topic test
kafka-console-consumer --bootstrap-server ip:port --topic test
生产者那刚输了个字符串然后回车,都报下面的错
WARN clients.NetworkClient: Bootstrap broker host:9092 (id: -3 rack: null) disconnected
WARN clients.NetworkClient: Bootstrap broker host:9092 (id: -1 rack: null) disconnected
WARN clients.NetworkClient: Bootstrap broker host:9092 (id: -2 rack: null) disconnected
然后网上搜,解决办法是这样的
7180 kafka搜security.inter.broker.protocol,默认是第二个,改为SASL_PLAINTEXT
然后创建两个文件client.properties、jaas.conf
添加环境变量:
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/cdh/kafka/jaas.conf"
然后记得kinit下,哪个用户都可以,反正没acl,不kinit会报错,报错信息如下
javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner authentication information from the user
Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner authentication information from the user
然后加参数,继续执行,不报错了
kafka-console-producer --broker-list ip:port --topic test --producer.config /home/cdh/kafka/client.properties
kafka-console-consumer --bootstrap-server ip:port --topic test --consumer.config /home/cdh/kafka/client.properties
然后再说spark消费kafka,代码中需添加如下:
def main(args: Array[String]): Unit = { //System.setProperty("java.security.krb5.conf", "C:\\ProgramData\\MIT\\Kerberos5\\krb5.ini") //System.setProperty("java.security.auth.login.config", "C:\\temp\\jass.conf") System.setProperty("java.security.krb5.conf", "/etc/krb5.conf") System.setProperty("java.security.auth.login.config", "/home/cdh/kakfa/jaas.conf") ..... }因为我在本地测试,所以注释掉的前两行是本地测试用的,然后kafka的参数也需要添加
val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "ip:port", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "zlq_test", "auto.offset.reset" -> "earliest", "security.protocol" -> "SASL_PLAINTEXT",//需要添加 "sasl.kerberos.service.name" -> "kafka",//需要添加 "enable.auto.commit" -> (false: java.lang.Boolean) )记得本地测试或者提交集群的时候,记得kinit,本地的登陆下,get Ticket
win版kerberos的安装可以看我后面写的博客