22時に寝ようと思って2時に寝る。

備忘録や日記を書いてます。きょうは早く寝よう。

RxJava - IntelliJ IDEA で RxJava の環境構築

実際に RxJava のサンプルコードを書いていく上での環境を構築していきます。今回はエディタとして IntelliJ IDEA を用います。

www.jetbrains.com

動作確認環境

  • OS: macOS High Sierra 10.13.6
  • IntelliJ IDEA: 2018.2.1 (Ultimate Edition) Build #IU-182.3911.36
  • JRE: 1.8.0_152-release-1248-b8 x86_64
  • JVM: OpenJDK 64-Bit Server VM by JetBrains s.r.o

環境構築

今回導入する RxJava のバージョンは記事執筆時点で最新の 2.2.0 です。

Release 2.2.0 · ReactiveX/RxJava · GitHub

IntelliJ IDEA で新たにプロジェクトを作成する

  • 起動後の Welcome to IntelliJ IDEA 画面にて、+ Create New Projectを選択

f:id:azuuun:20180818223403p:plain

  • サイドメニューから Gradle を選び、Next で次へ

f:id:azuuun:20180818223330p:plain

  • Project SDKについて聞かれた場合は IntelliJ IDEA 側でよしなにやってくれていると思いますので、そのとおり進めてください
  • GroupId, ArtifactId, Versionについて
    • https://improve-future.com/what-are-groupid-artifactid.html でわかりやすく説明されています
    • GroupId はプロジェクトを一意に識別できるもので com.example.azuuun.rxjava といったフォーマットで入力
    • ArtifactId は任意のプロジェクト名 RxJavaPlayground といったものを入力
    • Version はデフォルトでよいと思います
  • 次の画面の module の設定画面はデフォルトで進めて問題ないです
  • ProjectName などは先程設定した ArtifactId がデフォルトで設定されていますが、適当に名前を入力します

Gradle でライブラリを適用する

f:id:azuuun:20180818224704p:plain

  • プロジェクトが作成された後、build.gradle ファイルを開きます
  • dependenciescompile 'io.reactivex.rxjava2:rxjava:2.2.0'追記
  • ファイル右上に Enabled Auto-Import といった表示があればクリックします
  • 正常に導入されるかを確認

サンプルコードを書いてみる

HelloRxJavaというファイルを作成し、以下のコードを書いて実行してみましょう。

import io.reactivex.*;
import io.reactivex.schedulers.Schedulers;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class HelloRxJava {
    public static void main(String[] args) throws Exception {

        Flowable<String> flowable =
                Flowable.create(new FlowableOnSubscribe<String>() {
                    @Override
                    public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                        String[] datas = { "Hello, RxJava", "こんにちは、RxJava" };
                        for (String data : datas) {
                            if (emitter.isCancelled()) {
                                return;
                            }

                            // データを通知する
                            emitter.onNext(data);
                        }

                        // 完了したことを通知する
                        emitter.onComplete();
                    }
                }, BackpressureStrategy.BUFFER);

        flowable.observeOn(Schedulers.computation())
                .subscribe(new Subscriber<String>() {

                    private Subscription subscription;

                    // 購読を開始したときの処理
                    @Override
                    public void onSubscribe(Subscription s) {
                        // Subscription を Subscriber 内で保持
                        this.subscription = s;
                        // 受け取るデータ数の要求
                        this.subscription.request(1L);
                    }

                    // 通知を受け取ったときの処理
                    @Override
                    public void onNext(String data) {
                        String threadName = Thread.currentThread().getName();
                        System.out.println(String.format("%s: %s", threadName, data));

                        // 次に受け取るデータのリクエスト
                        this.subscription.request(1L);
                    }

                    // エラー通知を受け取ったときの処理
                    @Override
                    public void onError(Throwable t) {
                        t.printStackTrace();
                    }

                    // 完了通知を受け取ったときの処理
                    @Override
                    public void onComplete() {
                        String threadName = Thread.currentThread().getName();
                        System.out.println(String.format("%s: 完了しました", threadName));
                    }
                });

        Thread.sleep(500L);
    }
}

以下のような出力が出れば、環境は正常に構築できています。

RxComputationThreadPool-1: Hello, RxJava
RxComputationThreadPool-1: こんにちは、RxJava
RxComputationThreadPool-1: 完了しました

以上です。

次の記事

