Skip to content

Introduction to RxJava with Observers and Observables

ReactiveX, an API for asynchronous programming with observable streams. An API that provides a very beautiful alternative to async tasks. An API that has become core functionality across so many different programming languages and frameworks.

RxJava is its implementation for Java, and thus, Android (yes, even in today’s Kotlin era).

RxJava is an ever increasing-in-demand tool for developers of many platforms even beyond Android. Its most attractive feature comes in the form of performing async operations without the use of callbacks.

Instead, Observables and Observers are used in conjunction to emit data (once or multiple times) and include methods of their own to handle what to do each time data is emitted. If that doesn’t make too much sense now, don’t worry, you’re not alone. Read on.

What is an Observable and an Observer?

val myObservable = Observable.just(1, 2, 3)

val myObserver = myObservable.subscribe { receivedNumber ->
    Log.d(LOG_TAG, "Received Number $receivedNumber")
}

Observable – An object that emits a stream of data and works with an observer which receives that data.

Observer – ‘Subscribes’ to an observable to let it start emitting data and handles what to do upon receiving the data.

In the above example, Observable.just(1, 2, 3) emits the integers 1, 2, and 3 in that order. The observer subscribes to the observable and once it does, receives these numbers in the same order and each time, simply logs them.

Observable Lifecycle

There are a number of lifecycle methods of an observable that happen when it emits data, completes, throws an error, etc. I’ll list some of the most commonly used ones below.

onNext – Called whenever new data is emitted. It’s also what you see in the lambda function above (following subscribe).

onComplete – Called when there is no more data to be emitted. As the name implies, the stream of data has been completely emitted.

onError – Called when an error gets thrown by the observable. Proper use of this lifecycle event allows for graceful error handling.

onSubscribe – Called when an observer subscribes to the observable.

Adding RxJava to your App

def rxJavaVersion = 2.2.9

implementation "io.reactivex.rxjava2:rxjava:${rxJavaVersion}"

If you want to give RxJava a try in your own project, you can do so by adding this to your app/build.gradle file.

Different Types of Observable

The natural observable emits a continuous stream of data until it is completed. This won’t always be what you want. You might want to emit a single value, emit a value with a chance of not receiving that value, or simply invoke other functions after an async task with no return value.

val mySingle = Single.just(1)
val singleObserver = mySingle.subscribe { data ->
    Log.d(LOG_TAG, "Received $data")
}

val myMaybe = Maybe.empty<Int>()
val maybeObserver = myMaybe
        .defaultIfEmpty(1)
        .subscribe { data ->
            Log.d(LOG_TAG, "Received $data")
        }

val myCompletable = Completable.complete()
val completableObserver = myCompletable
        .subscribe {
            Log.d(LOG_TAG, "Task completed")
        }

Single – Emits only a single value. onNext is called once, and onComplete is called immediately after.

Maybe – Emits either one or zero values. When zero values are emitted, onNext will be skipped and onComplete will be called right away. Can use the defaultIfEmpty method to always emit a default value where zero values would be emitted.

Completable – Doesn’t emit any values. You can subscribe just like a callback with no return function.

Flowables and Backpressure

There is one more type of Observable and that is the Flowable. Just like an Observable, Flowables emit a continuous stream of data until it’s completed but with one key difference:

Imagine data is being emitted faster than your observer can handle them. This is known as backpressure and in most cases, would cause an error. A Flowable is an Observable that includes a backpressure strategy which tells it what to do in case backpressure happens.

val myFlowable = Observable.range(1, 100000)
    .toFlowable(BackpressureStrategy.DROP)
 
val flowableObserver = myFlowable.subscribe {data ->
    Log.d(LOG_TAG, "Received $data")
}

There are a few different backpressure strategies to know about:

Buffer – Buffers events in memory until a subscriber can consume them. By default, the buffer size is up to 128 items before an error is caused. This can be overridden, but note that doing so could affect performance.

Drop – Simply drops events a subscriber can’t consume.

Latest – Only keeps the latest emitted value until the subscriber can consume it and drops the rest.

Error – Simply throws an exception if backpressure occurs.

val myFlowable = Observable.range(1, 100000)
 .toFlowable(BackpressureStrategy.MISSING)
 
val flowableObserver = myFlowable
 .onBackpressureDrop()
 .subscribe { data ->
   Log.d(LOG_TAG, "Received $data")
 }

Missing – The lack of a backpressure strategy. You’d use this if you want to handle backpressure on the client side (since backpressure strategies are created with the observable). Failing to state a strategy will throw an exception upon backpressure.

Observable vs Flowable

It’s known practice that you’d use observables when you’re emitting data that you know won’t overflood your observer and use a flowable otherwise. I’ll be honest, I haven’t yet found a reason why you can’t just use a flowable in either case. Maybe flowables use that little bit of extra memory?

Error Handling

No code is safe from errors. You already know unhandled backpressure can be a cause for exceptions. On top of that, any exception that occurs from your own code within the observer’s subscribe method is considered an error handled by the observer.

The great thing is RxJava contains several different ways to handle these errors:

doOnError – Simply perform an action if an error occurs

val observer = myObservable
    .doOnError { Log.e(LOG_TAG, "ErrorOccurred") }
    .subscribe()

onErrorReturnItem – Return a default item if an error occurs

val observer = myObservable
    .onErrorReturnItem(0)
    .subscribe()
    }

onErrorReturn – Just like onErrorReturnItem, but takes in a function that returns the desired data type (for a dynamic default value)

val observer = myObservable
        .onErrorReturn{ throwable -> throwable.message}
        .subscribe()

onErrorResumeNext – Returns a default sequence if an error occurs. Can also take a function for dynamic data.

val observer = myObservable
        .onErrorResumeNext(Observable.just(2, 4, 6))
        .subscribe()

retry – Attempt resubscribing to the observable if an error occurs. You can pass in a maximum number of tries, or leave it blank for it to retry infinitely.

val observer = myObservable
        .retry(3)
        .subscribe()

You can also pass in a boolean predicate to achieve a ‘retry-on-condition’ scenario.

val observer = myObservable
        .retry{ integer, throwable -> integer > 0 }
        .subscribe()

Not defining any of these will simply result in a crash when an error occurs.

Further Reading

This article covers the basics of RxJava, and now you should be able to use observables and observers to control the flow of data in your app.

But isn’t RxJava supposed to be asynchronous? Where does that come into play?

That’s all about the threads my dear. Read my guide on Multithreading to find out.

RxJava also has a great number of operators to help you create, combine, and process data in your observable streams. Read my posts on Knowing your RxJava Operator Toolbelt.

Tags: