Change language

Asynchronous Flow in Kotlin

|

The article will be most useful to those who are already familiar with the Android platform, Kotlin and coroutines.

In coroutines, Flow is a class that can return multiple objects one by one or all at once. The key word here is "multiple": this is the main difference between it and the suspend function, which returns a single object and terminates. For example, Flow is much more convenient if you sign up for persistent notifications from your GPS or chat messages. Flow is coroutine-based and is a stream of data that can be processed asynchronously. With Flow, you can send requests to a server or database without blocking the main application flow. Naturally, all data returned by Flow must be of the same type: if a stream is declared as Flow, only objects of Int type can be received from it.

There are three objects involved in the Flow operation:

  • Producer - produces (creates, emits) data as a flow. Data is transmitted in a separate flow due to coroutines.
  • Intermediary - a class or classes that can modify or change the data produced by the Producer. Usually these are some kind of auxiliary classes or so-called mappers. It is not necessary to have mappers, if the data does not need to be modified or changed from one type to another.
  • Consumer is the recipient of the data produced by the Producer.

With a simple example, let's see how Flow can be used in an application. To build the application we will use a simplified analogue of a pure architecture:

  • we will have data stored in a conditional database: Data;
  • there will be a data source, which will retrieve data from the database: DataSource;
  • there will be a repository that will work with our data source: Repository;
  • Repository will be used in our ViewModel, and eventually the data will be displayed in Activity.

Let's start by creating a simple class to pass our data. In our case this is a data class that contains some value in the form of a String. This is data that will be received by our Activity:

internal data class Data(val data: String)

Now let us describe the source of our data. Suppose it is a speculative database and we want to check if data changes in it. Since the data in it will be constantly changing, Flow is ideal for our purposes. Database:

internal object DataBase {
   fun fetchData() = Random.nextInt()
}

Our database has a method that returns the desired information in the form of some random number. In this way we simulate a constant change of data in the database.

Now it's time for DataSource and Flow. DataSource class takes two arguments in constructor: database and period of data update. Period equals to one second specified in milliseconds and contains one variable of Flow type which contains data from our database converted from Int to String. To create a simple flow we need to use flow builder. In our case, it is a simple Flow function where everything happens:

internal class DataSource(
   private val dataBase: DataBase = DataBase,
   private val refreshIntervalMs: Long = 1000
) {
   val data: Flow<String> = flow {
       while (true) {
           val dataFromDataBase = dataBase.fetchData()
           emit(dataFromDataBase.toString())
           delay(refreshIntervalMs)
       }
   }
       .flowOn(Dispatchers.Default)
       .catch { e ->
           println(e.message)//Error!
       }
}


In the infinite loop, we access the database, get a random number and "emit" (function emit) this number already as a String for everyone who is "subscribed" to the flow (remember Producer and Consumer). After that we pause for one second in the loop using the delay function. The flowOn and catch functions are optional: the code will work fine without them. With flowOn you can explicitly specify in which thread to run, while catch will catch errors if they occur during the run.

Now it's time for the repository. In it, we pass our DataSource. The repository also has only one variable of type Flow. Note that DataSource returns its data type as String, but Repository returns Data. In this case, the repository is a mediator:

internal class Repository(dataSource: DataSource = DataSource()) {
 
   val userData: Flow<Data> =
       dataSource.data.map { data -> Data(data) }
   //.onEach { saveInCache(it) }
}

Here we see that the map function is called to the variable of the DataSource data class (this is our Flow), which allows us to save the returned String value to the Data class. The onEach function is optional and shows that the value returned by our DataSource can be saved for later use or an unlimited number of other operations can be performed on it.

It remains to describe the last class of our business logic - ViewModel. ViewModel contains the LiveData to which our Activity is subscribed. All we need to do is to pass our Repository to the ViewModel constructor and start the process of getting data from the Database:

