RxJava - Flowable / Observable と Subscriber / Observer について
今回は、RxJava における Reactive Streams 対応について見てきます。
RxJava における生産者と購読者
Reactive Streamsの概念では、データを生産し通知する役割を担う生産者と通知されてきたデータを受け取って処理を行う消費者がいます。消費者が生産者を購読することで生産者からの通知を受け取ることができ、データを受け取ることが可能になります。
RxJava では、この両者の関係が2つ存在します。
生産者 | 消費者 | |
---|---|---|
Reactive Streams 対応 | Flowable | Subscriber |
Reactive Streams 非対応 | Observable | Observer |
大きな特徴として、Reactive Streams 非対応の Observable / Observer では backpressure がありません。前回、記事でも説明した backpressure は以下のようなことを実現します。
データを通知する側(Publisher)に対して、受け取り手(Subscriber)が自分が処理できる能力の分だけ「これくらいに加減してください」とお願いすることで、データストリームが速すぎて処理能力が追いつかない・・・といった事態を防げる仕組みを backpressure と呼びます。
Flowable と Subscriber
Reactive Streams の生産者 Publisher にあたるものを実装したのが Flowable です。また、Subscriber は Reactive Streams と同様です。Reactive Streams に対応しているため、基本的な仕組みは Reactive Streams と同じと考えても違和感なく扱うことができるようになっています。
- 生産者 Flowable が Subscriber に対して以下の通知を随時行う
- データの準備ができた: onSubscribe
- データを通知する: onNext
- エラーが発生した: onError
- 完了した: onComplete
そして、 Subscription インタフェースを通してデータ数のリクエスト( backpressure を実現できる)や購読の解除を行います。
Observable と Observer
RxJavaの2系(2.x)で提供されている Observable と Observer は、Reactive Streams に基づいた実装ではないため Reactive Streams のインタフェースは用いていません。ですが、 backpressure という特徴がないこと以外は基本的に構成は似ています。
- 生産者 Observable が Observer に対して以下の通知を行う
- データの準備ができた: onSubscribe
- データを通知する: onNext
- エラーが発生した: onError
- 完了した: onComplete
Subscription インタフェースがない代わりに、 Disposable というメソッドを使って購読の解除を行います。Disposable は onSubscribe メソッドの引数として渡され、dispose()
というように呼び出すことで購読を解除することが可能です。
次の記事
参考
RxJavaリアクティブプログラミング (CodeZine BOOKS)
- 作者: 須田智之
- 出版社/メーカー: 翔泳社
- 発売日: 2017/02/17
- メディア: 単行本(ソフトカバー)
- この商品を含むブログ (1件) を見る