Skip to content

Knowing your RxJava Operator Toolbelt 3: Combining Observables

Check out the rest of the series:
Knowing your RxJava Operator Toolbelt 1: Creating Observables
Knowing your RxJava Operator Toolbelt 2: Transforming Observables
Knowing your RxJava Operator Toolbelt 4: Filtering Observables
Knowing your RxJava Operator Toolbelt 5: Utility Observables

RxJava is Java (and Kotlin’s) form of Reactive Programming, and thus Android’s too. It bestows the ultimate power in async operations through the use of observables. If you’re new, check out my Hardly Comprehensive Introduction.

RxJava is a powerful tool that only gets better with how well you know it. It’s got a high skill cap, and how close you get to it is determined by how well you know your operators.

So you know when you want to have two different observables to come together in one single stream? Yeah, that, that is the situation we’re covering.

There’s a few different ways to go ‘combine’ these observables, all differing by the type or order they are emitted to your observer so let’s buckle up and get observing.

Combine Operators

Merge

val o1 = Observable.interval(1, TimeUnit.SECONDS).map { it -> "A$it" }
val o2 = Observable.interval(2, TimeUnit.SECONDS).map { it -> "B$it" }

val combinedObservable = Observable.merge(o1, o2).subscribe {
    Log.d(LOG_TAG, it)
}

Merge is your standard run-of-the-mill combine operator. It combines the observables and lets them emit as one.

Output

A0
A1
B0
A2
A3
B1

Concat

val o1 = Observable.range(0, 3).map { it -> "A$it" }
val o2 = Observable.range(0, 3).map { it -> "A$it" }

val combinedObservable = Observable.concat(o1, o2).subscribe {
    Log.d(LOG_TAG, it)
}

Observables emit in order. The observable will wait for any current observable to be fully emitted before starting to emit the next one.

Output

A0
A1
A2
B0
B1
B2

Combine Latest

val o1 = Observable.interval(1, TimeUnit.SECONDS).map { it -> "A$it" }
val o2 = Observable.interval(2, TimeUnit.SECONDS).map { it -> "B$it" }
val observableArray = arrayOf(o1, o2)

val combinedObservable = Observable.combineLatest(observableArray) { args ->
    args.map { it -> it.toString() }
}.subscribe { array ->
    for (item in array) { Log.d(LOG_TAG, item) }
}

Whenever an update is made to either observable, the latest items from all observables are emitted via a specified function. It’s a bit more involved as the args supplied by combineLatest are in the Any or Object class so you have to map it yourself, then emissions come in the form of a list of the latest emissions from each observer.

Output

A0
B0
A1
B0
A2
B0
A3
B0
A3
B1

Join

val o1 = Observable.interval(1, TimeUnit.SECONDS).map { it -> "A$it" }
val o2 = Observable.interval(2, TimeUnit.SECONDS).map { it -> "B$it" }

val combinedObservable = o1.join<String, Long, Long, String>(o2,
        Function { Observable.timer(500, TimeUnit.MILLISECONDS) },
        Function { Observable.timer(0, TimeUnit.MILLISECONDS) },
        BiFunction { l, r -> l + r })
        .subscribe {
            Log.d(LOG_TAG, "Joined result: $it")
        }

This one is quite involved (and actually a pain to write in Kotlin). When one observable emits, there is a time window for the other to emit (defined by the functions). If the other emits within the specified time window, the results are joined by the Result Selector (the BiFunction) and emitted.

As can be seen, the time window given for when one observable emits first can be different from the other. If the other observable fails to emit during the time window, then the emission is dropped.

The time windows are also independent of each other. Say o1 emits and starts its time window, o2 emits during the time window and the joined result is emitted, but o2’s time window still starts regardless, so if o1 emits again within this time window, another joined result is emitted.

Output

A1 B0
A3 B1
A5 B2
A7 B3

SwitchOnNext

val tensObservable = Observable.interval(1, TimeUnit.SECONDS).map { it -> it + 10 }
val twentiesObservable = Observable.interval(1, TimeUnit.SECONDS).map { it -> it + 20 }
val observableWithObservables = Observable.interval(5, TimeUnit.SECONDS).map { it ->
    if (it < 1) { tensObservable }
    else { twentiesObservable }
}

val combinedObservable = Observable
        .switchOnNext(observableWithObservables)
        .subscribe { Log.d(LOG_TAG, it.toString()) }

Useful if you have an observable of observables! Emits items from the first observable until the second observable can start emitting, then unsubscribes from the first and starts emitting from the second.

Output

10
11
12
13
20
21
22
23

Zip

val o1 = Observable.interval(1, TimeUnit.SECONDS).map { it -> "A$it" }
val o2= Observable.interval(3, TimeUnit.SECONDS).map { it -> "B$it" }

val combinedObservable = Observable.zip(o1, o2, BiFunction { t1: String, t2: String ->
    return@BiFunction "$t1 $t2"
}).subscribe {
    Log.d(LOG_TAG, it)
}

Combines observables via a function and emits single items for each combination based on the results of this function. Zip waits for both observables to have emitted before releasing the combined emission.

Output

A0 B0
A1 B1
A2 B2
A3 B3

 

Tags: