View
오늘 배워 볼 것
오늘은 Flow에 대해서 이야기 해보겠다.
Flow
Flow는 비동기적 데이터 스트림으로 Flow는 연속적으로 데이터를 보냅니다.
Flow는 원어 그래로 Data Stream, 데이터의 흐름을 말하며, 생산자는 데이터를 Flow 타입으로 입력하고 소비자는 Flow에서 데이터를 수집하는 역할을 합니다. 즉 Data Layer단계에서 데이터를 생성하는 생산자 역할이 되면 Presentation Layer가 해당 데이터를 소비하는 소비자라고 볼 수 있겠습니다.
그리고 Flow는 크게 2가지 형식의 데이터 흐름으로 나눌 수 있습니다.
바로 Cold Stream과 Hot Stream이다.
Cold Stream은 우리가 일반적으로 Flow 타입으로 반환하는 것을 말하며, Hot Stream은 대표적으로 StateFlow와 ShareFlow가 있다.
Cold Stream의 경우는 데이터가 내부에서 생성되고 데이터의 생성 시점이 바로 소비자가 데이터 소비를 시작할 때 입니다.
그렇기 때문에 flow builder인 Flow, Flowof, asFlow로 데이터가 내부에서 생성됩니다.
Flow의 소비를 시작하는 함수인 종단연산자(collect, fold, reduce, first등)이 호출되지 않으면 데이터를 생성하지 않습니다.
당연히 중간 연산자(map, onEach, filter)또한 해당 종단연산자가 호출되어야 실행됩니다.
간단히 정리하자면
Cold Stream:
데이터가 내부에서 생성되고, 데이터는 소비자가 소비를 시작할 때 생산된다. 그리고 하나의 생산자에는 하나의 소비자가 존재한다.
Hot Stream은 Colod Stream과 반대로 외부에서 데이터가 생성되며, 생산자와 소비자의 시점이 다른다. 즉 생산자는 소비자를 신경 쓰지 않고 생산합니다. 그렇기 때문에 하나의 생산자에 다수의 소비자가 구독을 할 수 있습니다.
간단히 정리하자면
Hot Stream:
데이터가 외부에서 생산되며 하나의 생산자에 여러 명의 소비자가 존재 가능하다. 그리고 생산자는 소비자의 소비를 신경쓰지 않고 생산하기 때문에 소비자는 소비를 시작한 시점부터 생산된 데이터를 소비하기 시작한다.
Flow 빌더
- flowOf
- 고정된 값들을 방출
- 정해진 값의 set을 내부적으로 반복문을 돌려서 emit 해준다.(여기서 emit은 해당 데이터를 방출한다는 것이다.)
flowOf(0, 1, 2, 3)
.collect{
println(it)
}
우리가 사용하는 기본적인 arrayListOf()같은 느낌이다.
- asFlow()
- 코틀린의 컬렉션이나 시퀀스를 Flow로 바로 변환해주는 함수이다.
listOf(0, 1, 2, 3)
.asFlow()
.collect {
println(it)
}
kotlin의 as와 비슷한 느낌
중간 연산자
map
Stream에서 흘러나온 데이터에 어떤 변경을 할 때 사용
suspend fun performRequest(request: Int): String { delay(1000) return "response $request" } fun main() = runBlocking<Unit> { (1..3).asFlow() .map { performRequest(it) } .collect { println(it) } }
//결과는 response 1
//결과는 response 2
//결과는 response 3
- onEach
- 블럭안에 주어진 특정한 Action(행동)을 하는 flow를 반환
- flow에서 아이템이 흘러나올 때 마다, 실행하도록 해야하는 코드가 있다면 onEach를 통해서 해결할 수 있다.
- upstream 즉 위의 flow에서 흘러나오는 값을 그대로 전달해준다.
```kotlin
val flow = flowOf(1, 2, 3)
flow
.onEach{ item ->
println("Item emitted: $item")
}
.map {item ->
item * 2
}
.collect {result ->
println("result: $result")
}
//Item emitted: 1
//result: 2
//Item emitted: 2
//result: 4
//Item emitted: 3
//result: 6
- filter
- 조건에 맞는 아이템 반환 함수
listOf(0, 1, 2, 3, 4)
.asFlow()
.filter {
it % 2 == 0
}
.collect {
println(it)
}
//0
//2
//4
Flow 공식 문서 예제
class NewsRemoteDataSource(
private val newsApi: NewsApi,
private val refreshIntervalMs: Long = 5000
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
//데이터 생산
while(true) {
val latestNews = newsApi.fetchLatestNews()
emit(latestNews) // Emits the result of the request to the flow
delay(refreshIntervalMs) // Suspends the coroutine for some time
}
//여기 까지
}
}
// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
suspend fun fetchLatestNews(): List<ArticleHeadline>
}
flow 공식문서를 보면 해당 예제를 볼것이다.
해당 부분은 API에서 데이터를 Flow 형식으로 반환해주는 것이다.
주석에도 나와있듯이다. 해당 부분이 생산자 역할이 되는 것이다.
즉 flow 빌더에서 전달된 suspend 블록은 생산자 블록이 되며 Android에서 생산자와 소비자 간의 계층은 요구사항에 맞게 데이터 스트림을 수정할 수 있으면 이때 사용하는 중간연산자(map, filter 등)가 되는 것이다.
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
// Trigger the flow and consume its elements using collect
newsRepository.favoriteLatestNews.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
API를 통해서 나온 결과 값을 가져오러면 Flow의 terminal연산자(Flow를 수집을 시작하는 일시정지 함수로 collect 연산자가 가장 기본 연산자이다.)를 사용해야서 값을 수신해야되며 스트림의 모든 값을 전송 즉시 가져오려면 collect를 사용해야됩니다.
collect는 새로운 값이 생길 때마다 호출되는 함수를 파라미터로 받으며, suspend 연산이기 때문에 코루틴 안에서 실행되어야 합니다.
즉 결과값을 cold Stream 형식으로 받는다고 보면된다.
그렇기 때문에 해당 데이터를 소비하는 과정에서 여러 군대 에서 동일한 데이터를 소비하게 되면 소비될 때 마다 생성자 코드가 실행되게 되며, 데이터 소스가 서로 다른 고정된 간격으로 여러번 가져오게 된다.
이것을 방지하기 위해서 사용하는 것이다. 바로 Hot Stream이다. 그 중 shareIn은 flow를 StateFlow로 변환할 때, 사용합니다.
즉 Cold를 Hot으로 바꾸어준다고 보면된다.
StateFlow는 컬렉터가 없더라도 데이터를 보관하고 있다가 collect가 되었을 때 모든 곳에 데이터를 방출한다고 보면 될 것 같습니다.
(해당부분은 조금 더 알아봐야될 것 같다.)
StateFlow vs sharedFlow
- StateFlow
- 컬렉터에게 단일의 최신 데이터 값만 업데이터를 보장하는 Hot Stream이다.
- 초기값을 가지며 emit() 혹은 value 로 프로퍼티에 접근가능하다. 또한 이전에 내보낸 값과 동일한 값을 소비자에게 전달하지 않도록 distinctUntilChanged()와 같은 연산자의 기능을 가지고 있다.
- 항상 최신의 값을 가져오기 때문에 최신 상태를 유지하는 데이터 홀더로써 사용이 권장된다.
sharedFlow
- StateFlow와 동일한 Hot Stream이다.
- MutableSharedFlow() 생성자 함수로 만들 수 있으며, 파라미터가 없는 기본 생성자로 만들 수 있지만 특정 파라미터로 옵션들을 설정할 수 있다.
sharedFlow 옵션
- replay : 새로운 구독자에게 이전 이벤트를 전달할 갯수
- extraBufferCapacity : 추가적인 버퍼들 생성하고 emit한 데이터를 버퍼에 유지
- onBufferOverflow : 버퍼가 가득찬 경우 어떤 동작을 할지 정의
현재 계속 보고 있는데 너무 어렵다.
그리고 무엇보다 Flow로 Hot stream을 짜게 되면 해당 부분의 데이터 언제 방출하고 언제 받을지 설계하는게 매우 어려울 것 같다.ㅠㅠ
나머지 이해안되는 부분은 실제로 예제를 만들어서 사용해보면서 이해할까 한다.ㅠㅠ