Check out the rest of the series:
Knowing your RxJava Operator Toolbelt 1: Creating Observables
Knowing your RxJava Operator Toolbelt 2: Transforming Observables
Knowing your RxJava Operator Toolbelt 4: Filtering Observables
Knowing your RxJava Operator Toolbelt 5: Utility Observables
RxJava is Java (and Kotlin’s) form of Reactive Programming, and thus Android’s too. It bestows the ultimate power in async operations through the use of observables. If you’re new, check out my Hardly Comprehensive Introduction.
RxJava is a powerful tool that only gets better with how well you know it. It’s got a high skill cap, and how close you get to it is determined by how well you know your operators.
So you know when you want to have two different observables to come together in one single stream? Yeah, that, that is the situation we’re covering.
There’s a few different ways to go ‘combine’ these observables, all differing by the type or order they are emitted to your observer so let’s buckle up and get observing.
Combine Operators
Merge
val o1 = Observable.interval(1, TimeUnit.SECONDS).map { it -> "A$it" } val o2 = Observable.interval(2, TimeUnit.SECONDS).map { it -> "B$it" } val combinedObservable = Observable.merge(o1, o2).subscribe { Log.d(LOG_TAG, it) }
Merge is your standard run-of-the-mill combine operator. It combines the observables and lets them emit as one.
Output
A0 A1 B0 A2 A3 B1
Concat
val o1 = Observable.range(0, 3).map { it -> "A$it" } val o2 = Observable.range(0, 3).map { it -> "A$it" } val combinedObservable = Observable.concat(o1, o2).subscribe { Log.d(LOG_TAG, it) }
Observables emit in order. The observable will wait for any current observable to be fully emitted before starting to emit the next one.
Output
A0 A1 A2 B0 B1 B2
Combine Latest
val o1 = Observable.interval(1, TimeUnit.SECONDS).map { it -> "A$it" } val o2 = Observable.interval(2, TimeUnit.SECONDS).map { it -> "B$it" } val observableArray = arrayOf(o1, o2) val combinedObservable = Observable.combineLatest(observableArray) { args -> args.map { it -> it.toString() } }.subscribe { array -> for (item in array) { Log.d(LOG_TAG, item) } }
Whenever an update is made to either observable, the latest items from all observables are emitted via a specified function. It’s a bit more involved as the args supplied by combineLatest are in the Any or Object class so you have to map it yourself, then emissions come in the form of a list of the latest emissions from each observer.
Output
A0 B0 A1 B0 A2 B0 A3 B0 A3 B1
Join
val o1 = Observable.interval(1, TimeUnit.SECONDS).map { it -> "A$it" } val o2 = Observable.interval(2, TimeUnit.SECONDS).map { it -> "B$it" } val combinedObservable = o1.join<String, Long, Long, String>(o2, Function { Observable.timer(500, TimeUnit.MILLISECONDS) }, Function { Observable.timer(0, TimeUnit.MILLISECONDS) }, BiFunction { l, r -> l + r }) .subscribe { Log.d(LOG_TAG, "Joined result: $it") }
This one is quite involved (and actually a pain to write in Kotlin). When one observable emits, there is a time window for the other to emit (defined by the functions). If the other emits within the specified time window, the results are joined by the Result Selector (the BiFunction) and emitted.
As can be seen, the time window given for when one observable emits first can be different from the other. If the other observable fails to emit during the time window, then the emission is dropped.
The time windows are also independent of each other. Say o1 emits and starts its time window, o2 emits during the time window and the joined result is emitted, but o2’s time window still starts regardless, so if o1 emits again within this time window, another joined result is emitted.
Output
A1 B0 A3 B1 A5 B2 A7 B3
SwitchOnNext
val tensObservable = Observable.interval(1, TimeUnit.SECONDS).map { it -> it + 10 } val twentiesObservable = Observable.interval(1, TimeUnit.SECONDS).map { it -> it + 20 } val observableWithObservables = Observable.interval(5, TimeUnit.SECONDS).map { it -> if (it < 1) { tensObservable } else { twentiesObservable } } val combinedObservable = Observable .switchOnNext(observableWithObservables) .subscribe { Log.d(LOG_TAG, it.toString()) }
Useful if you have an observable of observables! Emits items from the first observable until the second observable can start emitting, then unsubscribes from the first and starts emitting from the second.
Output
10 11 12 13 20 21 22 23
Zip
val o1 = Observable.interval(1, TimeUnit.SECONDS).map { it -> "A$it" } val o2= Observable.interval(3, TimeUnit.SECONDS).map { it -> "B$it" } val combinedObservable = Observable.zip(o1, o2, BiFunction { t1: String, t2: String -> return@BiFunction "$t1 $t2" }).subscribe { Log.d(LOG_TAG, it) }
Combines observables via a function and emits single items for each combination based on the results of this function. Zip waits for both observables to have emitted before releasing the combined emission.
Output
A0 B0 A1 B1 A2 B2 A3 B3