python操作kafka

mac2025-07-30  2

一、发送json数据

生产者

# -*- coding: UTF-8 -*-

import thread

from kafka import KafkaProducer

import json

producer = KafkaProducer(bootstrap_servers='bigdata-test02:6667',value_serializer=lambda v: json.dumps(v).encode('utf-8'))

producer.send('test-01', {'www': 'aa'})

producer.flush()

 

消费者

# -*- coding: UTF-8 -*-

from kafka import KafkaConsumer

consumer = KafkaConsumer('test-01',bootstrap_servers='bigdata-test02:6667')

for msg in consumer:

print (msg)

 

二、发送string类型数据

生产者

# -*- coding: UTF-8 -*-

from kafka import KafkaProducer

msg = "wwww"

producer = KafkaProducer(bootstrap_servers='bigdata-test02:6667')

producer.send('test', msg.encode('utf-8'))

producer.flush()

 

消费者

from kafka import KafkaConsumer

consumer = KafkaConsumer('test', bootstrap_servers='bigdata-test02:6667')

for msg in consumer:

print (msg)

 

查看topic相关信息

from kafka import KafkaConsumer

consumer = KafkaConsumer('test-kafka', auto_offset_reset='earliest',

bootstrap_servers='bigdata-test02:6667')

for message in consumer:

print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,

message.offset, message.key, message.value))

 

三、消费者(读取目前最早可读的消息)

from kafka import KafkaConsumer

consumer = KafkaConsumer('test',

                         auto_offset_reset='earliest',

                         bootstrap_servers=['172.21.10.136:9092'])             

for message in consumer:

    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,

                                          message.offset, message.key,

                                          message.value))

                                         

auto_offset_reset:重置偏移量,earliest移到最早的可用消息,latest最新的消息,默认为latest

源码定义:{'smallest': 'earliest', 'largest': 'latest'}

 

四、手动设置偏移量

from kafka import KafkaConsumer

from kafka.structs import TopicPartition

consumer = KafkaConsumer('oplog_default',bootstrap_servers='bigdata-test02:6667')

print (consumer.partitions_for_topic("oplog_default")) #获取test主题的分区信息

print (consumer.topics() ) #获取主题列表

print (consumer.subscription()) #获取当前消费者订阅的主题

print (consumer.assignment()) #获取当前消费者topic、分区信息

print (consumer.beginning_offsets(consumer.assignment())) #获取当前消费者可消费的偏移量

consumer.seek(TopicPartition(topic='oplog_default', partition=1), 3) #重置偏移量,从第5个偏移量消费

for message in consumer:

print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,

message.offset, message.key,

message.value))

 

五、订阅多个主题

from kafka import KafkaConsumer

from kafka.structs import TopicPartition

consumer = KafkaConsumer(bootstrap_servers='bigdata-test02:6667')

consumer.subscribe(topics=('oplog_default','kafkap1')) #订阅要消费的主题

print (consumer.topics())

print (consumer.position(TopicPartition(topic=u'oplog_default', partition=1))) #获取当前主题的最新偏移量

for message in consumer:

print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,

message.offset, message.key,

message.value))

最新回复(0)