kafka

mac2025-02-25  6

Kafka

一 .概述1. 背景2. 概念3. 架构和原理4. 使用场景5. 特性 二、Kafka搭建部署1. 安装的前期准备2. kafka的搭建 三、Kafka的核心概念1. message 消息2. producer 生产者2.1 原理2.2 同步发送 vs 异步发送2.3 acks2.4 实现一个简单的Kafka生产者一般步骤2.5 案例 一 :一个简单生产者的实现2.6 案例二: 生产者(同步/异步发送数据) 3. broker 代理节点,代理缓存3.1 broker的策略 4. topic5. partition5.1 Topic & Partition 6. Replication副本7. Consumer消费者8. Zookeeper9. offset[偏移量]10. Kafka拓扑结构11. 消息的组织12 replica、leader、follower 四. Kafka文件存储机制1.segment 文件2. 复制原理与同步方式3. In-Sync Replicas 副本同步队列4. HighWatermark5. Kafka的复制机制6. acks (数据可靠性和持久性保证)7. leader 选举8. Consumer的策略9. 消费的 confirm10. Kafka减少消息丢失配置 五. 设计原理1、持久化(Persistence)2、生产者(producer)3、消费者(Consumer)4、常量时间存储能力5、消费分发机制6、复制 Replication7. Kafka 与 Zookeeper 五. kafka常用命令1、创建topic2、创建生产者3、创建消费者4、查询 topic 的所有记录数5、查询 topic 的消费者情况6.IDEA 配置 六 .案例1. 一个简单生产者的实现2. 生产者(同步/异步发送数据)

一 .概述

1. 背景

分布式集群,分布式数据库等等分布式的架构产生大量的数据:分析 分析热点新闻,今日头条,热点词汇,用户行为的分析----数据本身的时效性 需求:立刻获取分析结果 磁盘,访问磁盘 数据库,访问磁盘、内存、存储数据、浪费空间 内存,kafka,时效性(数据默认存储时间:一周)

2. 概念

数据的采集,实时的反馈信息,能够支撑大量的数据,高容错 由Linkedin公司研发 分布式 发布和订阅 海量日志采集系统 工作模式:发布和订阅 ,将采集的数据存储到内部的消息队列中,在有效的时间内不会失效 特点: 高吞吐量:每秒处理数十万条消息,和分布式kafka规模有关(broker的个数) 缓存TB级别数据量 支持分区,支持压缩... ... 分布式:易扩展,高容错 由producer、broker、consumer组成 ,三者的个数不定 生产者:flume/java程序/service 消费者:storm/hbase/hdfs 代理节点:broker , 缓存kafka中传递的数据到broker节点上 发布和订阅消息: 生产者生产带有主题的消息,存储在broker 消费者可以注册关注某一个主题的数据 发布消息时可以指定副本数量,使用zookeeper实现数据的切换 客户端消费的状态由自己本身维护 支持离线和在线实时分析的场景(kafka、flume) 注意:kafka数据缓存在/tmp/kafkafile/ 磁盘中 有序的存储,有序的读取,速度快

3. 架构和原理

包括三个角色: producer: 可以是flume,java程序,socket,service 通过push向broker中生产数据 broker: 缓冲数据 集群的吞吐量,broker的个数 向zk中进行注册 zk: broker和其他的元信息,协调broker和broker的通信,broker的位置的协同 consumer: 通过pull从broker中消费数据 多个comsumer可以组成一个Group 向zk中进行注册 消费者是通过zk知晓其关注主题的数据的状态 数据从productor发送到broker中间缓存和分发数据,注册到系统中的consumer

4. 使用场景

实时监控 日志收集,持久化存储 消息系统:高吞吐 用户行为分析和追踪 … …

5. 特性

通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能;

高吞吐量。即使是非常普通的硬件kafka也可以支持每秒数十万的消息;

支持通过kafka服务器和消费机集群来分区消息;

支持Hadoop并行数据加载;

kafka的设计目的是提供一个消息发布、订阅解决方案,它可以处理消费者规模的网站中的所有动作流数据。

备注:O(1)常数阶,是最低的时空复杂度,耗时/耗空间与输入数据大小无关,无论输入数据增大多少倍,耗时/耗空间都不变。 哈希算法就是典型的O(1)时间复杂度,无论数据规模多大,都可以在一次计算后找到目标(不考虑冲突的情况下)

二、Kafka搭建部署

1. 安装的前期准备

Hadoop集群启动成功 Zookeeper集群启动成功

2. kafka的搭建

在master,s201,s203搭建kafka分布式系统

1. 下载 kafka_2.10-0.8.1.1.tgz 2. 拷贝 3. 解压 tar -zxvf sxxxxxx.tar.gz 4. 配置 : kafka/config/server.properties # The id of the broker. This must be set to a unique integer for each broker. broker.id=唯一的整数 # Hostname the broker will bind to. If not set, the server will bind to all interfaces host.name=master ############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=master:2181,s201:2181,s203:2181 # 真正删除主题 delete.topic.enable=true 5. 分发软件 将软件分发到各节点(s201、s203) rsynch2.sh kafka 修改: 环境变量 broker.id=唯一的整数 host.name=currHostName 6. 启动kafka master: kafka/bin/kafka-server-start.sh -daemon kafka/config/server.properties s201: kafka/bin/kafka-server-start.sh -daemon kafka/config/server.properties s203: kafka/bin/kafka-server-start.sh -daemon kafka/config/server.properties jps 查看启动进程 zk 查看zk为kafka开辟的节点 7. 验证: 创建主题 kafka-topics.sh --create --zookeeper master:2181,s201:2181,s203:2181 //配置kafka的zk参数,和配置文件中的一致 --replication-factor 3 //配置kafka的副本个数,默认1 --partitions 1 //配置主题的分区个数,默认1 --topic houseprice //指定主题的名称 使用生产者生成数据 kafka-console-producer.sh --broker-list s201:9092 //9092是broker server的服务器端口,允许只写一个主机地址 --topic houseprice 编写需要生产的数据: asdasdasd zhangsan beijing houseprice is 11231321 消费者消费数据 s201:kafka-console-consumer.sh --zookeeper master:2181,s201:2181,s203:2181 //zookeeper的集群地址,允许写一个 --topic houseprice --from-beginning s203:kafka-console-consumer.sh --zookeeper master:2181 //zookeeper的集群地址,允许写一个 --topic houseprice

三、Kafka的核心概念

1. message 消息

Flume中传递数据的单位:Event Storm中传递数据的单位:Tuple Kafka中传递数据的单位:Message

