嵌入式Kafka测试适用于以下配置,
测试课注释
@EnableKafka
@SpringBootTest(classes = {KafkaController.class}) // Specify @KafkaListener class if its not the same class, or not loaded with test config
@EmbeddedKafka(
partitions = 1,
controlledShutdown = false,
brokerProperties = {
"listeners=PLAINTEXT://localhost:3333",
"port=3333"
})
public class KafkaConsumerTest {
@Autowired
KafkaEmbedded kafkaEmbeded;
@Autowired
KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
注释之前的设置方法
@Before
public void setUp() throws Exception {
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
ContainerTestUtils.waitForAssignment(messageListenerContainer,
kafkaEmbeded.getPartitionsPerTopic());
}
}
注意:我不是@ClassRule
用来创建嵌入式Kafka而是自动装配@Autowired embeddedKafka
@Test
public void testReceive() throws Exception {
kafkaTemplate.send(topic, data);
}
希望这可以帮助!
编辑:测试配置类标记为 @TestConfiguration
@TestConfiguration
public class TestConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(kafkaEmbedded));
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
kafkaTemplate.setDefaultTopic(topic);
return kafkaTemplate;
}
现在@Test
方法将自动连接KafkaTemplate并用于发送消息
kafkaTemplate.send(topic, data);
用上面的行更新了答案代码块