Check out the rest of the series:
Knowing your RxJava Operator Toolbelt 1: Creating Observables
Knowing your RxJava Operator Toolbelt 2: Transforming Observables
Knowing your RxJava Operator Toolbelt 3: Combining Observables
Knowing your RxJava Operator Toolbelt 4: Filtering Observables
RxJava is Java (and Kotlin’s) form of Reactive Programming, and thus Android’s too. It bestows the ultimate power in async operations through the use of observables. If you’re new, check out my Hardly Comprehensive Introduction.
RxJava is a powerful tool that only gets better with how well you know it. It’s got a high skill cap, and how close you get to it is determined by how well you know your operators.
In this 5th and final installment in the series, we’ll talk about Utility operators, basically the operators that are just good at helping out with a few stuff but can’t be placed in any category. These are often good in just about any use case and can be used to just further optimise your observable flows.
We’re mixing in some of the useful boolean and mathematical operators here as well, since I decided many of them are just common sense to any seasoned programmer. They don’t warrant their own sections.
Delay
Observable.just(1, 2, 3) .delay(5, TimeUnit.SECONDS) .subscribe()
This only delays the observable before it starts emitting. It doesn’t delay the timing between emissions, only the time before the observable starts emitting. For example, this observable is subscribed to, 5 seconds pass, then 1, 2, and 3 are emitted immediately after the other.
ObserveOn
Observable.just(1, 2, 3) // UI THREAD .map { i -> i + 1 } // UI THREAD .observeOn(Schedulers.io()) // BACKGROUND THREAD .map { i -> i * 2 } // BACKGROUND THREAD .subscribe() // BACKGROUND THREAD
One of the most commonly used operators, and for good reasons. This allows you to specify the thread for which your other operators run from.
I could write a whole article on threading, but as a starter for example, you can choose to observe on the background thread (Schedulers.io) if you have an observable with beefy map functions and you don’t want to freeze the UI which runs on the main thread.
SubscribeOn
Observable.just(1, 2, 3) // BACKGROUND THREAD .map { i -> i + 1 } // BACKGROUND THREAD .subscribeOn(Schedulers.io()) // BACKGROUND THREAD .map { i -> i * 2 } // BACKGROUND THREAD .subscribe() // BACKGROUND THREAD
This allows you to specify the thread for which your other operators run from… as well!
Okay the difference between this and ObserveOn is the operators they affect. ObserveOn will only affect the operators that come after it, while SubscribeOn will affect the whole observable.
DoOperators
They’re kinda just there when you need them. If you wanna do something on, well, pretty much any stage of the observable sequence.
DefaultIfEmpty
Observable.empty<Int>() .defaultIfEmpty(5) .subscribe()
As the name says, if the source observable is empty, this’ll make it emit a default value.
Timeout
listService.getListByID() .timeout(5, TimeUnit.SECONDS) .subscribe { list, throwable -> if (throwable != null) { throwable.printStackTrace() } }
If a period of time passes where no emissions have been released, issue an error. Don’t forget to handle the error.
TimeInterval
// This always emits 2 lmao Observable.interval(2, TimeUnit.SECONDS) .timeInterval(TimeUnit.SECONDS) .subscribe()
This intercepts the observable to instead emit the time taken in between emissions instead of the emissions themselves.
TimeStamp
Observable.interval(2, TimeUnit.SECONDS) .timestamp(TimeUnit.SECONDS) .subscribe { Log.d(LOG_TAG, "Value: ${it.value()}, Time: ${it.time()}") }
This changes the data type of the emissions to Timed<DataType> in order to attach timestamps to them. These timestamps take the form of the current time in the system and not something like the number of seconds from the start of the observable, aww hell no.
Materialize/Dematerialize
Observable.just(1, 2, 3, 4, 5) .materialize() .subscribe { Log.d(LOG_TAG, "Notification completed?: ${it.isOnComplete}, Value: ${it.value}") }
Materializing an observable changes the data type of its emissions into Notification<DateType>. The Notification objects allows you to check whether the emission method is OnComplete, OnNext, or OnError. Of course, you can still get the emitted value.
Dematerialize just reverses the effect for if you ever have an observable that’s already been materialized.
Boolean Operators
There’s quite a few here that can come in handy, but most of their names speak of themselves. These include contains, sequenceEqual, skipWhile/Until, and takeWhile/Until. The noteworthy ones here are:
All
Observable.just(1, 3, 6, 8, 10) .all { it > 5 } // In this case, 1 and 3 will be discarded .subscribe()
This discards any values that don’t meet the specified conditions.
Amb
val slowerObservable = Observable.just(1, 3, 5).delay(5, TimeUnit.SECONDS) val fasterObservable = Observable.just(2, 4, 6) Observable.ambArray(slowerObservable, fasterObservable) .subscribe() // Only the faster observable gets used
This waits for one of the observables to start emitting, then discards the other one. In other words, only the faster observable gets to emit.
Mathematical Operators
These do require an extra dependency:
implementation 'com.github.akarnokd:rxjava2-extensions:0.20.0'
But just like the Boolean Operators, these ones mostly speak for themselves with their names. These include average, sum, count, min and max. The single noteworthy operator here is:
Reduce
Observable.just(1, 2, 3, 4, 5) .reduce { t1: Int, t2: Int -> t1 * t2 } // Performs 1 * 2 * 3 * 4 * 5 .subscribe() // Results in 120
This reduces all the emissions into a single value by sequentially applying a function to them. The result of the function applied on the first two emissions will be fed back with the 3rd emission, and so on until the emissions have been completely reduced.