数据存储和通信的基本单位 每个生产者可以向一个Topic发布一些message 如果消费者订阅相关Topic的数据,数据发布后,新添加的message被广播给消费者

2. producer 生产者

向broker发送消息 通过zk定位到所有的broker(只需要向一个broker中生产数据)

发送数据的方式: 同步和异步:producer.properties 批发送: 一批一批的发送,减少网络连接的开销 数据源 数据源可以由多种类型的角色来扮演:flume,service,web app ,font project... ... 向broker发送消息,只需要指定一个broker(zk自动定位到其他的broker) 发送数据的方式:同步和异步

2.1 原理

Producers直接发送消息到broker上的leader partition。Kafka集群中的每个broker都可以响应producer的请求,并返回topic的元信息(元信息包括:哪些机器是存活的,topic的leader在哪,哪些leader partition可以直接被访问等)。

Producer客户端自己控制着消息被推送到哪些partition。实现的方式可以是随机分配、负载均衡、或者指定一些分区算法。Kafka提供了接口供用户实现自定义的partition,用户可以为每个消息指定一个partition Key,通过这个key来实现一些hash分区算法。

以Batch的方式推送数据可以极大的提高处理效率,Kafka Producer 可以将消息在内存中累计到一定数量后作为一个batch发送请求。Batch的数量大小可以通过参数控制,参数值可以设置为累计的消息数量(如500条)、累计的时间间隔(如100ms)或者累计的数据大小(64KB)。通过增加batch的大小,可以减少网络请求和磁盘IO的次数,具体的参数设置需要在效率和时效性方面做一个权衡。

2.2 同步发送 vs 异步发送

同步发送 vs 异步发送 所谓异步发送,就是指客户端有个本地缓冲区,消息先存放到本地缓冲区,然后由后台线程来发送。 在异步发送下,有以下几个参数需要配置: (1)队列的最大长度 buffer.memory //缺省为33554432, 即32M (2)队列满了,客户端是阻塞,还是抛异常出来(缺省是true) block.on.buffer.full //true: 阻塞消息;false:抛异常 (3)发送的时候,可以批量发送的数据量 batch.size //缺省16384字节,即16K (4)最长等多长时间,批量发送 linger.ms //缺省是0 很显然,异步发送可以提高发送的性能,但一旦客户端挂了,就可能丢数据。 其他消息系统RabbitMQ、ActiveMQ,都强调可靠性,不允许非ACK的发送,也没有异步发送模式。 Kafka提供了这个灵活性,允许使用者在性能与可靠性之间做权衡。 (5)消息的最大长度 max.request.size //缺省是1048576,即1M 这个参数会影响batch的大小,如果单个消息的大小 > batch的最大值(16k),那么batch会相应的增大;

2.3 acks

Producers可以异步、并行向Kafka发送消息。通常producer在发送完消息之后会得到一个future响应,返回offset值或发送过程中遇到的错误。其中有个重要参数acks,该参数决定了producer要求leader partition 收到确认的副本个数:

acks设置为0, 表示producer不会等待broker的响应。 所以producer无法知道消息是否发送成功,这样有可能导致数据丢失。 但同时会得到最大的系统吞吐量; acks设置为1(缺省值), 表示producer会在leader partition收到消息时得到broker的一个确认, 这样会有更好的可靠性,客户端会等待直到broker确认收到消息; acks设置为-1, producer会在所有备份的partition收到消息时得到broker的确认, 这个设置可以得到最高的可靠性保证;

Kafka 消息由一个定长的header和变长的字节数组组成。因为Kafka消息支持字节数组,也就使得Kafka可以支持任何用户自定义的序列号格式或者其它已有的格式如Apache Avro、protobuf等。Kafka没有限定单个消息的大小,通常消息大小在1 ~ 10 kB 之间。

发布消息时,kafka client先构造一条消息,将消息加入到消息集set中(kafka支持批量发布,可以往消息集合中添加多条消息,一次性发布),send消息时,producer需指定消息所属的topic。

2.4 实现一个简单的Kafka生产者一般步骤

创建Properties对象,设置相应的配置。以下3个配置是必须指定的: bootstrap.servers:broker的地址清单,不需要包含所有的broker地址,生产者会从给定的broker里查找到其他的broker信息。一般至少要提供两个broker的信息,防止单点故障; key.serializer,用于序列化消息Key的类,必需设置即使key为null; value.serializer,用于序列化消息实际数据的类;

根据Properties对象实例化一个KafkaProducer对象;

实例化ProducerRecord对象,每条消息对应一个ProducerRecord对象;

调用KafkaProducer发送消息的方法将ProducerRecord发送到Kafka对应的节点。Kafka提供了两个发送消息的方法: send(ProdecuerRecord<String, String> record) send(ProdecuerRecord<String, String> record, Callback callback) 生产者发送消息分为两个阶段: 第一阶段:将消息发送到消息缓冲区; 第二阶段:Sender线程负责将缓冲区的消息发送到broker,执行真正的IO操作; Kafka Producer默认是异步发送消息,会将消息缓存到消息缓存区中,当消息在缓存区中累计到一定的数量后最为一个RecordBatch再发送。 第一阶段执行完成后就返回一个Future对象,根据对Future对象处理方式的不同,KafkaProducer支持两种发送方式: 异步方式:两个Send方法都返回一个Future对象,即只负责将消息发送到消息缓冲区,并不等待Sender线程处理结果。若希望了解异步方式消息发送成功与否,可以在回调函数中进行相应处理,当消息被Sender线程处理后会回调Callback; 同步方式:通过调用send方法返回的Future对象的get()方法以阻塞方式获取执行结果,即等待Sender线程处理的最终结果;

关闭KafkaProducer,释放连接资源。

2.5 案例 一 :一个简单生产者的实现

import java.util.Properties import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} // 最简单的消息发送方式 object KafkaProducer1 { def main(args: Array[String]): Unit = { // brokers的地址,可以写多个。至少写2个避免单点故障 val brokers = "node1:9092, node2:9092" val topic = "mykafka" // Properties类实现了Map接口,本质上是一种简单的Map容器 val kafkaProps = new Properties() // 要往Kafka写入消息,首先要创建一个生产者对象,并设置一些属性。Kafka生产者有3个必选的属性 // bootstrap.servers,指定broker的地址清单。不需要包含所有的broker地址,生产者会从给定的broker里查找其他broker的信息,建议至少提供2个broker的信息 // key.serializer 将指定的值序列化。必选设置,即使key为空。 value.serializer 将value的值序列化 kafkaProps.put("bootstrap.servers", brokers) kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") // 使用字符串常量更好,避免输入错误 // kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092, node2:9092") // kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") // kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") // KafkaProducer是一个泛型,必须指定key、value的类型 val producer = new KafkaProducer[String, String](kafkaProps) // topic, key, value(可以不指定类型) val record = new ProducerRecord[String, String](topic, "", "my test 中文测试") producer.send(record) producer.close() } }

2.6 案例二: 生产者(同步/异步发送数据)

import java.util.{Properties, UUID} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata} import org.apache.log4j.{Level, Logger} object KafkaProducer2{ def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.WARN) val brokers = "node1:9092, node2:9092, node3:9092" val mytopic = "mykafka1" val kafkaProps = new Properties() kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") // 上面的参数必须指定,以下为可选参数 kafkaProps.put(ProducerConfig.ACKS_CONFIG, "1") kafkaProps.put(ProducerConfig.RETRIES_CONFIG, "3") // 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。16K kafkaProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384") // 默认情况下,只要有可用的线程,生产者就会把消息发送出去,就算批次里只有一个消息。 // KafkaProducer 会在批次填满或 linger.ms 达到上限时把批次发送出去。 kafkaProps.put(ProducerConfig.LINGER_MS_CONFIG, "100") // 设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。32M kafkaProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432") val producer = new KafkaProducer[String, String](kafkaProps) // 缺省异步发送 for (i <- 1 to 100) { val record = new ProducerRecord(topic, i.toString, s"$i") producer.send(record) } // 同步发送 for (i <- 1 to 88) { val record = new ProducerRecord[String, String](mytopic, s"$i, ${UUID.randomUUID}") // 程序阻塞,直到该条消息发送成功返回元数据信息或者报错 val metadata: RecordMetadata = producer.send(record).get println(s"(topic, partition, offset) : (${metadata.topic()}, ${metadata.partition()}, ${metadata.offset})") } producer.close() } } // 检查topic中的数据 // kafka-run-class.sh kafka.tools.GetOffsetShell --topic mykafka1 --broker-list node1:9092

3. broker 代理节点,代理缓存

Kafka集群包含多台服务器,一台Kafka服务器就是一个Broker,一个集群由多个broker组成,一个broker可以有多个topic。broker承担着中间缓存和分发的作用,broker将producer发送的数据分发到注册consumer中;每个broker通常运行在一台物理机上,是kafka server的一个实例,所有broker实例组成kafka的服务器集群;每个broker会分配一个唯一的broker id,broker通过zookeeper集群来管理。每个broker都会注册到zookeeper上,有某个broker挂了,或者新的broker加入,zookeeper都会收到通知;在0.9.0以后,producer/consumer不依赖zookeeper来获取集群的配置信息,而是通过任意一个broker来获取整个集群的配置信息; producer和consumer之间交互的桥梁 从producer接受数据,push到broker 将消息pull给订阅的consumer 读写操作都是顺序进行的,效率高,吞吐量 一个节点上的数据:(使用replication-factor指定当前数据副本个数) dog-0 : 第一个分区 (使用partition指定分区个数) 00000000.log : 由多个有序片段组成,每个片段内部由多个有序的message组成 00000000.index : 维护顺序读写的索引 从数据的最末端进行数据的操作:read / append 操作 是producer和consumer之间数据交互的桥梁 存储消息数据,缓存 producer使用push向broker中生产数据 consumer使用pull从broker中消费数据 对broker进行操作:顺序读写,效率高,吞吐量大 topic主题的相关信息-----存储在broker节点上-----/tmp/kafka-logs/ topicName-0文件夹 00000000000.log //顺序操作 00000000000.index 对broker中的数据进行操作:read和append ,在数据的末端进行操作 数据存储在某个节点上的broker中:topic-partition-segment-message ,有序

3.1 broker的策略

消息的顺序问题 在某些业务场景下,需要消息的顺序不能乱:发送顺序和消费顺序要严格一致。而在kafka中,同一个topic,被分成了多个partition,这多个partition之间是互相独立的。 之所以要分成多个partition,是为了提高并发度,多个partition并行的进行发送/消费,但这却没有办法保证消息的顺序问题。 一个解决办法是,一个topic只用一个partition,但这样很显然限制了灵活性。 还有一个办法就是,所有发送的消息,用同一个key,这样同样的key会落在一个partition里面。

消息的刷盘机制 操作系统本身是有page cache的。即使我们用无缓冲的io,消息也不会立即落到磁盘上,而是在操作系统的page cache里面。操作系统会控制page cache里面的内容,什么时候写回到磁盘; 用户可以指定每条消息都调用一次写磁盘的操作,但这样会降低系统性能,也增大了磁盘IO;也可以让操作系统去控制存盘。

4. topic

topic 主题,每一个分布到kafk集群中的消息都具有特定的类别 在物理结构上,topic发开存储 ,指定多个分区,创建多个分区文件夹 在逻辑结构上,一个整体 每个topic也就是自定义的一个队列,producer往队列中放消息,consumer从队列中取消息,topic之间相互独立

Kafka的消息通过topic进行分类;topic类似于数据库的表,或者文件系统中的文件夹; 一个topic认为一类消息,每条发送的消息都必须从属于某一类别; 一个topic由若干个分区组成;无法保证在整个topic范围内消息的顺序

消息的类型,类别 区分数据类型,分别存储消息数据

物理上不同的topic的数据,分开存储

一个主题的数据,可以设置多个分区:提供当前主题上数据操作的效率,提高吞吐量,利用 并发操作 的优势

student-0 00000000000.log msg1 msg2 msg3 msg4 msg5 ---------------------------------------------- student-0 文件夹 00000000000.log msg1 msg2 student-1 00000000000.log msg3 student-2 00000000000.log msg4 msg5

分区的个数:推荐设置的数值? 分区的个数建议大于broker的个数 保证每个broker节点上都能均分某一个主体的分区

5. partition

一个broker可以有多个topic,一个topic可以设置多个partition(分区),每个Partition在物理上都对应一个文件夹,该文件夹存储这个Partition的所有消息文件和索引文件;一个partition对应多个Segment,每个Segment对应一个文件,Segment由一个个的不可变记录组成,该记录只会追加到Segment中,不会单独的修改或者删除;消息被追加到每个分区的尾部;分区可以分布在不同的服务器上(即一个主题可以横跨多个服务器,以此来提供比单个服务器更强大的性能),Kafka通过分区实现系统的伸缩性;Partition中的消息都会被分配为一个有序的ID(offset);

