Android

[안드로이드] Flow 공식문서로 이해하기 - 1차 Flow 기초

베르_최성훈 2023. 8. 25. 20:19

우아한테크코스에서 flow 를 배우지 않는다는 소식에 혼자 독학하는 flow deep dive! 시작!

https://developer.android.com/kotlin/flow

 

Android의 Kotlin 흐름  |  Android Developers

Android의 Kotlin 흐름 컬렉션을 사용해 정리하기 내 환경설정을 기준으로 콘텐츠를 저장하고 분류하세요. 코루틴에서 흐름은 단일 값만 반환하는 정지 함수와 달리 여러 값을 순차적으로 내보낼

developer.android.com

 

Flow 란?

 Coroutine flow 는 단일 값만 반환하는 suspend function 과 다르게 여러 값을 순차적으로 내보낼 수 있는 type 이다.

예를 들어 이 flow 를 사용하면 데이터베이스의 업데이트를 실시간으로 수신할 수 있다.

 

  flow 는 코루틴 기반으로 빌드되며 여러 값을 제공할 수 있다. 또한 개념적으로 비동기적 계산이 가능한 데이터 스트림이다. 이때 내보내는 값은 flow 처럼 동일한 type 이어야 한다.

 

flow 는 연속적인 값을 제공한다는 점에서 Iterator 와 비슷하지만 suspend function 을 사용해서 값을 비동기적으로 생성하고 사용한다.

이게 무슨말이냐!

flow 는 main thread 를 blocking 하지 않고 다음 값을 제공하는 네트워크 요청을 안전하게 만들 수 있다.

 

Stream of Data

flow 를 사용하려면 streams of data 를 알아야한다.

데이터 스트림은 생산자(Producer), 중개자(Intermediary), 소비자(Consumer) 3가지 항목으로 구성된다.

  • 생산자(Producer): stream 에 추가할 데이터를 생산한다. 코루틴 덕분에 flow 는 비동기적으로 데이터를 생산할 수 있다.
  • (선택사항) 중개자 (Intermediaries): 스트림으로 내보내는 값이나 스트림 자체를 수정할 수 있다.
  • 소비자 (Consumer): 스트림의 값들을 소비한다.

안드로이드에서 일반적으로 Repositroy 가 데이터를 생산하는 생산자이며 UI 는 소비자이다. 하지만 UI 가 사용자 입력 이벤트를 생산하고 다른 계층에서 소비하기도 한다. 중개자는 다음 계층에 맞게 data stream 을 수정하는 역할을 한다.

Flow 생성하기

flow 를 생성하기 위해서는 flow Builder 를 사용한다. flow builder 함수는 emit() 을 사용해서 수동으로 새 값을 data stream 에 추가하는 flow 를 생성할 수 있다.

class NewsRemoteDataSource(
    private val newsApi: NewsApi,
    private val refreshIntervalMs: Long = 5000
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        while(true) {
            val latestNews = newsApi.fetchLatestNews()
            emit(latestNews) // 데이터 스트림에 값을 추가
            delay(refreshIntervalMs) // 5초 일시정지
        }
    }
}

interface NewsApi {
    suspend fun fetchLatestNews(): List<ArticleHeadline>
}

suspend function 은 하나의 값만 반환할 수 있다. 하지만 flow 를 사용해서 5초 간격으로 데이터 스트림에 값을 추가하고 있다.

Flow 빌더는 코루틴 내에서 실행된다. 따라서 비동기적이지만 몇가지 제한사항을 가진다.

  • Flow 는 순차적이다. 생산자가 코루틴 내에 있기 때문에 suspend function 을 호출하면 suspend function 이 반환할 때 까지 생산자는 일시정지한다. 예를 들어 위 코드에서 fetchLatestNews 네트워크 요청이 완료될 때 까지 일시정지한다. 그 다음에 데이터 스트림에 emit 할 수 있다.

  • Flow builider 를 사용하는 생산자는 다른 CoroutineContext 에서 value 를 추가할 수 없다. 그러므로 새 코루틴을 만들거나 withContext 블록을 사용해서 emit 해선 안된다. 이런 경우 callbackFlow 를 사용해야한다.

