menu

RxJava를 활용한 리액티브 프로그래밍 정리 Vol.1

1장 개요
  • RxJava란?
    • 자바와 안드로이드를 위한 리액티브 프로그래밍 구현체로, 함수형 프로그래밍의 영향을 받았다.
  • 리액티브 프로그래밍이란?
    • 데이터나 이벤트 변화의 반응에 초점을 맞춘 프로그래밍을 뜻한다. (ex. 스프레드 시트)
  • RxJava의 동작방식
    • RxJava의 핵심은 이벤트 스트림을 나타내는 Observable타입이며, 이벤트 밀어내기(Push) 방식을 지향한다.
    • Observable과 그를 구독하는 Observer(Subscriber)의 쌍으로 구성된다.
    • RxJava는 일반적으로 비동기로 동작하지만, 기본값은 동기 방식으로 되어있으며, 명시적 요청이 없다면 동시성 처리를 하지 않는다.
    • 단일 Observable은 동시성/병렬성 둘 다 허용하지 않는다. 대신 여러 비동기 Observable조합을 통해 이를 수행한다.
      • 동시성(Concurrency) : 싱글 코어에서 멀티 스레드를 동작시키는 방식, 동시에 실행되는 것 처럼 보이는 것, 실제론 번갈아 실행
      • 병렬성(Parallelism) : 멀티 코어에서 멀티 스레드를 동작시키는 방식, 실제로 동시에 작업 처리되는 것
    • onNext, onCompleted, onError 이벤트는 동시에 방출되지 않는다. 달리 말해, 하나의 Observable은 항상 직렬화되어 스레드에 안전해야 한다.
    • Observable은 느긋(lazy)하다. Observer가 구독하지 않는 한 이벤트 방출을 시작하지 않는다.
    • Observable은 재사용할 수 있다.
2장 Observable과 Observer, Subscriber
  • Observable
    • rx.Observable<T>는 값이 흐르는 순서를 나타낸다.
    • Observable<T>은 3가지 유형의 이벤트를 만들어 낼 수 있다. (T 자료형의 값, 완료, 오류)
  • Observer, Subscriber
    • Observable은 구독전에는 이벤트를 방출하지 않는다. 따라서 관찰을 시작하려면 subscribe()계통의 메서드를 사용해야 한다.
       Observable<Tweet> tweets = ...
       tweets.subscribe(t -> System.out.println(t));
      
    • 3가지 콜백에 대해 감싸놓은 구현체가 Observer이다.
      Observer<Tweet> observer = new Observer<Tweet>() {
             @Override
             public void onCompleted() {
                 noMore();
             }
      
             @Override
             public void onError(Throwable e) {
                 e.printStackTrace();
      
             }
      
             @Override
             public void onNext(Tweet tweet) {
                 System.out.println(tweet);
             }
         };
      tweets.subscribe(observer);     
      
    • 구독 해지를 할 수 있는 기능이 필요한데, subscription과 subscriber가 있다. subscription은 unsubscribe() 메서드를 이용해 구독을 취소할 수 있도록 하고, Subscriber는 리스너 내부에서 구독해지를 요청한다.
      Subscription subscription = tweets.subscribe(System.out::println);
      subscription.unsubscribe();
      
      Subscriber<Tweet> subscriber = new Subscriber<Tweet>() {
             @Override
             public void onCompleted() {
                 noMore();
             }
      
             @Override
             public void onError(Throwable e) {
                 e.printStackTrace();
      
             }
      
             @Override
             public void onNext(Tweet tweet) {
                 if(tweet.getText().contains("Java")) {
                   unsubscribe();
                 }
                 System.out.println(tweet);
             }
         };
      tweets.subscribe(subscriber);
      
  • Observable 팩토리 메서드
    • Observable.just(value) : 하나의 값을 방출하고 종료한다. 방출 값은 9개까지 가능함.
    • Observable.from(values) : 컬렉션을 인자로 받아, 컬렉션의 인자들을 방춣한다.
    • Observable.range(from, n) : from부터 n개의 정수값을 취해 스트림을 만든다.
    • Observable.empty() : 아무런 값도 방출하지 않고 구독을 종료한다.
    • Observable.never() : 알림, 종료, 오류 어떤 것도 방출하지 않는다.
    • Observable.error() : 모든 구독자에게 onError 알림을 방출한다.
  • RxJava의 비동기속성은 당연시되지만, 사실 대부분 RxJava의 연산자는 스레드풀을 전혀 사용하지 않는다.