Skip to content

Understanding RxJava Subjects and their Different Types

If you have any basic knowledge of RxJava, you’d know that an Observable is a stream ready to emit data when a Subscriber subscribes to it.

A Subject is well, both an Observable and a Subscriber. It can subscribe to one or more observables, and can re-emit that data and possibly new data as well to any other subscribers. It’s a bridge or proxy you could say. It’s kind of like a signal repeater.. on steroids.

Another key thing about Subjects is that they are HOT ( ͡° ͜ʖ ͡°) and you’ll see why that matters in the basic uses below.

Some Basic Uses of Subject

Merging an Unknown Number of Sources

// Sample Observables
val observable1 = Observable.interval(1, TimeUnit.SECONDS).map { "Observable 1: $it" }
val observable2 = Observable.interval(2, TimeUnit.SECONDS).map { "Observable 2: $it" }

// Subscribe Subject to Observables
val subject = PublishSubject.create<String>()
observable1.subscribe(subject)
observable2.subscribe(subject)

// Subscribe Observer to Subject
val observer = subject.subscribe {
    Log.d(LOG_TAG, "onNext: $it")
}

Because a Subject can subscribe to multiple observables, you can re-emit all of them by also using the Subject as an observable. Sure, Observable.merge does this too, but we’re all about that modular and concise code and Subject just wins in that business.

Using the subject also produces a hot observable instead of a cold one that merge would.

Recasting Data

val observable = Observable.create<Int> { someIntensiveOperation() }
val subject = PublishSubject.create<Int>()
observable.subscribe(subject)

val observer1 = subject.subscribe()
val observer2 = subject.subscribe()

If you have an observable that emits data, chances are it does some kind of operation to acquire the data like say, opening a database. If you have multiple subscribers to that observable, it could very well be opening the database each time it’s subscribed to (depending on how you created the observable, like with Observable.range for example).

You can stick in a Subject between the observable and its subscribers so the Observable only performs its tasks once, the data is passed to the Subject, then the Subject re-emits the data to all its subscribers.

Just keep in mind that the Subject is hot so you don’t want subscribers to miss their timing. If that is an issue for you, then you might want to check out Replay Subject below.

Converting Cold Observables into Hot Ones

// Yes this is essentially the same code as above
val coldObservable = Observable.create<Int> { someIntensiveOperation() }
val hotSubject = PublishSubject.create<Int>()
coldObservable.subscribe(hotSubject)

val observer = hotSubject.subscribe()

To quickly go over, a cold observable does its sequence and emits all its items each time it’s subscribed to. A hot one only emits its items once from the point of its creation (even if it has no subscribers) and so new subscribers will only receive items from their point of subscription.

Since Subjects are hot, you can use Subjects to convert Cold Observables into Hot ones by simply letting the Subject re-emit the observable’s data. Just note that this also means observers have the potential to miss data if it emits before they subcribe.

Dynamic onNext Calls

val subject = PublishSubject.create<Any>()
subject.onNext("foo")

I’m not the greatest RxJava developer by any means and I get the feeling this may go against some best practices for making production-level apps. If you’re a blogger or a tutor of any kind, using a subject to emit data dynamically by manually calling onNext can do wonders for your teaching. If you’re a more experienced React developer who could confirm or debunk this, let me know down in the comments.

Another potential use I can see with subjects is Polymorphism. Since you can emit data like this with onNext, you can continuously emit data of different types to be sent as an Any or Object (For Kotlin and Java respectively, though they’re practically the same). I’m yet to try this out in any practical way though.

Different Types of Subjects

There are 5 different types of Subjects.

Publish Subject

// Mandatory 1-Liner for Blog Style Points
val subject = PublishSubject.create<Any>()

This is the most basic type of Subject. You may as well just call this “Normal Subject” because that’s pretty much what it is. It follows all the behaviour defined above and… that’s it. The other 3 types just add other bonuses on top.

Behavior Subject

// Mandatory 1-Liner for Blog Style Points
val subject = BehaviorSubject.create<Any>()

This has the added bonus of emitting the last emitted item when it’s subscribed to, before emitting any outgoing items after that.

If a Subject emits 1, 2 is subscribed to, then emits 3 and 4, a subscriber to a Publish Subject will only receive 3 and 4. With a Behavior Subject, it receives 2, 3 and 4.

Replay Subject

// Mandatory 1-Liner for Blog Style Points
val subject = ReplaySubject.create<Any>()

You know all that about a subscriber only receiving items from the point of subscription and missing out the rest? Yeah, Replay says goodbye to that.

A Replay Subject emits all the items previously emitted by the Subject before the point of subscription, and then continues to emit outgoing items after that.

In the above example where a Subject emits 1, 2, subscribe, 3, 4, a Replay Subject would emit 1, 2, 3, and 4.

Async Subject

// Mandatory 1-Liner for Blog Style Points
val subject = AsyncSubject.create<Any>()

Observables and Subjects are already asynchronous, aren’t they?

Ok they are, but we’re talking about something special here. An Async Subject only emits to subscribers the very last time it is to emit, and it does this in its onComplete method. So calling onNext on an Async Subject only updates the item it will emit when it completes.

In the above example where a Subject emits 1, 2, subscribe, 3, 4, an Async Subject will only emit 4. If the subject instead emits 1, 2, subscribe, complete, the Async will only emit 2.

Unicast Subject

This only allows one subscriber on it, and will emit all previously emitted items and then anymore outgoing items from the point of subscription… so basically an exclusive Replay Subject.

Attempting to subscribe with a second observer will crash your app.

 

Tags: