Skip to content

Knowing your RxJava Operator Toolbelt 5: Utility Operators

Check out the rest of the series:
Knowing your RxJava Operator Toolbelt 1: Creating Observables
Knowing your RxJava Operator Toolbelt 2: Transforming Observables
Knowing your RxJava Operator Toolbelt 3: Combining Observables
Knowing your RxJava Operator Toolbelt 4: Filtering Observables

RxJava is Java (and Kotlin’s) form of Reactive Programming, and thus Android’s too. It bestows the ultimate power in async operations through the use of observables. If you’re new, check out my Hardly Comprehensive Introduction.

RxJava is a powerful tool that only gets better with how well you know it. It’s got a high skill cap, and how close you get to it is determined by how well you know your operators.

In this 5th and final installment in the series,  we’ll talk about Utility operators, basically the operators that are just good at helping out with a few stuff but can’t be placed in any category. These are often good in just about any use case and can be used to just further optimise your observable flows.

We’re mixing in some of the useful boolean and mathematical operators here as well, since I decided many of them are just common sense to any seasoned programmer. They don’t warrant their own sections.

Delay

Observable.just(1, 2, 3)
        .delay(5, TimeUnit.SECONDS)
        .subscribe()

This only delays the observable before it starts emitting. It doesn’t delay the timing between emissions, only the time before the observable starts emitting. For example, this observable is subscribed to, 5 seconds pass, then 1, 2, and 3 are emitted immediately after the other.

ObserveOn

Observable.just(1, 2, 3) // UI THREAD
        .map { i -> i + 1 } // UI THREAD
        .observeOn(Schedulers.io()) // BACKGROUND THREAD
        .map { i -> i * 2 } // BACKGROUND THREAD
        .subscribe() // BACKGROUND THREAD

One of the most commonly used operators, and for good reasons. This allows you to specify the thread for which your other operators run from.

I could write a whole article on threading, but as a starter for example, you can choose to observe on the background thread (Schedulers.io) if you have an observable with beefy map functions and you don’t want to freeze the UI which runs on the main thread.

SubscribeOn

Observable.just(1, 2, 3) // BACKGROUND THREAD
        .map { i -> i + 1 } // BACKGROUND THREAD
        .subscribeOn(Schedulers.io()) // BACKGROUND THREAD
        .map { i -> i * 2 } // BACKGROUND THREAD
        .subscribe() // BACKGROUND THREAD

This allows you to specify the thread for which your other operators run from… as well!

Okay the difference between this and ObserveOn is the operators they affect. ObserveOn will only affect the operators that come after it, while SubscribeOn will affect the whole observable.

DoOperators

 

 

 

 

They’re kinda just there when you need them. If you wanna do something on, well, pretty much any stage of the observable sequence.

DefaultIfEmpty

Observable.empty<Int>()
        .defaultIfEmpty(5)
        .subscribe()

As the name says, if the source observable is empty, this’ll make it emit a default value.

Timeout

listService.getListByID()
        .timeout(5, TimeUnit.SECONDS)
        .subscribe { list, throwable ->
            if (throwable != null) { throwable.printStackTrace() }
        }

If a period of time passes where no emissions have been released, issue an error. Don’t forget to handle the error.

TimeInterval

// This always emits 2 lmao
Observable.interval(2, TimeUnit.SECONDS)
        .timeInterval(TimeUnit.SECONDS)
        .subscribe()

This intercepts the observable to instead emit the time taken in between emissions instead of the emissions themselves.

TimeStamp

Observable.interval(2, TimeUnit.SECONDS)
        .timestamp(TimeUnit.SECONDS)
        .subscribe {
            Log.d(LOG_TAG, "Value: ${it.value()}, Time: ${it.time()}")
        }

This changes the data type of the emissions to Timed<DataType> in order to attach timestamps to them. These timestamps take the form of the current time in the system and not something like the number of seconds from the start of the observable, aww hell no.

Materialize/Dematerialize

Observable.just(1, 2, 3, 4, 5)
        .materialize()
        .subscribe { Log.d(LOG_TAG, "Notification completed?: ${it.isOnComplete}, Value: ${it.value}") }

Materializing an observable changes the data type of its emissions into Notification<DateType>. The Notification objects allows you to check whether the emission method is OnComplete, OnNext, or OnError. Of course, you can still get the emitted value.

Dematerialize just reverses the effect for if you ever have an observable that’s already been materialized.

Boolean Operators

There’s quite a few here that can come in handy, but most of their names speak of themselves. These include containssequenceEqual, skipWhile/Until, and takeWhile/Until. The noteworthy ones here are:

All

Observable.just(1, 3, 6, 8, 10)
        .all { it > 5 } // In this case, 1 and 3 will be discarded
        .subscribe()

This discards any values that don’t meet the specified conditions.

Amb

val slowerObservable = Observable.just(1, 3, 5).delay(5, TimeUnit.SECONDS)
val fasterObservable = Observable.just(2, 4, 6)

Observable.ambArray(slowerObservable, fasterObservable)
        .subscribe() // Only the faster observable gets used

This waits for one of the observables to start emitting, then discards the other one. In other words, only the faster observable gets to emit.

Mathematical Operators

These do require an extra dependency:

implementation 'com.github.akarnokd:rxjava2-extensions:0.20.0'

But just like the Boolean Operators, these ones mostly speak for themselves with their names. These include average, sumcountmin and max. The single noteworthy operator here is:

Reduce

Observable.just(1, 2, 3, 4, 5)
        .reduce { t1: Int, t2: Int -> t1 * t2 } // Performs 1 * 2 * 3 * 4 * 5
        .subscribe() // Results in 120

This reduces all the emissions into a single value by sequentially applying a function to them. The result of the function applied on the first two emissions will be fed back with the 3rd emission, and so on until the emissions have been completely reduced.

Tags: