点对点模型:
A发送的消息只能被B接收,其他任何系统都不能读取A发送的消息;
发布/订阅模型:
多个发布者可以向相同的主题发送消息,订阅者也可能存在多个,它们都能接收到相同主题的消息。
kafka同时支持以上两种消息模型。
缓冲上下游瞬时突发流量,使其平滑。特别是对于那种发送能力很强的上游系统,如果没有消息引擎保护,下游系统可能会由于没有消息引擎的保护被压垮导致全链路服务“雪崩”。消息引擎能够有效的对抗上有的冲击,真正做到将上游的“峰”填到“谷”,避免流量的震荡。 消息引擎另一大好处是发送方与接收方的松耦合,简化应用的开发和系统间不必要的交互。
第一层是主题层:每个主题配置M个分区,每个分区又可以配置N个副本;
第二层是分区层:每个分区的N个副本只能有一个充当领导者(Leader)的角色,并对外提供服务,其他的N-1个副本只是追随者副本(Follower),只提供数据冗余用;第三层是消息层:分区中包含若干个消息,每条消息的位移从0开始,一次递增;客户端程序只能与分区的领导者副本(Leader)进行交互。
消息:record 消息引擎处理的主要对象;
主题:Topic 承载消息的逻辑容器,实际应用中多用来区分不同的业务;分区:Partition 有序不变的消息序列,每个主题可以有多个分区;消息位移:Offset 表示分区中每条消息的位置,单调递增且不变的值;副本:Replica 一条消息被拷贝到多个地方进行数据冗余,副本分为Leader和Follower生产者:Producer 向主题发布新消息的应用程序;消费者:Consumer 从主题订阅新消息的应用程序;消费者位移:Consumer Offset 表征消费者消费进度,每个消费者都有自己的消费者位移;消费者组:Consumer Group 多个消费者实例组成的一个组,同时消费多个分区以实现高吞吐;重平衡:Rebalance 消费者组内某个消费者实例挂掉后,其它消费者实例自动重新分配订阅主题分区的过程。Rebanlance是kafka实现消费者端实现高可用的重要手段。
Kakfa Broker集群受Zookeeper管理。
所有的Kafka Broker节点一起去Zookeeper上注册一个临时节点,因为只有一个Kafka Broker会注册成功,其他的都会失败,所以这个成功在Zookeeper上注册临时节点的这个Kafka Broker会成为Kafka Broker Controller,其他的Kafka broker叫Kafka Broker follower。(这个过程叫Controller在ZooKeeper注册Watch)。
这个Controller会监听其他的Kafka Broker的所有信息,如果这个kafka broker controller宕机了,在zookeeper上面的那个临时节点就会消失,此时所有的kafka broker又会一起去Zookeeper上注册一个临时节点,因为只有一个Kafka Broker会注册成功,其他的都会失败,所以这个成功在Zookeeper上注册临时节点的这个Kafka Broker会成为Kafka Broker Controller,其他的Kafka broker叫Kafka Broker follower。
例如:一旦有一个broker宕机了,这个kafka broker controller会读取该宕机broker上所有的partition在zookeeper上的状态,并选取ISR列表中的一个replica作为partition leader(如果ISR列表中的replica全挂,选一个幸存的replica作为leader; 如果该partition的所有的replica都宕机了,则将新的leader设置为-1,等待恢复,等待ISR中的任一个Replica“活”过来,并且选它作为Leader;或选择第一个“活”过来的Replica(不一定是ISR中的)作为Leader),这个broker宕机的事情,kafka controller也会通知zookeeper,zookeeper就会通知其他的kafka broker。
kafka使用消息日志(Log)保存数据,一个日志就是磁盘上一个只能追加写(append only)消息的物理文件。追加写避免了缓慢的随机访问I/O操作改为性能较好的顺序I/O写操作,这也是kafka实现高吞吐量的一个重要手段。
不停的向一个日志里面写消息,最终会耗尽磁盘空间,因此必须定期的删除消息回收磁盘空间。怎么删除呢?
通过日志段机制(Log segment),在kafka的底层,一个日志又进一步细分为多个日志段,消息被追加写到当前最新的日志段中,当写满一个日志段后,kafka会自动切分出一个新的日志段,并将老的日志段封存起来。kafka在后台还有定时任务定期的检查老的日志段是否能够被删除,从而实现磁盘回收的目的。
第一步查找segment file 上述图2为例,其中00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0.第二个文件 00000000000000368769.index的消息量起始偏移量为368770 = 368769 + 1.同样,第三个文件00000000000000737337.index的起始偏移量为737338=737337 + 1,其他后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据offset 二分查找文件列表,就可以快速定位到具体文件。
当offset=368776时定位到00000000000000368769.index|log
第二步通过segment file查找message通过第一步定位到segment file,当offset=368776时,依次定位到00000000000000368769.index的元数据物理位置和 00000000000000368769.log的物理偏移地址,然后再通过00000000000000368769.log顺序查找直到 offset=368776为止
JVM参数:
操作系统参数:
主题-分区-消息三次结构:
分区的主要原因是:提供负载均衡的能力,提高系统的伸缩性(Scalability),不同的分区放到不同的Broker节点上,数据的读写也是针对分区这个粒度进行的,这样每个节点的机器都能独立的执行各自分区的读写请求处理,同时还可以通过添加新的节点来提升系统的吞吐量。
在MongoDB/Elasticksearch/HBase等系统也有分区的思想,Shard,Region
kafka允许为每条消息定义消息键,简称key。它可以是一个有着明确业务含义的字段,也可以用来表征消息元数据。一旦消息被定义了Key就可以保证同一个Key的所有消息都进入相同的分区里面,由于单个分区的消息处理是有序的,因此这个策略被称为消息键保序策略。
压缩的目的:时间换空间,带来更少的磁盘占用以及更少的网络I/O传输。
压缩可能发生在两个地方:生产者端和Broker端。
生产者配置:compression.type 参数表示启用指定类型的压缩算法。
大部分情况下Broker接收到Producer的消息只是原封不动的将其保存而不会对其进行任何修改。
特殊情况:
Broker端指定了和Producer端不同的压缩算法。例如:Producer端使用了GZIP但Broker指定了Snappy,那么Broker端就需要先把GZIP格式的消息解压再压缩成Snappy格式。这种情况会导致CPU使用率飙升。Broker端发生了消息格式转换。有压缩就有解压缩,通常情况下生产者发送压缩消息到Broker后,Broker只是原封不动的保存起来,当消费者请求这部分数据的时候Broker依然原样发送出去,当消息到达Consumer端后,由Consumer自行解压缩还原成之前的消息。
Consumer怎么知道要使用哪种解压缩算法?
kafka会把启用了哪种压缩算法封装进消息集合中,当Consumer读取到消息集合时自然知道用哪种解压算法。
Producer端压缩,Broker保持,Consumer解压缩。
生产者CPU资源紧张不建议开启压缩,带宽紧张建议开启压缩。
kafka只对已提交的消息做有限度的持久化保证。
Producer应用向kafka发送消息,最后发现kafka并没有保存。
kafka Producer是异步发送消息的,如果调用了producer.send(msg)这个API,通常会立即返回,但此时我们并不能认为消息已经成功发送完成。 producer.send(msg)属于典型的“fire and forget”,因此如果出现消息丢失我们是无法知晓的,因此这个方式不太靠谱。
使用这个方式哪些情况会出现消息丢失? 网络抖动,消息压根就没有发送的Broker端,消息不合格被Broker拒绝接收等。
使用带有回调通知的发送API,producer.send(msg, callback),callback能准确的告诉你消息是否发送成功,当出现发送失败的情况我们可以做相应的处理。比如那些瞬时错误可以通过重试,消息不合格造成的可以通过调整消息格式后再发送。
Consumer端丢失数据主要体现在Consumer要消费的消息不见了。Consumer使用位移的概念来表示这个Consumer当前消费到的Topic分区的位置
如果先更新位移再消费消息,那么当消费失败时这部分消息就丢失了。因此需要维持先消费消息再更新位移,虽然会出现消息重复处理但是不会丢消息。
另一种比较隐蔽的消息丢失场景:Consumer程序从kafka获取消息后开启多个线程处理消息,而Consumer程序自动的向前更新位移,当某个线程在处理过程中失败了,消息没有被正常处理,但是位移已经被更新,此时该消息实际上是丢失了。
如果是多线程异步处理消息,Consumer程序不要开启自动提交位移,而是要应用程序手动提交位移。
kafka无消息丢失的配置:
不要使用producer.send(msg),而要使用producer.send(msg,calback);设置acks=all。acks是Producer的一个参数,代表对已提交消息的定义,设置成all表示所有的副本Broker都要接收到消息该消息才算提交成功,这是最高等级的“已提交”定义;设置retries为一个较大的值。也是Producer的参数,自动重试;设置unclean.leader.election.enable=false。这是Broker端的参数,控制哪些Broker有资格竞选分区的Leader.如果一个Broker落后原先的Leader太多,那么它一旦成为新的Leader,必然会造成消息丢失。故一般都要把改参数设置成false,避免该情况发生;设置replication.factor >= 3。Broker端参数,把消息多保存几份。毕竟防止消息丢失的主要机制是数据冗余。设置min.insyc.replicas > 1。Broker端参数,表示消息至少被写入多少个副本才算“已提交”。默认值为1.确保replication.factor > min.insyc.replicas。如果两者相等,那么只要有个副本挂机,整个分区就无法正常工作了,改善消息的持久性,防止数据丢失的前提是不降低可用性。推荐 replication.factor = min.insyc.replicas + 1。确保消息消费完再提交。Consumer端有个参数enable.auto.commit,最好设置成false,并采用手动提价位移的方式。Rabalance本质是一种协议,规定了一个Consumer Group下的所有Consumer如何达成一致来分配订阅的Topic的每个分区。
ex:一个包含100个分区的Topic分配给一个包含20个Cobsumer实例的消费者组,正常情况下每个Consumer会被分配5个分区
ex:加入Consumer实例C
当kafka集群中的第一个Consumer程序启动时,kafka会自动创建位移主题;
将Consumer的位移数据作为普通的kafka消息提交到__consumer_offsets中,__consumer_offsets的主要作用是保存消费者的位移信息;
位移主题是个普通的kafka主题,但它的消息格式由kafka自定义,用户不能修改,一旦写入的消息不满足格式,kafka内部无法解析就会造成Broker崩溃;
消息格式:类似K,V,其中K包含Group id/topic/分区号分区数由ofssets.topic.num.partitions参数控制,默认为50,副本数由offsets.topic.replication.factor参数控制,默认为3;如果不满意kafka自动创建位移主题的默认值(比如分区数太多),可以在集群中没有任何consumer启动的情况下通过kafka API手动创建位移主题。
kafka使用Compact策略来删除位移主题中的过去消息:
图中位移为0、2和3的消息的Key都是K1,Compact之后分区只需要保存最新发送的位移为3的消息。
kafka提供了专门的后台线程(Log Cleaner)定期的检查待Compact的主题,看是否存在满足条件的可删除数据。如果生产环境位移主题无限膨胀占用了过多的磁盘空间,那么有可能是因为Log Cleaner线程挂了导致的。
转载于:https://www.cnblogs.com/llgogo/p/11289390.html