在物理上将一个Topic分成多个partition,一个partition对应一个文件夹, 每一个partition中存储当前分区的所有消息和索引文件

每一个partition由一些列有序的message组成,新的message被连续的追加到分区中

操作分区中的数据:顺序读写磁盘,效率非常高 提高kafka的吞吐量和实现数据的并发操作 一个分区对应磁盘上的一个文件夹 ,顺序读写,效率高 案例:三个节点,启动kafka,broker的数量3 bin/kafka-topics.sh --create --zookeeper master:2181 --replicaton-factor 3 --partition 5 --topic topicName 对当前主题的数据进行读写操作时:针对多个分区进行并发操作 kafka的topic,在每个机器上,是用文件存储的。而这些文件呢,会分目录。 partition就是文件的目录。比如一个topic叫abc,分了10个partion,则在机器的目录上,就是: abc_0 abc_1 abc_2 abc_3 … abc_9 每个目录里面,存放了一堆消息文件,消息是顺序append log方式存储的。

5.1 Topic & Partition

partition是实际物理上的概念,而topic是逻辑上的概念;

一个topic可以认为一类消息,每个topic将被分成多个partition,每个partition在存储层面是append log文件;

任何发布到此partition的消息都会被追加到log文件的尾部,每条消息在文件中的位置称为offset(偏移量); offset为一个long型的数字,它唯一标记一条消息。每条消息都被append到partition中,是顺序写磁盘,因此效率非常高。

每一条消息被发送到broker中,会根据partition规则选择被存储到哪一个partition; 如果partition规则设置的合理,所有消息可以均匀分布到不同的partition里,这样就实现了水平扩展。(如果一个topic对应一个文件,那这个文件所在的机器I/O将会成为这个topic的性能瓶颈,而partition解决了这个问题);

在创建topic时可以在$KAFKA_HOME/config/server.properties中指定这个partition的数量,也可以在topic创建之后去修改partition的数量; 在发送一条消息时,可以指定这个消息的key,producer根据这个key和partition机制来判断这个消息发送到哪个partition。partition机制可以通过指定producer的partition.class这一参数来指定,该class必须实现kafka.producer.Partitioner接口。

6. Replication副本

bin/kafka-topics.sh --create --zookeeper master:2181 --replicaton-factor 3 --partition 5 --topic topicName

创建主题时可以指定副本因子,副本的个数

提供数据的高可用,安全性,提高容错 一个节点-----宕机了-----broker无法使用----partition无法被访问----message无法读写----无法提供数据服务

副本个数的设置:建议设置成 3,不建议设置的太大,容易造成数据的过分冗余

分区的个数设置:建议设置的比broker的个数多,保证所有的分区被broker均匀的分配,实现负载均衡

副本数据的流向:

对一个主题进行读写操作时,用户访问的当前broker为replication 的leader,用户不需要关系其他副本的数据同步 此操作由zookeeper实现

当操作的数据量很大时,一个主题数据的不同分区是并发访问操作的,不同的分区在写操作时是并发的

7. Consumer消费者

从broker中pull消息数据 消费方式: 1. 队列模式消费 每一个消息只能被消费者队列中的一个消费者进行消费 消费者队列:c1 c2 c3 c4 c5 消息数据: msg1 msg2 msg3 .... .... 一个消息数据不能被多个消费者同时消费 2. 发布-订阅消费模式 消息数据以广播的形式交给所有的消费者 消费者队列: c1 c2 c3 c4 c5 (msg1、msg2、msg3) (msg1、msg2、msg3)(msg1、msg2、msg3)(msg1、msg2、msg3) ... .... 消息数据: msg1 msg2 msg3 .... .... Consumer Group[消费者群组] 消费者群组,是有若干个消费者组成的集体,每个consumer属于一个特定的consumer group, kafka采用将Consumer分组的方式实现一个主题(topic)的消息和广播(发给所有的consumer) 和单播(发给单个的consumer) 消费者组:更合理管理关注相同主题的消费者 g1 (c1 c2 c3) g2 (c4 c5) 消息数据: msg1 msg2 msg3 消费者组的消费方式: 组和组之间使用 发布-订阅消费模式,消息数据以广播的形式交给所有消费者组 组内的消费者的消费方式:队列模式消费,互斥消费方式

案例: producer ---- message consumer1 ---- consumerGroup1 consumer2 ---- consumerGroup1 consumer1 ---- consumerGroup1 -------message consumer2 ---- consumerGroup2 -------message 组内竞争互斥,组之间共享数据

同一Topic的一条消息只能被同一个Consumer Group内的一个Consumer消费,但多个Consumer Group可同时消费这一消息。

8. Zookeeper

协调:producer/consumer/broker 存储:consumer/broker/元数据/共享数据… …

副本数据同步:replication

均衡负载:节点间请求的均衡分配 在分区时:p > b , r=3

9. offset[偏移量]

在每个partition分区下的消息都是顺序保存的,kakfa使用一个唯一的标识来记录它们在该分区下的位置,这个位置标识被称为offset。每条消息在文件中的位置称为offset(偏移量),offset为一个long型的数字,它唯一标记一条消息。每条消息都被追加到partition中,是顺序写磁盘,因此效率非常高(经验证,顺序写磁盘效率比随机写内存还要高,这是Kafka高吞吐率的一个很重要的保证)。offset是顺序递增的,一旦确定下来之后就不能修改。Kafka会维护分区下的消息顺序,但是不会维护跨partition(分区)间的顺序(假如,向topic1分别发送三条消息1,2,3,如果1和3发送到了partition1中,2发送到了partition2中,那么kafka consumer在消费时,会按照1 然后 3的顺序消息,但是不保证 2 会在消费1之后在消费)知道了topic、partition和offset信息,就能唯一定位一条消息。所以说每条Kafka的消息本质上都是一个三元组(tuple):<topic, partition, offset>,称该元组为消息的元数据。

10. Kafka拓扑结构

Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。

Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息

一个典型的Kafka集群中包含: 若干Producer(可以是web前端产生的Page View,或者是服务器日志) 若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高) 若干Consumer Group 以及一个Zookeeper集群

11. 消息的组织

kafka集群中的消息,是通过Topic(主题)来进行组织的。 1、主题(Topic):一个主题类似新闻中的体育、娱乐、教育等分类概念,在实际工程中通常一个业务一个主题。

