kafka发送消息之压测

mac2024-06-29  56

kafka简单压测

kafka循环发送消息,达到压测的目的

kafka循环发送消息,达到压测的目的

import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Properties; import java.util.UUID; public class KafkaProducerDemo { private final KafkaProducer<String, String> producer; public final static String TOPIC = "test"; private KafkaProducerDemo() { Properties props = new Properties(); //xxx服务器ip props.put("bootstrap.servers","127.0.0.1:9092"); props.put("zookeeper.connect","127.0.0.1:2181"); //所有follower都响应了才认为消息提交成功,即"committed" props.put("acks", "all"); //retries = MAX 无限重试,直到你意识到出现了问题:) props.put("retries", 0); //producer将试图批处理消息记录,以减少请求次数.默认的批量处理消息字节数 //batch.size当批量的数据大小达到设定值后,就会立即发送,不顾下面的linger.ms props.put("batch.size", 16384); //延迟1ms发送,这项设置将通过增加小的延迟来完成--即,不是立即发送一条记录,producer将会等待给定的延迟时间以允许其他消息记录发送,这些消息记录可以批量处理 props.put("linger.ms", 1); //producer可以用来缓存数据的内存大小。 props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<String, String>(props); } public void produce() throws InterruptedException { int messageNo = 0; final int COUNT = 1; while(messageNo < COUNT) { String data = "{\"test\":\"\test\", \"uuid\":\""+ UUID.randomUUID().toString()+"\"}"; System.out.println("data:"+data); try { producer.send(new ProducerRecord<String, String>(TOPIC, data)); } catch (Exception e) { e.printStackTrace(); } System.out.println("发送消息,第"+ ++messageNo +"条"); Thread.sleep(1000); } producer.close(); } static String dateFormat() { Date date = new Date(); SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); return simpleDateFormat.format(date); } public static void main(String[] args) { try { new KafkaProducerDemo().produce(); System.exit(0); } catch (InterruptedException e) { e.printStackTrace(); } } }
最新回复(0)