Get ahead
VMware offers training and certification to turbo-charge your progress.
Learn moreOn the heels of the recently announced Spring Cloud Stream Elmhurst.RELEASE, we are pleased to present another blog installment dedicated to Spring Cloud Stream’s native integration with the Apache Kafka Streams library. Let’s review the new improvements.
Spring Cloud Stream framework enables application developers to write event-driven applications that use the strong foundations of Spring Boot and Spring Integration. The underpinning of all these is the binder implementation, which is responsible for communication between the application and the message broker. These binders are MessageChannel
-based implementations.
While the contracts established by Spring Cloud Stream are maintained from a programming model perspective, Kafka Streams binder does not use MessageChannel
as the target type. The binder implementation natively interacts with Kafka Streams “types” - KStream
or KTable
. Applications can directly use the Kafka Streams primitives and leverage Spring Cloud Stream and the Spring ecosystem without any compromise.
Note: The Kafka Streams binder is not a replacement for using the library itself.
A quick way to generate a project with the necessary components for a Spring Cloud Stream Kafka Streams application is through the Spring Initializr - see below.
Here is a simple word-count application written in Spring Cloud Stream and Kafka Streams.
@EnableBinding(KafkaStreamsProcessor.class)
public static class WordCountProcessorApplication {
@StreamListener("input")
@SendTo("output")
public KStream<?, WordCount> process(KStream<Object, String> input) {
return input
.flatMapValues(
value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey()
.windowedBy(TimeWindows.of(5000)
.count(Materialized.as("wordcounts"))
.toStream()
.map((key, value) ->
new KeyValue<>(null, new WordCount(key.key(), value));
}
}
@EnableBinding
annotation with KafkaStreamsProcessor
convey the framework to perform binding on Kafka Streams targets. You can have your own interfaces with multiple “input” and “output” bindings, too.
@StreamListener
instructs the framework to allow the application to consume events as KStream
from a topic that is bound on the "input" target.
process()
- a handler that receives events from the KStream
containing textual data. The business logic counts the number of each word and stores the total count over a time-window (5 seconds in this case) in a state store. The resulting KStream
contains the word and its corresponding count in that time window.
Here is a complete version of this example.
Josh Long (@starbuxman) has put together a screencast that goes into much detail about the various features of the Kafka Streams binding support.
Developers familiar with Spring Cloud Stream (eg: @EnableBinding
and @StreamListener
), can extend it to building stateful applications by using the Kafka Streams API.
Developers can leverage the framework’s content-type conversion for inbound and outbound conversion or switch to the native SerDe’s provided by Kafka.
Port existing Kafka Streams workloads into a standalone cloud-native application and be able to orchestrate them as coherent data pipelines using Spring Cloud Data Flow.
An application runs as-is - no lock-in with any cloud platform vendor.
Interoperability between Kafka Streams and Kafka binder’s MessageChannel
bindings
Multiple Kafka Streams types (such as KStream
and KTable
) as Handler arguments
Content-type conversion for inbound and outbound streams
Property toggles to switch between framework vs. native Kafka SerDe’s for inbound and outbound message conversion
Error handling support
Dead Letter Queue (DLQ) support for records in deserialization error
Branching support
Interactive-query support
Kafka Streams binder lets you send to multiple output topics (Branching API in Kafka Streams).
Here is the outline for such a method.
@StreamListener("input")
@SendTo({"output1","output2","output3"})
public KStream<String, String>[] process(KStream<Object, String> input) {
...
}
Notice that the return type on the method is KStream[]
. See this example for more details on how this works.
The Kafka Streams binder also let you bind to multiple inputs of KStream
and KTable
target types, as the following example shows:
@StreamListener
public void process(@Input("input") KStream<String, PlayEvent> playEvents,
@Input("inputX") KTable<Long, Song> songTable) {
...
}
Notice the use of multiple inputs on the method argument list. Here you can see two @Input
annotations - one for KStream
and another for KTable
.
Here is a working version of this example.
Similar to MessageChannel
based binder implementations, Kafka Streams binder also supports content-type conversion on the incoming and outgoing streams. Any other type of data serialization is entirely handled by Kafka Streams itself. The framework-provided content-type conversion on the edges can be disabled. Instead, you can delegate the responsibilities entirely to Kafka, using the SerDe facilities provided by Kafka Streams.
When relying on the Kafka Streams binder for the content-type conversion, it is applied only for “value” (that is, the payload) in the message. The “keys” are always converted by Kafka SerDe’s.
Please refer to the documentation for detailed information about how content-type negotiation and serialization is addressed in the Kafka Streams binder.
Kafka Streams library has built-in support for handling deserialization exceptions (KIP-161). In addition to native deserialization error-handling support, the Kafka Streams binder also provides support to route errored payloads to a DLQ. See this documentation section for details.
Here is a sample that demonstrates DLQ facilities in the Kafka Streams binder.
Kafka Streams lets you query state stores interactively from the applications, which can be used to gain insights into ongoing streaming data. The Kafka Streams binder API exposes a class called QueryableStoreRegistry
. You can access this as a Spring bean in your application by injecting this bean (possibly by autowiring), as the following example shows:
@Autowired
QueryableStoreRegistry queryableStoreRegistry;
ReadOnlyKeyValueStore<Object, Object> keyValueStore =
queryableStoreRegistry.getQueryableStoreType("my-store",
QueryableStoreTypes.keyValueStore());
Here are basic and an advanced examples demonstrating the interactive query capabilities through the binder.
If the application use case requires the usage of both the MessageChannel
-based Kafka binder and the Kafka Streams binder, both of them can be used in the same application. In that case, you can have multiple StreamListener
methods or a combination of source and sink/processor type methods. The following example of an application shows how multiple StreamListener
methods can be used to target various types of bindings:
@StreamListener("binding2")
@SendTo("output")
public KStream<?, WordCount> process(KStream<Object, String> input) {
}
@StreamListener("binding1")
public void sink(String input) {
}
interface MultipleProcessor {
String BINDING_1 = "binding1";
String BINDING_2 = "binding2";
String OUTPUT = "output";
@Input(BINDING_1)
SubscribableChannel binding1();
@Input(BINDING_2)
KStream<?, ?> binding2();
@Output(OUTPUT)
KStream<?, ?> output();
}
In this example, the first method is a Kafka Streams processor and the second method is a regular MessageChannel
-based consumer. Although you can have multiple methods with differing target types (MessageChannel
vs Kafka Stream type), it is not possible to mix the two within a single method.
In this article, we saw the higher level constructs and usage samples exposed through the Spring Cloud Stream Kafka Streams binder. In addition to allowing the use of Spring Cloud Stream’s MessageChannel
based binders, this binder implementation lets us develop, test, and produce stateful applications consistently.
Check out the project page and the documentation. As always, we welcome feedback and contributions, so please reach out to us on GitHub, Stack Overflow, and Gitter.