springboot + @KafkaListener 手动提交及消费能力优化

mac2022-06-30  34

转载 https://blog.csdn.net/asd5629626/article/details/82776450  https://blog.csdn.net/asd5629626/article/details/82746771

spring-boot 版本 1.5.12

依赖使用spring-kafka1.3.3(对应kafka-clients版本0.11.0.0,请使用于kafka版本对应版本的依赖)

<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.12.RELEASE</version> <relativePath/> </parent> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.3.3.RELEASE</version> </dependency>

1、自定义监听工厂  (resources目录下面kafka.properties文件中定义对应参数)

##============== kafka =====================kafka.consumer.bootstrap.servers = 192.168.11.133:9092kafka.consumer.session.timout.ms = 15000kafka.consumer.max.poll.interval.ms = 300000kafka.consumer.max.poll.records = 500kafka.consumer.heartbeat.interval.ms = 3000kafka.consumer.group.id = person-file-manage

#消费者并发启动个数(对应分区个数)每个listener方法

kafka.concurrency=10

@Configuration@EnableKafkapublic class KafkaConsumerConfig { @Value("${kafka.consumer.bootstrap.servers}") private String servers; @Value("${kafka.consumer.session.timout.ms}") private String sessionTimeout; @Value("${kafka.consumer.max.poll.interval.ms}") private String pollInterval; @Value("${kafka.consumer.max.poll.records}") private String pollRecords; @Value("${kafka.consumer.heartbeat.interval.ms}") private String heartbeatInterval; @Value("${kafka.consumer.group.id}") private String groupId; /** * 消费者基础配置 * * @return Map */ private Map<String, Object> consumerProps() { Map<String, Object> props = new HashMap<>(9); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, pollInterval); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, pollRecords); props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, heartbeatInterval); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } /** * 自定义 ConcurrentKafkaListenerContainerFactory 初始化消费者 * * @return ConcurrentKafkaListenerContainerFactory */ @Bean("ackContainerFactory") public ConcurrentKafkaListenerContainerFactory ackContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps())); factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE); return factory; } /** * 将监听者注入到IOC中,也可以采用注解方式,此方式只是为了便于确定监听者的分布 * * @return MqSinkReceiver */ @Bean public MqSinkReceiver listener() { return new MqSinkReceiver(); }}

 

2、监听器

public class MqSinkReceiver { @Autowired private MqListener mqListener; private final LoggerUtilI logger = LoggerUtilI.getLogger(this.getClass().getName()); /** * 归档统计 * * @param consumerRecord 消息体 * @param ack Acknowledgment */ @KafkaListener(id = "clusterPersonfileConsumer", topics = {"personfile-new-clustering"}, containerFactory = "ackContainerFactory") public void inputPersonfileNewCluster(ConsumerRecord consumerRecord, Acknowledgment ack) { if (consumerRecord != null) { JSONObject jsonParam = JSONObject.parseObject(consumerRecord.value().toString()); logger.info("接收到数据平台的归档kafka消息" + jsonParam.toString()); try { mqListener.clusterStatistic(jsonParam); if (ack != null) { ack.acknowledge(); } } catch (BusinessException | ParseException e) { logger.error("归档统计异常:" + e); } } }}  

3、spring-boot容器即可

#消费者并发启动个数(对应分区个数)每个listener方法kafka.concurrency=10 将启动器的并发提高到和分区数一致

 

kafka 消费能力的提高

1、自动提交的实现

2、autoCommitIntervalMs 设置每次隔多久自动提交offset

3、kafka.max.poll.interval.ms 和 sessionTimeout

max.poll.interval.ms ,它表示最大的poll数据间隔,如果超过这个间隔没有发起pool请求,但heartbeat仍旧在发,就认为该consumer处于 livelock状态。就会将该consumer退出consumer group

之后就会触发导致reblance

·heartbeat.interval.ms

心跳间隔。心跳是在consumer与coordinator之间进行的。心跳是确定consumer存活,加入或者退出group的有效手段。

    这个值必须设置的小于session.timeout.ms,因为:

当Consumer由于某种原因不能发Heartbeat到coordinator时,并且时间超过session.timeout.ms时,就会认为该consumer已退出,它所订阅的partition会分配到同一group 内的其它的consumer上。

    通常设置的值要低于session.timeout.ms的1/3。

    默认值是:3000 (3s)

·session.timeout.ms

Consumer session 过期时间。这个值必须设置在broker configuration中的group.min.session.timeout.ms 与 group.max.session.timeout.ms之间。

其默认值是:10000 (10 s)

 

转载于:https://www.cnblogs.com/miracleYu/p/10244765.html

最新回复(0)