目录
kafka(2.2.1)(kerberos+LDAP+Sentry)访问使用由于kafka集成了kerberos 所以需要通过kerberos的认证
认证方式有两种
1.通过配置文件2.通过keytab文件我们这里采用第一种
首先先在目录/usr/local/kafka_client下创建两个文件一个是client.properties,一个是jaas.conf
在client.properties文件里面写入
security.protocol=SASL_PLAINTEXT sasl.kerberos.service.name=kafka group.id=testgroup在jaas.conf写入
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useTicketCache=true renewTicket=true serviceName="kafka"; };之后在shell命令行执行一下命令来配置环境变量(这样只针对当前进程有效)
[root@cdh-datanode03 kafka_client]# export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka_client/jaas.conf" [root@cdh-datanode03 kafka_client]# echo $KAFKA_OPTS在执行kinit命令登陆kerberos用户
1.创建kafka topic
[root@cdh-datanode03 kafka_client]# kafka-topics --create --zookeeper cdh-master01:2181 --replication-factor 1 --partitions 1 --topic testTopic2.查看Topic列表
[root@cdh-datanode03 kafka_client]# kafka-topics --zookeeper cdh-master01:2181 --list3.删除Topic
[root@cdh-datanode03 kafka_client]# kafka-topics --delete --zookeeper cdh-master01:2181 --topic testTopic4.向Topic生产数据(需要权限)
[root@cdh-datanode03 kafka_client]# kafka-console-producer --broker-list cdh-datanode03:9092,cdh-datanode04:9092 --topic testTopic --producer.config /usr/local/kafka_client/client.properties5.消费Topic数据(需要权限)
[root@cdh-datanode03 kafka_client]# kafka-console-consumer --topic testTopic --from-beginning --bootstrap-server cdh-datanode03:9092,cdh-datanode04:9092,cdh-datanode05:9092 --consumer.config /usr/local/kafka_client/client.properties此时会报以下错误 表示没有权限向testTopicTopic 写入数据此时我们需要给我们kinit登陆的用户赋予权限
ERROR internals.ErrorLoggingCallback: Error when sending message to topic testTopic with key: null, value: 3 bytes with error: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [testTopic]我们以fayson用户为例 它属于user组(id+用户名 查看组)
1.我们需要首先创建一个kafka的principle2.我们给user用户组赋权可以写入数据到testTopic,注意需要使用管理员kafka用户登录Kerberos才能进行操作 [root@cdh-datanode03 kafka_client]# klist Ticket cache: FILE:/tmp/krb5cc_0 Default principal: kafka@GREE.IO Valid starting Expires Service principal 09/11/2019 20:47:25 09/12/2019 20:47:25 krbtgt/GREE.IO@GREE.IO renew until 09/18/2019 20:47:25 3.创建一个role [root@cdh-datanode03 kafka_client]# kafka-sentry -cr -r kafka_role 4.给kafka_role赋予写入testTopic权限 [root@cdh-datanode03 kafka_client]# kafka-sentry -gpr -r kafka_role -p "Topic=testTopic->action=write" [root@cdh-datanode03 kafka_client]# kafka-sentry -gpr -r kafka_role -p "Topic=testTopic->action=describe" 5.将角色加入到user组下面 [root@cdh-datanode03 kafka_client]# kafka-sentry -arg -r kafka_role -g user 6.以fayson用户登录(输入密码) [root@cdh-datanode03 kafka_client]# kinit fayson之后以此用户写入testTopic 就不会报权限问题了
此时我们还需要给 fayson 用户赋予读取testTopic的权限,所以需要给kafka_role赋予读取testtopic的权限
1.我们在上面完成的基础之上需要对kafka_role角色赋予读取testTopic 的权限2.执行以下命令需要使用kafka 用户 [root@cdh-datanode03 kafka_client]# kafka-sentry -gpr -r kafka_role -p "CONSUMERGROUP=testgroup->action=read" [root@cdh-datanode03 kafka_client]# kafka-sentry -gpr -r kafka_role -p "CONSUMERGROUP=testgroup->action=describe" [root@cdh-datanode03 kafka_client]# kafka-sentry -gpr -r kafka_role -p "Topic=zhcTestTopic->action=read"需要创建consumer.properties,producer.properties,jaas.conf文件 还要引入krb5.conf文件
producer.properties文件内容
bootstrap.servers=cdh-datanode03:9092,cdh-datanode04:9092,cdh-datanode05:9092 #实现了Serializer接口的序列化类。用于告诉kafka如何序列化key key.serializer=org.apache.kafka.common.serialization.StringSerializer #告诉kafka如何序列化value value.serializer=org.apache.kafka.common.serialization.StringSerializer acks=1 #访问kerberos的kafka client 配置 security.protocol=SASL_PLAINTEXT sasl.kerberos.service.name=kafkaconsumer.properties文件内容
bootstrap.servers=cdh-datanode04:9092 group.id=testgroup1 enable.auto.commit=true session.timeout.ms=30000 auto.offset.reset=earliest key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=org.apache.kafka.common.serialization.StringDeserializer security.protocol=SASL_PLAINTEXT sasl.kerberos.service.name=kafkajaas.conf文件内容
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required doNotPrompt=true useKeyTab=true storeKey=true renewTicket=true keyTab="D:/cdh/kafka/src/main/kerberos/gree1.keytab" principal="gree1@GREE.IO"; }; Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="D:/cdh/kafka/src/main/kerberos/gree1.keytab" principal="gree1@GREE.IO"; }; 1.pom.xml <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.2.1-cdh6.3.0</version> </dependency> 2.producer package producer; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.io.IOException; import java.util.Properties; class MyProducer { private static final MyProducer Instance = new MyProducer(); private MyProducer() { } public static MyProducer getInstance() { return Instance; } public int messageNo = 1; /** * 获得一个Kafka生产者实例 * * @return */ public KafkaProducer Produce() { System.setProperty("java.security.auth.login.config", "D:\\cdh\\kafka\\src\\main\\kerberos\\jaas.conf"); System.setProperty("java.security.krb5.conf", "D:\\cdh\\kafka\\src\\main\\kerberos\\krb5.conf"); Properties props = new Properties(); try { props.load(this.getClass().getResourceAsStream("/producer.properties")); } catch (IOException e) { e.printStackTrace(); } KafkaProducer producer = new KafkaProducer(props); return producer; } } public class ProducerStarter implements Runnable { private int threadIndex; public ProducerStarter(int threadIndex) { this.threadIndex = threadIndex; } /** * 生产数据 */ public void run() { MyProducer pro = MyProducer.getInstance(); KafkaProducer prod = pro.Produce(); String topic = "testTopic"; int i = 0; while (1 == 1) { final int index = i++; try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } prod.send(new ProducerRecord<String, String>(topic,String.valueOf(index), String.valueOf(i)), new Callback() { public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { e.printStackTrace(); } System.out.println("message send to partition " + recordMetadata.partition() + /*value*/ ": hello word " + index); } }); prod.flush(); //sleep 1min try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 启动200个线程,生产 * * @param args */ public static void main(String args[]) { for (int i = 0; i < 1; i++) { System.out.println("启动线程:" + i); Thread thread = new Thread(new ProducerStarter(i)); thread.start(); } } } 3.Consumer import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; public class ConsumerStarter { public static void main(String[] args) throws InterruptedException { KafkaConsumer consumer = Consumer.getInstance().Consume(); consumer.subscribe(Arrays.asList("testTopic")); //消费并打印消费结果 while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord record: records) { System.out.printf("offset = %d, key = %s, value= %s%n", record.offset(), record.key(), record.value()); } Thread.sleep(1000); } } } import org.apache.kafka.clients.consumer.KafkaConsumer; import java.io.IOException; import java.util.Properties; /** * Created by 260212 on 2018/4/12. * Author:JackLee -赵化臣 * 描述: */ class Consumer { private static final Consumer Instance=new Consumer(); private Consumer(){} public static Consumer getInstance(){ return Instance; } /** * 获得一个Kafka消费者 * kafka-clients版本要高于0.9.0.1,否则会取出为null * @return */ public KafkaConsumer Consume (){ System.setProperty("java.security.auth.login.config", "D:\\cdh\\kafka\\src\\main\\kerberos\\jaas.conf"); System.setProperty("java.security.krb5.conf", "D:\\cdh\\kafka\\src\\main\\kerberos\\krb5.conf"); Properties props=new Properties(); try { props.load(this.getClass().getResourceAsStream("/consumer.properties")); } catch (IOException e) { e.printStackTrace(); } KafkaConsumer consumerSelf=new KafkaConsumer<String,String>(props); return consumerSelf; } }转载于:https://www.cnblogs.com/HarSenZhao/p/11508687.html
相关资源:JAVA上百实例源码以及开源项目