Get ahead
VMware offers training and certification to turbo-charge your progress.
Learn moreWe are pleased to announce the release of the first milestone of Reactor Kafka 1.0.0.
Reactor Kafka is a reactive API for Apache Kafka based on Project Reactor. Reactor Kafka API enables messages to be published to Kafka topics and consumed from Kafka topics using functional APIs with non-blocking back-pressure and very low overheads. This enables applications using Reactor to use Kafka as a message bus or streaming platform and integrate with other systems to provide an end-to-end reactive pipeline.
The value proposition for Reactor Kafka is the efficient utilization of resources in applications with multiple external interactions where Kafka is one of the external systems. End-to-end reactive pipelines benefit from non-blocking back-pressure and efficient use of threads, enabling a large number of concurrent requests to be processed efficiently. The optimizations provided by Project Reactor enable development of reactive applications with very low overheads and predictable capacity planning to deliver low-latency, high-throughput pipelines.
To get started and run sample reactive Kafka producers and consumers, follow the instructions in the Getting Started section of the Reference Guide.
Reactor Kafka API is based on the Apache Kafka Producer/Consumer API and consists of two main classes:
Sender
for publishing messages to Kafka topics
Receiver
for consuming messages from Kafka topics
The full functionality of the underlying Kafka Producer
and Consumer
are provided by these reactive interfaces.
Sender<Integer, String> sender =
Sender.create(SenderOptions.create(producerProps)); (1)
Flux<SenderRecord<Integer, String, Integer>> outboundFlux = (2)
Flux.range(1, 10)
.map(i -> SenderRecord.create(producerRecord(topic, i), i));
sender.send(outboundFlux, false) (3)
.doOnNext(r -> log.debug("Message #{} result: {}",
r.correlationMetadata(), r.recordMetadata())) (4)
.subscribe(); (5)
Create a Sender
Flux
of outbound messages to send to Kafka
Reactive send
Log the result of every send
Subscribe to start flow of messages to Kafka
ReceiverOptions<Integer, String> receiverOptions = (1)
ReceiverOptions.<Integer, String>create(consumerProps)
.subscription(Collections.singleton(topic));
Receiver.create(receiverOptions) (2)
.receive() (3)
.subscribe(r -> {
log.info("Received message {} ", r.record()); (4)
r.offset().acknowledge(); (5)
});
Create ReceiverOptions
and configure subscription to Kafka topic
Create Receiver
Reactive receive
Log every incoming message
Acknowledge after processing message so that offset may be committed
Reactor Kafka source and samples are available on github.
For more information and additional resources, see Reactor Kafka Reference Guide and Javadocs.