Spring Boot整合Kafka

mac2025-03-02  4



将向Kafka topic发布消息的程序成为producers;












这里演示在Windows下Kafka安装与使用。Kafka下载地址:http://kafka.apache.org/downloads,选择二进制文件下载(Binary downloads),然后解压即可。







1 bin\windows\zookeeper-server-start.bat config\zookeeper.properties



1 bin/zookeeper-server-start.sh -daemon config/zookeeper.properties




1 bin\windows\kafka-server-start.bat config\server.properties



1 bin/kafka-server-start.sh config/server.properties





1 bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test




1 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test



1 bin\windows\kafka-topics.bat --list --zookeeper localhost:2181


可看到目前只包含一个我们刚创建的test Topic。


1 bin/kafka-topics.sh --list --zookeeper localhost:2181


查看test Topic的具体信息:

1 bin\windows\kafka-topics.bat --describe --zookeeper localhost:2181 --topic test



1 bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test




1 bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test


9092为生产者的默认端口号。这里启动了生产者,准备往test Topic里发送数据。


1 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test




1 bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning




1 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning




Spring Boot整合Kafaka

上面简单介绍了Kafka的使用,下面我们开始在Spring Boot里使用Kafka。

新建一个Spring Boot项目,版本为2.1.3.RELEASE,并引入如下依赖:

1 2 3 4 5 6 7 8 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>




1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @Configuration public class KafkaProducerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public ProducerFactory<String, String> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configProps.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }


首先我们配置了一个producerFactory,方法里配置了Kafka Producer实例的策略。bootstrapServers为Kafka生产者的地址,我们在配置文件application.yml里配置它:

1 2 3 spring: kafka: bootstrap-servers: localhost:9092







1 2 3 4 5 6 7 8 9 10 11 @RestController public class SendMessageController { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @GetMapping("send/{message}") public void send(@PathVariable String message) { this.kafkaTemplate.send("test", message); } }




1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @RestController public class SendMessageController { private Logger logger = LoggerFactory.getLogger(this.getClass()); @Autowired private KafkaTemplate<String, String> kafkaTemplate; @GetMapping("send/{message}") public void send(@PathVariable String message) { ListenableFuture<SendResult<String, String>> future = this.kafkaTemplate.send("test", message); future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onSuccess(SendResult<String, String> result) { logger.info("成功发送消息:{},offset=[{}]", message, result.getRecordMetadata().offset()); } @Override public void onFailure(Throwable ex) { logger.error("消息:{} 发送失败,原因:{}", message, ex.getMessage()); } }); } }





1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 @EnableKafka @Configuration public class KafkaConsumerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.consumer.group-id}") private String consumerGroupId; @Value("${spring.kafka.consumer.auto-offset-reset}") private String autoOffsetReset; @Bean public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put( ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId); props.put( ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); props.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory<>(props); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }



1 2 3 4 5 spring: kafka: consumer: group-id: test-consumer auto-offset-reset: latest







在KafkaConsumerConfig中我们配置了ConsumerFactory和KafkaListenerContainerFactory。当这两个Bean成功注册到Spring IOC容器中后,我们便可以使用@KafkaListener注解来监听消息了。




1 2 3 4 5 6 7 8 9 10 @Component public class KafkaMessageListener { private Logger logger = LoggerFactory.getLogger(this.getClass()); @KafkaListener(topics = "test", groupId = "test-consumer") public void listen(String message) { logger.info("接收消息: {}", message); } }




启动Spring Boot项目,启动过程中,控制台会输出Kafka的配置,启动好后,访问http://localhost:8080/send/hello,mrbird,控制台输出如下:




1 @KafkaListener(topics = "topic1, topic2")



1 2 3 4 5 @KafkaListener(topics = "test", groupId = "test-consumer") public void listen(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) { logger.info("接收消息: {},partition:{}", message, partition); }




因为我们没有进行分区,所以test Topic只有一个区,下标为0。


1 2 3 4 5 6 7 8 9 @KafkaListener(groupId = "test-consumer", topicPartitions = @TopicPartition(topic = "test", partitionOffsets = { @PartitionOffset(partition = "0", initialOffset = "0") })) public void listen(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) { logger.info("接收消息: {},partition:{}", message, partition); }



1 2 @KafkaListener(groupId = "test-consumer", topicPartitions = @TopicPartition(topic = "test", partitions = { "0", "1" }))



1 2 3 4 5 6 7 8 9 10 11 @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); // ------- 过滤配置 -------- factory.setRecordFilterStrategy( r -> r.value().contains("fuck") ); return factory; }


setRecordFilterStrategy接收RecordFilterStrategy<K, V>,他是一个函数式接口:

