Get ahead
VMware offers training and certification to turbo-charge your progress.
Learn moreThis blog post is the second in a series of posts that aim at providing a deeper look into Reactor's more advanced concepts and inner workings.
It is derived from my Flight of the Flux
talk, which content I found to be more adapted to a blog post format.
I'll update the table below with links when the other posts are published, but here is the planned content:
If you're missing an introduction to Reactive Streams and the basic concepts of Reactor, head out to the site's learning section and the reference guide.
Without further ado, let's jump in:
Switching from an imperative, blocking paradigm to a reactive, non-blocking one brings benefits but also comes with some caveats. One of these is the debugging experience. Why is that?
Primarily because you've learned to rely on the good old stacktrace, but suddenly this invaluable tool becomes far less valuable due to the asynchronous aspect of reactive programming. This is not specific to reactive programming though: as soon as you introduce asynchronous code, you create a boundary in the program between the code that schedules and the code that asynchronously executes.
Let's take an example with an ExecutorService
and a Future
(no Reactor code here):
private static void imperative() throws ExecutionException, InterruptedException {
final ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor();
int seconds = LocalTime.now().getSecond();
List<Integer> source;
if (seconds % 2 == 0) {
source = IntStream.range(1, 11).boxed().collect(Collectors.toList());
}
else if (seconds % 3 == 0) {
source = IntStream.range(0, 4).boxed().collect(Collectors.toList());
}
else {
source = Arrays.asList(1, 2, 3, 4);
}
executor.submit(() -> source.get(5)) //line 76
.get();
}
This example is a bit contrived, but let's imagine that we have these two out of three path in the code that can lead to the asynchronous task throwing an IndexOutOfBoundsException
... How helpful would the stacktrace be?
java.util.concurrent.ExecutionException: java.lang.ArrayIndexOutOfBoundsException: Index 5 out of bounds for length 4
at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
at Scratch.imperative(Scratch.java:77)
at Scratch.main(Scratch.java:50)
Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 5 out of bounds for length 4
at java.base/java.util.Arrays$ArrayList.get(Arrays.java:4351)
at Scratch.lambda$imperative$0(Scratch.java:76)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
We see that:
Future
's get()
method threw an ExecutionException
IndexOutOfBoundsException
submit(() -> source.get(5))
lambda line 76FutureTask
, from something called a ThreadPoolExecutor
, itself running in a Thread
...submit()
).Not terribly useful :-(
If we look for a Reactor equivalent to the above code, we can come up with this:
private static void reactive() {
int seconds = LocalTime.now().getSecond();
Mono<Integer> source;
if (seconds % 2 == 0) {
source = Flux.range(1, 10)
.elementAt(5);
}
else if (seconds % 3 == 0) {
source = Flux.range(0, 4)
.elementAt(5);
}
else {
source = Flux.just(1, 2, 3, 4)
.elementAt(5);
}
source.subscribeOn(Schedulers.parallel())
.block(); //line 97
}
Which triggers the following stacktrace:
java.lang.IndexOutOfBoundsException
at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.onComplete(MonoElementAt.java:153)
at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:176)
at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:96)
at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.request(MonoElementAt.java:92)
at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.trySchedule(MonoSubscribeOn.java:186)
at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.onSubscribe(MonoSubscribeOn.java:131)
at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.onSubscribe(MonoElementAt.java:107)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
at reactor.core.publisher.MonoElementAt.subscribe(MonoElementAt.java:59)
at reactor.core.publisher.Mono.subscribe(Mono.java:3711)
at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.run(MonoSubscribeOn.java:123)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:93)
at reactor.core.publisher.Mono.block(Mono.java:1495)
at Scratch.reactive(Scratch.java:97)
at Scratch.main(Scratch.java:51)
ArrayIndexOutOfBoundsException
again, hinting at a source that was too short for the MonoElementAt
operatoronComplete
, itself triggered by request
... and a bunch of other steps in reactor.core.publisher
range
(FluxRange.subscribe
), elementAt
and subscribeOn
...Thread
of a ThreadPoolExecutor
Worse, even if we did get rid of subscribeOn
we'd still wouldn't discover which of the two possible error paths was triggered:
private static void reactiveNoSubscribeOn() {
int seconds = LocalTime.now().getSecond();
Mono<Integer> source;
if (seconds % 2 == 0) {
source = Flux.range(1, 10)
.elementAt(5);
}
else if (seconds % 3 == 0) {
source = Flux.range(0, 4)
.elementAt(5);
}
else {
source = Flux.just(1, 2, 3, 4)
.elementAt(5);
}
source.block(); //line 116
}
Gives the stacktrace:
java.lang.IndexOutOfBoundsException
at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.onComplete(MonoElementAt.java:153)
at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:176)
at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:96)
at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.request(MonoElementAt.java:92)
at reactor.core.publisher.BlockingSingleSubscriber.onSubscribe(BlockingSingleSubscriber.java:49)
at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.onSubscribe(MonoElementAt.java:107)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
at reactor.core.publisher.MonoElementAt.subscribe(MonoElementAt.java:59)
at reactor.core.publisher.Mono.block(Mono.java:1494)
at Scratch.reactiveNoSubscribeOn(Scratch.java:116)
at Scratch.main(Scratch.java:52)
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:93)
at reactor.core.publisher.Mono.block(Mono.java:1495)
... 2 more
That is because, as we saw previously, there is an additional "boundary" in code between assembly and subscription. The trail only goes back to the point of subscription (here the block()
) :-(
So using stacktraces for analysis and debugging purposes is harder in an asynchronous world, and even a bit harder in Reactor (because it is asynchronous and has a lazy-by-default approach with assembly vs subscription). But fortunately there are tools in the library to try and alleviate that fact.
log
Remember when you sprinkled your imperative code with print
statements? It might not be as cool as firing up the step debugger, but sometimes it is the quick and dirty solution you need.
In Reactor, you have the log()
operator:
onNext
, onComplete
, onError
(and even onSubscribe
, cancel
and request
!)Logger
as wellIn short, log
is the quick and dirty solution to get an easy bird eye's view of what is going on at one step of your sequence. Use it liberally during development, with the possibility of specifying a "name" to each log
call to differentiate them.
Using log(String)
can be diverted to get a hint at which source causes the error:
private static void log() {
int seconds = LocalTime.now().getSecond();
Mono<Integer> source;
if (seconds % 2 == 0) {
source = Flux.range(1, 10)
.elementAt(5)
.log("source A");
}
else if (seconds % 3 == 0) {
source = Flux.range(0, 4)
.elementAt(5)
.log("source B");
}
else {
source = Flux.just(1, 2, 3, 4)
.elementAt(5)
.log("source C");
}
source.block(); //line 138
}
The stacktrace itself isn't much more interesting (apart from mentioning the MonoLogFuseable
class, but the log itself contains this interesting tidbit:
17:01:23.711 [main] INFO source C - | onSubscribe([Fuseable] MonoElementAt.ElementAtSubscriber)
17:01:23.716 [main] INFO source C - | request(unbounded)
17:01:23.717 [main] ERROR source C - | onError(java.lang.IndexOutOfBoundsException)
17:01:23.721 [main] ERROR source C -
java.lang.IndexOutOfBoundsException: null
At least we get our hardcoded source C
label...
Another approach that is available in Reactor is to try and get back the assembly information in the runtime stacktraces.
This can be done by activating the so-called "debug mode" via the Hooks
class:
Hooks.onOperatorDebug();
What does it do? It makes each operator instantiation (aka assembly) capture a stacktrace and keep it for later.
If an onError
reaches one operator, it will attach that assembly stacktrace to the onError
's Throwable
(as a suppressed Exception
). As a result, when you see the stacktrace you'll get a more complete picture of both the runtime AND the assembly.
With debug mode on, in our earlier example we would be able to see which assembly path was taken and which source was actually processed:
private static void hook() {
Hooks.onOperatorDebug();
try {
int seconds = LocalTime.now().getSecond();
Mono<Integer> source;
if (seconds % 2 == 0) {
source = Flux.range(1, 10)
.elementAt(5); //line 149
}
else if (seconds % 3 == 0) {
source = Flux.range(0, 4)
.elementAt(5); //line 153
}
else {
source = Flux.just(1, 2, 3, 4)
.elementAt(5); //line 157
}
source.block(); //line 160
}
finally {
Hooks.resetOnOperatorDebug();
}
}
Which produces the following stacktrace:
java.lang.IndexOutOfBoundsException
at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.onComplete(MonoElementAt.java:153)
(...)
at reactor.core.publisher.Mono.block(Mono.java:1494)
at Scratch.hook(Scratch.java:160)
at Scratch.main(Scratch.java:54)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.MonoElementAt] :
reactor.core.publisher.Flux.elementAt(Flux.java:4367)
Scratch.hook(Scratch.java:157)
Error has been observed by the following operator(s):
|_ Flux.elementAt ⇢ Scratch.hook(Scratch.java:157)
Notice the last line? Yay :-D
checkpoint
One drawback of using Hooks.onOperatorDebug()
is that it does the assembly stacktrace capture for every single operator used in the application. Filling a single stacktrace is a costly operation, so it goes without saying that this can have an heavy impact on performance. As a result, this is only recommended in a development setting.
Fortunately, you can bring the cost down a little if you identify parts of your codebase that are prone to that sort of source ambiguity.
By using the checkpoint()
operator, it is possible to activate the assembly trace capture only at that specific point in the codebase. You can even do entirely without the filling of a stacktrace if you give the checkpoint a unique and meaningful name using checkpoint(String)
:
private static void checkpoint() {
int seconds = LocalTime.now().getSecond();
Mono<Integer> source;
if (seconds % 2 == 0) {
source = Flux.range(1, 10)
.elementAt(5)
.checkpoint("source range(1,10)");
}
else if (seconds % 3 == 0) {
source = Flux.range(0, 4)
.elementAt(5)
.checkpoint("source range(0,4)");
}
else {
source = Flux.just(1, 2, 3, 4)
.elementAt(5)
.checkpoint("source just(1,2,3,4)");
}
source.block(); //line 186
}
This produces the following stacktrace:
java.lang.IndexOutOfBoundsException
at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.onComplete(MonoElementAt.java:153)
at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:176)
at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:96)
at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.request(MonoElementAt.java:92)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.request(FluxOnAssembly.java:438)
at reactor.core.publisher.BlockingSingleSubscriber.onSubscribe(BlockingSingleSubscriber.java:49)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onSubscribe(FluxOnAssembly.java:422)
at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.onSubscribe(MonoElementAt.java:107)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
at reactor.core.publisher.MonoElementAt.subscribe(MonoElementAt.java:59)
at reactor.core.publisher.MonoOnAssembly.subscribe(MonoOnAssembly.java:61)
at reactor.core.publisher.Mono.block(Mono.java:1494)
at Scratch.checkpoint(Scratch.java:186)
at Scratch.main(Scratch.java:55)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly site of producer [reactor.core.publisher.MonoElementAt] is identified by light checkpoint [source just(1,2,3,4)].
Notice the is identified by light checkpoint [source just(1,2,3,4)].
, which gives us our culprit (because we used a meaningful description for the checkpoint).
In this article, we've learned that stacktraces can be less useful in asynchronous programming. This effect is further compounded by the lazy way Reactor let you build reactive sequences.
We've looked at the worst cases that can be encountered and at several ways this problem can be lessened.
The whole code can be found in a gist here.
In the next instalment, we'll learn about schedulers and how to hop from one thread to another.
In the meantime, happy reactive coding!