View

RxJava

제롱구리 2024. 2. 15. 16:49
728x90

RxJava란?

  • 반응형 프로그램밍으로 데이터가 변하면 알아서 캐치하여 결과로 반영하는 프로그램 기술이다.
  • ReactiveX(Reactive Extensions)를 java로 구현한 라이브러리
    • ReactiveX = Reactive(비동기 이벤트 방식) + X(이벤트 처리 방식)
    • 비동기 이벤트 기반 프로그래밍 라이브러리이다.
    • 💡 비동기 프로그래밍이란 특정 코드의 처리가 완료되기 전, 처리하는 도중에 아래로 계속 내려가며 수행 하는 것
  • 각각의 데이터를 구독하고 스레드를 지정하여 데이터를 방출 처리 소비 한다.
  • 즉 간단히 말해 비동기처리에 유리한 방식의 프로그램 기술이라고 보면될거 같다.

RxJava구성?

RxJava = Observable(관찰할 것) + Observers(관찰자) + Schedulers(일정관리자, 스레드 지정자)

  • Observable: 데이터 스트림으로 하나의 스레드에서 다른 스레드로 전달 할 데이터를 압축합니다. 주기적으로 또는 설정에 따라 생애 주기동안 한번만 데이터를 방출합니다.
    데이터를 처리하고 방출한다고 보면 됩니다.
  • Observers: 위에 Observable와 비슷한 단어이므로 헷갈릴 수 있지만 이것은 Observable에서 방출된 데이터 소비하는 구독자 입니다.
    • 여기서 구독자라는 말이 어렵게 느껴지기도 하지만 이해하기 쉽게 말하면 우리가 잘아는 구독자라고 보면될 거 같습니다.
    • 유튜브, 트위치의 구독자처럼 데이터를 구독하며 알람(데이터의 변화)가 왔을때 영상을 보는 것처럼 변화된 데이터에 대해 특정 처리를 합니다.
    • Observers는 subscribeOn() 메서드를 사용해서 Observable를 구독하고 Observable이 방출하는 데이터를 수신합니다.
  • Schedulers: Schedulers는 Observable과 Observers에게 실행되어야할 스레드를 알려줍니다.
    • Observable에게는 scheduleOn() 메서드를 사용하여 어떤 스레드를 사용할지 알려줍니다.
    • Observers에게는 observeOn() 메서드를 사용하여 관찰해야 할 스레드를 알려줍니다.

RxJava 사용법?

그래서 RxJava를 어케 사용하는 것인데? 라는 의문이 있을 것이다.(사실 이게 제일 궁금할 것이다.)

Observer방식(1번방식)

  • Observer 인터페이스를 구현한 객체를 subscribe해서 소비자를 추가한 경우.
val observer = object : Observer<Int> {
        override fun onComplete() {
            // Observable이 완료된 경우
        }
        override fun onSubscribe(d: Disposable) {
            // Observable이 데이터 전달할 준비가 되었을 때.
            // 작업 취소를 위한 Disposable에 대한 레퍼런스를 여기서 받음
        }
        override fun onNext(t: Int) {
            // Observable이 데이터를 전달할 때 호출
        }
        override fun onError(e: Throwable) {
            // Observable이 에러를 전달할 때 호출. Error시 Complete없이 종료다.
        }
    }
    Observable.just(1, 2, 3).subscribe(observer)

Consumer 방식 == 소비자 방식(2번방식)

  • subscribe()함수를 사용해서 각각의 Consumer(소비자)를 추가한다.
  • Consumer(소비자)는 한개의 메소드를 가지는 자바 인터페이스로 SAM을 통해 람다로 표현 가능하다.
  • subscribe()의 return 타입이 Disposable이다.
  • 주로 이 방식을 사용한다.

💡 SAM : Single Abstract Method의 약자로 단일 추상 메소드를 말한다.

이것은 Kotlin에서 SAM Conversions이 제공되기 대문에 하나의 추상 메서드에 대해서 lambdas(람다)를 제공한다는 것이다.

Ex)
button.setOnclickListener { println(”객체 대신 람다를 넘기고 있다.”) }

val disposable: Disposable = Observable.just(1, 2, 3)
        .subscribe(
            { println("onNext $it") }, // onNext: Consumer(소비자)
            { println("onError") }, // onError: Consumer(소비자)
            { println("onComplete") }, // onComplete: Consumer(소비자)
            { println("onSubscribe") } // onSubscribe: Consumer(소비자)
        )
  • 여기에 2가지 방식을 다쓴 이유는 RxJava에 대해서 찾아보면 보통 1번식에 대한 내용이많다. 하지만 실제 코드에서는 1번 방식보다는 2번 방식을 사용하는 경우가 많은 것을 볼 수 있다.
  • 1번 방식은 개념적인 내용이고 2번은 실제 사용적인 내용인거 같다.
  • 글작성자인 나는 이 글을 보고 사용자에게 도움이 되었으면하기 때문에 이렇게 2가지를 다 보여주었다.!!!

RxJava함수

위에 여태까지 코드를 보면 여러 함수가 사용되었는데 이해하는데 도움이 되었으면 한다.(다시 읽으면서 복습하도록!!)

Observable의 세가지 알림

  • onNext(): Observable이 데이터의 발행을 알린다.
  • onComplete(): 모든 데이터의 발행을 완료했음을 알린다.
    • 해당 이벤트는 단 한번 만 발생하고, 발생한 후부터는 onNext() 이벤트가 발생하면 안된다.
  • onError(): Observable에서 어떤 이유로 에러가 발생했음을 알린다.
    • onError()이벤트가 발생하면 이후 onNext()와 onComplete() 이벤트가 발생하면 안된다.
    • 즉 Observable의 실행을 종료

