您好, 欢迎来到 !    登录 | 注册 | | 设为首页 | 收藏本站

kafka Avro消息反序列化器,可用于多个主题

kafka Avro消息反序列化器,可用于多个主题

在配置中使用以下行:

props.put( KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class.getName());
props.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONfig, KafkaAvroDeserializer.class.getName() );

代替:

props.put( SpecificAvroWithSchemaDeserializer.AVRO_VALUE_RECORD_TYPE, MysqL.researchdb.institutes.Value.class );
props.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONfig, SpecificAvroWithSchemaDeserializer.class );

最终代码为:

package com.moglix.netsuite.kafka;

import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class ReviewsConsumerConfig
{

@Value( "${kafka.bootstrap-servers}" )
private String bootstrapServers;

@Value( "${kafka.schema-registry-url}" )
private String schemaRegistryUrl;

@Value( "${kafka.reviews.start-offset}" )
private String orderStartOffset;

@Value( "${kafka.reviews.max-poll-records}" )
private Integer maxPollRecords;

@Bean
public <T> ConcurrentKafkaListenerContainerFactory<String, T> kafkaListenerContainerFactory()
{
    ConcurrentKafkaListenerContainerFactory<String, T> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory( consumerFactory1() );
    factory.setBatchListener( true );
    factory.getContainerProperties().setAckMode( ContainerProperties.AckMode.MANUAL );
    factory.getContainerProperties().setSyncCommits( true );
    return factory;
}

@Bean
public <T> ConsumerFactory<String, T> consumerFactory1()
{
    return new DefaultKafkaConsumerFactory<>( consumerConfigs1() );
}

@Bean
public Map<String, Object> consumerConfigs1()
{
    Map<String, Object> props = new HashMap<>();
    props.put( ConsumerConfig.BOOTSTRAP_SERVERS_CONfig, bootstrapServers );
    props.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONfig, StringDeserializer.class );
    props.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONfig, KafkaAvroDeserializer.class.getName() );
    props.put( ConsumerConfig.AUTO_OFFSET_RESET_CONfig, orderStartOffset );

    props.put( AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONfig, schemaRegistryUrl );
    props.put( KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONfig, true );
    props.put( KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class.getName());// This is main line for my problem solution
    //props.put( SpecificAvroWithSchemaDeserializer.AVRO_VALUE_RECORD_TYPE, MysqL.researchdb.institutes.Value.class );

    props.put( ConsumerConfig.MAX_POLL_RECORDS_CONfig, maxPollRecords );
    props.put( ConsumerConfig.ENABLE_AUTO_COMMIT_CONfig, false );
    return props;
}

}
其他 2022/1/1 18:23:13 有444人围观

撰写回答


你尚未登录,登录后可以

和开发者交流问题的细节

关注并接收问题和回答的更新提醒

参与内容的编辑和改进,让解决方法与时俱进

请先登录

推荐问题


联系我
置顶