val myFlow = flow {
   // GlobalScope.launch { // is prohibited
   // launch(Dispatchers.IO) { // is prohibited
   // withContext(CoroutineName("myFlow")) { // is prohibited
   emit(1) // OK
   coroutineScope {
       emit(2) // OK -- still the same coroutine
   }
}

스트림 수정하기

중개자는 intermediate operators 를 사용해서 데이터 스트림의 값을 소비하지 않고 수정할 수 있다.

중간 연산자 참고

https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/

 

Flow

Flow An asynchronous data stream that sequentially emits values and completes normally or with an exception. Intermediate operators on the flow such as map, filter, take, zip, etc are functions that are applied to the upstream flow or flows and return a do

kotlinlang.org

 

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData
) {
    /**
     * news 를 좋아하는 주제로 필터링한다.
     * 또한 각 news 를 캐시에 저장한다.
     */
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            .onEach { news -> saveInCache(news) }
}


Collecting from a flow

terminal operator 를 사용해서 값 수신을 시작할 수 있다. 스트림의 값을 전부 가져오고 싶으면 collect 를 사용하면 된다.

터미널 연산자(terminal operator) 참고

https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            // collect 를 사용해서 수신 대기 시작
            newsRepository.favoriteLatestNews.collect { favoriteNews -> 
            }
        }
    }
}

collectsuspend function 이므로 코루틴 스코프 내에서 실행해야한다. collect 를 실행하는 함수도 일시정지 될 수 있다.

 

while(true) 루프로 항상 활성 상태가 유지될 때 viewModelScope 가 중지되면 데이터 스트림도 종료된다.

다른 연산자를 통해 지정하지 않으면 상태는 콜드 및 지연이다. 즉 flow 에서 터미널 연산자가 호출될 때마다 생산자 코드가 실행된다.

예상치 못한 Exception 이 발생하면 catch 중개 연산자로 처리할 수 있다.

 

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            newsRepository.favoriteLatestNews
                .catch { exception -> notifyError(exception) }
                .collect { favoriteNews ->
                    // latest favorite news 로 작업
                }
        }
    }
}

 

예외가 발생하는 경우에 새 항목이 수신되지 않아 collect 람다가 호출되지 않는다. 이 대신에 Repository 에서 다른 값을 emit 해 처리할 수 있다.

class NewsRepository(...) {
	val favoriteLatestNews: Flow<List<ArticleHeadline>> =
		newsRemoteDataSource.latestNews
			.map { news -> news.filter { userData.isFavoriteTopic(it) } }
			.onEach { news -> saveInCache(news) }
			// 에러가 발생하면 캐시된 것에서 값을 emit 한다.
			.catch { exception -> emit(lastCachedNews()) }
}

다른 CoroutineContext 에서 실행

생산자는 기본적으로 소비자의 CoroutineContext 에서 실행된다.

레포지토리 계층은 viewModelScope 에서 사용하는 Dispatchers.Main 에서 실행되어선 안된다. 데이터 소스 레이어는 Dispatchers.IO 에서 실행해야 한다. Dispatcher를 변경하려면 중간 연산자 flowOn 을 사용하면 된다.

class NewsRepository(
	private val newsRemoteDataSource: NewsRemoteDataSource,
	private val userData: UserData,
	private val defaultDispatcher: CoroutineDispatcher
) {
	val favoriteLatestNews: Flow<List<ArticleHeadline>> =
		newsRemoteDataSource.latestNews
			.map { news -> //  default dispatcher
				news.filter { userData.isFavoriteTopic(it) }
			}
			.onEach { news -> // default dispatcher
				saveInCache(news)
			}
			// 위 flow 를 defaultDispatcher 에서 실행 ↑
			.flowOn(defaultDispatcher)
			// 아래 flow 는 소비자의 context 에서 실행 ↓
			.catch { exception ->
				emit(lastCachedNews())
			}
}