azunobu.hatenablog.com

参考

RxJavaリアクティブプログラミング (CodeZine BOOKS)

RxJavaリアクティブプログラミング (CodeZine BOOKS)

RxJava - Cold および Hot な生産者(Flowable / Observable)

今回は、生産者(Publisher)は大きく分けて Cold と Hot の2つに分類することができます。それら2つについて説明してきます。

Cold な生産者

f:id:azuuun:20180818180031p:plain

  • Cold な生産者は 1つの消費者とのみ購読関係を結ぶ
  • 1つの消費者と購読関係を結ぶ度にデータを通知するためのタイムラインを生成する

Hot な生産者

f:id:azuuun:20180818181001p:plain

  • Hot な生産者は 複数の消費者と購読関係を結ぶ
  • 既に生成しておいたタイムラインに対して購読関係を結んだ消費者が後から加わるかたち

購読のタイミング

  • Cold な生産者に対する購読は、購読開始時点で生産者側でデータの生産が開始される
  • Hot な生産者に対する購読は、購読を開始したといっても生産者側でデータが生成されるとは限らない
  • Hot な生産者を購読する場合、途中からのデータを通知される可能性もあるため、複数の購読者が同じデータを受け取れるとは限らない

RxJava での Cold / Hot な生産者

  • 基本的に生成メソッドで作られる生産者は Cold な生産者である
  • Hot な生産者をつくるには Cold な生産者を作ってから Hot な生産者へと変換するメソッドを呼んで変換するか、 Processor や Subject をつくる
  • Hot な Flowable / Observable として ConnectionFlowable / ConnectionObservable がある
    • Cold な Flowable / Observable を Hot な生産者に変換するメソッドを呼ぶと 上記2つが生成される

次の記事

azunobu.hatenablog.com

参考

RxJavaリアクティブプログラミング (CodeZine BOOKS)

RxJavaリアクティブプログラミング (CodeZine BOOKS)

RxJava - オペレータについて

今回は、Publisher から通知されたデータを受け取って処理する際に用いるオペレータを見ていきます。

段階的にデータを整形する

RxJava では、Publisher(Flowable / Observable)から Subscriber(Subscriber / Observer)にデータが通知される間に Subscriber がデータを利用しやすい形に整形、変換することができます。整形されたデータは再び Flowable / Observable なデータとして返されるため、メソッドチェインする形で段階的にデータを整形することが可能です。こういったデータを生成したり、変換したり、フィルターをかけたりできるメソッドのことを RxJava ではオペレータと呼びます。

メソッドをつなげて Subscriber へ通知されていく流れ

f:id:azuuun:20180818001129p:plain

上記の図は、流れてきた数値データを x5 していくオペレータのイメージ図です。丸い円のことをマーブルと呼んだりします。これをコードで表記すると以下のようになります。

public static void main(String[] args) {
    Flowable<Integer> flowable =
        // 1 から 10 までの数値を順に通知していくデータを生成
        Flowable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        // 通知するデータを 5 倍していく
        .map(data -> data * 5);
        
    flowable.subscribe(data -> System.out.println("data=" + data));
}

期待される実行結果は以下です。

data=5
data=10
data=15
...
data=45
data=50

おおまかな順序は以下のとおりです。

  1. just メソッドを用いて、引数通して渡した左から順番にデータを通知していく Flowable を生成する
  2. map メソッドを用いて、 Flowable によって通知されてきたデータを一つずつ 5 倍する

このように通知されてきたデータを一気に処理していくのではなく、オペレータを使用して Observable を返しながら 最終的に通知したいデータへ制御できます。

関数型インタフェースに影響を受けた RxJava

RxJava はメソッドの多くが関数型インタフェースを引数として受け取るようになっており、関数型プログラミングの原則に従い、

  • 同じ入力を受け取ったら毎回同じ結果を返す
  • 入力値や処理の外部のオブジェクトおよび環境に対して何も変化を起こさない

以上の2つが基本となります。

副作用を避ける

オペレータによって受け取ったデータの状態に変化を与えたり、外部に対して何らかの変化を与えたりすることを副作用と呼びます。処理の外部から参照型のオブジェクトの値を変える、データベースやファイルを更新するといったものが、副作用にあたります。副作用を避けることによって、責任範囲がより明確になります。このことより、以下のことが期待できます。

  • 複数のスレッドから参照されるオブジェクトがないため、スレッドセーフになる
  • 仮にある実装がパフォーマンス改善のために同期処理から非同期処理に置き換わったとしても、内部の処理は修正せずに動き続ける