internal class MainViewModel(
   repository: Repository = Repository()
) : ViewModel() {
 
   val liveData: MutableLiveData<Data> = MutableLiveData()
 
   init {
       viewModelScope.launch {
           repository.userData.flowOn(Dispatchers.Main)
               .collect { data ->
                   liveData.value = data
               }
       }
   }
}

This is done at the moment of ViewModel creation (init block). To subscribe to Flow, we need to start the process via coroutines (remember that flow is executed asynchronously). To do this, we have viewModelScope.launch, which we run in the init block (it can also be run inside the suspend function). Then we call the flowOn function of userData where we specify that all data will be displayed in the main thread of the application. The collect function starts the thread directly. As soon as we get another batch of data (once per second), we update LiveData.

In fact, all this can be run on a single line, because the Flow class has special functions for that:

val liveData: LiveData<Data> = repository.userData.asLiveData()
 
/*val liveData: MutableLiveData<Data> = MutableLiveData()
 
init {
   viewModelScope.launch {
       repository.userData.flowOn(Dispatchers.Main)
           .collect { data ->
               liveData.value = data
           }
   }
}*/

All classes of business logic completely:

internal class MainViewModel(
   repository: Repository = Repository()
) : ViewModel() {
   val liveData: LiveData<Data> = repository.userData.asLiveData()
 
   /*val liveData: MutableLiveData<Data> = MutableLiveData()
 
   init {
       viewModelScope.launch {
           repository.userData.flowOn(Dispatchers.Main)
               .collect { data ->
                   liveData.value = data
               }
       }
   }*/
}
 
internal class Repository(dataSource: DataSource = DataSource()) {
 
   val userData: Flow<Data> =
       dataSource.data.map { data -> Data(data) }
   //.onEach { saveInCache(it) }
}
 
internal class DataSource(
   private val dataBase: DataBase = DataBase,
   private val refreshIntervalMs: Long = 1000
) {
   val data: Flow<String> = flow {
       while (true) {
           val dataFromDataBase = dataBase.fetchData()
           emit(dataFromDataBase.toString())
           delay(refreshIntervalMs)
       }
   }
   /*.flowOn(Dispatchers.Default)
   .catch { e ->
       println(e.message)//Error!
   }*/
}
 
internal object DataBase {
   fun fetchData() = Random.nextInt()
}
 
internal data class Data(val data: String)

All that's left to do is to display the whole simple process on the screen. Layout:

<?xml version="1.0" encoding="utf-8"?>
<androidx.constraintlayout.widget.ConstraintLayout xmlns:android="http://schemas.android.com/apk/res/android"
   xmlns:app="http://schemas.android.com/apk/res-auto"
   android:id="@+id/main"
   android:layout_width="match_parent"
   android:layout_height="match_parent">
 
   <TextView
       android:id="@+id/message"
       android:layout_width="wrap_content"
       android:layout_height="wrap_content"
       app:layout_constraintBottom_toBottomOf="parent"
       app:layout_constraintEnd_toEndOf="parent"
       app:layout_constraintStart_toStartOf="parent"
       app:layout_constraintTop_toTopOf="parent" />
</androidx.constraintlayout.widget.ConstraintLayout>

In Activity, we create a ViewModel, subscribe to change the data, and as soon as the data changes, display it on the screen. Everything else happens inside the ViewModel:

class MainActivity : AppCompatActivity() {
 
   override fun onCreate(savedInstanceState: Bundle?) {
       super.onCreate(savedInstanceState)
       setContentView(R.layout.main_activity)
       val textView = findViewById<TextView>(R.id.message)
       ViewModelProvider(this).get(MainViewModel::class.java).liveData.observe(
           this,
           { dataFromDataBase ->
               textView.text = dataFromDataBase.data
           })
   }
}

Launch and enjoy the flow of data:

For more extensive and advanced work with Flow, see this repository: https://github.com/MindorksOpenSource/Kotlin-Flow-Android-Examples