Skip to content

Knowing your RxJava Operator Toolbelt 2: Transforming Observables

Check out the rest of the series:
Knowing your RxJava Operator Toolbelt 1: Creating Observables
Knowing your RxJava Operator Toolbelt 3: Combining Observables
Knowing your RxJava Operator Toolbelt 4: Filtering Observables
Knowing your RxJava Operator Toolbelt 5: Utility Observables

RxJava is Java (and Kotlin’s) form of Reactive Programming, and thus Android’s too. It bestows the ultimate power in async operations through the use of observables. If you’re new, check out my Hardly Comprehensive Introduction.

RxJava is a powerful tool that only gets better with how well you know it. It’s got a high skill cap, and how close you get to it is determined by how well you know your operators.

Transforming Observables takes on quite a few different forms to change one observable into another as a product of its outputs. It includes solutions to many MANY developers’ problems. I’m looking at you flatMap.

Transform Operators

Buffer

val observable = Observable.range(1, 10)
        .buffer(2)
        .subscribe()

Waits till a set number of emissions is gathered, then emits them as a list. The above example emits data in groups of 2.

Window

val observable = Observable.range(1, 10)
        .window(5)
        .subscribe()

Divides the observable stream into windows of time, based on a given count. The above example emits groups of data collected over 5 second periods.

Map

val observable = Observable.range(1, 10)
        .map{ n -> n / 2 }
        .subscribe()

Transforms the emissions of the observable through a function. This can be literally any non-async task for said emissions to become something else, like say, transforming emitted integers into their string equivalents.

The above example divides every value by 2 before emitting them.

FlatMap

val observable = Observable.range(1, 10)
        .flatMap { n ->
            val numberIsEven = n % 2 == 0
            Observable.just(numberIsEven)
        }
        .subscribe()

FlatMap… FLATMAP. A god-send, this is. Transforms the emissions of one observable into a new observable then flattens them both into one fat observable.

In short, the output of one observable becomes the input of the other observable and they become one single observable (not a ‘Single’ observable though. That’s different). It’s like an async version of map.

Now it’s important to note that flatMap introduces a separate observable for each item it emits. This means that the order is disregarded and if one item can be emitted ahead of the other, it will be. If you don’t like that, then we still got 2 ‘map’ operators that fixes this.

SwitchMap

val observable = Observable.range(1, 10)
        .switchMap { n ->
            val numberIsEven = n % 2 == 0
            Observable.just(numberIsEven)
        }
        .subscribe()

This is the flatMap where each item before the currently emitted one stops being observed. With flatMap, when each item is updated, the updates keep getting emitted. This will only happen with the latest emission for switchMap.

ConcatMap

val observable = Observable.range(1, 10)
        .concatMap { n ->
            val numberIsEven = n % 2 == 0
            Observable.just(numberIsEven)
        }
        .subscribe()

This is the flatMap that follows the order of the items! This does however come at the cost of speed. The observable waits for each successive item to be emitted before it can emit the next.

Yes I used the same example for each ‘map’ function. Problem? They differ only in the way they emit

Enough with the maps now, let’s proceed.

Scan

val observable = Observable.range(1, 10)
        .scan { previous: Int, new: Int ->
            new + previous
        }
        .subscribe()

Just like map, this applies a function to emitted values but also takes into account the previously emitted value. In the ‘map’ function, you have access to both values (current and previous).

GroupBy

val observable = Observable.range(1, 10)
        .groupBy { n ->
            n % 2 == 0
        }
        .subscribe()

This transforms one observable into several groups based on a given value (a boolean, int, whatever really) and each group is given its own observable and identified by its key. Perfect for grouping emissions of similar types.