您好, 欢迎来到 !    登录 | 注册 | | 设为首页 | 收藏本站

如何从Kafka用Python解码/反序列化Avro

如何从Kafka用Python解码/反序列化Avro

我以为Avro库只是读取Avro文件,但它实际上解决了解码Kafka消息的问题,如下所示:我首先导入库并将模式文件作为参数,然后创建一个函数以将消息解码为字典,可以在使用者循环中使用。

import io

from confluent_kafka import Consumer, KafkaError
from avro.io import DatumReader, BinaryDecoder
import avro.schema

schema = avro.schema.Parse(open("data_sources/EventRecord.avsc").read())
reader = DatumReader(schema)

def decode(msg_value):
    message_bytes = io.BytesIO(msg_value)
    decoder = BinaryDecoder(message_bytes)
    event_dict = reader.read(decoder)
    return event_dict

c = Consumer()
c.subscribe(topic)
running = True
while running:
    msg = c.poll()
    if not msg.error():
        msg_value = msg.value()
        event_dict = decode(msg_value)
        print(event_dict)
    elif msg.error().code() != KafkaError._PARTITION_EOF:
        print(msg.error())
        running = False
python 2022/1/1 18:42:32 有447人围观

撰写回答


你尚未登录,登录后可以

和开发者交流问题的细节

关注并接收问题和回答的更新提醒

参与内容的编辑和改进,让解决方法与时俱进

请先登录

推荐问题


联系我
置顶