22時に寝ようと思って2時に寝る。

備忘録や日記を書いてます。きょうは早く寝よう。

RxJava - Flowable / Observable と Subscriber / Observer について

今回は、RxJava における Reactive Streams 対応について見てきます。

RxJava における生産者と購読者

Reactive Streamsの概念では、データを生産し通知する役割を担う生産者と通知されてきたデータを受け取って処理を行う消費者がいます。消費者が生産者を購読することで生産者からの通知を受け取ることができ、データを受け取ることが可能になります。

RxJava では、この両者の関係が2つ存在します。

生産者 消費者
Reactive Streams 対応 Flowable Subscriber
Reactive Streams 非対応 Observable Observer

大きな特徴として、Reactive Streams 非対応の Observable / Observer では backpressure がありません。前回、記事でも説明した backpressure は以下のようなことを実現します。

データを通知する側(Publisher)に対して、受け取り手(Subscriber)が自分が処理できる能力の分だけ「これくらいに加減してください」とお願いすることで、データストリームが速すぎて処理能力が追いつかない・・・といった事態を防げる仕組みを backpressure と呼びます。

RxJava - Reactive Streams とは - 22時に寝ようと思って2時に寝る。

Flowable と Subscriber

Reactive Streams の生産者 Publisher にあたるものを実装したのが Flowable です。また、Subscriber は Reactive Streams と同様です。Reactive Streams に対応しているため、基本的な仕組みは Reactive Streams と同じと考えても違和感なく扱うことができるようになっています。

  • 生産者 Flowable が Subscriber に対して以下の通知を随時行う
    • データの準備ができた: onSubscribe
    • データを通知する: onNext
    • エラーが発生した: onError
    • 完了した: onComplete

そして、 Subscription インタフェースを通してデータ数のリクエスト( backpressure を実現できる)や購読の解除を行います。

Observable と Observer

RxJavaの2系(2.x)で提供されている Observable と Observer は、Reactive Streams に基づいた実装ではないため Reactive Streams のインタフェースは用いていません。ですが、 backpressure という特徴がないこと以外は基本的に構成は似ています。

  • 生産者 Observable が Observer に対して以下の通知を行う
    • データの準備ができた: onSubscribe
    • データを通知する: onNext
    • エラーが発生した: onError
    • 完了した: onComplete

Subscription インタフェースがない代わりに、 Disposable というメソッドを使って購読の解除を行います。Disposable は onSubscribe メソッドの引数として渡され、dispose()というように呼び出すことで購読を解除することが可能です。

次の記事

azunobu.hatenablog.com

参考

RxJavaリアクティブプログラミング (CodeZine BOOKS)

RxJavaリアクティブプログラミング (CodeZine BOOKS)