Check out the rest of the series:
Knowing your RxJava Operator Toolbelt 1: Creating Observables
Knowing your RxJava Operator Toolbelt 2: Transforming Observables
Knowing your RxJava Operator Toolbelt 3: Combining Observables
Knowing your RxJava Operator Toolbelt 5: Utility Observables
RxJava is Java (and Kotlin’s) form of Reactive Programming, and thus Android’s too. It bestows the ultimate power in async operations through the use of observables. If you’re new, check out my Hardly Comprehensive Introduction.
RxJava is a powerful tool that only gets better with how well you know it. It’s got a high skill cap, and how close you get to it is determined by how well you know your operators.
Filter operations are thankfully some of the simplest to understand and you’d be a real mug to have done some RxJava before and not know what ‘first’ does, so I’m gonna be blasting through these with a pOW!
Filter
val observable = Observable.range(0, 10) .filter { it % 2 == 0 }.subscribe { Log.d(LOG_TAG, "Emitted: $it") }
Bare bones filter. You make the rules with a boolean.
Output
0 2 4 6 8
FirstElement
val observable = Observable.range(0, 10) .firstElement() .subscribe { Log.d(LOG_TAG, "Emitted: $it") }
Emits only the first item and thus returns a maybe (there could be no first items at all). How can it get simpler than that?
If you want to return a single instead, you can use the firstOrError method which should be self-explanatory if you’re not an idiot.
Output
0
LastElement
val observable = Observable.range(0, 10) .lastElement() .subscribe { Log.d(LOG_TAG, "Emitted: $it") }
Emits only the last item and thus returns a maybe. Just like with firstElement, you also have access to a lastOrError method.
Output
9
Skip
val observable = Observable.range(0, 5) .skip(3) .subscribe { Log.d(LOG_TAG, "Emitted: $it") }
Skip the first n items… That’s it.
Output
3 4
SkipLast
val observable = Observable.range(0, 5) .skipLast(3) .subscribe { Log.d(LOG_TAG, "Emitted: $it") }
Skip the last n items…. No seriously, that’s it.
Output
0 1
Take
val observable = Observable.range(0, 5) .take(3) .subscribe { Log.d(LOG_TAG, "Emitted: $it") }
Emit only the first n items or a set of items within a time period from starting the observables.
Output
0 1 2
TakeLast
val observable = Observable.range(0, 5) .takeLast(3) .subscribe { Log.d(LOG_TAG, "Emitted: $it") }
Emit only the last n items. Or a set of items within a time period of the observable finishing.
Output
2 3 4
ElementAt
val observable = Observable.range(0, 5) .elementAt(2) .subscribe { Log.d(LOG_TAG, "Emitted: $it") }
Emit only the element at n… That’s it.
Output
2
IgnoreElements
val observable = Observable.range(0, 5) .ignoreElements() .subscribe { Log.d(LOG_TAG, "Completed") }
Don’t emit any items, but mirror the observable’s termination notification. Like turning the observable into a completable. It will do what it needs to try an emit those items, it just won’t actually emit them.
Output
Completed
Debounce
val o1 = Observable.interval(400, TimeUnit.MILLISECONDS).map { it -> it.toInt() } val o2 = Observable.interval(1000, TimeUnit.MILLISECONDS).map { it -> it.toInt() } val observable = Observable.combineLatest(o1, o2, BiFunction { t1: Int, t2: Int -> return@BiFunction t1 + t2 }) .debounce(300, TimeUnit.MILLISECONDS) .subscribe { Log.d(LOG_TAG, "Emitted $it") }
Whenever an item is emitted, set a timespan. All items emitted during that timespan will be ignored. A debounce in RxJava is as it would be anywhere.
Output
2 3 6 7 10 11
Distinct
val observable = Observable.just( 1, 2, 2, 3, 3, 3) .distinct() .subscribe { Log.d(LOG_TAG, "Emitted $it") }
All emissions shall be unique! Any duplicates will not be emitted.
Output
1 2 3
Sample
val o1 = Observable.interval(350, TimeUnit.MILLISECONDS).map { it -> it.toInt() } val o2 = Observable.interval(1000, TimeUnit.MILLISECONDS).map { it -> it.toInt() } val observable = Observable.combineLatest(o1, o2, BiFunction { t1: Int, t2: Int -> return@BiFunction t1 + t2 }) .sample(1200, TimeUnit.MILLISECONDS) .subscribe { Log.d(LOG_TAG, "Emitted $it") }
Periodic Time Intervals are set. At every interval, the most recent item of the observable will be emitted.
Output
2 6 11 15 20