RxJava - BackpressureStrategy の種類
今回は Backpressure の挙動を指定するための列挙型、BackpressureStrategy について見ていきます
BackpressureStrategy とは
Flowable は場合によって生成スピードが Subscribe の処理速度以上に速い場合があります。そういった場合は、データは通知されるのを待つことになります。その待つデータに関する挙動を指定するのが BackpressureStrategy です。
BackpressureStrategy の種類一覧
概要 | |
---|---|
BUFFER | 消費者に通知されるまで、すべてのデータがバッファされる |
DROP | 消費者にデータが通知できるようになるまでに生成されたデータを破棄する |
LATEST | 生成された最新のデータ(1件)のみをバッファし、生成される度に最新のデータで上書きする |
ERROR | 通知待ちとなっているバッファデータが最大バッファサイズを超えた場合に MissingBackpressureException が発生し、エラー通知する |
NONE | 処理は行わず、onBackpressureBuffer(int capacity) または 他のパラメータ化された onBackpressureXXX メソッドを使用する場合に使用可能 |
参考
RxJavaリアクティブプログラミング (CodeZine BOOKS)
- 作者: 須田智之
- 出版社/メーカー: 翔泳社
- 発売日: 2017/02/17
- メディア: 単行本(ソフトカバー)
- この商品を含むブログ (1件) を見る
RxJava - IntelliJ IDEA で RxJava の環境構築
実際に RxJava のサンプルコードを書いていく上での環境を構築していきます。今回はエディタとして IntelliJ IDEA を用います。
動作確認環境
- OS: macOS High Sierra 10.13.6
- IntelliJ IDEA: 2018.2.1 (Ultimate Edition) Build #IU-182.3911.36
- JRE: 1.8.0_152-release-1248-b8 x86_64
- JVM: OpenJDK 64-Bit Server VM by JetBrains s.r.o
環境構築
今回導入する RxJava のバージョンは記事執筆時点で最新の 2.2.0 です。
Release 2.2.0 · ReactiveX/RxJava · GitHub
IntelliJ IDEA で新たにプロジェクトを作成する
- 起動後の
Welcome to IntelliJ IDEA
画面にて、+ Create New Project
を選択
- サイドメニューから
Gradle
を選び、Next
で次へ
Project SDK
について聞かれた場合は IntelliJ IDEA 側でよしなにやってくれていると思いますので、そのとおり進めてくださいGroupId
,ArtifactId
,Version
について- https://improve-future.com/what-are-groupid-artifactid.html でわかりやすく説明されています
GroupId
はプロジェクトを一意に識別できるものでcom.example.azuuun.rxjava
といったフォーマットで入力ArtifactId
は任意のプロジェクト名RxJavaPlayground
といったものを入力Version
はデフォルトでよいと思います
- 次の画面の
module
の設定画面はデフォルトで進めて問題ないです - ProjectName などは先程設定した
ArtifactId
がデフォルトで設定されていますが、適当に名前を入力します
Gradle でライブラリを適用する
- プロジェクトが作成された後、
build.gradle
ファイルを開きます dependencies
にcompile 'io.reactivex.rxjava2:rxjava:2.2.0'
を追記- ファイル右上に
Enabled Auto-Import
といった表示があればクリックします - 正常に導入されるかを確認
サンプルコードを書いてみる
HelloRxJava
というファイルを作成し、以下のコードを書いて実行してみましょう。
import io.reactivex.*; import io.reactivex.schedulers.Schedulers; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; public class HelloRxJava { public static void main(String[] args) throws Exception { Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() { @Override public void subscribe(FlowableEmitter<String> emitter) throws Exception { String[] datas = { "Hello, RxJava", "こんにちは、RxJava" }; for (String data : datas) { if (emitter.isCancelled()) { return; } // データを通知する emitter.onNext(data); } // 完了したことを通知する emitter.onComplete(); } }, BackpressureStrategy.BUFFER); flowable.observeOn(Schedulers.computation()) .subscribe(new Subscriber<String>() { private Subscription subscription; // 購読を開始したときの処理 @Override public void onSubscribe(Subscription s) { // Subscription を Subscriber 内で保持 this.subscription = s; // 受け取るデータ数の要求 this.subscription.request(1L); } // 通知を受け取ったときの処理 @Override public void onNext(String data) { String threadName = Thread.currentThread().getName(); System.out.println(String.format("%s: %s", threadName, data)); // 次に受け取るデータのリクエスト this.subscription.request(1L); } // エラー通知を受け取ったときの処理 @Override public void onError(Throwable t) { t.printStackTrace(); } // 完了通知を受け取ったときの処理 @Override public void onComplete() { String threadName = Thread.currentThread().getName(); System.out.println(String.format("%s: 完了しました", threadName)); } }); Thread.sleep(500L); } }
以下のような出力が出れば、環境は正常に構築できています。
RxComputationThreadPool-1: Hello, RxJava RxComputationThreadPool-1: こんにちは、RxJava RxComputationThreadPool-1: 完了しました
以上です。
次の記事
参考
RxJavaリアクティブプログラミング (CodeZine BOOKS)
- 作者: 須田智之
- 出版社/メーカー: 翔泳社
- 発売日: 2017/02/17
- メディア: 単行本(ソフトカバー)
- この商品を含むブログ (1件) を見る
RxJava - Cold および Hot な生産者(Flowable / Observable)
今回は、生産者(Publisher)は大きく分けて Cold と Hot の2つに分類することができます。それら2つについて説明してきます。
Cold な生産者
- Cold な生産者は 1つの消費者とのみ購読関係を結ぶ
- 1つの消費者と購読関係を結ぶ度にデータを通知するためのタイムラインを生成する
Hot な生産者
- Hot な生産者は 複数の消費者と購読関係を結ぶ
- 既に生成しておいたタイムラインに対して購読関係を結んだ消費者が後から加わるかたち
購読のタイミング
- Cold な生産者に対する購読は、購読開始時点で生産者側でデータの生産が開始される
- Hot な生産者に対する購読は、購読を開始したといっても生産者側でデータが生成されるとは限らない
- Hot な生産者を購読する場合、途中からのデータを通知される可能性もあるため、複数の購読者が同じデータを受け取れるとは限らない
RxJava での Cold / Hot な生産者
- 基本的に生成メソッドで作られる生産者は Cold な生産者である
- Hot な生産者をつくるには Cold な生産者を作ってから Hot な生産者へと変換するメソッドを呼んで変換するか、 Processor や Subject をつくる
- Hot な Flowable / Observable として ConnectionFlowable / ConnectionObservable がある
- Cold な Flowable / Observable を Hot な生産者に変換するメソッドを呼ぶと 上記2つが生成される
次の記事
参考
RxJavaリアクティブプログラミング (CodeZine BOOKS)
- 作者: 須田智之
- 出版社/メーカー: 翔泳社
- 発売日: 2017/02/17
- メディア: 単行本(ソフトカバー)
- この商品を含むブログ (1件) を見る
RxJava - オペレータについて
今回は、Publisher から通知されたデータを受け取って処理する際に用いるオペレータを見ていきます。
段階的にデータを整形する
RxJava では、Publisher(Flowable / Observable)から Subscriber(Subscriber / Observer)にデータが通知される間に Subscriber がデータを利用しやすい形に整形、変換することができます。整形されたデータは再び Flowable / Observable なデータとして返されるため、メソッドチェインする形で段階的にデータを整形することが可能です。こういったデータを生成したり、変換したり、フィルターをかけたりできるメソッドのことを RxJava ではオペレータと呼びます。
メソッドをつなげて Subscriber へ通知されていく流れ
上記の図は、流れてきた数値データを x5 していくオペレータのイメージ図です。丸い円のことをマーブルと呼んだりします。これをコードで表記すると以下のようになります。
public static void main(String[] args) { Flowable<Integer> flowable = // 1 から 10 までの数値を順に通知していくデータを生成 Flowable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) // 通知するデータを 5 倍していく .map(data -> data * 5); flowable.subscribe(data -> System.out.println("data=" + data)); }
期待される実行結果は以下です。
data=5 data=10 data=15 ... data=45 data=50
おおまかな順序は以下のとおりです。
- just メソッドを用いて、引数通して渡した左から順番にデータを通知していく Flowable を生成する
- map メソッドを用いて、 Flowable によって通知されてきたデータを一つずつ 5 倍する
このように通知されてきたデータを一気に処理していくのではなく、オペレータを使用して Observable を返しながら 最終的に通知したいデータへ制御できます。
関数型インタフェースに影響を受けた RxJava
RxJava はメソッドの多くが関数型インタフェースを引数として受け取るようになっており、関数型プログラミングの原則に従い、
- 同じ入力を受け取ったら毎回同じ結果を返す
- 入力値や処理の外部のオブジェクトおよび環境に対して何も変化を起こさない
以上の2つが基本となります。
副作用を避ける
オペレータによって受け取ったデータの状態に変化を与えたり、外部に対して何らかの変化を与えたりすることを副作用と呼びます。処理の外部から参照型のオブジェクトの値を変える、データベースやファイルを更新するといったものが、副作用にあたります。副作用を避けることによって、責任範囲がより明確になります。このことより、以下のことが期待できます。
- 複数のスレッドから参照されるオブジェクトがないため、スレッドセーフになる
- 仮にある実装がパフォーマンス改善のために同期処理から非同期処理に置き換わったとしても、内部の処理は修正せずに動き続ける
データの処理のタイミングに注意する
関数型プログラミングでは、引数で渡したデータはその時点で評価された値を受け取ります。引数に式を記述したとしても、既に評価された値しか渡されません。つまり、メソッドが実行される前から値が決まっています。
例として以下のコードでは、Flowable が生成された際にシステム時間が評価され、値が確定します。
Flowable<Long> flowable = Flowable.just(System.currentTimeMillis());
一方で、以下のコードでは購読されたタイミングでfromCallbackメソッドが呼び出され、呼び出された瞬間にシステムの時間が決定され通知されます。
print(() -> System.currentTimeMills());
つまり、
- justメソッドによるデータを通知する Flowable を何度 onSubscribe で購読したとしても同じ値を返す
- 一方で fromCallBack メソッドによって生成された Flowable は呼び出すたびに式が評価され値が変化する
次の記事
参考
RxJavaリアクティブプログラミング (CodeZine BOOKS)
- 作者: 須田智之
- 出版社/メーカー: 翔泳社
- 発売日: 2017/02/17
- メディア: 単行本(ソフトカバー)
- この商品を含むブログ (1件) を見る
RxJava - Flowable / Observable と Subscriber / Observer について
今回は、RxJava における Reactive Streams 対応について見てきます。
RxJava における生産者と購読者
Reactive Streamsの概念では、データを生産し通知する役割を担う生産者と通知されてきたデータを受け取って処理を行う消費者がいます。消費者が生産者を購読することで生産者からの通知を受け取ることができ、データを受け取ることが可能になります。
RxJava では、この両者の関係が2つ存在します。
生産者 | 消費者 | |
---|---|---|
Reactive Streams 対応 | Flowable | Subscriber |
Reactive Streams 非対応 | Observable | Observer |
大きな特徴として、Reactive Streams 非対応の Observable / Observer では backpressure がありません。前回、記事でも説明した backpressure は以下のようなことを実現します。
データを通知する側(Publisher)に対して、受け取り手(Subscriber)が自分が処理できる能力の分だけ「これくらいに加減してください」とお願いすることで、データストリームが速すぎて処理能力が追いつかない・・・といった事態を防げる仕組みを backpressure と呼びます。
Flowable と Subscriber
Reactive Streams の生産者 Publisher にあたるものを実装したのが Flowable です。また、Subscriber は Reactive Streams と同様です。Reactive Streams に対応しているため、基本的な仕組みは Reactive Streams と同じと考えても違和感なく扱うことができるようになっています。
- 生産者 Flowable が Subscriber に対して以下の通知を随時行う
- データの準備ができた: onSubscribe
- データを通知する: onNext
- エラーが発生した: onError
- 完了した: onComplete
そして、 Subscription インタフェースを通してデータ数のリクエスト( backpressure を実現できる)や購読の解除を行います。
Observable と Observer
RxJavaの2系(2.x)で提供されている Observable と Observer は、Reactive Streams に基づいた実装ではないため Reactive Streams のインタフェースは用いていません。ですが、 backpressure という特徴がないこと以外は基本的に構成は似ています。
- 生産者 Observable が Observer に対して以下の通知を行う
- データの準備ができた: onSubscribe
- データを通知する: onNext
- エラーが発生した: onError
- 完了した: onComplete
Subscription インタフェースがない代わりに、 Disposable というメソッドを使って購読の解除を行います。Disposable は onSubscribe メソッドの引数として渡され、dispose()
というように呼び出すことで購読を解除することが可能です。
次の記事
参考
RxJavaリアクティブプログラミング (CodeZine BOOKS)
- 作者: 須田智之
- 出版社/メーカー: 翔泳社
- 発売日: 2017/02/17
- メディア: 単行本(ソフトカバー)
- この商品を含むブログ (1件) を見る
RxJava - Reactive Streams のルール
Reactive Streamsの仕組みを理解する上で重要なルールを簡単に説明します。
詳しくは、以下の公式リポジトリの README.md を参照してください。
重要なルールは主に4つです。
- 購読を開始する通知である onSubscribe は購読につき一度だけ通知する
- 通知は順を追ってつぎつぎに行われる
- null は通知しない
- Publisherの処理の終了は、 onComplete(処理の完了)または onError (処理の異常終了)を通知することで行う
onSubscribe は購読につき一度だけ通知する
購読の開始を意味する onSubscribe は、一度だけしか行われません。
通知は順を追って次々に行われる
通知は必ず一つずつ順次(シーケンシャルに)行われます。複数の通知が一度に行われることはあり得ません。RxJava では Observable契約 というルールがあり、これがデータが複数同時に通知されることによる不整合を防いでくれています。
null は通知しない
null は通知できません。Reactive Streams で 仮に null が通知されようとしたときは NullPointerException が起こるようになっています。通常のデータの通知である onNext だけでなく、 エラー時の通知 onError においても null は許容されません。
完了またはエラーによって Publisher は処理を終了する
Publisher が処理を終了する契機は2つしかありません。onComplete で完了させるか、 onError によってエラーで終了するか、です。これら2つのいずれかが通知された購読は、それ以降データを通知することはありません。つまり、 onComplete や onError の後に何らかの処理を続行することはできません。
次の記事
参考
RxJavaリアクティブプログラミング (CodeZine BOOKS)
- 作者: 須田智之
- 出版社/メーカー: 翔泳社
- 発売日: 2017/02/17
- メディア: 単行本(ソフトカバー)
- この商品を含むブログ (1件) を見る
RxJava - Reactive Streams とは
今回は、Reactive Streams について見ていきます。
Reactive Streams は、ライブラリやフレームワークに依存せず、データストリームを非同期的に扱うことができる仕組み・インタフェースを提供しています。
The purpose of Reactive Streams is to provide a standard for asynchronous stream processing with non-blocking backpressure.
reactive-streams/reactive-streams-jvm: Reactive Streams Specification for the JVM
- non-blocking で backpressure
- そういう非同期処理の枠組みを作るのが Reactive Streams
同期・非同期に関わらず、I/O処理などの外部のリソースを呼び出した場合、レスポンスが返ってくるまでの間は呼び出し元のメインスレッドは blocking されます。その反対が non-blocking な状態です。
また、データを通知する側(Publisher)に対して、受け取り手(Subscriber)が自分が処理できる能力の分だけ「これくらいに加減してください」とお願いすることで、データストリームが速すぎて処理能力が追いつかない・・・といった事態を防げる仕組みを backpressure と呼びます。
Reactive Streams では以上の2点を実現できるインタフェースを提供します。
Publisher と Subscriber
基本的な構成を説明します。Reactive Streams では、データを生産し通知する側を Publisher と呼び、通知されたデータを受け取り処理する側のことを Subscriber と呼びます。
- Publisher: データの生産者,データを通知する
- Subscriber: データの購読者,通知を受けてデータを受け取り処理する
主な流れを見ていきます。
- Publisher が通知する準備が整ったことを Subscriber へ通知する(onSubscribe)
- Subscriber は受け取るデータ数を併せてリクエストを送る(Subscription#request)
- Publisher はデータ数分データを生成し、データを通知する(onNext)
- Subscriber は通知されたデータをもとに処理を行い、再び Publisher へリクエストを送る(Subscription#request)
- Publisher はデータを通知する(onNext)
- これを完了(onComplete)もしくはエラーが起きる(onError)まで続ける
Publisher の処理と Subscriber の処理が別のスレッド上で行われていて、Publisher の処理のほうが速い場合、 Subscriber は処理が追いつかない状況になりかねません。Subscriber がリクエストの際にデータ数を渡している部分は、 Publisher へ自らの処理できる分だけデータを要求できます。これが backpressure にあたります。
上記の流れをまとめると、 Publisher と Subscriber は4つのプロトコルを用いてデータをやり取りしています。
プロトコル | 概要 |
---|---|
onSubscribe | データの通知の準備が整ったことを通知する |
onNext | データを通知する |
onError | エラーの通知 |
onComplete | 完了の通知 |
また、インタフェースは以下の4つに絞られます。
インタフェース | 概要 |
---|---|
Publisher | データを生産し,通知する役割を担う |
Subscriber | データの通知を受け取り,処理行う |
Subscription | データ数のリクエスト,購読の解除を行う |
Processor | Publisher および Subscriber の両方の性質を併せ持つ |
Reactive Streams の概要と、流れについて説明しました。
次の記事
参考
RxJavaリアクティブプログラミング (CodeZine BOOKS)
- 作者: 須田智之
- 出版社/メーカー: 翔泳社
- 発売日: 2017/02/17
- メディア: 単行本(ソフトカバー)
- この商品を含むブログ (1件) を見る