Question
I developed a @KafkaListener
that is also marked with the
ConsumerAwareRebalanceListener
interface, using Spring Boot 2.0.6. I
implemented the onPartitionsAssigned
method, in which I rewind the offset of
a fixed amount of time, let's say 60 seconds.
So far so good.
How can I test the above use case using the tools that Spring Kafka gives me?
I supposed I need to start a Kafka broker (i.e., an EmbeddedKafka
), then
stopping the listener and then rebooting it again, to test that it read again
the messages arrived in the last 60 seconds.
Can somebody help me? I googled a little, but I didn't find anything. Thanks a lot.
Answer
The @KafkaListener
has an:
/**
* The unique identifier of the container managing for this endpoint.
* <p>If none is specified an auto-generated one is provided.
* @return the {@code id} for the container managing for this endpoint.
* @see org.springframework.kafka.config.KafkaListenerEndpointRegistry#getListenerContainer(String)
*/
String id() default "";
attribute, so you can get an access to its MessageListenerContainer
via
mentioned KafkaListenerEndpointRegistry
, which you can simply @Autowired
into the test class based on Spring Testing Framework. Then, you can really
stop()
and start()
that MessageListenerContainer
in your test method.
Also pay attention how @KafkaListener
has an autoStartup()
attribute also.