1 2 3 public interface RecordFilterStrategy<K, V> { boolean filter(ConsumerRecord<K, V> var1); }












1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class Message implements Serializable { private static final long serialVersionUID = 6678420965611108427L; private String from; private String message; public Message() { } public Message(String from, String message) { this.from = from; this.message = message; } @Override public String toString() { return "Message{" + "from='" + from + '\'' + ", message='" + message + '\'' + '}'; } // get set 略 }



1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @Configuration public class KafkaProducerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public ProducerFactory<String, Message> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configProps.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate<String, Message> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }


我们将value序列化策略指定为了Kafka提供的JsonSerializer,并且kafkaTemplate返回类型为KafkaTemplate<String, Message>。



1 2 3 4 5 6 7 @Autowired private KafkaTemplate<String, Message> kafkaTemplate; @GetMapping("send/{message}") public void sendMessage(@PathVariable String message) { this.kafkaTemplate.send("test", new Message("mrbird", message)); }




1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 @EnableKafka @Configuration public class KafkaConsumerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.consumer.group-id}") private String consumerGroupId; @Value("${spring.kafka.consumer.auto-offset-reset}") private String autoOffsetReset; @Bean public ConsumerFactory<String, Message> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put( ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId); props.put( ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); return new DefaultKafkaConsumerFactory<>( props, new StringDeserializer(), new JsonDeserializer<>(Message.class)); } @Bean public ConcurrentKafkaListenerContainerFactory<String, Message> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, Message> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }




1 2 3 4 @KafkaListener(topics = "test", groupId = "test-consumer") public void listen(Message message) { logger.info("接收消息: {}", message); }




