How to test a ConsumerAwareRebalanceListener?

ghz 1years ago ⋅ 5191 views

Question

I developed a @KafkaListener that is also marked with the ConsumerAwareRebalanceListener interface, using Spring Boot 2.0.6. I implemented the onPartitionsAssigned method, in which I rewind the offset of a fixed amount of time, let's say 60 seconds.

So far so good.

How can I test the above use case using the tools that Spring Kafka gives me? I supposed I need to start a Kafka broker (i.e., an EmbeddedKafka), then stopping the listener and then rebooting it again, to test that it read again the messages arrived in the last 60 seconds.

Can somebody help me? I googled a little, but I didn't find anything. Thanks a lot.


Answer

The @KafkaListener has an:

/**
 * The unique identifier of the container managing for this endpoint.
 * <p>If none is specified an auto-generated one is provided.
 * @return the {@code id} for the container managing for this endpoint.
 * @see org.springframework.kafka.config.KafkaListenerEndpointRegistry#getListenerContainer(String)
 */
String id() default "";

attribute, so you can get an access to its MessageListenerContainer via mentioned KafkaListenerEndpointRegistry, which you can simply @Autowired into the test class based on Spring Testing Framework. Then, you can really stop() and start() that MessageListenerContainer in your test method.

Also pay attention how @KafkaListener has an autoStartup() attribute also.