Substituting Android’s LiveData: StateFlow or SharedFlow?

Biverk
11 min readNov 23, 2020

Kotlin Coroutines recently introduced two Flow types, SharedFlow and StateFlow, and Android’s community started wondering about substituting LiveData with one of those new types, or both. The main reasons for that are:

  1. LiveData is closely bound to UI (no natural way to offload work to worker threads), and
  2. LiveData is closely bound to the Android platform.

We can conclude from those two facts that, in Clean Architecture terms, while LiveData works fine for the Presentation Layer, it does not fit well in the Domain Layer, which should be platform-independent; nor in the Data Layer (Repositories implementations and Data Sources), which should usually offload work to worker threads.

Image Source: Medium

We could not just substitute LiveData with pure Flow, though. The two main issues with using pure Flow as a LiveData substitute on all app layers are that:

  1. Flow is stateless (no .value access).
  2. Flow is declarative (cold): a flow builder merely describes what the flow is, and it is only materialized when collected. However, a new Flow is effectively run (materialized) for each collector, meaning upstream (expensive) database access is redundantly and repeatedly run for each collector.

Those are not to be viewed as pure Flow intrinsic defects: those are just characteristics that makes it not fit well as a LiveData substitute, but are very powerful in other contexts.

Now, SharedFlow and StateFlow provide a solution for both of those issues.

A practical example

Let’s exemplify with a practical use-case. Our use-case is fetching nearby locations. We’ll assume a Firebase Realtime Database is used alongside the GeoFire library, which allows for querying nearby locations.

Using LiveData end-to-end

Image source: Medium

The Data Source is responsible for connecting to the Firebase Realtime Database through a GeoQuery. When we receive a onGeoQueryReady() or onGeoQueryError(), we update the LiveData value with the aggregate of the locations entered, exited or moved since the last onGeoQueryReady() .

Our Repository, ViewModel and Activity should then be as simple as:

This approach may work fine, until you decide to make the Domain Layer, which contain the Repository interfaces, platform independent (as it should be). Also, once you need to offload work to worker threads on Data Sources, you will see there is no easy, idiomatic way with LiveData.

Using flows on Data Source and Repository

Image source: Medium

Let us convert the Data Source to use Flow . We have a flow builder, callbackFlow {}, that converts a callback to a cold Flow. When this Flow is collected, it runs the code block passed to the flow builder, adds the GeoQuery listener and reaches awaitClose {}, where it suspends until the Flow is closed (that is, until no one is collecting). When closed, it removes the listener, and the flow is dematerialized.

Our Repository and ViewModel warrants no changes, but our Activity now receives a Flow and not a LiveData , so it needs to adapt: instead of observing the LiveData , we will collect the Flow.

We use launchWhenStarted {} to collect the Flow so the coroutine will be automatically started only when the Activity reaches the onStart() lifecycle state, and will be automatically paused when it reaches the onStop() lifecycle state. This is akin to the automatic handling of Lifecycle that LiveData gives us.

Note: You might choose to keep using LiveData in your Presentation Layer (Activity). In that case, you can easily convert from Flow to LiveData in the ViewModel by using Flow<T>.asLiveData() extension function. This decision has consequences that we'll talk about in the next session, and we'll show that using SharedFlow and StateFlow end-to-end is more versatile and might fit better in your architecture.

Image source: Medium

What are the issues with using Flow in the View Layer?

The first problem with this approach is the handling of the Lifecycle, which LiveData does automatically for us. We achieved a similar behavior through the use of launchWhenStarted {} in the example above.

But there’s another problem: because the Flow is declarative and is only run (materialized) upon collection, if we have multiple collectors, a new flow will be built for each collector. Depending on the operations done, such as database or network operations, this can be very ineffective. It can even result in erroneous states, if we require the operations to be done only once for correctness. In our example, we would have one new GeoQuery listener added for each collector.

Note: If you convert your Repository Flow to LiveData by using Flow<T>.asLiveData() in the ViewModel, the LiveData becomes the sole collector for the Flow , and no matter how many observers in the Presentation Layer, only one Flow will be collected. However, for that architecture to work well, you’ll need to guarantee every other component of yours access your LiveData from the ViewModel, and never the Flow directly from the Repository. This can prove itself a challenge depending on how decoupled your app is: all components that need the Repository, such as Interactors (use-cases) implementations, would now depend on the Activity instance to get the ViewModel instance, and the scope of those components would need to be limited accordingly.