データの処理のタイミングに注意する

関数型プログラミングでは、引数で渡したデータはその時点で評価された値を受け取ります。引数に式を記述したとしても、既に評価された値しか渡されません。つまり、メソッドが実行される前から値が決まっています。

例として以下のコードでは、Flowable が生成された際にシステム時間が評価され、値が確定します。

Flowable<Long> flowable =
    Flowable.just(System.currentTimeMillis());

一方で、以下のコードでは購読されたタイミングでfromCallbackメソッドが呼び出され、呼び出された瞬間にシステムの時間が決定され通知されます。

print(() -> System.currentTimeMills());

つまり、

  • justメソッドによるデータを通知する Flowable を何度 onSubscribe で購読したとしても同じ値を返す
  • 一方で fromCallBack メソッドによって生成された Flowable は呼び出すたびに式が評価され値が変化する

次の記事

azunobu.hatenablog.com

参考

RxJavaリアクティブプログラミング (CodeZine BOOKS)

RxJavaリアクティブプログラミング (CodeZine BOOKS)

RxJava - Flowable / Observable と Subscriber / Observer について

今回は、RxJava における Reactive Streams 対応について見てきます。

RxJava における生産者と購読者

Reactive Streamsの概念では、データを生産し通知する役割を担う生産者と通知されてきたデータを受け取って処理を行う消費者がいます。消費者が生産者を購読することで生産者からの通知を受け取ることができ、データを受け取ることが可能になります。

RxJava では、この両者の関係が2つ存在します。

生産者 消費者
Reactive Streams 対応 Flowable Subscriber
Reactive Streams 非対応 Observable Observer

大きな特徴として、Reactive Streams 非対応の Observable / Observer では backpressure がありません。前回、記事でも説明した backpressure は以下のようなことを実現します。

データを通知する側(Publisher)に対して、受け取り手(Subscriber)が自分が処理できる能力の分だけ「これくらいに加減してください」とお願いすることで、データストリームが速すぎて処理能力が追いつかない・・・といった事態を防げる仕組みを backpressure と呼びます。

RxJava - Reactive Streams とは - 22時に寝ようと思って2時に寝る。

Flowable と Subscriber

Reactive Streams の生産者 Publisher にあたるものを実装したのが Flowable です。また、Subscriber は Reactive Streams と同様です。Reactive Streams に対応しているため、基本的な仕組みは Reactive Streams と同じと考えても違和感なく扱うことができるようになっています。

  • 生産者 Flowable が Subscriber に対して以下の通知を随時行う
    • データの準備ができた: onSubscribe
    • データを通知する: onNext
    • エラーが発生した: onError
    • 完了した: onComplete

そして、 Subscription インタフェースを通してデータ数のリクエスト( backpressure を実現できる)や購読の解除を行います。

Observable と Observer

RxJavaの2系(2.x)で提供されている Observable と Observer は、Reactive Streams に基づいた実装ではないため Reactive Streams のインタフェースは用いていません。ですが、 backpressure という特徴がないこと以外は基本的に構成は似ています。

  • 生産者 Observable が Observer に対して以下の通知を行う
    • データの準備ができた: onSubscribe
    • データを通知する: onNext
    • エラーが発生した: onError
    • 完了した: onComplete

Subscription インタフェースがない代わりに、 Disposable というメソッドを使って購読の解除を行います。Disposable は onSubscribe メソッドの引数として渡され、dispose()というように呼び出すことで購読を解除することが可能です。

次の記事

azunobu.hatenablog.com

参考

RxJavaリアクティブプログラミング (CodeZine BOOKS)

RxJavaリアクティブプログラミング (CodeZine BOOKS)

RxJava - Reactive Streams のルール

Reactive Streamsの仕組みを理解する上で重要なルールを簡単に説明します。

詳しくは、以下の公式リポジトリの README.md を参照してください。

github.com

重要なルールは主に4つです。

  • 購読を開始する通知である onSubscribe は購読につき一度だけ通知する
  • 通知は順を追ってつぎつぎに行われる
  • null は通知しない
  • Publisherの処理の終了は、 onComplete(処理の完了)または onError (処理の異常終了)を通知することで行う