just()함수

  • 인자로 넣은 데이터를 차례로 발행하려고 Observable을 생성
  • 한 개의 값을 넣을 수도 있고 인자로 최대 10개를 넣을 수도 있다.
  • 단 타입이 모두 같아야 된다.
  • 모든 데이터 발행이 완료 되면 onComplete 이벤트가 발생한다.
Observable.just(1, 2, 3, 4, 5)
                .subscribe(System.out::println);

subscribe()함수

  • RxJava 내가 동작시기 원하는 것을 사전에 정의해둔 다음 실제 그것이 실행되는 시점을 조절 할 수 있다. 이때 사용하는 함수가 바로 subscribe()이다.
  • Observable은 just() 등의 팩토리 함수로 데이터 흐름을 정의한 후 subscribe() 함수를 호출해야 실제로 데이터를 발행한다.
val disposable: Disposable = Observable.just(1, 2, 3)
        .subscribe(
            { println("onNext $it") }, // onNext: Consumer(소비자)
            { println("onError") }, // onError: Consumer(소비자)
            { println("onComplete") }, // onComplete: Consumer(소비자)
            { println("onSubscribe") } // onSubscribe: Consumer(소비자)
        )
  • subscribe()의 인자가 없는 경우
    • onNext와 onComplete 이벤트를 뭇하고 onError 이벤트가 발생했을 때만 OnErrorNotImplementedException을 던진다.
    • Observable로 작성한 코드를 테스트하거나 디버깅할 때 많이 사용한다.
  • subscribe()의 인자가 1개인 경우
    • onNext이벤트 처리
    • onError이벤트 발생시 OnErrorNotImplementedException을 던진다.
  • subscribe()의 인자가 2개인 경우
    • onNext와 onError 이벤트를 처리
  • subscribe()의 인자가 3개인 경우
    • onNext, onError, onComplete 이벤트를 모두 처리

💡 위 함수의 원형은 모두 Disposable 인터페이스의 객체를 리턴한다.

fun dispose()
fun isDisposed(): boolean
  • ds dispose()는 Observable에게 더 이상 데이터를 발생하지 않도록 구독을 해지하는 함수
  • Observable이 onComplete 알림을 보냈을 때 자동으로 dispose()를 호출해 Observable과 구독자의 관계를 끊는다.
  • onComplete 이벤트가 정상적으로 발생했다면 구독자가 별도로 dispose()을 호출할 필요가 없다.

create()함수

  • 함수 내부에서 emitter가 직접 onNext, onComplete등으로 데이터를 전달하는 함수
Observable.create<String>{emitter -> 
    emitter.onNext("Hello")
    emitter.onNext("RxJava")
    emitter.onComplete()
}.subscribe{println(it)}

interval()함수

  • 주어진 주기대로 0부터 1씩 증가된 값을 만드는 연산자
Observable.interval(100, TimeUnit.MILLISECONDS)
        .subscribe(::println)
Thread.sleep(300)

range()함수

  • range(start,count) : start부터 count만큼 1씩 증가한 데이터를 전달하는 연산자
Observable.range(3, 4).subscribe(::println)ㅇ

repeat()함수

  • Observable을 지정한 횟수만큼 반복시키는 연산자(subscribe포함)
val observable = Observable.just("Hello", "World")
    .repeat(2)
observable.subscribe(::println)ㄹ

from함수

  • Array, Iterable, Callable로부터 Observable을 만드는 연사자
  • fromArray, fromIterable, fromCallable
val items = arrayOf("Hello", "World")
    Observable.fromArray(*items).subscribe(::println)

map()함수

  • 데이터를 변환하는 연산자
Observable.fromIterable(0..3)
    .map { "RxJava : $it" }
    .subscribe(::println)

💡 `fromIterable` : ArrayList, HashSet처럼 Iterable을 구현한 모든 객체를 ObservableSource로 변환합니다.

xxxMap()함수

  • 공통: Observable을 받아 새로운 Observable을 만드는 연산자
  • flatMap: 데이터를 병렬적으로 처리
  • concatMap: 데이터를 직렬적으로 처리
  • switchMap: 중간에 데이터가 들어오면 처리
  • 아래 예제를 실행해보면 어떤 느낌인지 알 수 있을 것이다.
Observable.fromIterable(listOf(1, 2, 3, 4, 5, 6))
    .xxxMap { original: Int ->
        Observable.just("$original plusplus")
            .delay(Random.nextLong(5), TimeUnit.SECONDS)
    }
    .subscribe(::println)

💡

flatMap : 랜덤한 순서 출력
concatMap : 순서대로 출력
switchMap : 6 출력

debounce

  • 일정시간 다른 아이템이 생성되지 않으면 데이터를 전달하는 연산자
Observable.interval(250,TimeUnit.MILLISECONDS)
    .debounce(333, TimeUnit.MILLISECONDS)
    .subscribe(::println) // 출력 없음
Thread.sleep(1000)ㄹ

filter/ofType

  • filter : 특정 조건에 맞는 데이터만 전달
  • ofType : 특정 타입에 맞는 데이터만 전달. 전달시 typecasting이 되었있음
Observable.just(11,true,"Hello","Rx",false)
    .ofType(String::class.java)
    .filter { it.length == 2 }
    .subscribe(::println)
Share Link
reply
«   2024/12   »
1 2 3 4 5 6 7
8 9 10 11 12 13 14
15 16 17 18 19 20 21
22 23 24 25 26 27 28
29 30 31