kafak ProducerConfigs配置和consumer含义

mac2024-05-20  34

Producer Configs 配置

属性描述类型默认值bootstrap.servers用于建立与kafka集群的连接,这个list仅仅影响用于初始化的hosts,来发现全部的servers。 格式:host1:port1,host2:port2,…,数量尽量不止一个,以防其中一个down了list acksServer完成 producer request 前需要确认的数量。acks=0时,producer不会等待确认,直接添加到socket等待发送;acks=1时,等待leader写到local log就行;acks=all或acks=-1时,等待isr中所有副本确认 (注意:确认都是 broker 接收到消息放入内存就直接返回确认,不是需要等待数据写入磁盘后才返回确认,这也是kafka快的原因)string1buffer.memoryProducer可以用来缓存数据的内存大小。该值实际为RecordAccumulator类中的BufferPool,即Producer所管理的最大内存。 如果数据产生速度大于向broker发送的速度,producer会阻塞max.block.ms,超时则抛出异常long33554432compression.typeProducer用于压缩数据的压缩类型,取值:none, gzip, snappy, or lz4stringnonebatch.sizeProducer可以将发往同一个Partition的数据做成一个Produce Request发送请求,即Batch批处理,以减少请求次数,该值即为每次批处理的大小。 另外每个Request请求包含多个Batch,每个Batch对应一个Partition,且一个Request发送的目的Broker均为这些partition的leader副本。 若将该值设为0,则不会进行批处理int16384linger.msProducer默认会把两次发送时间间隔内收集到的所有Requests进行一次聚合然后再发送,以此提高吞吐量,而linger.ms则更进一步,这个参数为每次发送增加一些delay,以此来聚合更多的Message。 官网解释翻译:producer会将request传输之间到达的所有records聚合到一个批请求。通常这个值发生在欠负载情况下,record到达速度快于发送。但是在某些场景下,client即使在正常负载下也期望减少请求数量。这个设置就是如此,通过人工添加少量时延,而不是立马发送一个record,producer会等待所给的时延,以让其他records发送出去,这样就会被聚合在一起。这个类似于TCP的Nagle算法。该设置给了batch的时延上限:当我们获得一个partition的batch.size大小的records,就会立即发送出去,而不管该设置;但是如果对于这个partition没有累积到足够的record,会linger指定的时间等待更多的records出现。该设置的默认值为0(无时延)。例如,设置linger.ms=5,会减少request发送的数量,但是在无负载下会增加5ms的发送时延。long0max.request.size请求的最大字节数。这也是对最大消息大小的有效限制。注意:server具有自己对消息大小的限制,这些大小和这个设置不同。此项设置将会限制producer每次批量发送请求的数目,以防发出巨量的请求。int1048576receive.buffer.bytesTCP的接收缓存 SO_RCVBUF 空间大小,用于读取数据int32768request.timeout.msclient等待请求响应的最大时间,如果在这个时间内没有收到响应,客户端将重发请求,超过重试次数发送失败int30000send.buffer.bytesTCP的发送缓存 SO_SNDBUF 空间大小,用于发送数据int131072timeout.ms指定server等待来自followers的确认的最大时间,根据acks的设置,超时则返回errorint30000max.in.flight.requests.per.connection在block前一个connection上允许最大未确认的requests数量。 当设为1时,即是消息保证有序模式,注意:这里的消息保证有序是指对于单个Partition的消息有顺序,因此若要保证全局消息有序,可以只使用一个Partition,当然也会降低性能int5metadata.fetch.timeout.ms在第一次将数据发送到某topic时,需先fetch该topic的metadata,得知哪些服务器持有该topic的partition,该值为最长获取metadata时间long60000reconnect.backoff.ms连接失败时,当我们重新连接时的等待时间long50retry.backoff.ms在重试发送失败的request前的等待时间,防止若目的Broker完全挂掉的情况下Producer一直陷入死循环发送,折中的方法long100

其余参数(注:以下均为默认值)

生产者配置参数释义

1.bootstrap.servers 指定Kafka集群所需的broker地址清单,默认 ""

2.metadata.max.age.ms 强制刷新元数据时间,毫秒,默认300000,5分钟

3.batch.size 指定ProducerBatch内存区域的大小,默认16kb

4.acks 指定分区中必须有多少个副本收到这条消息,才算消息发送成功,默认值1,字符串类型

5.linger.ms 指定ProducerBatch在延迟多少毫秒后再发送,但如果在延迟的这段时间内batch的大小已经到了batch.size设置的大小,那么消息会被立即发送,不会再等待,默认值0

6.client.id 用户设定,用于跟踪记录消息,默认 ""

7.send.buffer.bytes Socket发送缓冲区大小,默认128kb,-1将使用操作系统的设置

8.receive.buffer.bytes Socket接收缓冲区大小,默认32kb,-1将使用操作系统的设置

9.max.request.size 限制生产者客户端发送消息的最大值,默认1MB

10.reconnect.backoff.ms 连接失败后,尝试连接Kafka的时间间隔,默认50ms

11.reconnect.backoff.max.ms 尝试连接到Kafka,生产者客户端等待的最大时间,默认1000ms

