Get ahead
VMware offers training and certification to turbo-charge your progress.
Learn morePart 1 - Programming Model Part 2 - Programming Model Continued Part 3 - Data deserialization and serialization Part 4 - Error Handling
In this blog post, we continue our discussion on the support for Kafka Streams in Spring Cloud Stream. We are going to elaborate on the ways in which you can customize a Kafka Streams application.
Kafka Streams binder uses the StreamsBuilderFactoryBean, provided by the Spring for Apache Kafka project, to build the StreamsBuilder object that is the foundation for a Kafka Streams application. This factory bean is a Spring lifecycle bean. Oftentimes, this factory bean must be customized before it is started, for various reasons. As described in the previous blog post on error handling, you need to customize the StreamsBuilderFactoryBean
if you want to register a production exception handler. Let’s say that you have this producer exception handler:
class IgnoreRecordTooLargeHandler implements ProductionExceptionHandler {
public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
final Exception exception) {
if (exception instanceof RecordTooLargeException) {
return ProductionExceptionHandlerResponse.CONTINUE;
} else {
return ProductionExceptionHandlerResponse.FAIL;
}
}
}
You can register it directly by using configuration if you choose (using default.production.exception.handler
).
However, a more elegant approach, when using the binder, is to register this as part of the StreamsBuilderFactoryBean
customizer, as follows:
@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
return fb -> {
fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
IgnoreRecordTooLargeHandler.class);
};
}
Note that, if you have multiple processors in the application, you can control which processor gets customization based on the application ID. For example, you can check on it this way:
return factoryBean -> {
if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
.equals("processor1-application-id")) {
Here is another example of setting a state listener:
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return sfb -> sfb.setStateListener((newState, oldState) -> {
//Do some action here!
});
}
The KafkaStreams object is at the heart of any Kafka Streams application. StreamsBuilderFactoryBean
is responsible for creating the topology and then creating the KafkaStreams
object. Before starting the KafkaStreams
object, StreamsBuilderFactoryBean gives an opportunity to customize this KafkaStreams
object. For example, if you want to set an application-wide handler for uncaught exceptions you can do the following:
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return factoryBean -> {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
});
}
});
};
}
Notice that we start with the customizer for StreamsBuilderFactoryBean. However, inside it, we use a separate KafkaStreamsCustomizer
.
In this blog post, we saw how the Kafka Streams binder in Spring Cloud Stream lets you customize the underlying StreamsBuilderFactoryBean
and the KafkaStreams
object.
Thank you for reading this far! Next, in the final blog post in this series, we will look at how the binder lets you deal with state stores and enabling interactive queries against them.