Skip to content

Knowing your RxJava Operator Toolbelt 4: Filtering Observables

Check out the rest of the series:
Knowing your RxJava Operator Toolbelt 1: Creating Observables
Knowing your RxJava Operator Toolbelt 2: Transforming Observables
Knowing your RxJava Operator Toolbelt 3: Combining Observables
Knowing your RxJava Operator Toolbelt 5: Utility Observables

RxJava is Java (and Kotlin’s) form of Reactive Programming, and thus Android’s too. It bestows the ultimate power in async operations through the use of observables. If you’re new, check out my Hardly Comprehensive Introduction.

RxJava is a powerful tool that only gets better with how well you know it. It’s got a high skill cap, and how close you get to it is determined by how well you know your operators.

Filter operations are thankfully some of the simplest to understand and you’d be a real mug to have done some RxJava before and not know what ‘first’ does, so I’m gonna be blasting through these with a pOW!

Filter

val observable = Observable.range(0, 10)
        .filter {
            it % 2 == 0
        }.subscribe {
            Log.d(LOG_TAG, "Emitted: $it")
        }

Bare bones filter. You make the rules with a boolean.

Output

0
2
4
6
8

FirstElement

val observable = Observable.range(0, 10)
        .firstElement()
        .subscribe { Log.d(LOG_TAG, "Emitted: $it") }

Emits only the first item and thus returns a maybe (there could be no first items at all). How can it get simpler than that?

If you want to return a single instead, you can use the firstOrError method which should be self-explanatory if you’re not an idiot.

Output

0

LastElement

val observable = Observable.range(0, 10)
        .lastElement()
        .subscribe { Log.d(LOG_TAG, "Emitted: $it") }

Emits only the last item and thus returns a maybe. Just like with firstElement, you also have access to a lastOrError method.

Output

9

Skip

val observable = Observable.range(0, 5)
        .skip(3)
        .subscribe { Log.d(LOG_TAG, "Emitted: $it") }

Skip the first n items… That’s it.

Output

3
4

SkipLast

val observable = Observable.range(0, 5)
        .skipLast(3)
        .subscribe { Log.d(LOG_TAG, "Emitted: $it") }

Skip the last n items…. No seriously, that’s it.

Output

0
1

Take

val observable = Observable.range(0, 5)
        .take(3)
        .subscribe { Log.d(LOG_TAG, "Emitted: $it") }

Emit only the first n items or a set of items within a time period from starting the observables.

Output

0
1
2

TakeLast

val observable = Observable.range(0, 5)
        .takeLast(3)
        .subscribe { Log.d(LOG_TAG, "Emitted: $it") }

Emit only the last n items. Or a set of items within a time period of the observable finishing.

Output

2
3
4

ElementAt

val observable = Observable.range(0, 5)
        .elementAt(2)
        .subscribe { Log.d(LOG_TAG, "Emitted: $it") }

Emit only the element at n… That’s it.

Output

2

IgnoreElements

val observable = Observable.range(0, 5)
        .ignoreElements()
        .subscribe { Log.d(LOG_TAG, "Completed") }

Don’t emit any items, but mirror the observable’s termination notification. Like turning the observable into a completable. It will do what it needs to try an emit those items, it just won’t actually emit them.

Output

Completed

Debounce

val o1 = Observable.interval(400, TimeUnit.MILLISECONDS).map { it -> it.toInt() }
val o2 = Observable.interval(1000, TimeUnit.MILLISECONDS).map { it -> it.toInt() }

val observable = Observable.combineLatest(o1, o2, BiFunction { t1: Int, t2: Int -> 
    return@BiFunction t1 + t2 
})
    .debounce(300, TimeUnit.MILLISECONDS)
    .subscribe { Log.d(LOG_TAG, "Emitted $it") }

Whenever an item is emitted, set a timespan. All items emitted during that timespan will be ignored. A debounce in RxJava is as it would be anywhere.

Output

2
3
6
7
10
11

Distinct

val observable = Observable.just( 1, 2, 2, 3, 3, 3)
    .distinct()
    .subscribe { Log.d(LOG_TAG, "Emitted $it") }

All emissions shall be unique! Any duplicates will not be emitted.

Output

1
2
3

Sample

val o1 = Observable.interval(350, TimeUnit.MILLISECONDS).map { it -> it.toInt() }
val o2 = Observable.interval(1000, TimeUnit.MILLISECONDS).map { it -> it.toInt() }

val observable = Observable.combineLatest(o1, o2, BiFunction { t1: Int, t2: Int ->
    return@BiFunction t1 + t2
})
    .sample(1200, TimeUnit.MILLISECONDS)
    .subscribe { Log.d(LOG_TAG, "Emitted $it") }

Periodic Time Intervals are set. At every interval, the most recent item of the observable will be emitted.

Output

2
6
11
15
20

 

Tags: