一、发送json数据
生产者
# -*- coding: UTF-8 -*-
import thread
from kafka import KafkaProducer
import json
producer = KafkaProducer(bootstrap_servers='bigdata-test02:6667',value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer.send('test-01', {'www': 'aa'})
producer.flush()
消费者
# -*- coding: UTF-8 -*-
from kafka import KafkaConsumer
consumer = KafkaConsumer('test-01',bootstrap_servers='bigdata-test02:6667')
for msg in consumer:
print (msg)
二、发送string类型数据
生产者
# -*- coding: UTF-8 -*-
from kafka import KafkaProducer
msg = "wwww"
producer = KafkaProducer(bootstrap_servers='bigdata-test02:6667')
producer.send('test', msg.encode('utf-8'))
producer.flush()
消费者
from kafka import KafkaConsumer
consumer = KafkaConsumer('test', bootstrap_servers='bigdata-test02:6667')
for msg in consumer:
print (msg)
查看topic相关信息
from kafka import KafkaConsumer
consumer = KafkaConsumer('test-kafka', auto_offset_reset='earliest',
bootstrap_servers='bigdata-test02:6667')
for message in consumer:
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key, message.value))
三、消费者(读取目前最早可读的消息)
from kafka import KafkaConsumer
consumer = KafkaConsumer('test',
auto_offset_reset='earliest',
bootstrap_servers=['172.21.10.136:9092'])
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
auto_offset_reset:重置偏移量,earliest移到最早的可用消息,latest最新的消息,默认为latest
源码定义:{'smallest': 'earliest', 'largest': 'latest'}
四、手动设置偏移量
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
consumer = KafkaConsumer('oplog_default',bootstrap_servers='bigdata-test02:6667')
print (consumer.partitions_for_topic("oplog_default")) #获取test主题的分区信息
print (consumer.topics() ) #获取主题列表
print (consumer.subscription()) #获取当前消费者订阅的主题
print (consumer.assignment()) #获取当前消费者topic、分区信息
print (consumer.beginning_offsets(consumer.assignment())) #获取当前消费者可消费的偏移量
consumer.seek(TopicPartition(topic='oplog_default', partition=1), 3) #重置偏移量,从第5个偏移量消费
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
五、订阅多个主题
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
consumer = KafkaConsumer(bootstrap_servers='bigdata-test02:6667')
consumer.subscribe(topics=('oplog_default','kafkap1')) #订阅要消费的主题
print (consumer.topics())
print (consumer.position(TopicPartition(topic=u'oplog_default', partition=1))) #获取当前主题的最新偏移量
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))