RxJava를 활용한 리액티브 프로그래밍 정리 Vol.1
-
date_range 07/05/2018 11:30 infosortJavalabelRxJavaReactiveProgramming
1장 개요
- RxJava란?
- 자바와 안드로이드를 위한 리액티브 프로그래밍 구현체로, 함수형 프로그래밍의 영향을 받았다.
- 리액티브 프로그래밍이란?
- 데이터나 이벤트 변화의 반응에 초점을 맞춘 프로그래밍을 뜻한다. (ex. 스프레드 시트)
- RxJava의 동작방식
- RxJava의 핵심은 이벤트 스트림을 나타내는 Observable타입이며, 이벤트 밀어내기(Push) 방식을 지향한다.
- Observable과 그를 구독하는 Observer(Subscriber)의 쌍으로 구성된다.
- RxJava는 일반적으로 비동기로 동작하지만, 기본값은 동기 방식으로 되어있으며, 명시적 요청이 없다면 동시성 처리를 하지 않는다.
- 단일 Observable은 동시성/병렬성 둘 다 허용하지 않는다. 대신 여러 비동기 Observable조합을 통해 이를 수행한다.
- 동시성(Concurrency) : 싱글 코어에서 멀티 스레드를 동작시키는 방식, 동시에 실행되는 것 처럼 보이는 것, 실제론 번갈아 실행
- 병렬성(Parallelism) : 멀티 코어에서 멀티 스레드를 동작시키는 방식, 실제로 동시에 작업 처리되는 것
- onNext, onCompleted, onError 이벤트는 동시에 방출되지 않는다. 달리 말해, 하나의 Observable은 항상 직렬화되어 스레드에 안전해야 한다.
- Observable은 느긋(lazy)하다. Observer가 구독하지 않는 한 이벤트 방출을 시작하지 않는다.
- Observable은 재사용할 수 있다.
2장 Observable과 Observer, Subscriber
- Observable
rx.Observable<T>
는 값이 흐르는 순서를 나타낸다.Observable<T>
은 3가지 유형의 이벤트를 만들어 낼 수 있다. (T 자료형의 값, 완료, 오류)
- Observer, Subscriber
- Observable은 구독전에는 이벤트를 방출하지 않는다. 따라서 관찰을 시작하려면 subscribe()계통의 메서드를 사용해야 한다.
Observable<Tweet> tweets = ... tweets.subscribe(t -> System.out.println(t));
- 3가지 콜백에 대해 감싸놓은 구현체가 Observer이다.
Observer<Tweet> observer = new Observer<Tweet>() { @Override public void onCompleted() { noMore(); } @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onNext(Tweet tweet) { System.out.println(tweet); } }; tweets.subscribe(observer);
- 구독 해지를 할 수 있는 기능이 필요한데, subscription과 subscriber가 있다. subscription은 unsubscribe() 메서드를 이용해 구독을 취소할 수 있도록 하고, Subscriber는 리스너 내부에서 구독해지를 요청한다.
Subscription subscription = tweets.subscribe(System.out::println); subscription.unsubscribe();
Subscriber<Tweet> subscriber = new Subscriber<Tweet>() { @Override public void onCompleted() { noMore(); } @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onNext(Tweet tweet) { if(tweet.getText().contains("Java")) { unsubscribe(); } System.out.println(tweet); } }; tweets.subscribe(subscriber);
- Observable은 구독전에는 이벤트를 방출하지 않는다. 따라서 관찰을 시작하려면 subscribe()계통의 메서드를 사용해야 한다.
- Observable 팩토리 메서드
- Observable.just(value) : 하나의 값을 방출하고 종료한다. 방출 값은 9개까지 가능함.
- Observable.from(values) : 컬렉션을 인자로 받아, 컬렉션의 인자들을 방춣한다.
- Observable.range(from, n) : from부터 n개의 정수값을 취해 스트림을 만든다.
- Observable.empty() : 아무런 값도 방출하지 않고 구독을 종료한다.
- Observable.never() : 알림, 종료, 오류 어떤 것도 방출하지 않는다.
- Observable.error() : 모든 구독자에게 onError 알림을 방출한다.
- RxJava의 비동기속성은 당연시되지만, 사실 대부분 RxJava의 연산자는 스레드풀을 전혀 사용하지 않는다.