12.max.block.ms 控制生产者客户端send()方法和partitionsFor()方法的阻塞时间。当生产者的发送缓存区已满,或者没有可用元数据时,这些方法就会阻塞,默认60s

13.buffer.memory 生产者客户端中用于缓存消息的缓存区大小,默认32MB

14.retry.backoff.ms 消息发送失败重试时间间隔,默认100ms

15.compression.type 指定消息的压缩方式,默认不压缩

16.metrics.sample.window.ms 样本计算时间窗口,默认30000ms

17.metrics.num.samples 用于维护metrics的样本数量,默认2

18.metrics.log.level metrics日志记录级别,默认info

19.metric.reporters 类的列表,用于衡量指标,默认空list

20.max.in.flight.requests.per.connection 可以在一个connection中发送多个请求,叫作一个flight,这样可以减少开销,但是如果产生错误,可能会造成数据的发送顺序改变,默认5

21.retries 消息发送失败重试次数,默认0

22.key.serializer key的序列化方式

23.value.serializer value序列化类方式

24.connections.max.idle.ms 设置多久之后关闭空闲连接,默认540000ms

25.partitioner.class 分区类,实现Partitioner接口,可以自定义分区规则

26.request.timeout.ms 客户端将等待请求的响应的最大时间,如果在这个时间内没有收到响应,客户端将重发请求,超过重试次数将抛异常,默认30000ms

27.interceptor.classes 拦截器类,实现ProducerInterceptor接口,自定义拦截器

28.enable.idempotence true为开启幂等性

29.transaction.timeout.ms 事务超时时间,默认60000ms

30.transactional.id 设置事务id,必须唯一

消费者配置参数释义

1.group.id 消费者所属消费组的唯一标识

2.max.poll.records 一次拉取请求的最大消息数,默认500条

3.max.poll.interval.ms 指定拉取消息线程最长空闲时间,默认300000ms

4.session.timeout.ms 检测消费者是否失效的超时时间,默认10000ms

5.heartbeat.interval.ms 消费者心跳时间,默认3000ms

6.bootstrap.servers 连接集群broker地址

7.enable.auto.commit 是否开启自动提交消费位移的功能,默认true

8.auto.commit.interval.ms 自动提交消费位移的时间间隔,默认5000ms

9.partition.assignment.strategy 消费者的分区配置策略, 默认 RangeAssignor

10.auto.offset.reset 如果分区没有初始偏移量,或者当前偏移量服务器上不存在时,将使用的偏移量设置,earliest从头开始消费,latest从最近的开始消费,none抛出异常

11.fetch.min.bytes 消费者客户端一次请求从Kafka拉取消息的最小数据量,如果Kafka返回的数据量小于该值,会一直等待,直到满足这个配置大小,默认1b

12.fetch.max.bytes 消费者客户端一次请求从Kafka拉取消息的最大数据量,默认50MB

13.fetch.max.wait.ms 从Kafka拉取消息时,在不满足fetch.min.bytes条件时,等待的最大时间,默认500ms

14.metadata.max.age.ms 强制刷新元数据时间,毫秒,默认300000,5分钟

15.max.partition.fetch.bytes 设置从每个分区里返回给消费者的最大数据量,区别于fetch.max.bytes,默认1MB

16.send.buffer.bytes Socket发送缓冲区大小,默认128kb,-1将使用操作系统的设置

17.receive.buffer.bytes Socket发送缓冲区大小,默认64kb,-1将使用操作系统的设置

18.client.id 消费者客户端的id

19.reconnect.backoff.ms 连接失败后,尝试连接Kafka的时间间隔,默认50ms

20.reconnect.backoff.max.ms 尝试连接到Kafka,生产者客户端等待的最大时间,默认1000ms

21.retry.backoff.ms 消息发送失败重试时间间隔,默认100ms

22.metrics.sample.window.ms 样本计算时间窗口,默认30000ms

23.metrics.num.samples 用于维护metrics的样本数量,默认2

24.metrics.log.level metrics日志记录级别,默认info

25.metric.reporters 类的列表,用于衡量指标,默认空list

26.check.crcs 自动检查CRC32记录的消耗

27.key.deserializer key反序列化方式

28.value.deserializer value反序列化方式

29.connections.max.idle.ms 设置多久之后关闭空闲连接,默认540000ms

30.request.timeout.ms 客户端将等待请求的响应的最大时间,如果在这个时间内没有收到响应,客户端将重发请求,超过重试次数将抛异常,默认30000ms

31.default.api.timeout.ms 设置消费者api超时时间,默认60000ms

32.interceptor.classes 自定义拦截器

33.exclude.internal.topics 内部的主题:一consumer_offsets 和一transaction_state。该参数用来指定 Kafka 中的内部主题是否可以向消费者公开,默认值为 true。如果设置为 true,那么只能使用 subscribe(Collection)的方式而不能使用 subscribe(Pattern)的方式来订阅内部主题,设置为 false 则没有这个限制。

34.isolation.level 用来配置消费者的事务隔离级别。如果设置为“read committed”,那么消费者就会忽略事务未提交的消息,即只能消 费到 LSO (LastStableOffset)的位置,默认情况下为 “read_uncommitted”,即可以消 费到 HW (High Watermark)处的位置

最新回复(0)