Enter Project Reactor
Project Reactor gives us two fundamental reactive publishers we can use to model our producer/consumer systems, Mono and Flux.
Straight from their docs, we see that:
Mono is an object [that] represents a single-value-or-empty (0…1) result, while Flux is an object [that] represents a reactive sequence of 0…N items.
These publishers enable the composition of many reactive programming patterns such as Reactive Data Pipeline and Pub/Sub Pattern, with the latter that can cross machine boundaries via some messaging queue system (like Apache Kafka), adding resiliency and fault tolerance to the overall application.
Having a deeper look at these publishers, we can see that they provide a lot of methods to manipulate our data and composite our own operations.
We will examine the most common ones for the Mono and the Flux publisher below, to get a basic understanding of what they do, how they can be used to achieve our results and what are the most common patterns to combine them.
One key factor to keep in mind is that these publishers are generic and typed, so we can rely on the fact that the JVM will verify that we won’t violate the types of the data we are going to operate on.
In practice what this means is that, like with the List collection, we need to provide a type when declaring a variable of type Mono/Flux, like:
Mono<String> or Flux<List<Double>>.
In the following paragraphs, we are going to have a brief look at how our reactive collections work to understand how we can leverage them to create our own pipelines of operations.
Remember that Reactive Programming is about composition of lower-level functions into higher-level pipelines that implement our business logic, so we need the tools to create our lower-level functions and to combine them.
Mono and Flux
Mono and Flux are the fundamental reactive collections that Project Reactor offers to the developer so they can be used as building blocks for any kind of pipeline that may be needed.
The way they work is quite similar so we are going to discuss only one in depth and we’ll go into detail about the other implementation when they differ significantly.
Specifically, we’re going to have a look at the Mono<T> reactive collection to understand how it works, how it emits elements, how we can create one, subscribe to one and understand the basic operations that can be performed on it.
Luckily for us, in the official documentation (link), there’s a very explicative drawing on how a Mono operates, so we will have a look at that right now, before diving into its operators and functionalities:
Even with the explicative text, there’s still a lot to unpack in this image, so let’s go over it right now.
The solid arrow line at the top of the image, that flows from left to right, represents the time. As we said before, every reactive collection is drained after we consume the events that it emits, and this is why.
Once an event has been emitted at a given time t, that element is lost because it’s not saved by the Mono/Flux, and since time cannot go backwards, there’s no way to replay that exact element so it is emitted again.
What can happen is that, in the case of a Flux, a copy of that same element is emitted, but it’s important to note that this is indeed a copy, a duplicate, of the original element.
Once an element has been emitted, it cannot be replayed except by duplicating it.
The top green circle contains an element, in this case an Integer, that is being emitted after some time elapsed, and specifically its being emitted at time t0.
What happens is that at time t0 this value “1” is being returned from the Mono and all its subscribers will receive this value, so they can operate on it.
Also, from the value of the event that is being emitted, we can reasonably guess the type of the Mono to be something like Mono<Integer>.
The solid vertical line indicates a completion signal, which is a key concept for any reactive collection.
Basically any collection can emit either an event/value or a signal.
Events are the data we are interested in, like the “1” in this example.
Signals are special events that carry a specific meaning related to the collection, like: I am finished emitting events and I encountered and error while emitting events.
These signals are important and thus have a name, respectively onComplete and onError and their name is self-explanatory: The Mono/Flux will throw an onComplete signal when it’s finished and will not emit any more events and it will throw a onError signal when, for any reason, it wants to signal that there has been an error during the emission of one or more events.
So, the vertical line is our onComplete signal in this drawing.
The white bounded box in the middle of the drawing represents a generic operator for the Mono, which is nothing more than one of the predefined methods we can use to operate on the data contained in the Mono.
For example this generic box labeled “operator” could be a flatmap operator, or a filter operator, or a map operator, and so on.
Basically, anything that’s exposed as a public method on the Mono class is a valid substitute for this white box.
The idea is that this operator represents some kind of transformation/operation that we perform on the data contained inside the reactive collection itself.
This is important because there’s no point in creating collections of data if we cannot process them and operate on the values contained inside them.
The bottom arrow line is the new Mono that is returned as result of the operation performed on the “source” Mono, represented by the top arrow line.
Always keep in mind the pipeline core idea: We want to be able to chain our low-level operations (white boxes) to achieve high-level results.
This means that, in order to be able to chain our operations, they need to return a reactive collection, sort of like a fluent interface, with the difference that instead of returning this, the operators return a newly formed reactive collection, where some logic has been applied to its starting elements.
Thinking in these terms means that every operation we perform on a reactive collection does not break the promise of reactivity, i.e. it still returns a reactive collection, so we can keep adding operators to this pipeline.
This is a key point to understand: If we were to break the “reactive promise” by using an operator that would not return a reactive collection but instead a plain old POJO (for example a List, a Map, an Integer) we would lose the possibility of combining our low-level functions into a reactive pipeline, thus breaking the fundamental principle of reactive programming, and this is not something we want to do.
The key takeaway here is that we always use operators that fulfill the reactive promise and always return a reactive collection.
The bottom green circle is the value (contained in the resulting Mono) that is returned from the operator (the white box) when applied to the emitted value from the source Mono, the top arrow line.
In this illustration, it’s a pure coincidence that the result value is the same as the input value, in general it is different.
Note how the second, resulting, Mono contains a red cross.
That indicates a onError signal, which is used to notify all the subscribers of this Mono that something went wrong and they might need to handle an error because no value will be emitted.
Subscribers of this resulting Mono that emitted an onError signal can decide to:
- Ignore the error signal and just do nothing
- Handle the error signal by either returning a default value or trying again to execute the failing operation
- Throw an error themselves and signal that they cannot operate on that Mono because their source just threw an error, and they are re-throwing this error to their own subscribers until this “bubbles up” to the caller, which can either handle it or let it propagate to the ultimate caller which is the JVM
This logic of allowing the choice when it comes to error handling triggered by an onError signal is born out of the principle that errors are a special kind of data and not an exceptional occasion that needs separate mechanism to be handled.
In practice it means that it’s possible to build a pipeline where errors can flow from one Mono to another until a subscriber decides to handle that error and, for example, return a default value or performing some additional logic based on the type of error, and the pipeline that is downstream of this particular subscriber does not need to concern itself with what happened before it, because the errors have been handled and have been transformed into data that can be processed.
There’s always the case where an error cannot be handled and it does not make sense to return a default value or perform additional logic, so the onError signal simply propagates “upwards” to all other Mono and subscribers until it is either handled or it reaches the JVM where at that point it will be logged and it will stop the pipeline from where it has been propagated.
Now that we have got the diagram explanation out of the way, let’s have a look at some practical example of Mono in a typical web application context.
Keep in mind that these examples are not production ready because we didn’t go through some important concepts yet, so while we will keep them real (they will work if applied) they won’t be the most optimal display of the reactive patterns we discussed.
Don’t be distracted by unknown syntax/operators you will see, the focus of the examples is the composition of lower-level functions to obtain higher-level abstractions through the use of standard Mono operators.
Example: Check if a user has the admin permission to perform an action on a dashboard
Let’s examine the example to see what are our operators, how they related to the diagram above, and what is the pipeline that we constructed.
The code shown above (albeit a bit contrived by design) is a real-world example of how the problem of determining if a given user is admin can be tackled in a reactive application context.
This is a typical use case and it allows us to briefly go over some of the fundamental operators for a Mono, and also presents us a good opportunity to discuss how we composed our pipeline starting from lower-level functions.
The first line shows that we declare and instantiate a Mono collection that contains 0 or 1 objects of type UserEntity, which is a normal POJO that contains some DB specific annotations, maybe @Id or @Timestamp.
What makes this UserEntity POJO reactive is that we wrap it in a Mono, and that will take care of everything needed so we can create our own reactive pipeline.
As is typical in real world code, we don’t have a static value to use to initialize our own Mono like we did in the early examples, but we receive this value from a function, and that’s all we need to care about.
How the function getCurrentLoggedUser obtained this Mono is irrelevant for us, we only care that we receive it.
So, once we have it, the next step is combining some lower-level functions into our own pipeline.
In order to do that, we need to have an idea on how our pipeline should operate, and in this example the workflow is this:
- Given a UserEntity
- Convert it into a UserModel that contains the business logic representation of our User (and we always operate on this representation that is abstracted from the underlying entity and thus from the DB that’s used to store said entity)
- Given a UserModel
- Apply the function “isUserAdmin” which takes a Predicate of type UserModel (a Predicate is a function introduced in Java 8 that takes an object and returns a Boolean if it meets one or more custom conditions)
- Based on the return value of the “isUserAdmin” either
- Return Mono<Boolean> containing the value true to signal that the user can perform this action in the positive case of the isUserAdmin function
- Return Mono<Boolean> containing the value false in case the isUserAdmin function returns false
It is important to remember that we need to return a Mono<Boolean> and not a simple Boolean POJO otherwise we would break the reactive promise that we made to our caller, which might want to use our return value to perform some additional logic, such as logging an unauthorized attempt in the case we return a Mono<Boolean> containing a false value.
Given the logical, high-level pipeline we defined above, let’s analyze how it has been implemented.
The first thing to notice is the structure of the code: it is written (and later executed) as a cascading series of steps, each one with a specific name that takes a number of parameters.
Each step follows the other until we are done with our pipeline and simply assign the result of the last step to a variable of the appropriate Mono type.
We then return this value to our caller, assuming the case we are inside a function with return type Mono<Boolean>.
Looking at each step, we can see that:
- We use a “source” Mono as the starting point of our pipeline (this would be the top arrow in the diagram). This source Mono is called currentUser.
- The first step of the pipeline is an operator called map. To relate this to the diagram this is the generic white box, only that this time is not generic because it has a name, and that’s map as we noted earlier.
What does this operator do? It simply applies a function (that we, the developer, pass as its argument) to the events emitted from the Mono, and return the resulting Mono transformed by the function passed earlier.
In practice, if we were working with the “classical OOP style” of programming, “map” would be a simple “for loop” iterating over all the elements in a List (that in the case of the Mono contains just one datum), applying a function to each element, and then appending the result of this function into a new List.
What we pass to the map function is called method reference (link) and it’s a very comfortable and useful way of avoiding duplicating code by simply passing the method of a given class as if it were a variable, so it can be used inside any operator.
Even if we don’t know exactly how the “convertEntityToModel” method works, we can make an educated guess that it takes a parameter of type UserEntity as input and returns a UserModel object.
Given how common it is the use case for a method that takes one input and returns one output of different type, the Java 8 language specification introduced an object called Function to codify this behavior.
Checking its definition (link), we can clearly see that it fits our description, as it takes an input of type T and returns an output of type R, exactly as our methods defined above.
In fact, checking the exact signature for the map operator we can see that it is:
public final <R> Mono<R> map(Function<? super T,? extends R> mapper)
Where “mapper” is the name of the function that takes an input of type T, the same generic type as the Mono on which it’s defined, and returns a Mono of type R.
- Now that we have a UserModel object, we can apply a function (Predicate link) called “isUserAdmin” that will return true if the given user is admin or false if it’s not.
This boolean value returned from the “isUserAdmin” function is used inside the next operator that is called filter which has a very specific behaviour.
So, we know that if the function “isUserAdmin” returns true, the operators that we will define downstream of this and down the pipeline will receive the value of this Mono, in this case our Mono<UserModel>, otherwise if the function “isUserAdmin” returns false, the filter operator will not emit any value and complete, which is the equivalent of passing down an empty Mono, as we know that after a Mono completes there’s nothing else we can do, it didn’t emit the only event that it could and it just completed, so this is an empty Mono.
Knowing this, we need to shape our pipeline in a way that we account for difference between the case where we receive the Mono<UserModel> and the case where we receive an empty Mono.
Keep this distinction between the two Mono types that we might receive well impressed in your mind as it will come handy in just a moment.
- Now we are downstream of the filter operator which, as we extensively discussed before, can return either an empty Mono or a Mono containing a UserModel POJO.
We need to handle the case where the Mono is empty, so we “return early” and don’t need to carry this check down the pipeline any longer, as we can handle it right now.
For the time being, ignore the “Mono.defer” invocation on the basis that’s a standard construct with the switchIfEmpty operator and just focus on what’s inside there, a lambda that takes no parameters, hence the empty parenthesis, and just returns false.
Why is this? Because we know from our own analysis that if we receive and empty Mono it means that the method “isUserAdmin” returned false, and thus the filter operator transformed this false value into an empty Mono, that we caught with the appropriate operator of switchIfEmpty.
It’s curious to note that the word switch is very on-point, because we are literally switching our flow into another pipeline, deviating it from the “normal” pipeline that expects a valued Mono, i.e. a Mono that contains at least an element.
If we did not switch, what would happen is that the next step would never get executed and we would not know what to do in the case when the filter operator returns an empty Mono, simply propagating it to the caller which might not expect and empty Mono and this might cause some issues down the line.
- Given that we already handled the case where the filter operator returns an empty Mono, we only miss the “positive” case where we receive the Mono<UserModel> containing the POJO for the current user, due to the fact that the “isUserAdmin” function returned true.
In this case, there is no logic to perform or anything to signal, so we simply ignore this parameter and just return true.
This is because the info that we need to ascertain if the user is permitted to perform the operation is implicit in the fact that the function “isUserAdmin” returned true, so we can simply “re-wrap” this value and return it.
- Now that we have built the pipeline but not executed it, we can assign it to a variable of type Mono<Boolean> (as that’s the return value of the final step and thus of the whole pipeline) and return this variable to the caller, at which point we are not involved anymore and it’s not our responsibility on what happens to this variable that represents our pipeline.
It might be that this variable is subscribed to and thus the pipeline gets executed or it might be the case that this variable is itself used as an intermediate step into another pipeline.
In any case, none of that is our concern because we performed all the steps required in this function and it’s up to our caller to decide what to do with this result now.
This pipeline that we just implemented is made up of smaller, lower-level operators like map, filter, switchIfEmpty which are common for all the Mono/Flux reactive collections.
What this means is that it is the code that we pass to these operators and the way and order we combine them that makes our program achieve higher-level results starting from lower-level components.
Basically, we managed to define the structure of our pipeline (map something into something else, filter for a given property and act differently in case the filtering operation is successful or not) and customize it to provide our own “building blocks”.
We are not mapping into any random object, but we are mapping from a UserEntity into a UserModel through our own specialized method “convertEntityToModel” and we are not filter on any random property but we are doing it on a specific boolean called “isUserAdmin” inside the UserModel.
As you can see, the building blocks are quite low level, they are simple mapping methods and functions that check if a boolean is set or not, but the result is the response to the question: “Is this user allowed to perform a given action” which is significantly higher level that the components of the pipeline.
This is the spirit and goal of the Reactive Programming.
Define low-level, easy-to-test components that can be combined to obtain high-level functionality that carries a reduced mental context to be understood and debugged.
Consider our own example, where we have a simple pipeline and we find out that the check fails when it should be successful.
Given how we structured the pipeline, there are only a limited number of places where the data might be wrong:
- During the retrieval of the current user
- In the “isUserAdmin” method
- In the branch that checks if the receiving Mono is empty
There is no other place where this data gets accessed/retrieved or mutated, and we know that the components of our pipeline are simple enough that we can inspect them very easily and check if they perform according to their specifications, maybe helped by our test suite to verify if some external resource changed, for example.
This is a significant advantage that will come clear in the next sections and articles, because as our pipeline become more complex and involved, both in the type of operators and their number, having the possibility to exclude that the error is on our own steps is a time-saver.
Being able to pinpoint where a certain variable is accessed and switched upon can make the difference between deciding to investigate the program itself for data corruption (for example) and going straight to the external resource, maybe a DB or a K/V store, to verify if the data there is correct.
Once a pipeline has been defined, its structure stays the same, and once the structure has been verified to be working, the only variable parts are:
- The source Mono/Flux on which this pipeline operates on
- The code that is executed by each operator at every step in the pipeline
Furthermore, if we did not change our code and we spot an issue that was not there before, and this issue is covered by a testcase (because this issue belongs to one or more steps that are defined in the pipeline), we can skip also the check for the code inside each operator and go straight to check if the source Mono/Flux contains the correct values, because that’s the only place where the error can originate from, in the shape on non-conformant data.
Bonus point: In the next article, we will rewrite and greatly simplify the example above to show how, in this case, the same can be achieved with less operators.
This is a common occurrence when starting out: A pipeline that’s been described logically is implemented following closely that initial “direction” which ensures that it works and is easily understandable, but as time goes on and new operators are discovered and new patterns of combining these operators emerge, the same pipeline can be rewritten and simplified to use less operators and maybe even slash some of the complexity that was in the original code too.
Often times the same high-level task can be achieved in more than one way, that each differ on how we structure our pipeline.