Reactive Programming with Reactor 3
Blocking to Reactive
The big question is "How to deal with legacy, non reactive code?".
Say you have blocking code (eg. a JDBC connection to a database), and you want to integrate that into your reactive pipelines while avoiding too much of an impact on performance.
The best course of action is to isolate such intrinsically blocking parts of your code into
their own execution context via a
Scheduler, keeping the efficiency of the rest of the
pipeline high and only creating extra threads when absolutely needed.
In the JDBC example you could use the
fromIterable factory method. But how do you prevent
the call to block the rest of the pipeline?
subscribeOn method allow to isolate a sequence from the start on a provided
For example, the
Schedulers.boundedElastic() will create a pool of threads that grows on demand,
releasing threads that haven't been used in a while automatically. In order to avoid too many
threads due to abusing of this easy option, the
boundedElastic Scheduler places an upper limit
to the number of threads it can create and reuse (unlike the now deprecated
Use that trick to slowly read all users from the blocking
repository in the first exercise.
Note that you will need to wrap the call to the repository inside a
For slow subscribers (eg. saving to a database), you can isolate a smaller section of the
sequence with the
publishOn operator. Unlike
subscribeOn, it only affects the part of
the chain below it, switching it to a new
As an example, you can use
doOnNext to perform a
save on the
repository, but first
use the trick above to isolate that save into its own execution context. You can make it
more explicit that you're only interested in knowing if the save succeeded or failed by
then() operator at the end, which returns a