1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 spring.kafka.admin.client-id= # ID to pass to the server when making requests. Used for server-side logging. spring.kafka.admin.fail-fast=false # Whether to fail fast if the broker is not available on startup. spring.kafka.admin.properties.*= # Additional admin-specific properties used to configure the client. spring.kafka.admin.ssl.key-password= # Password of the private key in the key store file. spring.kafka.admin.ssl.key-store-location= # Location of the key store file. spring.kafka.admin.ssl.key-store-password= # Store password for the key store file. spring.kafka.admin.ssl.key-store-type= # Type of the key store. spring.kafka.admin.ssl.protocol= # SSL protocol to use. spring.kafka.admin.ssl.trust-store-location= # Location of the trust store file. spring.kafka.admin.ssl.trust-store-password= # Store password for the trust store file. spring.kafka.admin.ssl.trust-store-type= # Type of the trust store. spring.kafka.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Applies to all components unless overridden. spring.kafka.client-id= # ID to pass to the server when making requests. Used for server-side logging. spring.kafka.consumer.auto-commit-interval= # Frequency with which the consumer offsets are auto-committed to Kafka if 'enable.auto.commit' is set to true. spring.kafka.consumer.auto-offset-reset= # What to do when there is no initial offset in Kafka or if the current offset no longer exists on the server. spring.kafka.consumer.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Overrides the global property, for consumers. spring.kafka.consumer.client-id= # ID to pass to the server when making requests. Used for server-side logging. spring.kafka.consumer.enable-auto-commit= # Whether the consumer's offset is periodically committed in the background. spring.kafka.consumer.fetch-max-wait= # Maximum amount of time the server blocks before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by "fetch-min-size". spring.kafka.consumer.fetch-min-size= # Minimum amount of data the server should return for a fetch request. spring.kafka.consumer.group-id= # Unique string that identifies the consumer group to which this consumer belongs. spring.kafka.consumer.heartbeat-interval= # Expected time between heartbeats to the consumer coordinator. spring.kafka.consumer.key-deserializer= # Deserializer class for keys. spring.kafka.consumer.max-poll-records= # Maximum number of records returned in a single call to poll(). spring.kafka.consumer.properties.*= # Additional consumer-specific properties used to configure the client. spring.kafka.consumer.ssl.key-password= # Password of the private key in the key store file. spring.kafka.consumer.ssl.key-store-location= # Location of the key store file. spring.kafka.consumer.ssl.key-store-password= # Store password for the key store file. spring.kafka.consumer.ssl.key-store-type= # Type of the key store. spring.kafka.consumer.ssl.protocol= # SSL protocol to use. spring.kafka.consumer.ssl.trust-store-location= # Location of the trust store file. spring.kafka.consumer.ssl.trust-store-password= # Store password for the trust store file. spring.kafka.consumer.ssl.trust-store-type= # Type of the trust store. spring.kafka.consumer.value-deserializer= # Deserializer class for values. spring.kafka.jaas.control-flag=required # Control flag for login configuration. spring.kafka.jaas.enabled=false # Whether to enable JAAS configuration. spring.kafka.jaas.login-module=com.sun.security.auth.module.Krb5LoginModule # Login module. spring.kafka.jaas.options= # Additional JAAS options. spring.kafka.listener.ack-count= # Number of records between offset commits when ackMode is "COUNT" or "COUNT_TIME". spring.kafka.listener.ack-mode= # Listener AckMode. See the spring-kafka documentation. spring.kafka.listener.ack-time= # Time between offset commits when ackMode is "TIME" or "COUNT_TIME". spring.kafka.listener.client-id= # Prefix for the listener's consumer client.id property. spring.kafka.listener.concurrency= # Number of threads to run in the listener containers. spring.kafka.listener.idle-event-interval= # Time between publishing idle consumer events (no data received). spring.kafka.listener.log-container-config= # Whether to log the container configuration during initialization (INFO level). spring.kafka.listener.monitor-interval= # Time between checks for non-responsive consumers. If a duration suffix is not specified, seconds will be used. spring.kafka.listener.no-poll-threshold= # Multiplier applied to "pollTimeout" to determine if a consumer is non-responsive. spring.kafka.listener.poll-timeout= # Timeout to use when polling the consumer. spring.kafka.listener.type=single # Listener type. spring.kafka.producer.acks= # Number of acknowledgments the producer requires the leader to have received before considering a request complete. spring.kafka.producer.batch-size= # Default batch size. spring.kafka.producer.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Overrides the global property, for producers. spring.kafka.producer.buffer-memory= # Total memory size the producer can use to buffer records waiting to be sent to the server. spring.kafka.producer.client-id= # ID to pass to the server when making requests. Used for server-side logging. spring.kafka.producer.compression-type= # Compression type for all data generated by the producer. spring.kafka.producer.key-serializer= # Serializer class for keys. spring.kafka.producer.properties.*= # Additional producer-specific properties used to configure the client. spring.kafka.producer.retries= # When greater than zero, enables retrying of failed sends. spring.kafka.producer.ssl.key-password= # Password of the private key in the key store file. spring.kafka.producer.ssl.key-store-location= # Location of the key store file. spring.kafka.producer.ssl.key-store-password= # Store password for the key store file. spring.kafka.producer.ssl.key-store-type= # Type of the key store. spring.kafka.producer.ssl.protocol= # SSL protocol to use. spring.kafka.producer.ssl.trust-store-location= # Location of the trust store file. spring.kafka.producer.ssl.trust-store-password= # Store password for the trust store file. spring.kafka.producer.ssl.trust-store-type= # Type of the trust store. spring.kafka.producer.transaction-id-prefix= # When non empty, enables transaction support for producer. spring.kafka.producer.value-serializer= # Serializer class for values. spring.kafka.properties.*= # Additional properties, common to producers and consumers, used to configure the client. spring.kafka.ssl.key-password= # Password of the private key in the key store file. spring.kafka.ssl.key-store-location= # Location of the key store file. spring.kafka.ssl.key-store-password= # Store password for the key store file. spring.kafka.ssl.key-store-type= # Type of the key store. spring.kafka.ssl.protocol= # SSL protocol to use. spring.kafka.ssl.trust-store-location= # Location of the trust store file. spring.kafka.ssl.trust-store-password= # Store password for the trust store file. spring.kafka.ssl.trust-store-type= # Type of the trust store. spring.kafka.streams.application-id= # Kafka streams application.id property; default spring.application.name. spring.kafka.streams.auto-startup=true # Whether or not to auto-start the streams factory bean. spring.kafka.streams.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Overrides the global property, for streams. spring.kafka.streams.cache-max-size-buffering= # Maximum memory size to be used for buffering across all threads. spring.kafka.streams.client-id= # ID to pass to the server when making requests. Used for server-side logging. spring.kafka.streams.properties.*= # Additional Kafka properties used to configure the streams. spring.kafka.streams.replication-factor= # The replication factor for change log topics and repartition topics created by the stream processing application. spring.kafka.streams.ssl.key-password= # Password of the private key in the key store file. spring.kafka.streams.ssl.key-store-location= # Location of the key store file. spring.kafka.streams.ssl.key-store-password= # Store password for the key store file. spring.kafka.streams.ssl.key-store-type= # Type of the key store. spring.kafka.streams.ssl.protocol= # SSL protocol to use. spring.kafka.streams.ssl.trust-store-location= # Location of the trust store file. spring.kafka.streams.ssl.trust-store-password= # Store password for the trust store file. spring.kafka.streams.ssl.trust-store-type= # Type of the trust store. spring.kafka.streams.state-dir= # Directory location for the state store. spring.kafka.template.default-topic= # Default topic to which messages are sent.