Engineering

RxJava Backpressure in practice

d# RxJava 2 Backpressure in practice

If someone ranked RxJava features by the least understood and most unclear, Backpressure would probably be placed in the top tier. The majority of us know of its existence, but only a few actually worked with it. It doesn’t occur often in the Android world, but if it’s not handled properly, it may freeze or even crash our application.

When I was exploring this topic, what I found was a good deal of theory, but a dramatic lack of real code examples that would help me visualize it better.

This article is meant to fill this gap by presenting different backpressure strategies with simple code examples.

Brief theory

Firstly, we should briefly say what Backpressure really is and when it can happen.

In simple words:

Backpressure occurs when data is produced faster than it can be consumed.

To visualize it better, let’s look at the following analogy:

Imagine we are constantly filling this funnel with some liquid. If it is produced faster than it can escape, the cone part will work as a buffer for some time. Eventually, after it is filled, the liquid will overflow.

Depending on our attitude towards it, we can:

  1. increase the buffer by taking a funnel with a bigger cone part, if the liquid is too precious,
  2. let it pour, if we don’t mind the wasted liquid,
  3. tune the emitter to produce more slowly, if possible.

Real-life

Imagine we develop a very popular live streaming application in which people leave reactions in real-time (just like on Facebook). Our app needs to display them on the UI thread, so if there are thousands of them at the same time, Backpressure will quickly pop up.

Coding

In the next sections, we’ll see how these 3 different scenarios affect the app behavior. Using RxJava’s Flowable class and its different Backpressure Strategies.

For the sake of simplicity, events are created by hand and values are only shown in a TextView and in a ProgressBar.

Increasing the buffer

This approach may seem reasonable. If we cannot consume the data as fast as it’s produced, we may put them in some kind of cache for later use. Modern devices should have enough memory for it. To achieve this, we use BackpressureStrategy.BUFFER. In this strategy the buffer will expand every time it needs more memory. Its initial size is 128 items. You can access it by:

System.getProperty("rx2.buffer-size")
System.setProperty("rx2.buffer-size", newValue)

Remember to do it before you use any RxJava class. Let’s see what happens after executing this piece of code:

Flowable.create<Int>({ emitter ->
    repeat(10_000_000) { count ->
        emitter.onNext(count)
    }
}, BackpressureStrategy.BUFFER)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe { count ->
        text_view.text = count.toString()
        progress_bar.progress = 
            ((count / 10_000_000f) * 100).toInt()
    }

The app periodically freezes. When it happens, we can see similar logs in the Logcat:

Background concurrent copying GC freed 3102720(61MB) AllocSpace objects, 0(0B) LOS objects, 2% free, 226MB/232MB, paused 17us total 456.470ms

It means that the system is trying to allocate more memory for our app and at the same time Garbage Collector is freeing the previously allocated memory. These operations are so demanding that the system cannot display the graphics properly anymore. Eventually, if enough memory is available for caching more emissions, the app unfreezes until the next allocation is required.

It’s like removing the funnel that is too small and replacing it with a bigger one - it takes some time and stops the entire system.

This behavior is the side effect of the buffer’s ability to grow. If there are too many events or the device memory is very low, we may eventually encounter an OutOfMemoryException.

Letting it pour

Next, we’ll look into two other strategies: DROP and LATEST. They are quite similar, but a few things are different.

Dropping values

If consumer is busy processing the data, this strategy will literally drop all the values that couldn’t be processed. Let’s execute the previous code, replacing BUFFER with DROP:

At the first glance it looks fine. The emission seems quite smooth. Unfortunately, we also see that the TextView doesn’t show the last values and ProgressBar isn’t fully filled.

Processing data takes much more time than the intervals between emissions, as it includes both switching from I/O Thread to Main Thread and updating widgets. According to the strategy assumptions, values that are emitted during this process are lost. That’s why the screen isn’t updated at the end of the emission. The last values are emitted while the previous data is being processed and the strategy drops them. When the consumer is finally ready to receive more data, there is nothing left - the emission has already finished.

To illustrate this situation better, I created an additional counter, which was incremented on every widgets update in the subscribe() method. After the emission was over, it showed a value of 391 887 (it may vary between runs). It means that the widgets weren’t updated for over 9.6 million items!

Keeping the most recent value

LATEST is a twin brother of DROP. It also drops unconsumed values, but caches the last one. It may be compared to the BUFFER with a constant size of 1 item. Thanks to that, when the consumer asks for a new value, it always receives the most recent one.

The first notable difference here is that the TextView and ProgressBar receive the last value. The second one is the value of the additional counter I introduced in the DROP subsection. Here it showed 1 319 707 widget updates (again, may vary across runs). Almost a million more than DROP. That’s because the caching mechanism naturally slows the emission.

Tuning the emitter

There’s yet another approach that cannot be simply set by the BackpressureStrategy enum, but is implemented in some operators, like Flowable.range, Flowable.fromIterable and Flowable.fromArray. The documentation says that these operators signal values on-demand (i.e., when requested). It means that the emission is paused until the consumer is ready to receive new values, so there’s no need for caching or dropping. Let’s see what happens after executing this code:

Flowable.range(0, 10_000_000)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe { count ->
         text_view.text = count.toString()
         progress_bar.progress = 
             ((count / 10_000_000f) * 100).toInt()
    }

The execution happens much slower than before because the emitter is perfectly synced with the consumer. The additional counter we used before, seems to confirm that. It shows exactly 10 000 000 updates, so not even a single value was lost.

Conclusion

We’ve seen how each strategy handles the immense data emission and what the user experience is:

  1. BUFFER doesn’t do well with such a number of events. Moreover, we were emitting only plain integers. If we used more memory consuming objects instead, its performance would probably be even worse.
  2. DROP and LATEST make the emission smoother but can waste a lot of data. They are both useful, when users don’t have to see every single update (most smartphones refresh the screen at the rate of 60Hz, so it won’t be possible to display more than 60 values per second anyway). LATEST is usually a safer choice, as it always provides a final result.
  3. By using on-demand emission built-in some operators, we can generate a flow in which no items are lost. This would be helpful, for example, if we gathered some analytics and required the data to be reliable. Naturally, not every data source will have this mechanism implemented. Fortunately, we can write a custom operator which will implement the so-called reactive pull to achieve similar behavior. However, this is a story for a different article.