Combining the emissions of multiple Observables together using RxJava Zip operator in a Spring Boot Micro service

Introduction

Well, today we are going to discuss an advanced RxJava operator called Zip. Let’s first take a look at the zip operator at a glance and then move on to a real world example that leverages the capabilities of the operator. In fact, zip is a very powerful and useful operator provided in the RxJava library.

Prerequisites: Good knowledge of Java8, Functional Programming, RxJava, Spring Boot, Microservices.
User Level: Advanced

Since this is an advanced article, I am not going to walk you through the basics of RxJava or Spring Boot microservices. If you need to get some basic understanding of RxJava, I thoroughly recommend you to go through one of my previous blog posts [1] [2] or any other good introductory article about RxJava. If you need to learn more about Spring Boot microservices please go through some of the spring tutorials which are really comprehensive and impressive.

RxJava Zip Operator

Combines the emissions of multiple Observables together via a specified function and emits a single item for each combination based on the results of this function.


The Zip method returns an Observable that applies a function of your choosing to the combination of items emitted, in sequence, by two (or more) other Observables, with the results of this function becoming the items emitted by the returned Observable. It applies this function in strict sequence, so the first item emitted by the new Observable will be the result of the function applied to the first item emitted by Observable #1 and the first item emitted by Observable #2; the second item emitted by the new zip-Observable will be the result of the function applied to the second item emitted by Observable #1 and the second item emitted by Observable #2; and so forth. It will only emit as many items as the number of items emitted by the source Observable that emits the fewest items. [3]

Concrete example

Now let's move on to the scenario that we are going to consider today. Check out the illustration below which depicts the big picture of the system under consideration.


   
                                                              Figure 1: The Big Picture

Consider an eCommerce system such as Amazon where people are purchasing things by placing orders online and getting them delivered at their door steps.

Suppose we have a simple REST API which returns us product information associated with our order. There is another REST API which computes shipping cost and returns all the necessary shipping information pertaining to this order. Our micro service is going to call both of these downstream services asynchronously and merge those information before returning Order details to the end user. The product API returns the price of the item while the shipping API returns the shipping cost associated with it. Finally we sum them up to calculate the total price of the item and return it along with the other information about this order back to the user.

The response of the Product API is given below.
The response of the Shipping API looks something like this.
Upon the submission of a get request to the Order API which we are planning to expose to the clients, you may get the following response.
GET: http://localhost:8080/api/order
All the necessary steps to run the sample microservice application is given in the readme file distributed with the project itself [4], hence I am not gonna discuss them again here. Let’s now walk through the code and dive into the implementation details.
Here you may see the use of RxJava Zip operator. Notice that we are calling two REST endpoints asynchronously, first the product service to get the product details associated with this order and then the Shipping Service to get all the shipping data associated with this order. I have used mock endpoints for the demonstration purpose. Finally we want to combine these two emissions.  Here we are preparing the call to getProductDetails method. Then we are preparing the call to getShippingInformation method. Finally  we zip these two information as the information arrives in each Observable stream. We are combining the sources of data as pairs and create a wrapper object called Order details here. Notice that the constructor reference passed in as an argument to the Zip operator does the magic.

Finally let’s come back to the OrderResource class where we hook them up to each other using a subscription. I have given the code below. Here we merely create a subscriber to consume data. When the data is available in the stream, the Observable will notify the subscribers so that they can consume the data.

Exercise

Carefully check out the places in your code and other use cases which can be substituted with the RxJava Zip operator. When you refactor your code using RxJava, substitute them with Zip operator and see how much complexity it can reduce in your code. Another point to consider is that the performance you gain by calling all these IO bound tasks asynchronously which is well worth the payoff.

Conclusion

Well, we have covered a lot of ground in RxJava transformations and operator landscape today. I introduced an advanced RxJava operator called Zip which will be really helpful to you. I have explained it with a reference implementation and a sample use case while sharing the code with you [4].

References

[1] http://ravindraranwala.blogspot.com/2016/12/introducing-java-reactive-extentions-in.html
[2] http://ravindraranwala.blogspot.com/2017/01/calling-exterenal-nosql-database.html
[3] http://reactivex.io/documentation/operators/zip.html
[4] https://github.com/ravindraranwala/SpringBootRxJava

Comments

Popular posts from this blog

Introducing Java Reactive Extentions in to a SpringBoot Micro Service

Optimal binary search trees

Edit distance