We only want one GeoQuery listener, no matter how many collectors in the View Layer we have. We can achieve this by sharing the flow between all collectors.

SharedFlow to the rescue

SharedFlow is a Flow that allows for sharing itself between multiple collectors, so that only one flow is effectively run (materialized) for all of the simultaneous collectors. If you define a SharedFlow that accesses databases and it is collected by multiple collectors, the database access will only run once, and the resulting data will be shared to all collectors.

StateFlow can also be used to achieve the same behavior: it is a specialized SharedFlow with .value (it’s current state) and specific SharedFlow configurations (constraints). We’ll talk about those constraints later.

We have an operator for transforming any Flow into a SharedFlow :

fun <T> Flow<T>.shareIn(
scope: CoroutineScope,
started: SharingStarted,
replay: Int = 0
): SharedFlow<T> (source)

Let’s apply this to our Data Source.

The scope is where all computation for materializing the Flow will be done. As our Data Source is a @Singleton, we can use the application process’ LifecycleScope , which is a LifecycleCoroutineScope that is created upon process creation and is only destroyed upon process destruction.

For the started parameter, we can use SharingStarted.WhileSubscribed() , which makes our Flow start sharing (materializing) only when the number of subscribers turns from 0 to 1, and stop sharing when the number of subscribers turns from 1 to 0. This is akin to the LiveData behavior we implemented earlier by adding the GeoQuery listener in the onActive() callback and removing the listener on the onInactive() callback. We could also configure it to be started eagerly (immediately materialized and never dematerialized) or lazily (materialized when first collected, and never dematerialized), but we do want it to stop upstream database collection when not being collected downstream.

For the replay parameter, we can use 1: new subscribers will get the last emitted value immediately upon subscription.

Now, we might be tempted to think our Activity needs no adjustment. Wrong! There is a gotcha: when collecting the flow in a coroutine launched with launchWhenStarted {} , the coroutine will be paused on onStop() and resumed on onStart() , but it will still be subscribed to the flow. For MutableSharedFlow<T>, it means MutableSharedFlow<T>.subscriptionCount will not change for paused coroutines. To leverage the power of SharingStarted.WhileSubscribed() , we need to actually unsubscribe on onStop() , and subscribe again on onStart(). This means cancelling the collection coroutine and recreating it.

(See this issue and this issue for more details).

Let’s create a class for that general purpose:

Now, we can adjust our Activity to use the .observeIn(LifecycleOwner) extension function we just created:

The collector coroutine created with observeIn(LifecycleOwner) will be destroyed when the LifecycleOwner 's Lifecycle reaches the CREATED state (right before onStop() call) and will be recreated once it reaches the STARTED state (after onStart() call).

Note: Why CREATED state? Shouldn’t it be STOPPED state? It sounds counterintuitive at first, but it makes perfect sense. Lifecycle.State only has the following states: CREATED, DESTROYED, INITIALIZED, RESUMED, STARTED. There are no STOPPED and PAUSED states. When lifecycle reaches onPause() , instead of going to a new state, it goes back to the STARTED state. When it reaches onStop() , it goes back to the CREATED state.

Source: android.com

We now have a Data Source that materializes once, but shares its data to all its subscribers. Its upstream collection will stop as soon as there are no subscribers and will restart as soon as the first subscriber reappears. It has no dependency on the Android platform, and it is not tied to the main thread ( Flow transformations can happen in other threads by simply applying the .flowOn() operator: flowOn(Dispatchers.IO) or .flowOn(Dispatchers.Default)).

But what if I need to eventually access the current state of the flow without collecting it?

If we really need to access the Flow state with .value just like we can do with LiveData , we can use StateFlow , which is a specialized, constricted SharedFlow .

Instead of applying the shareIn() operator to materialize the flow, we can apply stateIn() :

fun <T> Flow<T>.stateIn(
scope: CoroutineScope,
started: SharingStarted,
initialValue: T
): StateFlow<T> (source)

