View
728x90
RxJava란?
- 반응형 프로그램밍으로 데이터가 변하면 알아서 캐치하여 결과로 반영하는 프로그램 기술이다.
- ReactiveX(Reactive Extensions)를 java로 구현한 라이브러리
- ReactiveX = Reactive(비동기 이벤트 방식) + X(이벤트 처리 방식)
- 비동기 이벤트 기반 프로그래밍 라이브러리이다.
- 💡 비동기 프로그래밍이란 특정 코드의 처리가 완료되기 전, 처리하는 도중에 아래로 계속 내려가며 수행 하는 것
- 각각의 데이터를 구독하고 스레드를 지정하여 데이터를 방출 처리 소비 한다.
- 즉 간단히 말해 비동기처리에 유리한 방식의 프로그램 기술이라고 보면될거 같다.
RxJava구성?
RxJava = Observable(관찰할 것) + Observers(관찰자) + Schedulers(일정관리자, 스레드 지정자)
- Observable: 데이터 스트림으로 하나의 스레드에서 다른 스레드로 전달 할 데이터를 압축합니다. 주기적으로 또는 설정에 따라 생애 주기동안 한번만 데이터를 방출합니다.
즉 데이터를 처리하고 방출한다고 보면 됩니다. - Observers: 위에 Observable와 비슷한 단어이므로 헷갈릴 수 있지만 이것은 Observable에서 방출된 데이터 소비하는 구독자 입니다.
- 여기서 구독자라는 말이 어렵게 느껴지기도 하지만 이해하기 쉽게 말하면 우리가 잘아는 구독자라고 보면될 거 같습니다.
- 유튜브, 트위치의 구독자처럼 데이터를 구독하며 알람(데이터의 변화)가 왔을때 영상을 보는 것처럼 변화된 데이터에 대해 특정 처리를 합니다.
- Observers는 subscribeOn() 메서드를 사용해서 Observable를 구독하고 Observable이 방출하는 데이터를 수신합니다.
- Schedulers: Schedulers는 Observable과 Observers에게 실행되어야할 스레드를 알려줍니다.
- Observable에게는 scheduleOn() 메서드를 사용하여 어떤 스레드를 사용할지 알려줍니다.
- Observers에게는 observeOn() 메서드를 사용하여 관찰해야 할 스레드를 알려줍니다.
RxJava 사용법?
그래서 RxJava를 어케 사용하는 것인데? 라는 의문이 있을 것이다.(사실 이게 제일 궁금할 것이다.)
Observer방식(1번방식)
- Observer 인터페이스를 구현한 객체를 subscribe해서 소비자를 추가한 경우.
val observer = object : Observer<Int> {
override fun onComplete() {
// Observable이 완료된 경우
}
override fun onSubscribe(d: Disposable) {
// Observable이 데이터 전달할 준비가 되었을 때.
// 작업 취소를 위한 Disposable에 대한 레퍼런스를 여기서 받음
}
override fun onNext(t: Int) {
// Observable이 데이터를 전달할 때 호출
}
override fun onError(e: Throwable) {
// Observable이 에러를 전달할 때 호출. Error시 Complete없이 종료다.
}
}
Observable.just(1, 2, 3).subscribe(observer)
Consumer 방식 == 소비자 방식(2번방식)
- subscribe()함수를 사용해서 각각의 Consumer(소비자)를 추가한다.
- Consumer(소비자)는 한개의 메소드를 가지는 자바 인터페이스로 SAM을 통해 람다로 표현 가능하다.
- subscribe()의 return 타입이 Disposable이다.
- 주로 이 방식을 사용한다.
💡 SAM : Single Abstract Method의 약자로 단일 추상 메소드를 말한다.
이것은 Kotlin에서 SAM Conversions이 제공되기 대문에 하나의 추상 메서드에 대해서 lambdas(람다)를 제공한다는 것이다.
Ex)
button.setOnclickListener { println(”객체 대신 람다를 넘기고 있다.”) }
val disposable: Disposable = Observable.just(1, 2, 3)
.subscribe(
{ println("onNext $it") }, // onNext: Consumer(소비자)
{ println("onError") }, // onError: Consumer(소비자)
{ println("onComplete") }, // onComplete: Consumer(소비자)
{ println("onSubscribe") } // onSubscribe: Consumer(소비자)
)
- 여기에 2가지 방식을 다쓴 이유는 RxJava에 대해서 찾아보면 보통 1번식에 대한 내용이많다. 하지만 실제 코드에서는 1번 방식보다는 2번 방식을 사용하는 경우가 많은 것을 볼 수 있다.
- 1번 방식은 개념적인 내용이고 2번은 실제 사용적인 내용인거 같다.
- 글작성자인 나는 이 글을 보고 사용자에게 도움이 되었으면하기 때문에 이렇게 2가지를 다 보여주었다.!!!
RxJava함수
위에 여태까지 코드를 보면 여러 함수가 사용되었는데 이해하는데 도움이 되었으면 한다.(다시 읽으면서 복습하도록!!)
Observable의 세가지 알림
- onNext(): Observable이 데이터의 발행을 알린다.
- onComplete(): 모든 데이터의 발행을 완료했음을 알린다.
- 해당 이벤트는 단 한번 만 발생하고, 발생한 후부터는 onNext() 이벤트가 발생하면 안된다.
- onError(): Observable에서 어떤 이유로 에러가 발생했음을 알린다.
- onError()이벤트가 발생하면 이후 onNext()와 onComplete() 이벤트가 발생하면 안된다.
- 즉 Observable의 실행을 종료
just()함수
- 인자로 넣은 데이터를 차례로 발행하려고 Observable을 생성
- 한 개의 값을 넣을 수도 있고 인자로 최대 10개를 넣을 수도 있다.
- 단 타입이 모두 같아야 된다.
- 모든 데이터 발행이 완료 되면 onComplete 이벤트가 발생한다.
Observable.just(1, 2, 3, 4, 5)
.subscribe(System.out::println);
subscribe()함수
- RxJava 내가 동작시기 원하는 것을 사전에 정의해둔 다음 실제 그것이 실행되는 시점을 조절 할 수 있다. 이때 사용하는 함수가 바로 subscribe()이다.
- Observable은 just() 등의 팩토리 함수로 데이터 흐름을 정의한 후 subscribe() 함수를 호출해야 실제로 데이터를 발행한다.
val disposable: Disposable = Observable.just(1, 2, 3)
.subscribe(
{ println("onNext $it") }, // onNext: Consumer(소비자)
{ println("onError") }, // onError: Consumer(소비자)
{ println("onComplete") }, // onComplete: Consumer(소비자)
{ println("onSubscribe") } // onSubscribe: Consumer(소비자)
)
- subscribe()의 인자가 없는 경우
- onNext와 onComplete 이벤트를 뭇하고 onError 이벤트가 발생했을 때만 OnErrorNotImplementedException을 던진다.
- Observable로 작성한 코드를 테스트하거나 디버깅할 때 많이 사용한다.
- subscribe()의 인자가 1개인 경우
- onNext이벤트 처리
- onError이벤트 발생시 OnErrorNotImplementedException을 던진다.
- subscribe()의 인자가 2개인 경우
- onNext와 onError 이벤트를 처리
- subscribe()의 인자가 3개인 경우
- onNext, onError, onComplete 이벤트를 모두 처리
💡 위 함수의 원형은 모두 Disposable 인터페이스의 객체를 리턴한다.
fun dispose()
fun isDisposed(): boolean
- ds dispose()는 Observable에게 더 이상 데이터를 발생하지 않도록 구독을 해지하는 함수
- Observable이 onComplete 알림을 보냈을 때 자동으로 dispose()를 호출해 Observable과 구독자의 관계를 끊는다.
- onComplete 이벤트가 정상적으로 발생했다면 구독자가 별도로 dispose()을 호출할 필요가 없다.
create()함수
- 함수 내부에서 emitter가 직접 onNext, onComplete등으로 데이터를 전달하는 함수
Observable.create<String>{emitter ->
emitter.onNext("Hello")
emitter.onNext("RxJava")
emitter.onComplete()
}.subscribe{println(it)}
interval()함수
- 주어진 주기대로 0부터 1씩 증가된 값을 만드는 연산자
Observable.interval(100, TimeUnit.MILLISECONDS)
.subscribe(::println)
Thread.sleep(300)
range()함수
- range(start,count) : start부터 count만큼 1씩 증가한 데이터를 전달하는 연산자
Observable.range(3, 4).subscribe(::println)ㅇ
repeat()함수
- Observable을 지정한 횟수만큼 반복시키는 연산자(subscribe포함)
val observable = Observable.just("Hello", "World")
.repeat(2)
observable.subscribe(::println)ㄹ
from함수
- Array, Iterable, Callable로부터 Observable을 만드는 연사자
- fromArray, fromIterable, fromCallable
val items = arrayOf("Hello", "World")
Observable.fromArray(*items).subscribe(::println)
map()함수
- 데이터를 변환하는 연산자
Observable.fromIterable(0..3)
.map { "RxJava : $it" }
.subscribe(::println)
💡 `fromIterable` : ArrayList, HashSet처럼 Iterable을 구현한 모든 객체를 ObservableSource로 변환합니다.
xxxMap()함수
- 공통: Observable을 받아 새로운 Observable을 만드는 연산자
- flatMap: 데이터를 병렬적으로 처리
- concatMap: 데이터를 직렬적으로 처리
- switchMap: 중간에 데이터가 들어오면 처리
- 아래 예제를 실행해보면 어떤 느낌인지 알 수 있을 것이다.
Observable.fromIterable(listOf(1, 2, 3, 4, 5, 6))
.xxxMap { original: Int ->
Observable.just("$original plusplus")
.delay(Random.nextLong(5), TimeUnit.SECONDS)
}
.subscribe(::println)
💡
flatMap : 랜덤한 순서 출력
concatMap : 순서대로 출력
switchMap : 6 출력
debounce
- 일정시간 다른 아이템이 생성되지 않으면 데이터를 전달하는 연산자
Observable.interval(250,TimeUnit.MILLISECONDS)
.debounce(333, TimeUnit.MILLISECONDS)
.subscribe(::println) // 출력 없음
Thread.sleep(1000)ㄹ
filter/ofType
- filter : 특정 조건에 맞는 데이터만 전달
- ofType : 특정 타입에 맞는 데이터만 전달. 전달시 typecasting이 되었있음
Observable.just(11,true,"Hello","Rx",false)
.ofType(String::class.java)
.filter { it.length == 2 }
.subscribe(::println)
'TIL > 스터디' 카테고리의 다른 글
hilt가 무엇인디? (3) | 2024.02.27 |
---|---|
안드로이드 Task(간단 포스트) (0) | 2024.02.26 |
Subject 클래스(PublishSubject, ReplaySubject) (0) | 2024.02.22 |
Subject 클래스(AsyncSubject, BehaviorSubject) (0) | 2024.02.21 |
Hot Observable vs Cold Observable (2) | 2024.02.20 |
reply