您好, 欢迎来到 !    登录 | 注册 | | 设为首页 | 收藏本站

带Spring Boot的简单嵌入式Kafka测试示例

带Spring Boot的简单嵌入式Kafka测试示例

嵌入式Kafka测试适用于以下配置,

测试课注释

@EnableKafka
@SpringBootTest(classes = {KafkaController.class}) // Specify @KafkaListener class if its not the same class, or not loaded with test config
@EmbeddedKafka(
    partitions = 1, 
    controlledShutdown = false,
    brokerProperties = {
        "listeners=PLAINTEXT://localhost:3333", 
        "port=3333"
})
public class KafkaConsumerTest {
    @Autowired
    KafkaEmbedded kafkaEmbeded;

    @Autowired
    KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

注释之前的设置方法

@Before
public void setUp() throws Exception {
  for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
    ContainerTestUtils.waitForAssignment(messageListenerContainer, 
    kafkaEmbeded.getPartitionsPerTopic());
  }
}

注意:我不是@ClassRule用来创建嵌入式Kafka而是自动装配@Autowired embeddedKafka

@Test
public void testReceive() throws Exception {
     kafkaTemplate.send(topic, data);
}

希望这可以帮助!

编辑:测试配置类标记@TestConfiguration

@TestConfiguration
public class TestConfig {

@Bean
public ProducerFactory<String, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(kafkaEmbedded));
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
    kafkaTemplate.setDefaultTopic(topic);
    return kafkaTemplate;
}

现在@Test方法自动连接KafkaTemplate并用于发送消息

kafkaTemplate.send(topic, data);

用上面的行更新了答案代码

Java 2022/1/1 18:23:06 有473人围观

撰写回答


你尚未登录,登录后可以

和开发者交流问题的细节

关注并接收问题和回答的更新提醒

参与内容的编辑和改进,让解决方法与时俱进

请先登录

推荐问题


联系我
置顶