Kafka(二、Kafka生产者 - 向kafka写入数据)

mac2024-03-24  28

Kakfa 生产者配置

由于配置太多,在此不一一列举,只显示使用率最高的几个配置,其他配置请参考官网。 http://kafka.apache.org/documentation/#producerconfigs

NAMEDESCRIPTIONTYPEDEFAULTVALID VALUESkey.serializerkey序列化,可以使用Kafka默认的序列化和自定义序列化器classvalue.serializervalue序列化,可以使用Kafka默认的序列化和自定义序列化器acks生产者需要leader确认请求完成之前接收的应答数。此配置控制了发送记录的持久性。允许以下设置:acks=0 如果设置为0,那么生产者将不等待任何消息确认。消息将立刻添加到socket缓冲区并考虑发送。在这种情况下不能保障消息被服务器接收到。并且重试机制不会生效(因为客户端不知道故障了没有)。每个消息返回的offset始终设置为-1。acks=1,这意味着leader写入消息到本地日志就立即响应,而不等待所有follower应答。在这种情况下,如果响应消息之后但follower还未复制之前leader立即故障,那么消息将会丢失。acks=all 这意味着leader将等待所有副本同步后应答消息。此配置保障消息不会丢失(只要至少有一个同步的副本)。这是最强壮的可用性保障。等价于acks=-1。string1[all, -1, 0, 1]bootstrap.servershost/port列表,用于初始化建立和Kafka集群的连接。列表格式为host1:port1,host2:port2,…,无需添加所有的集群地址,kafka会根据提供的地址发现其他的地址(你可以多提供几个,以防提供的服务器关闭)list“”non-null stringbuffer.memory生产者用来缓存等待发送到服务器的消息的内存总字节数。如果消息发送比可传递到服务器的快,生产者将阻塞max.block.ms之后,抛出异常。此设置应该大致的对应生产者将要使用的总内存,但不是硬约束,因为生产者所使用的所有内存都用于缓冲。一些额外的内存将用于压缩(如果启动压缩),以及用于保持发送中的请求。一个合理的内存大小。long33554432[0,…]compression.type生产者生成的所有数据的压缩类型。默认值为无(即不压缩)。有效值为none,gzip,snappy,lz4,或zstd。压缩是指全部数据的压缩,因此,批处理的效率也会影响压缩率(批处理越多意味着压缩越好)。stringnoneretries设置一个比零大的值,客户端如果发送失败则会重新发送。注意,这个重试功能和客户端在接到错误之后重新发送没什么不同。如果max.in.flight.requests.per.connection没有设置为1,有可能改变消息发送的顺序,因为如果2个批次发送到一个分区中,并第一个失败了并重试,但是第二个成功了,那么第二个批次将超过第一个。“retries”和“retries.backoff.ms”决定了重试机制,也就是如果一个请求失败了可以重试几次,每次重试的间隔是多少毫秒。这个大家适当设置几次重试的机会,给一定的重试间隔即可,比如给100ms的重试间隔。 int2147483647[0,…,2147483647]batch.size当多个消息要发送到相同分区的时,生产者尝试将消息批量打包在一起,以减少请求交互。这样有助于客户端和服务端的性能提升。该配置的默认批次大小(以字节为单位):不会打包大于此配置大小的消息。发送到broker的请求将包含多个批次,每个分区一个,用于发送数据。int16384[0,…]ssl.key.password密钥存储文件中私钥的密码。这对于客户端是可选的。passwordnullssl.keystore.location密钥存储文件的位置这对于客户端是可选的,可以用于客户端的双向身份验证。stringnullssl.keystore.password密钥存储文件的存储密码。这对于客户端是可选的,并且仅在配置了ssl.keystore.location时才需要。passwordnullssl.truststore.location信任存储和文件的位置。stringnullssl.truststore.password信任存储文件的密码。如果未设置密码,则仍然可以访问信任库,但是将禁用完整性检查。passwordnull

代码示例

这里没有使用最后几个ssl配置,内网默认都是网络安全的。

import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class ProducerDemo { private final KafkaProducer<String, String> producer; public final static String TOPIC = "test5"; private ProducerDemo() { Properties props = new Properties(); props.put("bootstrap.servers", "127.0.0.1:9092"); // 所有follower都响应了才认为消息提交成功,即"committed" props.put("acks", "all"); // retries = MAX 无限重试,直到你意识到出现了问题:) props.put("retries", 0); // producer将试图批处理消息记录,以减少请求次数.默认的批量处理消息字节数 props.put("batch.size", 16384); // batch.size当批量的数据大小达到设定值后,就会立即发送,不顾下面的linger.ms, // 延迟1ms发送,这项设置将通过增加小的延迟来完成--即,不是立即发送一条记录,producer将会等待给定的延迟时间以允许其他消息记录发送,这些消息记录可以批量处理 props.put("linger.ms", 1); // producer可以用来缓存数据的内存大小。 props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<>(props); } public void produce() { int messageNo = 1; final int COUNT = 5; while (messageNo < COUNT) { String key = String.valueOf(messageNo); String data = String.format("hello KafkaProducer message %s from hubo 06291018 ", key); try { // 返回值为Future,现在为不关心是否发送成功。还有同步发送,异步发送两种。 producer.send(new ProducerRecord<String, String>(TOPIC,",", data)); } catch (Exception e) { e.printStackTrace(); } messageNo++; } producer.close(); } public static void main(String[] args) { new ProducerDemo().produce(); } }

自定义分区器 首先自定义个分区器,一定要实现Partitioner接口。

import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.record.InvalidRecordException; import org.apache.kafka.common.utils.Utils; import java.util.List; import java.util.Map; public class CustomPartitioner implements Partitioner { String keyName; @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic); int numPartitions = partitionInfos.size(); if ((keyBytes == null) || (!(key instanceof String))) { throw new InvalidRecordException("We expect all messages to have customer name as key"); } if (((String) key).equalsIgnoreCase(keyName)) { return numPartitions; } return (Math.abs(Utils.murmur2(keyBytes))) % (numPartitions - 1); } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { keyName = (String) configs.get(""); } }

然后将自定义分区器添加到配置中:

//设置自定义分区 props.put("partitioner.class", "com.study.Kafka.common.CustomPartitioner");
最新回复(0)