KafKa入门

mac2022-06-30  234

一、Kafaka 介绍 

Apache Kafka 是分布式发布-订阅消息系统。 它最初由 LinkedIn 公司开发, 之后成为 Apache 项目的一部分。 Kafka 是一种快速、 可扩展的、 设计内在就是 分布式的, 分区的和可复制的提交日志服务 Kafka 是一个消息系统, 原本开发自 LinkedIn, 用作 LinkedIn 的活动流( activity stream) 和运营数据处理管道( pipeline) 的基础。 后贡献给 apache 基金会, 成为 apache 的一个顶级项目。 

 

补充:

1、Kafka是一个分布式发布-订阅消息系统。分布式,意味着Kafka可以在集群中运行。2、消息的发布者,称为生产者(Producers),负责发布消息。发布消息之前,需要发布确定一个主题(topic)。3、broker,消息的缓存代理,相当于消息的服务器。生产者发布的消息缓存在broker中。4、消息的订阅者,称为消费者(Consumers),负责订阅消费消息。它向broker中获取订阅的消息进行消费。5、Kafka集群的运行需要Zookeeper来协调服务,所以在运行Kafka集群之前,必须保证Zookeeper集群能正常运行。6、在企业的大数据开发中,经常用Flume+Kafka+Spark Streaming(Storm)或者Kafka+Spark Streaming+Redis的方式进行开发。NoSQL:not only SQL(不仅仅是SQL),常用NoSQL数据库:HBase、ES(Elastic Search)、Redis、MongDB

 

二、安装Kafka步骤

        1、安转zookeeper

        2、安装kafaka

        3、下载解压安装包kafka_2.10-0.9.0.1.tgz

        4、vi config/server.properties 

        5、broker.id=0            host.name=liuiwei3           zookeeper.connect=liuwei3:2181,liuwei1:2181,liuwei4:2181

        6、将kafka复制到其他节点 scp -r  kafka_2.10-0.9.0.1 hadoop@liuwei2:$PWD                          scp -r  kafka_2.10-0.9.0.1 hadoop@liuwei4:$PWD 

        7、在其他两个salve节点分别修改 

              broker.id=1            host.name=liuiwei2           zookeeper.connect=liuwei3:2181,liuwei1:2181,liuwei4:2181

         8、启动kafka

             在三台机器上分别启动              ./bin/kafka-server-start.sh -daemon config/server.properties 

         9、检验kafka是否安装成功

             在master节点(liuwei3)操作:创建一个名为 test 的主题 

                bin/kafka-topics.sh --create --zookeeper liuwei3:2181,liuwei2:2181,liuwei4:2181 --replication-factor 1 --partitions 1 --topic test

               在一个终端上启动一个生产者  ./bin/kafka-console-producer.sh --broker-list liuwei3:9092,liuwei2:9092,liuwei4:9092 --topic test

              然后再键盘输入信息:hello   

              在另一个终端中启动消费者(liuwei2,liuwei4),进入交互客户端,命令如下: ./bin/kafka-console-consumer.sh --zookeeper liuwei2:2181 --topic test --from-beginning

             会在屏幕上显示相同的信息 hello

三、Kafka+Spark Streaming

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 object KafkaDemo {    def main(args: Array[String]): Unit = {      val conf= new  SparkConf().setAppName( "KafkaDemo" )                .setMaster( "local[2]" )      val ssc= new  StreamingContext(conf,Seconds( 5 ))      //创建主题集合      val topicSet=Map(( "tgtest2" -> 1 ))        /**        * 第一个参数:StreamingContext对象        * 第二个参数:Zookeeper集群,注意端口:2181        * 第三个参数:消费者Consumer所在小组        * 第四个参数:运行的主题        */      //从Kafka中获取数据      val lines=KafkaUtils.createStream(ssc,      "tgmaster:2181,tgslave:2181" , "DefaultConsumerGroup" ,topicSet)      val result=lines.flatMap(x=>{        x._2.split( " " )      }).map(word=>(word, 1 ))        .reduceByKey(_+_)        result.print()      ssc.start()      ssc.awaitTermination()    } }

  

                        

转载于:https://www.cnblogs.com/kms1989/p/6700985.html

相关资源:Kafka入门书籍.pdf

最新回复(0)