Reactive Programming with Reactor 3

Reactor
109.1K views

You can make RxJava2 and Reactor 3 types interact without a single external library.

In the first two examples we will adapt from Flux to Flowable, which implements Publisher, and vice-versa.

This is straightforward as both libraries provide a factory method to do that conversion from any Publisher. The checker below runs the two opposite conversions in one go:

Flux to Flowable back to Flux

The next two examples are a little trickier: we need to adapt between Flux and Observable, but the later doesn't implement Publisher.

In the first case, you can transform any publisher to Observable. In the second case, you have to first transform the Observable into a Flowable, which forces you to define a strategy to deal with backpressure (RxJava 2 Observable doesn't support backpressure).

Flux to Observable and back to Flux

Next, let's try to transform a Mono to a RxJava Single, and vice-versa. You can simply call the firstOrError method from Observable. For the other way around, you'll once again need to transform the Single into a Flowable first.

Mono to Single and back to Mono

Finally, you can easily transform a Mono to a Java 8 CompletableFuture and vice-versa. Notice how these conversion methods all begin with from (when converting an external type to a Reactor one) and to (when converting a Reactor type to an external one).

Mono to CompletableFuture and back to Mono
Create your playground on Tech.io
This playground was created on Tech.io, our hands-on, knowledge-sharing platform for developers.
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.pivotal.literx;
//generic imports to help with simpler IDEs (ie tech.io)
import java.util.*;
import java.util.function.*;
import java.time.*;
import java.util.concurrent.CompletableFuture;
import io.pivotal.literx.domain.User;
import io.reactivex.Flowable;
import io.reactivex.Observable;
import io.reactivex.Single;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* Learn how to adapt from/to RxJava 2 Observable/Single/Flowable and Java 8+ CompletableFuture.
*
* Mono and Flux already implements Reactive Streams interfaces so they are natively
* Reactive Streams compliant + there are {@link Mono#from(Publisher)} and {@link Flux#from(Publisher)}
* factory methods.
*
* For RxJava 2, you should not use Reactor Adapter but only RxJava 2 and Reactor Core.
*
* @author Sebastien Deleuze
*/
public class Part09Adapt {
//========================================================================================
// TODO Adapt Flux to RxJava Flowable
Flowable<User> fromFluxToFlowable(Flux<User> flux) {
return null;
}
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
Join the CodinGame community on Discord to chat about puzzle contributions, challenges, streams, blog articles - all that good stuff!
Online Participants