onSubscribe は購読につき一度だけ通知する

購読の開始を意味する onSubscribe は、一度だけしか行われません。

通知は順を追って次々に行われる

通知は必ず一つずつ順次(シーケンシャルに)行われます。複数の通知が一度に行われることはあり得ません。RxJava では Observable契約 というルールがあり、これがデータが複数同時に通知されることによる不整合を防いでくれています。

null は通知しない

null は通知できません。Reactive Streams で 仮に null が通知されようとしたときは NullPointerException が起こるようになっています。通常のデータの通知である onNext だけでなく、 エラー時の通知 onError においても null は許容されません。

完了またはエラーによって Publisher は処理を終了する

Publisher が処理を終了する契機は2つしかありません。onComplete で完了させるか、 onError によってエラーで終了するか、です。これら2つのいずれかが通知された購読は、それ以降データを通知することはありません。つまり、 onComplete や onError の後に何らかの処理を続行することはできません。

次の記事

azunobu.hatenablog.com

参考

RxJavaリアクティブプログラミング (CodeZine BOOKS)

RxJavaリアクティブプログラミング (CodeZine BOOKS)

RxJava - Reactive Streams とは

今回は、Reactive Streams について見ていきます。

Reactive Streams は、ライブラリやフレームワークに依存せず、データストリームを非同期的に扱うことができる仕組み・インタフェースを提供しています。

github.com

The purpose of Reactive Streams is to provide a standard for asynchronous stream processing with non-blocking backpressure.

reactive-streams/reactive-streams-jvm: Reactive Streams Specification for the JVM

  • non-blocking で backpressure
  • そういう非同期処理の枠組みを作るのが Reactive Streams

同期・非同期に関わらず、I/O処理などの外部のリソースを呼び出した場合、レスポンスが返ってくるまでの間は呼び出し元のメインスレッドは blocking されます。その反対が non-blocking な状態です。

また、データを通知する側(Publisher)に対して、受け取り手(Subscriber)が自分が処理できる能力の分だけ「これくらいに加減してください」とお願いすることで、データストリームが速すぎて処理能力が追いつかない・・・といった事態を防げる仕組みを backpressure と呼びます。

Reactive Streams では以上の2点を実現できるインタフェースを提供します。

Publisher と Subscriber

基本的な構成を説明します。Reactive Streams では、データを生産し通知する側を Publisher と呼び、通知されたデータを受け取り処理する側のことを Subscriber と呼びます。

  • Publisher: データの生産者,データを通知する
  • Subscriber: データの購読者,通知を受けてデータを受け取り処理する

