# Reactive Programming with Reactor 3

Reactor
134K views

## Request

### Description

Remember this diagram?

There's one aspect to it that we didn't cover: the volume control. In Reactive Streams terms this is called backpressure. It is a feedback mechanism that allows a Subscriber to signal to its Publisher how much data it is prepared to process, limiting the rate at which the Publisher produces data.

This control of the demand is done at the Subscription level: a Subscription is created for each subscribe() call and it can be manipulated to either cancel() the flow of data or tune demand with request(long).

Making a request(Long.MAX_VALUE) means an unbounded demand, so the Publisher will emit data at its fastest pace.

### Practice

The demand can be tuned in the StepVerifier as well, by using the relevant parameter to create and withVirtualTime for the initial request, then chaining in thenRequest(long) in your expectations for further requests.

In this first example, create a StepVerifier that produces an initial unbounded demand and verifies 4 values to be received, before completion. This is equivalent to the way you've been using StepVerifier so far.

Request All

Next we will request values one by one: for that you need an initial request, but also a second single request after you've received and asserted the first element.

Without more request, the source will never complete unless you cancel it. This can be done instead of the terminal expectations by using .thenCancel().

Request One By One

### A note on debugging

How to check that the previous sequence was requested one by one, and that a cancellation happened?

It's important to be able to debug reactive APIs, so in the next example we will make use of the log operator to know exactly what happens in term of signals and events.

Use the repository to get a Flux of all users, then apply a log to it. Observe in the console below how the underlying test requests it, and the other events like subscribe, onNext...

Request Log

If you want to perform custom actions without really modifying the elements in the sequence, you can use the "side effect" methods that start with doOn.

For example, if you want to print "Starting:" upon subscription, use doOnSubscribe.

Each doOn method takes a relevant callback representing the custom action for the corresponding event.

Note that you should not block or invoke operations with latency in these callbacks (which is also true of other operator callbacks like map): it's more for quick operations.

Custom Operations

Go ahead an modify the first two methods in this exercise in order to get some insight into their sequences using log and doOnXXX.

This playground was created on Tech.io, our hands-on, knowledge-sharing platform for developers.
package io.pivotal.literx;
//generic imports to help with simpler IDEs (ie tech.io)
import java.util.*;
import java.util.function.*;
import java.time.*;
import io.pivotal.literx.domain.User;
import io.pivotal.literx.repository.ReactiveRepository;
import io.pivotal.literx.repository.ReactiveUserRepository;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
/**
* Learn how to control the demand.
*
* @author Sebastien Deleuze
*/
public class Part06Request {
ReactiveRepository<User> repository = new ReactiveUserRepository();
//========================================================================================
// TODO Create a StepVerifier that initially requests all values and expect 4 values to be received
StepVerifier requestAllExpectFour(Flux<User> flux) {
return null;
}
//========================================================================================
// TODO Create a StepVerifier that initially requests 1 value and expects User.SKYLER then requests another value and expects User.JESSE.
StepVerifier requestOneExpectSkylerThenRequestOneExpectJesse(Flux<User> flux) {
return null;
}
//========================================================================================
// TODO Return a Flux with all users stored in the repository that prints automatically logs for all Reactive Streams signals
Flux<User> fluxWithLog() {
return null;
}
//========================================================================================
// TODO Return a Flux with all users stored in the repository that prints "Starring:" on subscribe, "firstname lastname" for all values and "The end!" on complete
Flux<User> fluxWithDoOnPrintln() {
return null;
}
}
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
Join the CodinGame community on Discord to chat about puzzle contributions, challenges, streams, blog articles - all that good stuff!
Online Participants