Check out myAngular article series with live demos and Facebook group Angular - Advanced Topics

RxJs Subjects Emit Synchronous Values

Torgeir "Tor" Helgevold
- JavaScript Developer and Blogger
Published: Thu Aug 10 2017

Working with subjects usually involves “next-ing” out values that subscribers can subscribe to. In this post I will show that the “next-ed” values are received synchronously by the subscriber(s).

The impact of this is that subscribers receive values instantly after next-ing, without having to worry about asynchronous timing issues before any values are received.

This may seem surprising if you compare this behavior to promise callbacks. When dealing with promises, the “then” callbacks are always called asynchronously.

Observables can either be asynchronous or synchronous depending on the underlying implementation. An example of an asynchronous observable is one that wraps an http call.

However when you are dealing with simple next-ing of values from a subject, the values will be received synchronously.

I will illustrate this in a unit test of a simple Subject:

import { Subject } from 'rxjs/Subject'; describe('SubjectTest', () => { it('should notify of new friends', () => { let subject = new Subject<string>(); subject.asObservable() .subscribe(res => { expect(res).toBe('Joe');'Value Received'); });'Joe');' End of Test'); }); });

As you can see, there are no asynchronous considerations in this test. The callback of the subscribe call will fire as soon as is called. As a result “Value Received” is printed to the console before “End of Test”.

Dissecting a Subject

When I first learned about subjects I was a bit surprised by this behavior.

I, for some reason, always though the values would be received asynchronously by the subscriber.

It is actually a great thing that it works this way since it often simplifies code, especially unit tests.

However, how is it implemented?

I think the most important part to understand is that observables are really implemented around a callback registration convention.

When you subscribe to an observable, you are really just passing in callback(s) for the observable to call when certain conditions are met.

Specifically you register up to three callbacks per subscription (nextValue, error and complete).

See the example below:

myObservable .subscribe(res => { // fires when a new value is received }, error => { // fires when the observable errors }, () => { // fires when the observable completes } );

Subjects have a split personality. One one end they are producers of values via next-ing. On the other end they function as a regular observable where next-ed valued can be received.

The transfer of a next-ed value to a subscriber is done by calling the registered next callback from within the subject’s next function.

In fact the subject keeps a list of registered next callbacks from all subscribers. Every time you call next on the subject, all registered callbacks are invoked and passed the next-ed value.

You can see this if you examine the RxJs Subject code.

Here is the next method from Subject:

next(value?: T) { if (this.closed) { throw new ObjectUnsubscribedError(); } if (!this.isStopped) { const { observers } = this; const len = observers.length; const copy = observers.slice(); for (let i = 0; i < len; i++) { copy[i].next(value); } } }

The observers array contains registered callbacks from all subscribers. As you can see, the for loop iterates over, and invokes each “nextValue” callback.

The result of this is that subscribers will instantly receive values as they are next-ed by the subject.

If you liked this article, share it with your friends on social media:

We also have a new Facebook group about advanced Angular topics.

I invite you to follow me on twitter