首先上Consumer类 类变量即为需配置的consumer属性
public static class Consumer { private final Ssl ssl = new Ssl(); /** * 自动提交间隔时间 设为自动提交时有效 * 默认 5000 ms * Frequency with which the consumer offsets are auto-committed to Kafka if * 'enable.auto.commit' is set to true. */ private Duration autoCommitInterval; /** * 当没有初始偏移量时从何处读取 * 默认 latest * earliest:从分区开始位置读取 * latest:从分区末尾开始读数据 * What to do when there is no initial offset in Kafka or if the current offset no * longer exists on the server. */ private String autoOffsetReset; /** * kafka集群地址,多个用逗号隔开 * Comma-delimited list of host:port pairs to use for establishing the initial * connection to the Kafka cluster. */ private List<String> bootstrapServers; /** * 请求服务器时带的clientId * 通常用于服务器端日志 * 默认 "" * ID to pass to the server when making requests. Used for server-side logging. */ private String clientId; /** * 是否允许自动提交 * 默认:true * Whether the consumer's offset is periodically committed in the background. */ private Boolean enableAutoCommit; /** * 如果没有足够的数据立即满足“fetch.min.bytes”的要求,则服务器在响应fetch请求之前阻塞的最长时间 * 默认 500 ms,即就算无法达到要求最小数据量 也要在500ms过后返回 * Maximum amount of time the server blocks before answering the fetch request if * there isn't sufficient data to immediately satisfy the requirement given by * "fetch.min.bytes". */ private Duration fetchMaxWait; /** * 一次fetch请求最小拉取数据量 * 默认 1 byte * Minimum amount of data, in bytes, the server should return for a fetch request. */ private Integer fetchMinSize; /** * group id * Unique string that identifies the consumer group to which this consumer * belongs. */ private String groupId; /** * 预期的心跳间隔时间 * 默认 3000 ms * Expected time between heartbeats to the consumer coordinator. */ private Duration heartbeatInterval; /** * 反序列化的key * Deserializer class for keys. */ private Class<?> keyDeserializer = StringDeserializer.class; /** * Deserializer class for values. */ private Class<?> valueDeserializer = StringDeserializer.class; /** * 最大拉取记录数 * 默认 500 * Maximum number of records returned in a single call to poll(). */ private Integer maxPollRecords; }以上的属性 我们是可以在yml中配置的,但其实consumer端还有更多的属性,这些属性的值都被springboot自动配置了 无需我们关心,当然我们也可以进行定制化的改动
下面列出全量的属性自动配置 define的第一个参数即为属性名称 第三个即为属性默认的值
static { CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOTSTRAP_SERVERS_DOC) .define(GROUP_ID_CONFIG, Type.STRING, "", Importance.HIGH, GROUP_ID_DOC) .define(SESSION_TIMEOUT_MS_CONFIG, Type.INT, 10000, Importance.HIGH, SESSION_TIMEOUT_MS_DOC) .define(HEARTBEAT_INTERVAL_MS_CONFIG, Type.INT, 3000, Importance.HIGH, HEARTBEAT_INTERVAL_MS_DOC) .define(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, Type.LIST, Collections.singletonList(RangeAssignor.class), Importance.MEDIUM, PARTITION_ASSIGNMENT_STRATEGY_DOC) .define(METADATA_MAX_AGE_CONFIG, Type.LONG, 5 * 60 * 1000, atLeast(0), Importance.LOW, CommonClientConfigs.METADATA_MAX_AGE_DOC) .define(ENABLE_AUTO_COMMIT_CONFIG, Type.BOOLEAN, true, Importance.MEDIUM, ENABLE_AUTO_COMMIT_DOC) .define(AUTO_COMMIT_INTERVAL_MS_CONFIG, Type.INT, 5000, atLeast(0), Importance.LOW, AUTO_COMMIT_INTERVAL_MS_DOC) .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.LOW, CommonClientConfigs.CLIENT_ID_DOC) .define(MAX_PARTITION_FETCH_BYTES_CONFIG, Type.INT, DEFAULT_MAX_PARTITION_FETCH_BYTES, atLeast(0), Importance.HIGH, MAX_PARTITION_FETCH_BYTES_DOC) .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(-1), Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC) .define(RECEIVE_BUFFER_CONFIG, Type.INT, 64 * 1024, atLeast(-1), Importance.MEDIUM, CommonClientConfigs.RECEIVE_BUFFER_DOC) .define(FETCH_MIN_BYTES_CONFIG, Type.INT, 1, atLeast(0), Importance.HIGH, FETCH_MIN_BYTES_DOC) .define(FETCH_MAX_BYTES_CONFIG, Type.INT, DEFAULT_FETCH_MAX_BYTES, atLeast(0), Importance.MEDIUM, FETCH_MAX_BYTES_DOC) .define(FETCH_MAX_WAIT_MS_CONFIG, Type.INT, 500, atLeast(0), Importance.LOW, FETCH_MAX_WAIT_MS_DOC) .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 50L, atLeast(0L), Importance.LOW, CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC) .define(RECONNECT_BACKOFF_MAX_MS_CONFIG, Type.LONG, 1000L, atLeast(0L), Importance.LOW, CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_DOC) .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), Importance.LOW, CommonClientConfigs.RETRY_BACKOFF_MS_DOC) .define(AUTO_OFFSET_RESET_CONFIG, Type.STRING, "latest", in("latest", "earliest", "none"), Importance.MEDIUM, AUTO_OFFSET_RESET_DOC) .define(CHECK_CRCS_CONFIG, Type.BOOLEAN, true, Importance.LOW, CHECK_CRCS_DOC) .define(METRICS_SAMPLE_WINDOW_MS_CONFIG, Type.LONG, 30000, atLeast(0), Importance.LOW, CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC) .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, CommonClientConfigs.METRICS_NUM_SAMPLES_DOC) .define(METRICS_RECORDING_LEVEL_CONFIG, Type.STRING, Sensor.RecordingLevel.INFO.toString(), in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString()), Importance.LOW, CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC) .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC) .define(KEY_DESERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, KEY_DESERIALIZER_CLASS_DOC) .define(VALUE_DESERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_DESERIALIZER_CLASS_DOC) .define(REQUEST_TIMEOUT_MS_CONFIG, Type.INT, 305000, // chosen to be higher than the default of max.poll.interval.ms atLeast(0), Importance.MEDIUM, REQUEST_TIMEOUT_MS_DOC) /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */ .define(CONNECTIONS_MAX_IDLE_MS_CONFIG, Type.LONG, 9 * 60 * 1000, Importance.MEDIUM, CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC) .define(INTERCEPTOR_CLASSES_CONFIG, Type.LIST, null, Importance.LOW, INTERCEPTOR_CLASSES_DOC) .define(MAX_POLL_RECORDS_CONFIG, Type.INT, 500, atLeast(1), Importance.MEDIUM, MAX_POLL_RECORDS_DOC) .define(MAX_POLL_INTERVAL_MS_CONFIG, Type.INT, 300000, atLeast(1), Importance.MEDIUM, MAX_POLL_INTERVAL_MS_DOC) .define(EXCLUDE_INTERNAL_TOPICS_CONFIG, Type.BOOLEAN, DEFAULT_EXCLUDE_INTERNAL_TOPICS, Importance.MEDIUM, EXCLUDE_INTERNAL_TOPICS_DOC) .defineInternal(LEAVE_GROUP_ON_CLOSE_CONFIG, Type.BOOLEAN, true, Importance.LOW) .define(ISOLATION_LEVEL_CONFIG, Type.STRING, DEFAULT_ISOLATION_LEVEL, in(IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT), IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT)), Importance.MEDIUM, ISOLATION_LEVEL_DOC) // security support .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, Type.STRING, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC) .withClientSslSupport() .withClientSaslSupport(); }首先我们进入KafkaAutoConfiguration类
@Configuration @ConditionalOnClass(KafkaTemplate.class) @EnableConfigurationProperties(KafkaProperties.class) @Import(KafkaAnnotationDrivenConfiguration.class) public class KafkaAutoConfiguration { private final KafkaProperties properties; //这个方法执行了consumer属性的默认值配置 @Bean @ConditionalOnMissingBean(ConsumerFactory.class) public ConsumerFactory<?, ?> kafkaConsumerFactory() { return new DefaultKafkaConsumerFactory<>( this.properties.buildConsumerProperties()); } } @ConfigurationProperties(prefix = "spring.kafka") public class KafkaProperties { //内部维护了Consumer类 private final Consumer consumer = new Consumer(); public Map<String, Object> buildConsumerProperties() { Map<String, Object> properties = buildCommonProperties(); //buildProperties properties.putAll(this.consumer.buildProperties()); return properties; } public static class Consumer { public Map<String, Object> buildProperties() { Properties properties = new Properties(); PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); //这行代码可以这样理解:如果getAutoCommitInterval()有值,即我们在yaml文件中配置了这个属性值,那么就会在map中put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,getAutoCommitInterval) //没有值则不作处理 map.from(this::getAutoCommitInterval).asInt(Duration::toMillis) .to(properties.in(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)); map.from(this::getAutoOffsetReset) .to(properties.in(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)); map.from(this::getBootstrapServers) .to(properties.in(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); map.from(this::getClientId) .to(properties.in(ConsumerConfig.CLIENT_ID_CONFIG)); map.from(this::getEnableAutoCommit) .to(properties.in(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)); map.from(this::getFetchMaxWait).asInt(Duration::toMillis) .to(properties.in(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG)); map.from(this::getFetchMinSize) .to(properties.in(ConsumerConfig.FETCH_MIN_BYTES_CONFIG)); map.from(this::getGroupId).to(properties.in(ConsumerConfig.GROUP_ID_CONFIG)); map.from(this::getHeartbeatInterval).asInt(Duration::toMillis) .to(properties.in(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG)); map.from(this::getKeyDeserializer) .to(properties.in(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)); map.from(this::getValueDeserializer) .to(properties.in(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)); map.from(this::getMaxPollRecords) .to(properties.in(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)); return properties.with(this.ssl, this.properties); } } } public class ConsumerConfig extends AbstractConfig { static { CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOTSTRAP_SERVERS_DOC) ...同上文 .withClientSslSupport() .withClientSaslSupport(); } }测试代码:
@Autowired KafkaTemplate kafkaTemplate; @Test public void testQueryBill() { kafkaTemplate.send("",1); }观察kafkaTemplate中的生产者配置