22時に寝ようと思って2時に寝る。

備忘録や日記を書いてます。きょうは早く寝よう。

RxJava - Reactive Streams とは

今回は、Reactive Streams について見ていきます。

Reactive Streams は、ライブラリやフレームワークに依存せず、データストリームを非同期的に扱うことができる仕組み・インタフェースを提供しています。

github.com

The purpose of Reactive Streams is to provide a standard for asynchronous stream processing with non-blocking backpressure.

reactive-streams/reactive-streams-jvm: Reactive Streams Specification for the JVM

  • non-blocking で backpressure
  • そういう非同期処理の枠組みを作るのが Reactive Streams

同期・非同期に関わらず、I/O処理などの外部のリソースを呼び出した場合、レスポンスが返ってくるまでの間は呼び出し元のメインスレッドは blocking されます。その反対が non-blocking な状態です。

また、データを通知する側(Publisher)に対して、受け取り手(Subscriber)が自分が処理できる能力の分だけ「これくらいに加減してください」とお願いすることで、データストリームが速すぎて処理能力が追いつかない・・・といった事態を防げる仕組みを backpressure と呼びます。

Reactive Streams では以上の2点を実現できるインタフェースを提供します。

Publisher と Subscriber

基本的な構成を説明します。Reactive Streams では、データを生産し通知する側を Publisher と呼び、通知されたデータを受け取り処理する側のことを Subscriber と呼びます。

  • Publisher: データの生産者,データを通知する
  • Subscriber: データの購読者,通知を受けてデータを受け取り処理する

主な流れを見ていきます。

  1. Publisher が通知する準備が整ったことを Subscriber へ通知する(onSubscribe)
  2. Subscriber は受け取るデータ数を併せてリクエストを送る(Subscription#request)
  3. Publisher はデータ数分データを生成し、データを通知する(onNext)
  4. Subscriber は通知されたデータをもとに処理を行い、再び Publisher へリクエストを送る(Subscription#request)
  5. Publisher はデータを通知する(onNext)
  6. これを完了(onComplete)もしくはエラーが起きる(onError)まで続ける

Publisher の処理と Subscriber の処理が別のスレッド上で行われていて、Publisher の処理のほうが速い場合、 Subscriber は処理が追いつかない状況になりかねません。Subscriber がリクエストの際にデータ数を渡している部分は、 Publisher へ自らの処理できる分だけデータを要求できます。これが backpressure にあたります。

上記の流れをまとめると、 Publisher と Subscriber は4つのプロトコルを用いてデータをやり取りしています。

プロトコル 概要
onSubscribe データの通知の準備が整ったことを通知する
onNext データを通知する
onError エラーの通知
onComplete 完了の通知

また、インタフェースは以下の4つに絞られます。

インタフェース 概要
Publisher データを生産し,通知する役割を担う
Subscriber データの通知を受け取り,処理行う
Subscription データ数のリクエスト,購読の解除を行う
Processor Publisher および Subscriber の両方の性質を併せ持つ

Reactive Streams の概要と、流れについて説明しました。

次の記事

azunobu.hatenablog.com

参考

RxJavaリアクティブプログラミング (CodeZine BOOKS)

RxJavaリアクティブプログラミング (CodeZine BOOKS)