[안드로이드] Flow 공식문서로 이해하기 - 1차 Flow 기초
우아한테크코스에서 flow 를 배우지 않는다는 소식에 혼자 독학하는 flow deep dive! 시작!
https://developer.android.com/kotlin/flow
Flow 란?
Coroutine flow 는 단일 값만 반환하는 suspend function 과 다르게 여러 값을 순차적으로 내보낼 수 있는 type 이다.
예를 들어 이 flow 를 사용하면 데이터베이스의 업데이트를 실시간으로 수신할 수 있다.
flow 는 코루틴 기반으로 빌드되며 여러 값을 제공할 수 있다. 또한 개념적으로 비동기적 계산이 가능한 데이터 스트림이다. 이때 내보내는 값은 flow 처럼 동일한 type 이어야 한다.
flow 는 연속적인 값을 제공한다는 점에서 Iterator 와 비슷하지만 suspend function 을 사용해서 값을 비동기적으로 생성하고 사용한다.
이게 무슨말이냐!
flow 는 main thread 를 blocking 하지 않고 다음 값을 제공하는 네트워크 요청을 안전하게 만들 수 있다.
Stream of Data
flow 를 사용하려면 streams of data 를 알아야한다.
데이터 스트림은 생산자(Producer), 중개자(Intermediary), 소비자(Consumer) 3가지 항목으로 구성된다.
- 생산자(Producer): stream 에 추가할 데이터를 생산한다. 코루틴 덕분에 flow 는 비동기적으로 데이터를 생산할 수 있다.
- (선택사항) 중개자 (Intermediaries): 스트림으로 내보내는 값이나 스트림 자체를 수정할 수 있다.
- 소비자 (Consumer): 스트림의 값들을 소비한다.
안드로이드에서 일반적으로 Repositroy 가 데이터를 생산하는 생산자이며 UI 는 소비자이다. 하지만 UI 가 사용자 입력 이벤트를 생산하고 다른 계층에서 소비하기도 한다. 중개자는 다음 계층에 맞게 data stream 을 수정하는 역할을 한다.
Flow 생성하기
flow 를 생성하기 위해서는 flow Builder 를 사용한다. flow builder 함수는 emit() 을 사용해서 수동으로 새 값을 data stream 에 추가하는 flow 를 생성할 수 있다.
class NewsRemoteDataSource(
private val newsApi: NewsApi,
private val refreshIntervalMs: Long = 5000
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
while(true) {
val latestNews = newsApi.fetchLatestNews()
emit(latestNews) // 데이터 스트림에 값을 추가
delay(refreshIntervalMs) // 5초 일시정지
}
}
}
interface NewsApi {
suspend fun fetchLatestNews(): List<ArticleHeadline>
}
suspend function 은 하나의 값만 반환할 수 있다. 하지만 flow 를 사용해서 5초 간격으로 데이터 스트림에 값을 추가하고 있다.
Flow 빌더는 코루틴 내에서 실행된다. 따라서 비동기적이지만 몇가지 제한사항을 가진다.
- Flow 는 순차적이다. 생산자가 코루틴 내에 있기 때문에 suspend function 을 호출하면 suspend function 이 반환할 때 까지 생산자는 일시정지한다. 예를 들어 위 코드에서 fetchLatestNews 네트워크 요청이 완료될 때 까지 일시정지한다. 그 다음에 데이터 스트림에 emit 할 수 있다.
- Flow builider 를 사용하는 생산자는 다른 CoroutineContext 에서 value 를 추가할 수 없다. 그러므로 새 코루틴을 만들거나 withContext 블록을 사용해서 emit 해선 안된다. 이런 경우 callbackFlow 를 사용해야한다.
val myFlow = flow {
// GlobalScope.launch { // is prohibited
// launch(Dispatchers.IO) { // is prohibited
// withContext(CoroutineName("myFlow")) { // is prohibited
emit(1) // OK
coroutineScope {
emit(2) // OK -- still the same coroutine
}
}
스트림 수정하기
중개자는 intermediate operators 를 사용해서 데이터 스트림의 값을 소비하지 않고 수정할 수 있다.
중간 연산자 참고
https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData
) {
/**
* news 를 좋아하는 주제로 필터링한다.
* 또한 각 news 를 캐시에 저장한다.
*/
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
.onEach { news -> saveInCache(news) }
}
Collecting from a flow
terminal operator 를 사용해서 값 수신을 시작할 수 있다. 스트림의 값을 전부 가져오고 싶으면 collect 를 사용하면 된다.
터미널 연산자(terminal operator) 참고
https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
// collect 를 사용해서 수신 대기 시작
newsRepository.favoriteLatestNews.collect { favoriteNews ->
}
}
}
}
collect 는 suspend function 이므로 코루틴 스코프 내에서 실행해야한다. collect 를 실행하는 함수도 일시정지 될 수 있다.
while(true)
루프로 항상 활성 상태가 유지될 때 viewModelScope 가 중지되면 데이터 스트림도 종료된다.
다른 연산자를 통해 지정하지 않으면 상태는 콜드 및 지연이다. 즉 flow 에서 터미널 연산자가 호출될 때마다 생산자 코드가 실행된다.
예상치 못한 Exception 이 발생하면 catch 중개 연산자로 처리할 수 있다.
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
newsRepository.favoriteLatestNews
.catch { exception -> notifyError(exception) }
.collect { favoriteNews ->
// latest favorite news 로 작업
}
}
}
}
예외가 발생하는 경우에 새 항목이 수신되지 않아 collect 람다가 호출되지 않는다. 이 대신에 Repository 에서 다른 값을 emit 해 처리할 수 있다.
class NewsRepository(...) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
.onEach { news -> saveInCache(news) }
// 에러가 발생하면 캐시된 것에서 값을 emit 한다.
.catch { exception -> emit(lastCachedNews()) }
}
다른 CoroutineContext 에서 실행
생산자는 기본적으로 소비자의 CoroutineContext 에서 실행된다.
레포지토리 계층은 viewModelScope 에서 사용하는 Dispatchers.Main 에서 실행되어선 안된다. 데이터 소스 레이어는 Dispatchers.IO 에서 실행해야 한다. Dispatcher를 변경하려면 중간 연산자 flowOn 을 사용하면 된다.
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData,
private val defaultDispatcher: CoroutineDispatcher
) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> // default dispatcher
news.filter { userData.isFavoriteTopic(it) }
}
.onEach { news -> // default dispatcher
saveInCache(news)
}
// 위 flow 를 defaultDispatcher 에서 실행 ↑
.flowOn(defaultDispatcher)
// 아래 flow 는 소비자의 context 에서 실행 ↓
.catch { exception ->
emit(lastCachedNews())
}
}