Skip to content

Unit Testing RxJava Observables

Within test modules, RxJava gives you access to a happy little class called the TestObserver. That’s what this is all about.

We can get a TestObserver by calling test() on any observable. What this does is it executes your observable as if you subscribed to it, and it does so on the thread your test is running on.

@Test
fun givenValidResponse_whenGetPost_thenMappedPostReturned() {
  given(placeholderApi.getPost(postId)).willReturn(Single.just(post))
  given(placeholderMapper.mapPost(post)).willReturn(mappedPost)

  val postTestObserver = placeHolderService.getPost(postId)

  postTestObserver.test()
      .assertComplete()
      .assertNoErrors()
      .assertValue { it == mappedPost }
}

TestObserver then gives you access to plenty of useful methods allowing to assert your stream and values in the ways you see fit. There are a lot of these different assertions so your tests can be as comprehensive as you want them to be.

Some of these assertions include:

assertComplete – asserts that the subscriber received exactly one  onComplete event

assertValue (and all its variants) – asserts that the subscriber received a value matching the given predicate

assertError – asserts that the subscriber received a specific error instance / class

assertNever – asserts that the subscriber never received a value matching the given predicate

These are only a few of the many assertions you are provided with.

Await

You might want to test observable streams that are time-based, based on a multiple number of emissions, or infinite timer kind of things. In which case, the await will help you out.

fun getTimerCounting() = Observable.interval(1000, TimeUnit.MILLISECONDS)

@Test
fun whenTimerIsCounting_thenNumberOfEmissionsReturned() {

  val testObserver = getTimerCounting().test()

  testObserver.await(5000, MILLISECONDS)
  testObserver.assertValues(0, 1, 2, 3, 4)
}

In a real scenario, these 0, 1, 2, 3, 4 values would emit one each second, so they would take 5 seconds to fully emit. In this test scenario, thanks to await, those 5 seconds happen instantly.

So your tests that test things that take time don’t need to take time when testing… Just pretend for a second that makes sense, alright? Good. Moving on.

How NOT to test RxJava: BlockingGet

I’ve seen this a couple times so I thought it’s worth a mention.

blockingGet is another function you can call on observables to get their value.

The above test can be restructured like this.

@Test
fun givenValidResponse_whenGetPost_thenMappedPostReturned() {
  given(placeholderApi.getPost(postId)).willReturn(Single.just(post))
  given(placeholderMapper.mapPost(post)).willReturn(mappedPost)

  val post = placeHolderService.getPost(postId).blockingGet()

  assertThat(post).isEqualTo(mappedPost)
}

Looks pretty nice doesn’t it? We get the single value that the Single would emit, then we apply our assertions to it.

Oh my dear Freddy, you are so naive.

Using blockingGet in unit tests is not the grave sin it is if it were used in production code, but it still is a sin for the fact that you are missing out on assertions you could be making with TestObservable.

You could be doing asserts which revolve around the lifecycle of the observable, onNext, onComplete, onError and assert more than just the emitted value.

And when it comes to situations where await is needed, i.e. time-based events, blockingGet would make the test actually have to wait those 5 seconds or whatever instead of passing that time instantly.

Post Summary

  • Use test to transform your observables into TestObserver
  • Use its various assert methods to test the lifecycle and values of its emissions
  • Use await to instantly test time-based observable functions
  • blockingGet is bad

Happy coding ༼ つ ◕_◕ ༽つ