2、分区(Partition):一个Topic中的消息数据按照多个分区组织,分区是kafka消息队列组织的最小单位,一个分区可以看作是一个FIFO( First Input First Output的缩写,先入先出队列)的队列。

kafka分区是提高kafka性能的关键所在,当发现Kafka集群性能不高时,常用手段就是增加Topic的分区; 分区里面的消息是按照从新到老的顺序进行组织,消费者从队列头订阅消息,生产者从队列尾添加消息。

12 replica、leader、follower

每个topic的partition的所有消息,都不只有1份,而是在多个broker上冗余存储,从而提高系统的可靠性。一个topic的所有partition称为一个replica;

在replica集合中,需要选出1个leader,剩下的是follower。类似于master/slave 的结构;

发送消息的时候,只会发送给leader,然后leader再把消息同步给followers(以pull的方式,followers去leader上pull,而不是leader push给followers)。

问题:leader收到消息之后,是直接返回给producer呢,还是等所有followers都写完消息之后,再返回?

关键点:replica、leader、follower都是逻辑概念,并且是相对 partition来讲的,而不是 topic。也就说,同一个topic的不同partition,对于的replica集合可以是不一样的。

四. Kafka文件存储机制

Kafka中消息是以topic进行分类的,生产者通过topic向Kafka broker发送消息,消费者通过topic读取数据。topic在物理层面又以partition为分组,一个topic可以分成若干个partition,partition还可以细分为segment,一个partition物理上由多个segment组成;

同一个topic下有多个不同的partition,每个partiton为一个目录,partition的名称规则为:topic名称+有序序号,第一个序号从0开始计,最大的序号为partition数量减1;

以segment为单位将partition细分。每个partition(目录)相当于一个巨型文件被平均分配到多个大小相等的segment(段)数据文件中(每个segment 文件中消息数量不一定相等)这种特性也方便old segment的删除,即方便已被消费的消息的清理,提高磁盘的利用率。每个partition只需要支持顺序读写就行,segment的文件生命周期由服务端配置参数(log.segment.bytes、log.roll.{ms,hours}等若干参数)决定。

1.segment 文件

segment文件由两部分组成,分别为 .index、.log 文件,为segment索引文件和数据文件。这两个文件的命令规则为: partition全局的第一个segment从0开始; 后续每个segment文件名为上一个segment文件最后一条消息的offset值; 数值大小为64位,20位数字字符长度,没有数字用0填充 log数据文件存储消息;

以 index 索引文件中的元数据[3, 348]为例,在.log数据文件表示第3个消息,即在全局partition中表示170410+3=170413个消息,该消息的物理偏移地址为348。

如何从partition中通过offset查找message? 例如:要读取offset=170418的消息,首先查找segment文件。 第一个文件为00000000000000000000.index; 第二个文件为00000000000000170410.index(起始偏移为170410+1=170411); 第三个文件为00000000000000239430.index(起始偏移为239430+1=239431);

offset=170418就落到了第二个文件之中。其他后续文件可以依次类推,以其实偏移量命名并排列这些文件,然后根据二分查找法就可以快速定位到具体文件位置。

其次根据00000000000000170410.index文件中的[8,1325]定位到00000000000000170410.log文件中的1325的位置进行读取。

要是读取offset=170418的消息,从00000000000000170410.log文件中的1325的位置进行读取。

怎么知道何时读完本条消息,否则就读到下一条消息的内容了。 这就需要知道到消息的物理结构了,消息都具有固定的物理结构,包括:offset(8 Bytes)、消息体的大小(4 Bytes)、crc32(4 Bytes)、magic(1 Byte)、attributes(1 Byte)、key length(4 Bytes)、key(K Bytes)、payload(N Bytes)等等字段,可以确定一条消息的大小,即读取到哪里截止。

2. 复制原理与同步方式

Kafka中topic的每个partition都有一个预写式的日志文件,虽然partition可以继续细分为若干个segment文件,但是对于上层应用来说可以将partition看成最小的存储单元(一个有多个segment文件拼接的“巨型”文件),每个partition都由一些列有序的、不可变的消息组成,这些消息被连续的追加到partition中。

LEO( Log End Offset ),表示每个partition的log最后一条Message的位置; HW( HighWatermark ),是指consumer能够看到的此partition的位置,这涉及到多副本的概念; 为了提高消息的可靠性,Kafka每个topic的partition有N (大于等于1)个副本,其中N是topic的复制因子(replica fator)的个数; Kafka通过多副本机制实现故障自动转移,当Kafka集群中一个broker失效情况下仍然保证服务可用; 在Kafka中发生复制时确保partition的日志能有序地写到其他节点上,N个replicas中,其中一个replica为leader,其他都为follower; leader处理partition的所有读写请求,与此同时,follower会被动定期地去复制leader上的数据; Kafka提供了数据复制算法保证,如果leader发生故障或挂掉,一个新leader被选举并被接受客户端的消息成功写入;

Kafka确保从同步副本列表中选举一个副本为leader,或者说follower追赶leader数据;

leader负责维护和跟踪ISR(In-Sync Replicas,副本同步队列)中所有follower滞后的状态;

当producer发送一条消息到broker后,leader写入消息并复制到所有follower。消息提交之后才被成功复制到所有的同步副本;

消息复制延迟受最慢的follower限制。如果follower“落后”太多或者失效,leader将会把它从ISR中删除。

3. In-Sync Replicas 副本同步队列

副本同步队列(ISR)的副本数对Kafka的吞吐率是有一定的影响,但极大的增强了可用性。副本数默认为1,即每个partition都有一个唯一的leader,为了确保消息的可靠性,通常应用中将其设置为大于1;

所有的副本(replicas)统称为Assigned Replicas,即AR。 AR = ISR + OSR

ISR是AR中的一个子集; ISR中包括:leader和follower;

由leader维护ISR列表,follower从leader同步数据有一些延迟(包括延迟时间、延迟条数两个维度, 在0.10.x以后仅支持延迟时间),任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。

4. HighWatermark

HW(HighWatermark)高水位,取一个partition对应的ISR中最小的LEO ( Log End Offset )作为HW,consumer最多只能消费到HW所在的位置;

另外每个replica都有HW,leader和follower各自负责更新自己的HW的状态。对于leader新写入的消息,consumer不能立刻消费,leader会等待该消息被所有ISR中的replicas同步后更新HW,此时消息才能被consumer消费。

这样就保证了如果leader所在的broker失效,该消息仍然可以从新选举的leader中获取;

对于来自内部broKer的读取请求,没有HW的限制。

5. Kafka的复制机制

Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制;

同步复制要求所有能工作的follower都复制完,这条消息才会被commit,这种复制方式极大的影响了吞吐率;

而异步复制方式下,follower异步的从leader复制数据,数据只要被leader写入log就被认为已经commit,这种情况下如果follower都还没有复制完,落后于leader时,突然leader宕机,则会丢失数据;

Kafka这种使用ISR的方式则较好的均衡了确保数据不丢失以及吞吐率;

leader有单独的线程定期检测ISR中follower是否脱离ISR, 如果发现ISR变化,则会将新的ISR的信息返回到Zookeeper的相关节点中。

6. acks (数据可靠性和持久性保证)

发布消息时,kafka client先构造一条消息,将消息加入到消息集set中(kafka支持批量发布,可以往消息集合中添加多条消息,一次性发布),send消息时,producer需指定消息所属的topic。 当producer向leader发送数据时,可以通过request.required.acks参数来设置数据可靠性的级别:

acks设置为0, 表示producer不会等待broker的响应。 所以producer无法知道消息是否发送成功,这样有可能导致数据丢失。 但同时会得到最大的系统吞吐量; 这意味着producer无需等待来自broker的确认而继续发送下一批消息。 这种情况下数 据传输效率最高,但是数据可靠性确是最低的。 acks设置为1(缺省值), 表示producer会在leader partition收到消息时得到broker的一个确认, 这样会有更好的可靠性,客户端会等待直到broker确认收到消息; 这意味着producer在ISR中的leader已成功收到的数据并得到确认后发送下一条message。 如果leader宕机了,则会丢失数据。 acks设置为-1, producer会在所有备份的partition收到消息时得到broker的确认, 这个设置可以得到最高的可靠性保证; producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。 但是这样也不能保证数据不丢失,比如当ISR中只有leader时 (ISR中的成员由于某些情况会增加也会减少,最少就只剩一个leader), 这样就变成了acks=1的情况。

如果要提高数据的可靠性,在设置request.required.acks=-1的同时,还需要min.insync.replicas这个参数(可以在broker或者topic层面进行设置)的配合,这样才能发挥最大的功效;

min.insync.replicas这个参数设定ISR中的最小副本数是多少,默认值为1,当且仅当request.required.acks参数设置为-1时,此参数才生效。如果ISR中的副本数少于min.insync.replicas配置的数量时,客户端会返回异常。

acks = 1(可能存在数据丢失) acks = -1 (可能存在数据重复)

acks=-1,数据发送到leader后 ,部分ISR的副本同步,leader此时挂掉。follower1、follower2都有可能变成新的leader,producer端会得到返回异常,producer端重新发送数据,数据可能会重复。 当然如果在leader crash的时候,follower2还没有同步到任何数据,而且follower2被选举为新的leader的话,消息不会重复。

7. leader 选举

一条消息只有被ISR中的所有follower都从leader复制过去才会被认为已提交。这样就避免了部分数据被写进了leader,还没来得及被任何follower复制就宕机了,而造成数据丢失。

对于producer而言,它可以选择是否等待消息commit,这可以通过acks来设置。这种机制确保了只要ISR中有一个或者以上的follower,一条被commit的消息就不会丢失。

一个重要的问题是leader宕机了,怎样在follower中选举出新的leader。因为follower可能落后很多或者直接crash了,所以必须确保选择“最新”的follower作为新的leader。一个基本的原则是:新的leader必须拥有原来的leader commit的所有消息。

这就需要做一个折中,如果leader在一条消息被commit前等待更多的follower确认,那么在它挂掉之后就有更多的follower可以成为新的leader,但这也会造成吞吐率的下降。

Kafka为每一个partition动态的维护了一个ISR,这个ISR里的所有replica都跟上了leader;

在ISR中至少有一个follower时,Kafka可以确保已经commit的数据不丢失,但如果某一个partition的所有replica都挂了,就无法保证数据不丢失了。此时有两种可行的方案:

等待ISR中任意一个replica“活”过来,并且选它作为leader选择第一个“活”过来的replica(并不一定是在ISR中)作为leader(缺省)

这需要在 数据的可靠性 与 系统的性能 中作出一个选择: 3. 如果一定要等待ISR中的replica“活”过来,那不可用的时间就可能会相对较长。而且如果ISR中所有的replica都无法“活”过来了,这个partition将永远不可用。 4. 选择第一个“活”过来的replica作为leader,而这个replica不是ISR中的replica,那即使它并不包含了所有已commit的消息,它也会成为leader而作为consumer的数据源。 5. 默认情况下,Kafka采用第二种策略,即unclean.leader.election.enable=true。也可以将此参数设置为false来启用第一种策略。

8. Consumer的策略

Push vs Pull kafka选择了以pull的方式将数据传递给consumer。pull的方式更灵活:消息发送频率应该如何,消息是否可以延迟然后batch发送,这些信息只有消费者自己最清楚;

把控制权交给消费者,消费者自己控制消费的速率,当消费者处理消息很慢时,它可以选择减缓消费速率;当处理消息很快时,它可以选择加快消费速率。而在push的方式下,要实现这种灵活的控制策略,就需要额外的协议,让消费者告诉broker,要减缓还是加快消费速率,这增加了实现的复杂性。

另外pull的方式下,消费者可以很容易的自适应控制消息是batch的发送,还是最低限度的减少延迟,每来1个就发送1个。

9. 消费的 confirm

在消费端,所有消息队列都要解决的一个问题就是“消费确认问题”:消费者拿到一个消息,然后处理这个消息的时候挂了,如果这个时候broker认为这个消息已经消费了,那这条消息就丢失了。

一个解决办法就是,消费者在消费完之后,再往broker发个confirm消息。broker收到confirm消息之后,再把消息删除。 要实现这个,broker就要维护每个消息的状态,已发送/已消费,很显然,这会增大broker的实现难度。同时,这还有另外一个问题,就是消费者消费完消息,发送confirm的时候,挂了。这个时候会出现重复消费的问题。

kafka没有直接解决这个问题,而是引入offset回退机制,变相解决了这个问题。在kafka里面,消息缺省会存放一个星期,才会被删除。并且在一个partion里面,消息是按序号递增的顺序存放的,因此消费者可以回退到某一个历史的offset,进行重新消费。

对于重复消费的问题,需要消费者去解决。

10. Kafka减少消息丢失配置

Kafka会不会丢数据,通常不会,但某些情况下的确有可能会发生。 下面的参数配置可以在一定程度上减少数据的丢失,当然牺牲了系统的吞吐量。 acks = -1; // 所有follower都响应了才认为消息提交成功,即"committed" min.insync.replicas = 2;//消息至少要被写入到这么多副本才算成功,与acks配合使用 unclean.leader.election.enable = false 不允许非ISR中的副本被选举为leader,以避免数据丢失 replication.factor = 3; // 参考Hadoop及业界通用的三备份原则 使用KafkaProducer.send(record, callback) // 不使用send(record)方法,自定义回调逻辑处理消息发送失败 block.on.buffer.full = true; // 队列满了,客户端是阻塞。即producer将一直等待缓冲区直至其变为可用 replication.factor > min.insync.replicas 如果两者相等,当一个副本挂掉了分区也就没法正常工作了 通常设置:replication.factor = min.insync.replicas + 1 即可

五. 设计原理

1、持久化(Persistence)

Kafka是高度依赖文件系统和缓存的,Kafka对磁盘时append操作,磁盘检索的开支是较少的,同时为了减少磁盘写入的次数,broker会将消息暂时缓存起来,当消息的数量(offset)达到一定阀值时(可配置 offset可以设置为自动提交或者手动提交),再刷新到磁盘中,这样减少了磁盘的开销。

2、生产者(producer)

producer用于往broker中发送/生产消息,每一个broker中可以有多个topic,每个topic下面又会有多个partition,在负载均衡的情况,如果均衡的将消息发送到指定的partition中。 异步处理:将多条消息存储在buffer中,之后,批量的提交到broker中,从而提高了网络IO。采用异步发送机制时,如果producer异常或者失效,消息将会丢失。

3、消费者(Consumer)

Consumer通过链接broker,采用 pull 方式来拉取broker中的数据,consumer根据自己的消费能力,去消费信息。

为什么采用 pull 的方式而不是采用 push 呢,因为pull模式中: broker不需要感知有多少个consumer; 如果采用push模式一旦消息量级超过consumer的承受范围,会压垮consumer;

consumer每消费完一条信息之后,kafka会自动的提交offset;

也可以设置为手动提交,如当消费100000条消息之后,offset存储在一个list中,达到一定的消息量之后,提交这个offset list。

4、常量时间存储能力

一个实现了数据持久化的队列,提供简单的数据读和数据追加写到文件末尾。这种数据结构的最大优势是所有操作的算法复杂度都是O(1),磁盘读、写也不会互相阻塞。这使得Kafka具有了一个显著的性能优势。

在这里性能与数据量实现了完全得解耦。一台服务器现在可以轻松利用到一组廉价的低转速、大容量磁盘能够提供的各种优势,虽然这些磁盘只有可怜的寻道速度但仍然能够基于大块数据的读、写提供可以接受的性能。

拥有访问几乎无限的磁盘空间的能力,却不会有任何性能惩罚,这意味着用户可以基于Kafka实现一些在传统消息中间件中很少看到的特性。在传统消息中间件系统中往往会在消息一旦被获取后立即尝试删除该消息数据,而Kafka能够为消息数据保留一个相对来说很长的时间(如一周)。这一特性,可为消息消费端提供了大量的灵活性。

5、消费分发机制

At most once 消息可能会丢,但绝不会重复传输 At least once 消息绝不会丢,但可能会重复传输 Exactly once 每条消息肯定会被传输一次且仅传输一次;这是我们努力追求的;

当Producer向broker发送消息时,一旦这条消息被commit,因为有多个replication的存在,它就不会丢。但是如果Producer发送数据给broker后,遇到网络问题而造成通信中断,那Producer就无法判断该条消息是否已经commit。虽然Kafka无法确定网络故障期间发生了什么,但是Producer可以生成一种类似于主键的东西,发生故障时幂等性的重试多次,这样就做到了Exactly once。

Consumer在从broker读取消息后,可以选择commit,该操作会在Zookeeper中保存该Consumer在该Partition中读取的消息的offset。该Consumer下一次再读该Partition时会从下一条开始读取。如未commit,下一次读取的开始位置会跟上一次commit之后的开始位置相同。当然可以将Consumer设置为auto commit,即Consumer一旦读到数据立即自动commit。如果只讨论这一读取消息的过程,那Kafka是确保了Exactly once。但实际使用中应用程序并非在Consumer读取完数据就结束了,而是要进行进一步处理,而数据处理与commit的顺序在很大程度上决定了消息从broker和consumer的delivery guarantee semantic(交付传递保证语义)

读完消息先commit再处理消息。如果Consumer在commit后还没来得及处理消息就crash了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息,这对应于At most once;

读完消息先处理再commit。如果在处理完消息之后commit之前Consumer crash了,下次重新开始工作时还会处理刚刚未commit的消息,实际上该消息已经被处理过了。这对应于At least once。在很多使用场景下,消息都有一个主键,所以消息的处理往往具有幂等性,即多次处理这一条消息跟只处理一次是等效的,那就可以认为是Exactly once。(这种说法比较牵强,因为它不是Kafka本身提供的机制,主键本身也并不能完全保证操作的幂等性。而且实际上我们说 delivery guarantee 语义是讨论被处理多少次,而非处理结果怎样,因为处理方式多种多样,我们不应该把处理过程的特性——如是否幂等性,当成Kafka本身的Feature)

如果一定要做到Exactly once,就需要协调offset和实际操作的输出。经典的做法是引入两阶段提交。让offset和操作输入存在同一个地方,或者说放入同一个事务中。许多输出系统可能不支持两阶段提交。如 Consumer 拿到数据后可能把数据放到HDFS,如果把最新的offset和数据本身一起写到HDFS,那就可以保证数据的输出和offset的更新要么都完成,要么都不完成,间接实现Exactly once。

6、复制 Replication

kafka将每个partition数据复制到多个server上,任何一个partition有一个leader和多个follower(可以没有)。

Kafka自动维护leader和follower的失效转移。leader处理所有的read-write请求,follower需要和leader保持同步。

leader负责跟踪所有的follower状态,如果follower“落后”太多或者失效,leader将会把它从replicas同步列表中删除。当所有的follower都将一条消息保存成功,此消息才被认为是“committed”,此时consumer才能消费它。即使只有一个replicas实例存活,仍然可以保证消息的正常发送和接收,只要zookeeper集群存活即可。Producer端也可以通过参数“acks”来控制是否要等待消息返“committed”的响应。

当leader失效时,需在followers中选取出新的leader。只有处于“in-sync”状态的followers才有参选资格。可能此时follower落后于leader,因此需要选择一个"up-to-date"的follower。选择follower时需要兼顾一个问题就是新leader server上所已经承载的partition leader的个数。如果一个server上有过多的partition leader,意味着此server将承受着更多的IO压力。在选举新leader时需要考虑到"负载均衡"。follower需要能够维护和ZooKeeper之间一个有效的会话,否则也会被判定为”unalive”。

7. Kafka 与 Zookeeper

Kafka的使用依赖于Zookeeper,安装Kafka前必须先安装Zookeeper;

Kafka将元数据信息保存在zookeeper中,发送给Topic本身的数据是不会发到zookeeper上的;

kafka使用zookeeper来实现动态的集群扩展,不需要更改客户端(producer和consumer)的配置;

broker会在zookeeper注册并保持相关的元数据(topic,partition信息等)更新;

客户端会在zookeeper上注册相关的watcher。一旦zookeeper发生变化,客户端能及时感知并作出相应调整。这样就保证了添加或去除broker时,各broker间仍能自动实现负载均衡。

五. kafka常用命令

1、创建topic

# replication-factor表示该topic需要在不同的broker中保存几份 # replication-factor必须小于等于 broker 的个数

kafka-topics.sh --zookeeper node3:2181/kafka0.11 --create --topic mykafka1 --partitions 3 --replication-factor 2

# 列出全部的topic

kafka-topics.sh --zookeeper node3:2181/kafka0.11 --list

# 查看topic的状态

kafka-topics.sh --zookeeper node3:2181/kafka0.11 --describe --topic mykafka1

备注:--zookeeper node3:2181/kafka0.11 必须指明 ISR(In-Sync Replicas)副本同步队列

2、创建生产者

kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9093 --topic mykafka1 并输入一些信息:xxx yyy zzz

3、创建消费者

kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9093 --topic mykafka1 # 旧版本的命令(高版本中将被废弃) kafka-console-consumer.sh --zookeeper node1:2181,node2:2181,node3:2181/kafka0.11 --topic mykafka1

4、查询 topic 的所有记录数

kafka-run-class.sh kafka.tools.GetOffsetShell --topic mykafka1 --broker-list node1:9092,node2:9092,node3:9092

# 查询 topic 在分区上的记录数

kafka-run-class.sh kafka.tools.GetOffsetShell --topic mykafka1 --broker-list node1:9092,node2:9092,node3:9092 --partitions 0

5、查询 topic 的消费者情况

kafka-consumer-groups.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --describe --group 1

zkCli.sh -server node3:12181 # 查看zookeeper中的内容 问题:如何删除已经存在的topic kafka-topics.sh --zookeeper node3:12181/kafka0.11 --delete --topic mykafka1 命令执行后有如下返回信息: Topic mykafka1 is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true.

应该将 config/server.properties 文件这行(delete.topic.enable=true)最前面的注释去掉,即让这一行生效!

还可以参考这篇文章: http://blog.csdn.net/xbs1019/article/details/54917371

6.IDEA 配置

编写kafka生产者程序,需要调用kafka的API,对应的jar在$KAFKA_HOME/lib中 编写kafka消费者程序,需要调用Spark Streaming的API,对应的jar需要下载

使用maven管理jar,需要在pom.xml文件中添加: <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.11.0.2</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>2.2.0</version> </dependency> 如果未使用Maven,需要引入相应的jar (在$SPARK_HOME/jars中创建目录kafka, 将$KAFKA_HOME/lib中的jar拷贝过去; 下载spark-streaming-kafka-0-8_2.11-2.2.0.jar放在$SPARK_HOME/jars 中)

六 .案例

1. 一个简单生产者的实现

import java.util.Properties import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} // 最简单的消息发送方式 object KafkaProducer1 { def main(args: Array[String]): Unit = { // brokers的地址,可以写多个。至少写2个避免单点故障 val brokers = "node1:9092, node2:9092" val topic = "mykafka" // Properties类实现了Map接口,本质上是一种简单的Map容器 val kafkaProps = new Properties() // 要往Kafka写入消息,首先要创建一个生产者对象,并设置一些属性。Kafka生产者有3个必选的属性 // bootstrap.servers,指定broker的地址清单。不需要包含所有的broker地址,生产者会从给定的broker里查找其他broker的信息,建议至少提供2个broker的信息 // key.serializer 将指定的值序列化。必选设置,即使key为空。 value.serializer 将value的值序列化 kafkaProps.put("bootstrap.servers", brokers) kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") // 使用字符串常量更好,避免输入错误 // kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092, node2:9092") // kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") // kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") // KafkaProducer是一个泛型,必须指定key、value的类型 val producer = new KafkaProducer[String, String](kafkaProps) // topic, key, value(可以不指定类型) val record = new ProducerRecord[String, String](topic, "", "my test 中文测试") producer.send(record) producer.close() } }

2. 生产者(同步/异步发送数据)

import java.util.{Properties, UUID} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata} import org.apache.log4j.{Level, Logger} object KafkaProducer2{ def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.WARN) val brokers = "node1:9092, node2:9092, node3:9092" val mytopic = "mykafka1" val kafkaProps = new Properties() kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") // 上面的参数必须指定,以下为可选参数 kafkaProps.put(ProducerConfig.ACKS_CONFIG, "1") kafkaProps.put(ProducerConfig.RETRIES_CONFIG, "3") // 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。16K kafkaProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384") // 默认情况下,只要有可用的线程,生产者就会把消息发送出去,就算批次里只有一个消息。 // KafkaProducer 会在批次填满或 linger.ms 达到上限时把批次发送出去。 kafkaProps.put(ProducerConfig.LINGER_MS_CONFIG, "100") // 设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。32M kafkaProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432") val producer = new KafkaProducer[String, String](kafkaProps) // 缺省异步发送 for (i <- 1 to 100) { val record = new ProducerRecord(topic, i.toString, s"$i") producer.send(record) } // 同步发送 for (i <- 1 to 88) { val record = new ProducerRecord[String, String](mytopic, s"$i, ${UUID.randomUUID}") // 程序阻塞,直到该条消息发送成功返回元数据信息或者报错 val metadata: RecordMetadata = producer.send(record).get println(s"(topic, partition, offset) : (${metadata.topic()}, ${metadata.partition()}, ${metadata.offset})") } producer.close() } } // 检查topic中的数据 // kafka-run-class.sh kafka.tools.GetOffsetShell --topic mykafka1 --broker-list node1:9092

最新回复(0)