主な流れを見ていきます。

  1. Publisher が通知する準備が整ったことを Subscriber へ通知する(onSubscribe)
  2. Subscriber は受け取るデータ数を併せてリクエストを送る(Subscription#request)
  3. Publisher はデータ数分データを生成し、データを通知する(onNext)
  4. Subscriber は通知されたデータをもとに処理を行い、再び Publisher へリクエストを送る(Subscription#request)
  5. Publisher はデータを通知する(onNext)
  6. これを完了(onComplete)もしくはエラーが起きる(onError)まで続ける

Publisher の処理と Subscriber の処理が別のスレッド上で行われていて、Publisher の処理のほうが速い場合、 Subscriber は処理が追いつかない状況になりかねません。Subscriber がリクエストの際にデータ数を渡している部分は、 Publisher へ自らの処理できる分だけデータを要求できます。これが backpressure にあたります。

上記の流れをまとめると、 Publisher と Subscriber は4つのプロトコルを用いてデータをやり取りしています。

プロトコル 概要
onSubscribe データの通知の準備が整ったことを通知する
onNext データを通知する
onError エラーの通知
onComplete 完了の通知

また、インタフェースは以下の4つに絞られます。

インタフェース 概要
Publisher データを生産し,通知する役割を担う
Subscriber データの通知を受け取り,処理行う
Subscription データ数のリクエスト,購読の解除を行う
Processor Publisher および Subscriber の両方の性質を併せ持つ

Reactive Streams の概要と、流れについて説明しました。

次の記事

azunobu.hatenablog.com

参考

RxJavaリアクティブプログラミング (CodeZine BOOKS)

RxJavaリアクティブプログラミング (CodeZine BOOKS)

RxJava - リアクティブプログラミングとは

最近、学び始めたリアクティブプログラミングについて備忘録としてまとめていきます。今回は、概要について触れます。

リアクティブプログラミングとは

リアクティブプログラミング(Reactive Programming)は、最近注目されているプログラミングパラダイムの一つで、特定の言語やフレームワークにかかわらず適用できるスタイルのようなものです。

  • データフローの定義
  • 変更の伝搬

以上の2つの段階を記述することで、あとは変更があるたびに伝搬させることができ、何らかのイベントに反応するプログラムを比較的容易に実現することが可能になります。

例えば、GPSの位置情報を取得するときに適用できます。移動し座標(緯度・軽度)が変化するたびに位置情報データが送信され、移動をやめるとデータの送信も止まるように、一定期間の位置情報データを一片に送信するのではなく、変化が起きる度にデータを送信していくのが特徴です。

リアクティブプログラミングでは、このような流れのことをデータストリームと呼びます。ListやCollectionといった一塊のデータの集合だけではなく、将来的に発生するであろうデータも含めてデータの集合体として扱います。

データを受け取ることに徹する

キーボード上で「hello」と打ち込む操作はどうでしょうか。

  1. h keyを打ち込む
  2. e keyを打ち込む
  3. l key を打ち込む
  4. l key を打ち込む
  5. o key を打ち込む

上記のように 1. の段階で、それ以降なにが打ち込まれるかに関わらず「何らかの key を押した」というイベントのデータが生成されたとみなすことができます。 key を複数回押した場合、その押した回数だけ「key を押した」というデータが生成されているということです。つまり、キーボード入力の一つ一つのキータッチにおいても、データストリームとして扱うことが可能です。

ここで重要なのは、リアクティブプログラミングはこのようなデータストリームから流れてくるデータに対して、受け取り手のロジックは受け取る度にデータを順次処理していく仕組みになっていることです。ロジック側が何らかの契機に自分からどんなイベントが発生したのかを能動的に取得していくのではなく、送らてきた(通知された)データを受け取る度に処理をするリアクティブなロジックになっています。

インクリメンタルサーチ

Image from Gyazo

Googleなどの検索窓に何らかのキーワードを打ち込むと、打ち込んでから何ミリ秒か後に「もしかして、これをお探しですか?」といくつかの候補を提案してくれる機能があります。これはインクリメンタルサーチと呼ばれ、あらゆる場面で目にする一般的なものです。

このインクリメンタルサーチに、リアクティブプログラミングを活用できます。先ほどの例のように、キーボードで何らかのキーワードを入力されたのと同時に検索窓のロジックへデータが送信(通知)されます。通知されたあと次の通知を待ちつつ、候補のキーワードを探しリストの形でUIに出力する処理が走ります。これがリアクティブプログラミングではない場合、「検索する」ボタンであったりキーボードで Enter キーを叩くなどしないとユーザーは検索結果を受け取ることができません。

検索窓にイベントリスナーを用意する実装との違い

今の説明だけでは、検索窓に「キーが入力されたこと」を監視するイベントリスナーをつくり、同様の処理をさせるロジックを書くことと何ら変わらないように思えます。しかし、「何が」データが発生した後の具体的な処理を担うのかが違ってきます。

イベントリスナーで監視する場合、検索窓(フォーム)がキーが入力されたことを検知し、フォーム自体が「フォームの下に検索候補のリストを表示させる」ことまでを担当します。

リアクティブプログラミングでは、検索窓が「フォームの下に検索候補のリストを表示させる」ことまで担当しません。あくまでも、検索窓は通知を送るだけに徹しており、その通知を受け取り検索候補を表示するのは「検索候補を表示するリスト」自身です。

結局、どんなメリットがあるか

  • どのロジックが何をするか、といった責任の影響範囲が明確になる
  • データが生まれる側は「データを通知する」という部分までが自身の責任になり、そのデータの受け取り手がそのデータをどのように使用するかは気にしなくて良い
  • 気にしなくてもいいので、通知を送ったあとはすぐに次のデータの監視へ徹することができる(受け取り手の処理が完了するのを待つ必要もない)
    • この性質が非同期処理と相性が良く、非同期処理にまつわるプログラムを容易に実装することができる

次の記事

azunobu.hatenablog.com

参考

RxJavaリアクティブプログラミング (CodeZine BOOKS)

RxJavaリアクティブプログラミング (CodeZine BOOKS)