As we can see from the methods parameters, there are two basic differences between sharedIn() and stateIn():

  1. stateIn() has no support for replay customization. StateFlow is a SharedFlow with a fixed replay=1 . That means new subscribers will immediately get the current state upon subscription.
  2. stateIn() requires an initial value. That means if you don’t have an initial value at the time, you will need to either make the StateFlow<T> type T nullable, or use a sealed class to represent an empty initial value.

From the documentation:

State flow is a shared flow

State flow is a special-purpose, high-performance, and efficient implementation of SharedFlow for the narrow, but widely used case of sharing a state. See the SharedFlow documentation for the basic rules, constraints, and operators that are applicable to all shared flows.

State flow always has an initial value, replays one most recent value to new subscribers, does not buffer any more values, but keeps the last emitted one, and does not support resetReplayCache. A state flow behaves identically to a shared flow when it is created with the following parameters and the distinctUntilChanged operator is applied to it:

// MutableStateFlow(initialValue) is a shared flow with the following parameters:
val shared = MutableSharedFlow(
replay = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
shared.tryEmit(initialValue) // emit the initial value
val state = shared.distinctUntilChanged() // get StateFlow-like behavior

Use SharedFlow when you need a StateFlow with tweaks in its behavior such as extra buffering, replaying more values, or omitting the initial value.

However, note the obvious compromise in choosing SharedFlow: you will lose StateFlow<T>.value .

Which to choose, StateFlow or SharedFlow?

The easy way to answer this question is trying to answer a few other questions:

“Do I really need to access the flow’s current state at any given time with myFlow.value ?”

If the answer to this question is no, you might consider SharedFlow.

“Do I need to support emitting and collecting repeated values?”

If the answer to this question is yes, you will need SharedFlow.

“Do I need to replay more than the latest value for new subscribers?”

If the answer to this question is yes, you will need SharedFlow.

As we can see, StateFlow for everything is not automatically the right answer.

1. It ignores (conflates) repeated values and this is non-configurable. Sometimes you need to not ignore repeated values, e.g.: a connection attempt that stores the attempt result in a flow, and needs to retry after each failure.

2. Also, it requires an initial value. Because SharedFlow does not have .value, it does not need to be instantiated with an initial value — collectors will just suspend until the first value appears, and no one will try to access .value before any value arrives. If you don’t have an initial value for StateFlow you’ll have to make the StateFlow type nullable T? and use null as the initial value (or declare a sealed class for a default no-value value).

3. Also, you might want to tweak the replay value. SharedFlow can replay the last n values for new subscribers. StateFlow has a fixed replay value of 1 — it only shares the current state value.

Both support the SharingStarted ( Eagerly, Lazily or WhileSubscribed()) configuration. I commonly use SharingStarted.WhileSubscribed() and destroy/recreate all my collectors on Activity onStart() / onStop(), so data source upstream collection will stop when the user is not actively using the app (this is akin to removing/re-adding listeners on LiveData onActive() / onInactive())

The constraints that the StateFlow impose on the SharedFlow might not be the best fit for you, you might want to tweak with the behavior and choose to use SharedFlow. Personally, I rarely ever need to access myFlow.value, and I enjoy SharedFlow's flexibility, so I usually choose SharedFlow.

Read more on StateFlow and SharedFlow on the official documentation.

A practical case where SharedFlow instead of StateFlow is needed

Consider the following wrapper around Google’s Billing Client library. We have a MutableSharedFlow billingClientStatus that stores the current connection status to the billing service. We set its initial value to be SERVICE_DISCONNECTED. We collect billingClientStatus, and when it is not OK, we try to startConnection() to the billing service. If the connection attempt fails, we will emit SERVICE_DISCONNECTED.

In that example, if billingClientStatus were a MutableStateFlow instead of a MutableSharedFlow, when its value is already SERVICE_DISCONNECTED and we try to set it to the same (connection retry failed), it would ignore the update, and consequently, it would not try to reconnect again.

In that case, we need to use a SharedFlow, which supports emitting sequential repeated values.

On the GeoFire use-case

If you have practical need to work with GeoFire, I have developed a library, geofire-ktx, that allows for readily converting a GeoQuery object to a Flow . It also supports fetching DataSnapshot located in other DatabaseReference root with the same child key as the GeoFire root, as this is a common use-case with GeoQuery. It also supports fetching this data as an instance of a class instead of a DataSnapshot . This is done through Flow transformations. The library source code completes the examples given in this article., , , , , , , , ,

--

--