
関数型プログラミングのトピックに興味を持ち、私は岐路に立たされました-慣れるためにどのフレームワークを選択するか。 ReactiveCocoaはiOS界のベテランであり、彼に関する多くの情報を持っています。 しかし、彼はObjective-Cで成長しましたが、これは問題ではありませんが、現時点では主にSwiftについて書いていますが、元々言語のすべての長所を考慮して設計されたソリューションを採用したいと思います。 RxSwiftはReactive Extensionsの同じポートであり、長い歴史がありますが、ポート自体は新しく、Swift専用に書かれています。 私はそれにとどまることにしました。
しかし、RxSwiftのドキュメントの特異性は、すべてのコマンドの説明がreactx.ioにあることであり、主に一般的な情報であり、開発者の手はまだRxSwift専用のドキュメントを作成するためにまだ到達していません。 一部のチームには実装に微妙な点がありますが、一般的なドキュメントには言及しかないものがあります。
Wikiのすべての章をRxSwift githubで読んだ後、私はすぐに公式の例を扱うことにしました。これはRXでは機能しないことが明らかになりました。基本をよく理解する必要があります。 理解するのが最も難しいチーム、次に理解できると思われるチームを分析し始めましたが、それらについて自分に質問すると、正しく答える方法を推測しているだけであることがわかりましたが、わかりません。
一般的に、これ以上何もせずに、私はすべてのRxSwiftオペレーターを解決することにしました。 プログラミングで何かを理解する最良の方法は、コードを実行して、その動作を確認することです。 次に、リアクティブプログラミングの詳細を考慮すると、スキームは非常に便利です。ロシア語で簡単に説明します。 今日の仕事を終えて、リアクティブプログラミングのトピックだけに注目している人と結果を共有しないのは罪だと思いました。
カットの下にたくさんの写真とテキスト、たくさん!まず、
公式ドキュメントをご覧になることをお勧めします。基本ではなく、RxSwiftコマンドの主な本質と具体性を説明しました。
スキーム内のボール、いわゆる
RxMarblesで「
遊び回る 」こともできます
。iPhone/ iPad用の無料バージョンがあります
そのため、この記事では、すべて(まあ、またはほぼすべて)のRxSwiftコマンドを検討します。それぞれについて、簡単な説明、図(意味がある場合)、コード、実行結果を示し、必要に応じて、ログへのコード実行結果の出力についてコメントします。
この記事では、各チームの見出しは公式ドキュメントへのリンクです。 私は、チームのすべてのニュアンスを翻訳するという目標を自分で設定しませんでした。
ここに、公式のRxSwiftリポジトリをクローンし、サンドボックス(DescribeOperators.playground)を追加
したgithubへのリンクがあります。ここでは、記事とほぼ同じコードになります。
そして、ここにすべてのコマンドがmindMapの形式で収集さ
れるPDFへの
リンクがあります。これにより、すべてのコマンドをすばやく表示できます。 チームで作業するために必要なパラメーターをどのように、どのように使用するかを確認するために、PDFのコードが適用されます。 最初は、このPDFのために、すべてを開始しました-スキームを備えたすべてのコマンドが明確に表示されるドキュメントを手元に用意しました。 PDFは(重量ではなくワークスペースの点で)巨大であることが判明しましたが、iPad 2でもすべてが正常に表示されることを確認しました。
PMのすべてのエラーについて書いてください。テキストの4回目の校正で私の目が呪われた後、作業量が少し多すぎることがわかりました。
私の仕事が誰かに役立つことを願っています。 始めましょう。
内容
注釈オブザーバブルの作成
asObservable作成する延期空っぽエラー間隔ただ決しての範囲repeatElementタイマー組み合わせ観察可能
amb組み合わせる連結併合するstartWithswitchLatestwithLatestFromzipフィルタリング
distinctUntilChangedelementAtフィルターignoreElementsサンプル独身飛ばすスキップ(期間)skipUntilスキップしながらskipWhileWithIndex取る取る(期間)takeLasttakeUntil取りながらtakeWhileWithIndexスロットル変換
緩衝flatMapflatMapFirstflatMapLatestflatMapWithIndex地図mapWithIndex窓数学および集計演算子
減らすスキャンtoArrayエラー処理
catchErrorcatchErrorJustReturn再試行再試行するときConnectable Observableを操作するための演算子
マルチキャスト発行するrefCount返信するreplayAllヘルパーメソッド
デバッグdoOn / doOnNextdelaySubscriptionobserveOn購読するsubscribeOnタイムアウトを使用してスキームでは、
Source Observableとして
Source / SOという表記を使用し、
Result Observableとして
RO / Resultを使用します。
補助コードとして、createSequenceWithWait関数を使用します。これは、要素間の指定された間隔で要素の配列からObservableを作成します。
public enum ResultType { case Infinite case Completed case Error } public func createSequenceWithWait<T, U>(array: [T], waitTime: Int64 = 1, resultType: ResultType = .Completed, describer: ((value: T) -> U)? = nil) -> Observable<U> { return Observable<U>.create{ observer in for (idx, letter) in array.enumerate() { let time = dispatch_time(dispatch_time_t(DISPATCH_TIME_NOW), waitTime * Int64(idx) * Int64(NSEC_PER_SEC)) dispatch_after(time, dispatch_get_main_queue()) { if let describer = describer { let value = describer(value: letter) observer.on(.Next(value)) } else { observer.on(.Next(letter as! U)) } } } if resultType != .Infinite { let allTime = dispatch_time(dispatch_time_t(DISPATCH_TIME_NOW), waitTime * Int64(array.count) * Int64(NSEC_PER_SEC)) dispatch_after(allTime, dispatch_get_main_queue()) { switch resultType { case .Completed: observer.onCompleted() case .Error: observer.onError(RxError.Unknown) default: break } } } return NopDisposable.instance } }
関数の例-コンソールで出力を分離するだけで、そのコードは次のとおりです(RxSwiftから取得)
public func example(description: String, action: () -> ()) { print("\n--- \(description) example ---") action() }
時間遅延を処理する必要があるすべての例で、このコードがサンドボックスで実行される場合、登録する必要があります
import XCPlayground XCPlaygroundPage.currentPage.needsIndefiniteExecution = true
また、読者は、リアクティブプログラミングが一般的であり、特にRxSwiftが何であるかについて一般的な理解を持っていることも理解されています。 別の導入部をブロックするのが理にかなっているかどうかはわかりません。
オブザーバブルの作成
Obxableへの変換をサポートする場合、このメソッドはRxSwiftクラスに実装されます。 例:ControlEvent、ControlProperty、変数、ドライバー
example("as Observable") { let variable = Variable<Int>(0) variable.asObservable().subscribe { e in print(e) } variable.value = 1 }
コンソール:
--- as Observable example --- Next(0) Next(1) Completed
この例では、VariableをObservableに変換し、そのイベントにサブスクライブしました。
このメソッドを使用すると、ゼロからObservableを作成し、どの要素といつ生成するかを完全に制御できます。
example("create") { let firstSequence = Observable<AnyObject>.of(1, 2, 3) let secondSequence = Observable<AnyObject>.of("A", "B", "C") let multipleSequence = Observable<Observable<AnyObject>>.create { observer in observer.on(.Next(firstSequence)) observer.on(.Next(secondSequence)) return NopDisposable.instance } let concatSequence = multipleSequence.concat() concatSequence.subscribe { e in print(e) } }
コンソール:
--- create example --- Next(1) Next(2) Next(3) Next(A) Next(B) Next(C)
この例では、Observableを手動で作成し、2つのObservableを生成します。その後、その要素を
concatコマンドと組み合わせ、結果のObservableに
サブスクライブします
このステートメントを使用すると、subscribeを使用してサブスクライブするまでObservableの作成を遅らせることができます
example("without deferred") { var i = 1 let justObservable = Observable.just(i) i = 2 _ = justObservable.subscribeNext{ print ("i = \($0)") } } example("with deferred") { var i = 1 let deferredJustObservable = Observable.deferred{ Observable.just(i) } i = 2 _ = deferredJustObservable.subscribeNext{ print ("i = \($0)") } }
コンソール:
--- without deferred example --- i = 1 --- with deferred example --- i = 2
最初のケースでは、Observable.just(i)を使用してObservableがすぐに作成され、iの値を変更しても、このシーケンスによって生成される要素には影響しません。 2番目のケースでは、deferredを使用して作成し、サブスクライブする前にiの値を変更できます
Completedで終わる空のシーケンス

example("empty") { let sequence = Observable<Int>.empty() sequence.subscribe { e in print(e) } }
コンソール:
1つのイベント-エラーで構成されるシーケンスを作成します

example("error") { let sequence = Observable<Int> .error(RxError.Unknown) sequence.subscribe { e in print(e) } }
コンソール:
--- error example --- Error(Unknown error occured.)
指定した頻度で、0から1ずつ増加する無限シーケンスを作成します

example("interval") { let sequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance) sequence.subscribe { e in print(e) } }
コンソール:
--- interval example --- Next(0) Next(1) Next(2) Next(3) Next(4) ....
完了する任意の値のシーケンスを作成します。

example("just") { let sequence = Observable.just(100) sequence.subscribe { e in print(e) } }
コンソール:
オブザーバーが呼び出されることのない空のシーケンス、つまり イベントは生成されません

example("never") { let sequence = Observable<Int>.never() sequence.subscribe { e in print(e) } }
コンソール:
すべての要素が完了した後の可変変数からのシーケンス

example("simple of") { let sequence = Observable.of(1, 2) sequence.subscribe { e in print(e) } } example("of for Observables") { let firstSequence = Observable<AnyObject>.of(1, 2, 3) let secondSequence = Observable<AnyObject>.of("A", "B", "C") let bothSequence = Observable.of(firstSequence, secondSequence) let mergedSequence = bothSequence.merge() mergedSequence.subscribe { e in print(e) } }
コンソール:
--- simple of example --- Next(1) Next(2) Completed --- of for Observables example --- Next(1) Next(2) Next(3) Next(A) Next(B) Next(C) Completed
最初のケースでは、2つの数字のシーケンスを作成しました。 2つのObservableの2番目で、それらはマージ演算子を使用して結合されました
指定された値から指定された回数だけ1ずつ増加する有限数の要素を持つシーケンスを作成します。すべての要素の後にCompletedが生成されます

example("range") { let sequence = Observable.range(start: 5, count: 3) sequence.subscribe { e in print(e) } }
コンソール:
--- range example --- Next(5) Next(6) Next(7) Completed
エレメントは、5から始まり、1の増分で3回生成されました
指定された要素を遅滞なく作成します。 完了イベントまたはエラーイベントは生成されません。

example("repeatElement") { let sequence = Observable.repeatElement(1, scheduler: MainScheduler.instance) sequence.subscribe { e in print(e) } }
コンソール:
--- repeatElement example --- Next(1) Next(2) Next(3) .....
示された頻度と開始時の遅延の可能性を伴う、1の増分で0から増加する無限シーケンス。 完了イベントまたはエラーイベントは生成されません。

example("timer") { let sequence = Observable<Int64>.timer(2, period: 3, scheduler: MainScheduler.instance) sequence.subscribe { e in print(e) } }
コンソール:
--- timer example --- Next(0) Next(1) Next(2)
この例では、シーケンスは3秒ごとに2秒の遅延で要素の生成を開始します
組み合わせ観察可能
SO = [Observable<T>] SO1, SO2 = Observable<T> RO = Observable<T>
すべての観測可能なSOから、最初に要素の生成を開始するSOが選択され、その要素はROで複製され、残りのSOは無視されます

example("amb") { let subjectA = PublishSubject<Int>() let subjectB = PublishSubject<Int>() let subjectC = PublishSubject<Int>() let subjectD = PublishSubject<Int>() let ambSequence = [subjectA, subjectB, subjectC, subjectD].amb() ambSequence.subscribe { e in print(e) } subjectC.onNext(0) subjectA.onNext(3) subjectB.onNext(102) subjectC.onNext(1) subjectD.onNext(45) }
コンソール:
--- amb example --- Next(0) Next(1)
なぜなら 最初にsubjectC要素を生成しました-その要素のみがROで複製され、残りは無視されます
SO = SO1, SO2,... SON = Observable<T> RO = Observable<f(T,T)>
すべてのObservablesが少なくとも1つの要素を生成するとすぐに、これらの要素は渡された関数のパラメーターとして使用され、この関数の結果はROによって要素として生成されます。 さらに、Observable要素が生成されると、結合されたすべてのObservableの最後の要素を含む新しい関数結果が生成されます

example("combineLatest") { let firstSequence = createSequenceWithWait([1,2,3], waitTime: 2) { element in "\(element)" }.debug("firstSequence") let secondSequence = createSequenceWithWait(["A", "B", "C"], waitTime: 1) { element in "\(element)" } .delaySubscription(3, scheduler: MainScheduler.instance) .debug("secondSequence") let concatSequence = Observable.combineLatest(firstSequence, secondSequence) { first, second -> String in "\(first) - \(second)" } concatSequence.subscribe { e in print(e) } }
コンソール:
--- combineLatest example --- 2016-04-12 16:59:35.421: firstSequence -> subscribed 2016-04-12 16:59:35.422: secondSequence -> subscribed 2016-04-12 16:59:35.434: firstSequence -> Event Next(1) 2016-04-12 16:59:37.423: firstSequence -> Event Next(2) 2016-04-12 16:59:38.423: secondSequence -> Event Next(A) Next(2 - A) 2016-04-12 16:59:39.423: firstSequence -> Event Next(3) Next(3 - A) 2016-04-12 16:59:39.522: secondSequence -> Event Next(B) Next(3 - B) 2016-04-12 16:59:40.622: secondSequence -> Event Next(C) Next(3 - C) 2016-04-12 16:59:41.722: firstSequence -> Event Completed 2016-04-12 16:59:41.722: firstSequence -> disposed 2016-04-12 16:59:41.722: secondSequence -> Event Completed 2016-04-12 16:59:41.722: secondSequence -> disposed Completed
この例では、createSequenceWithWaitを使用してObservableを作成し、要素が異なる遅延で生成されるようにして、要素がどのように混合されているかを確認します。
firstSequenceは、secondSequenceがAを生成する前に1と2を生成することができたため、1が破棄され、最初の出力は
2-Aでし
た
SO = Observable<Observable<T>> SO1, SO2 = Observable<T> RO = Observable<T>
ROでは、要素は最初に最初のObservableのすべての要素を含み、次にのみ次のObservableを含みます。 これは、最初のObservableがCompletedを生成しない場合、2番目のObservableがROに到達しないことを意味します。 現在のObservableのエラーはROに転送されます

example("concat object method") { let firstSequence = Observable<AnyObject>.of(1, 2, 3) let secondSequence = Observable<AnyObject>.of("A", "B", "C") let concatSequence = firstSequence.concat(secondSequence) concatSequence.subscribe { e in print(e) } } example("concat from array") { let firstSequence = Observable.of(1,2,3) let secondSequence = Observable.of(4,5,6) let concatSequence = Observable.of(firstSequence, secondSequence) .concat() concatSequence.subscribe { e in print(e) } }
コンソール:
--- concat object method example --- Next(1) Next(2) Next(3) Next(A) Next(B) Next(C) Completed --- concat from array example --- Next(1) Next(2) Next(3) Next(4) Next(5) Next(6) Completed
最初の例では、2番目のObservableを最初のObservableに添付します。
2番目では、配列からシーケンスを生成します。
SO = Observable<Observable<T>> RO = Observable<T>
RO要素には、元のObservableでリリースされた順序で元のObservableの要素が含まれます

example("simple merge") { let firstSequence = Observable<AnyObject>.of(1, 2, 3) let secondSequence = Observable<AnyObject>.of("A", "B", "C") let bothSequence = Observable.of(firstSequence, secondSequence) let mergedSequence = bothSequence.merge() mergedSequence.subscribe { e in print(e) } } example("merge with wait") { let firstSequence = createSequenceWithWait([1,2,3]) { element in "\(element)" } let secondSequence = createSequenceWithWait(["A", "B", "C"], waitTime: 2) { element in "\(element)" } let bothSequence = Observable.of(firstSequence, secondSequence) let mergedSequence = bothSequence.merge() mergedSequence.subscribe { e in print(e) } }
コンソール:
--- simple merge example --- Next(1) Next(2) Next(3) Next(A) Next(B) Next(C) Completed --- merge with wait example --- Next(1) Next(A) Next(2) Next(3) Next(B) Next(C) Completed
最初の例では、遅延なしに作成された2つのシーケンスをマージします。その結果、最初のObservableは、2番目のObservableがすべての要素を生成してから、2番目のシーケンスがこれを開始します。結果はconcatと同じです。
2番目のケースでは、シーケンスの生成が遅れており、RO内の要素が元のObservableで生成された順序で混同されていることは明らかです。
SO = Observable<T> RO = Observable<T>
引数として渡された要素は、SOの先頭に追加されます

example("startWith") { let sequence = Observable.of(1, 2, 3).startWith(0) sequence.subscribe { e in print(e) } }
コンソール:
--- startWith example --- Next(0) Next(1) Next(2) Next(3) Completed
SO = Observable<Observable<T>> RO = Observable<T>
最初に、SOによって生成されたO1をサブスクライブします。その要素はROでミラー生成されます。 次のObservableがSOから生成されるとすぐに、前のObservableの要素は破棄されます。 O1からのサブスクリプション解除があり、O2にサブスクライブします。 したがって、ROでは、要素は最後に生成されたObservableからのみです

example("switchLatest") { let varA = Variable<Int>(0) let varB = Variable<Int>(100) let proxyVar = Variable(varA.asObservable()) let concatSequence = proxyVar.asObservable().switchLatest() concatSequence.subscribe { e in print(e) } varA.value = 1 varA.value = 2 varB.value = 3 proxyVar.value = varB.asObservable() varB.value = 4 varA.value = 5 } example("switchLatest") { let observableA = Observable<Int>.create{ observer in delay(0) { observer.on(.Next(10)) } delay(3) { observer.on(.Next(20)) } delay(5) { observer.onCompleted() } return NopDisposable.instance }.debug("oA") let observableB = Observable<Int>.create{ observer in delay(0) { observer.on(.Next(100)) } delay(1) { observer.on(.Next(200)) } delay(2) { observer.onCompleted() } return NopDisposable.instance }.debug("oB") let observableC = Observable<Int>.create{ observer in delay(0) { observer.on(.Next(1000)) } delay(1) { observer.on(.Next(2000)) } delay(2) { observer.onCompleted() } return NopDisposable.instance }.debug("oC") let subjects = [observableA, observableB, observableC] let sequence:Observable<Observable<Int>> = createSequenceWithWait([observableA, observableB, observableC],waitTime:1) {$0} let switchLatestSequence:Observable<Int> = sequence.switchLatest() switchLatestSequence.subscribe { e in print(e) } }
コンソール:
--- switchLatest example --- Next(0) Next(1) Next(2) Next(3) Next(4) Completed --- switchLatest example --- 2016-04-12 17:15:22.710: oA -> subscribed 2016-04-12 17:15:22.711: oA -> Event Next(10) Next(10) 2016-04-12 17:15:23.797: oA -> disposed // oB 2016-04-12 17:15:23.797: oB -> subscribed 2016-04-12 17:15:23.797: oB -> Event Next(100) Next(100) 2016-04-12 17:15:24.703: oB -> disposed // oC 2016-04-12 17:15:24.703: oC -> subscribed 2016-04-12 17:15:24.703: oC -> Event Next(1000) Next(1000) 2016-04-12 17:15:25.800: oC -> Event Next(2000) Next(2000) 2016-04-12 17:15:26.703: oC -> Event Completed 2016-04-12 17:15:26.703: oC -> disposed Completed
最初の例は、Observableを手動で再接続したときの静的モードでのコマンドの動作を示しています。
2番目には、遅延のあるシーケンスがあります。 SOのobservableA、observableB、observableCは1秒ごとに1回生成されます。 それらの要素はさまざまな遅延で生成されます。
SO1, SO2 = Observable<T> RO = Observable<f(T,T)>
O1が要素を生成するとすぐに、O2で少なくとも1つの要素が生成されるかどうかがチェックされます。生成される場合、O1およびO2の最後の要素が取得され、転送された関数の引数として使用されます。その結果は、ROによって要素として生成されます

example("withLatestFrom") { let varA = Variable<Int>(0) let varB = Variable<Int>(10) let withLatestFromSequence = varA.asObservable().withLatestFrom(varB.asObservable()) { "\($0) - \($1)" } withLatestFromSequence.subscribe { e in print(e) } varA.value = 1 varA.value = 2 varB.value = 20 varB.value = 30 varA.value = 5 varA.value = 6 }
コンソール:
--- withLatestFrom example --- Next(0 - 10) Next(1 - 10) Next(2 - 10) Next(5 - 30) Next(6 - 30) Completed
SO = Observable<Observable<T>> RO = Observable<f(T,T)>
RO要素は元のObservableによって生成された要素の組み合わせであり、ユニオンは解放された要素のインデックスに移動します

example("zip with simple Variable") { let varA = Variable<Int>(0) let varB = Variable<Int>(10) let zippedSequence = Observable.zip(varA.asObservable(), varB.asObservable()) { "\($0) - \($1)" } zippedSequence.subscribe { e in print(e) } varA.value = 1 varA.value = 2 varB.value = 20 varB.value = 30 varA.value = 3 varA.value = 4 } example("zip with PublishSubject") { let subjectA = PublishSubject<Int>() let subjectB = PublishSubject<Int>() let zippedSequence = Observable.zip(subjectA, subjectB) { "\($0) - \($1)" } zippedSequence.subscribe { e in print(e) } subjectA.onNext(0) subjectA.onNext(1) subjectA.onNext(2) subjectB.onNext(100) subjectB.onNext(101) subjectA.onNext(3) subjectB.onNext(102) subjectA.onNext(4) }
コンソール:
--- zip with simple Variable example --- Next(0 - 10) Next(1 - 20) Next(2 - 30) Completed --- zip with PublishSubject example --- Next(0 - 100) Next(1 - 101) Next(2 - 102)
例は、元のObservableで生成された順序で要素が結合されることを示しています。
フィルタリング
すべての繰り返し
連続要素をスキップします

example("distinctUntilChanged") { let sequence = Observable.of(1, 2, 2, 3, 4, 4, 4, 1).distinctUntilChanged() sequence.subscribe { e in print(e) } }
コンソール:
--- distinctUntilChanged example --- Next(1) Next(2) Next(3) Next(4) Next(1) Completed
シーケンス全体に固有ではない要素は破棄されますが、連続する要素のみが破棄されるという微妙な点があります。
アカウント内のNによって発行された要素のみがROに分類されます

example("elementAt") { let sequence = Observable.of(0, 10, 20, 30, 40) .elementAt(2) sequence.subscribe { e in print(e) } }
コンソール:
指定された条件を満たさない要素はすべて破棄されます。

example("filter") { let sequence = Observable.of(1, 20, 3, 40) .filter{ $0 > 10} sequence.subscribe { e in print(e) } }
コンソール:
--- filter example --- Next(20) Next(40) Completed
すべての要素を破棄し、完了およびエラーの端末メッセージのみを送信します

example("ignoreElements") { let sequence = Observable.of(1, 2, 3, 4) .ignoreElements() sequence.subscribe { e in print(e) } }
コンソール:
生成されたサンプラーシーケンスの各要素(タイマーとして認識される)-元のシーケンスの
最後に解放された要素を取得し、以前に生成されていない場合はROで複製します。

example("sampler") { let sampler = Observable<Int>.interval(1, scheduler: MainScheduler.instance).debug("sampler") let sequence:Observable<Int> = createSequenceWithWait([1,2,3,4], waitTime: 3).sample(sampler) sequence.subscribe { e in print(e) } }
コンソール:
--- sampler example --- 2016-04-12 18:28:20.322: sampler -> subscribed 2016-04-12 18:28:21.323: sampler -> Event Next(0) Next(1) 2016-04-12 18:28:22.324: sampler -> Event Next(1) // RO , .. 2016-04-12 18:28:23.323: sampler -> Event Next(2) Next(2) 2016-04-12 18:28:24.323: sampler -> Event Next(3) // RO , .. 2016-04-12 18:28:25.323: sampler -> Event Next(4) // RO , .. 2016-04-12 18:28:26.323: sampler -> Event Next(5) Next(3) ...
elements> 1の場合、元のシーケンスから唯一の要素が取得されます-エラーを生成します。 述語オプションがあります

example("single generate error") { let sequence = Observable.of(1, 2, 3, 4).single() sequence.subscribe { e in print(e) } } example("single") { let sequence = Observable.of(1, 2, 3, 5).single { $0 % 2 == 0 } sequence.subscribe { e in print(e) } }
コンソール:
--- single generate error example --- Next(1) Error(Sequence contains more than one element.) --- single example --- Next(2) Completed
最初の例では、元のシーケンスに複数の要素があったため、SOで2番目の要素が生成されたときにエラーが生成されました
2番目の例では、1つの要素のみが述語条件を満たしているため、エラーは生成されませんでした
SOから、最初のN個の要素を破棄します

example("skip") { let sequence = Observable.of(1, 2, 3, 4).skip(2) sequence.subscribe { e in print(e) } }
コンソール:
--- skip example --- Next(3) Next(4) Completed
SOから、最初のNで生成された最初の要素を破棄します

example("skip duration with wait") { let sequence = createSequenceWithWait([1,2,3,4]) { $0 }.skip(2, scheduler: MainScheduler.instance) sequence.subscribe { e in print(e) } }
コンソール:
--- skip duration with wait example --- Next(3) Next(4) Completed
パラメーターとして渡されたシーケンスによる要素の生成の前に生成された要素をSOから破棄します

example("skipUntil") { let firstSequence = createSequenceWithWait([1,2,3,4]) { $0 } let secondSequence = Observable.just(1) .delaySubscription(1, scheduler: MainScheduler.instance) let skippedSequence = firstSequence.skipUntil(secondSequence) skippedSequence.subscribe { e in print(e) } }
コンソール:
--- skipUntil example --- Next(3) Next(4) Completed
delaySubscriptionコマンドを使用して、secondSequenceの要素の生成が1秒間遅延したため、firstSequenceの要素は1秒後にのみROで複製され始めました。
渡された関数によって返される条件が真になるまで、SOから要素を破棄します

example("skipWhile") { let firstSequence = [1,2,3,4,0].toObservable() let skipSequence = firstSequence.skipWhile { $0 < 3 } skipSequence.subscribe { e in print(e) } }
コンソール:
--- skipWhile example --- Next(3) Next(4) Next(0) Completed
渡された関数によって返される条件が真になるまで、SOから要素を破棄します。 skipWhileとの違いは、関数に渡される別のパラメーターが生成された要素のインデックスであることです

example("skipWhileWithIndex") { let firstSequence = [1,2,5,0,7].toObservable() let skipSequence = firstSequence.skipWhileWithIndex{ value, idx in value < 4 || idx < 2 } skipSequence.subscribe { e in print(e) } }
コンソール:
--- skipWhileWithIndex example --- Next(5) Next(0) Next(7) Completed
最初のN個の要素のみがSOから取得されます

example("take") { let sequence = Observable.of(1, 2, 3, 4).take(2) sequence.subscribe { e in print(e) } }
コンソール:
--- take example --- Next(1) Next(2) Completed
最初のN秒間に生成された要素のみがSOから取得されます

example("take duration with wait") { let sequence = createSequenceWithWait([1,2,3,4]) { $0 } let takeSequence = sequence.take(2, scheduler: MainScheduler.instance) takeSequence.subscribe { e in print(e) } }
コンソール:
--- take duration with wait example --- Next(1) Next(2) Completed
最後のN個の要素のみがSOから取得されます。 つまり、SOが要素の生成を終了しなかった場合、ROに入る要素はありません。

example("takeLast") { let sequence = Observable.of(1, 2, 3, 4).takeLast(2) sequence.subscribe { e in print(e) } } example("takeLast with wait") { let sequence = createSequenceWithWait([1,2,3,4]) { $0 } let takeSequence = sequence.takeLast(2) takeSequence.subscribe { e in print(e) } }
コンソール:
--- takeLast example --- Next(3) Next(4) Completed --- takeLast with wait example --- Next(3) Next(4) Completed
2番目の例は、SO内の要素の生成の完了を待機することによるRO内の要素の生成の遅延を示すために与えられています。
パラメーターとして渡されたシーケンスによる要素の生成の開始前にリリースされた要素は、SOから取得されます

example("takeUntil") { let stopSequence = Observable.just(1) .delaySubscription(2, scheduler: MainScheduler.instance) let sequence = createSequenceWithWait([1,2,3,4]) { $0 } .takeUntil(stopSequence) sequence.subscribe { e in print(e) } }
コンソール:
--- takeUntil example --- Next(1) Next(2) Completed
返された関数の条件が返されるまで、要素はSOから取得されます

example("takeWhile") { let sequence = [1,2,3,4].toObservable().takeWhile{ $0 < 3 } sequence.subscribe { e in print(e) } }
コンソール:
--- takeWhile example --- Next(1) Next(2) Completed
返された関数の条件が真になるまで、要素はSOから取得されます。 takeWhileとの違いは、関数に渡される別のパラメーターが生成された要素のインデックスであることです

example("takeWhileWithIndex") { let sequence = [1,2,3,4,5,6].toObservable() .takeWhileWithIndex{ (val, idx) in val % 2 == 0 || idx < 3 } sequence.subscribe { e in print(e) } }
コンソール:
--- takeWhileWithIndex example --- Next(1) Next(2) Next(3) Next(4) Completed
SOから要素のみが取得され、その後N秒間新しい要素はありません。

example("throttle") { let sequence = Observable.of(1, 2, 3, 4) .throttle(1, scheduler: MainScheduler.instance) sequence.subscribe { e in print(e) } } example("throttle with wait") { let sequence = createSequenceWithWait([1,2,3,4]) { $0 } .throttle(0.5, scheduler: MainScheduler.instance) sequence.subscribe { e in print(e) } }
コンソール:
--- throttle example --- Next(4) Completed --- throttle with wait example --- Next(1) Next(2) Next(3) Next(4) Completed
最初のケースでは、SOは遅延なく要素を生成するため、最後の要素のみに新しい要素はありません。
2番目の例では、要素はスロットルに送信されるN秒よりも遅く生成されるため、生成される各要素には十分な時間間隔があります。
変換
SO = Observable<>> RO = Observable<[T]>
特定のルールに従ってSOの要素が配列に結合され、ROで生成されます。 パラメータとして、count、配列内の要素の最大数、およびtimeSpanが渡され、現在の配列がSO要素から満たされるまで待機する最大時間を渡します。 したがって、RO要素は配列[T]であり、長さは0からcountです。

example("buffer") { let varA = Variable<Int>(0) let bufferSequence = varA.asObservable() .buffer(timeSpan: 3, count: 3, scheduler: MainScheduler.instance) bufferSequence.subscribe { e in print("\(NSDate()) - \(e)") } varA.value = 1 varA.value = 2 varA.value = 3 delay(3) { varA.value = 4 varA.value = 5 delay(5) { varA.value = 6 } } }
コンソール: --- buffer example --- 2016-04-12 16:10:58 +0000 - Next([0, 1, 2]) 2016-04-12 16:11:01 +0000 - Next([3]) 2016-04-12 16:11:04 +0000 - Next([4, 5]) 2016-04-12 16:11:07 +0000 - Next([6]) 2016-04-12 16:11:07 +0000 - Completed
配列の長さは3として指定されました-3つの要素が生成されるとすぐに-要素[0、1、2]がROで生成されました-要素3の生成後、3秒の遅延があり、タイムアウトが機能し、配列は完全に埋められませんでした。要素5を生成した後の遅延についても同じことが言えます
各SO要素は個別のObservableに変わり、[O1、O2、O3 ...]のすべての要素がROに結合されます。RO内の要素の生成順序は、ソース[O1、O2、O3 ...]での生成時間に依存します(マージコマンドの場合と同様)
example("flatMap with wait") { let sequence:Observable<Int> = createSequenceWithWait([0,1,2], waitTime: 1) { $0 } let flatMapSequence:Observable<String> = sequence.flatMap{val in createSequenceWithWait([10,11,12], waitTime: 2) { element in "\(element) - \(val)" } } flatMapSequence.subscribe { e in print(e) } }
コンソール: --- flatMap with wait example --- Next(10 - 0) Next(10 - 1) Next(11 - 0) Next(10 - 2) Next(11 - 1) Next(12 - 0) Next(11 - 2) Next(12 - 1) Next(12 - 2) Completed
各SO要素は個別のObservableに変わります。1)最初に、O1にサブスクライブし、その要素はROでミラー生成されます。O1は要素を生成しますが、SOから生成される後続のObservableはすべて破棄されますが、サブスクライブしません。2)O1が終了するとすぐに-新しいObservableが生成されると、サブスクライブされ、その要素がROで複製されます。ポイント1を繰り返しますが、O1の代わりに、最後に生成されたObservableを取得します
example("flatMapFirst") { let sequence:Observable<Int> = Observable.of(10, 20, 30) .debug("sequence") let flatMapSequence:Observable<String> = sequence .flatMapFirst{val in Observable.of(0, 1, 2) .map{"\($0) - \(val)" } } flatMapSequence.subscribe { e in print(e) } } example("flatMapFirst with delay") { let subjectA = Observable<Int>.create{ observer in delay(0) { observer.on(.Next(10)) } delay(1) { observer.on(.Next(20)) } delay(7) { observer.onCompleted() } return NopDisposable.instance }.debug("sA") let subjectB = Observable<Int>.create{ observer in delay(0) { observer.on(.Next(100)) } delay(1) { observer.on(.Next(200)) } delay(2) { observer.onCompleted() } return NopDisposable.instance }.debug("sB") let subjectC = Observable<Int>.create{ observer in delay(0) { observer.on(.Next(1000)) } delay(1) { observer.on(.Next(2000)) } delay(2) { observer.onCompleted() } return NopDisposable.instance }.debug("sC") let subjects = [subjectA, subjectB, subjectC] let sequence:Observable<Int> = createSequenceWithWait([0, 1, 2],waitTime:4){$0} .debug("sequence") let flatMapSequence:Observable<Int> = sequence.flatMapFirst{val in return subjects[val].asObservable() }.debug("flatMapSequence") flatMapSequence.subscribe { e in print(e) } }
コンソール: --- flatMapFirst example --- 2016-04-12 19:19:46.915: sequence -> subscribed 2016-04-12 19:19:46.916: sequence -> Event Next(10) Next(0 - 10) Next(1 - 10) Next(2 - 10) 2016-04-12 19:19:46.918: sequence -> Event Next(20) Next(0 - 20) Next(1 - 20) Next(2 - 20) 2016-04-12 19:19:46.919: sequence -> Event Next(30) Next(0 - 30) Next(1 - 30) Next(2 - 30) 2016-04-12 19:19:46.921: sequence -> Event Completed Completed 2016-04-12 19:19:46.921: sequence -> disposed --- flatMapFirst with delay example --- 2016-04-12 19:19:46.925: flatMapSequence -> subscribed 2016-04-12 19:19:46.926: sequence -> subscribed 2016-04-12 19:19:46.935: sequence -> Event Next(0) // SO 1 2016-04-12 19:19:46.935: sA -> subscribed // Observable sA, 2016-04-12 19:19:46.936: sA -> Event Next(10) 2016-04-12 19:19:46.936: flatMapSequence -> Event Next(10) Next(10) 2016-04-12 19:19:47.936: sA -> Event Next(20) 2016-04-12 19:19:47.936: flatMapSequence -> Event Next(20) Next(20) 2016-04-12 19:19:50.926: sequence -> Event Next(1) // SO 2 , sA , sB , 2016-04-12 19:19:53.935: sA -> Event Completed 2016-04-12 19:19:53.936: sA -> disposed // sA , 2016-04-12 19:19:55.137: sequence -> Event Next(2) // SO 3 2016-04-12 19:19:55.137: sC -> subscribed // .. Observable ( sA , sB - ) - 2016-04-12 19:19:55.137: sC -> Event Next(1000) 2016-04-12 19:19:55.137: flatMapSequence -> Event Next(1000) Next(1000) 2016-04-12 19:19:56.236: sC -> Event Next(2000) 2016-04-12 19:19:56.236: flatMapSequence -> Event Next(2000) Next(2000) 2016-04-12 19:19:57.335: sC -> Event Completed 2016-04-12 19:19:57.336: sC -> disposed 2016-04-12 19:19:58.926: sequence -> Event Completed 2016-04-12 19:19:58.926: flatMapSequence -> Event Completed Completed 2016-04-12 19:19:58.926: sequence -> disposed
最初の例は、Observableは、新しいObservableにサブスクライブする順番になるまでに要素を生成します-これはすでに許可されているため、すべてのObservableの要素がROに入ります要素生成
各SO要素は個別のObservableに変わります。最初にO1にサブスクライブすると、その要素はROでミラー生成されます。次の要素がSOから解放され、次のObservableがそれに基づいて生成されるとすぐに、前のObservableの要素は破棄されます。登録解除が発生します。したがって、ROでは-最後に生成されたObservableの要素
example("flatMapLatest") { let sequence:Observable<Int> = Observable.of(10, 20, 30) let flatMapSequence = sequence.flatMapLatest{val in Observable.of(0, 1, 2) .map{"\($0) - \(val)" } } flatMapSequence.subscribe { e in print(e) } } example("flatMapLatest with delay") { let subjectA = Observable<Int>.create{ observer in delay(0) { observer.on(.Next(10)) } delay(3) { observer.on(.Next(20)) } delay(5) { observer.onCompleted() } return NopDisposable.instance }.debug("sA") let subjectB = Observable<Int>.create{ observer in delay(0) { observer.on(.Next(100)) } delay(1) { observer.on(.Next(200)) } delay(2) { observer.onCompleted() } return NopDisposable.instance }.debug("sB") let subjectC = Observable<Int>.create{ observer in delay(0) { observer.on(.Next(1000)) } delay(1) { observer.on(.Next(2000)) } delay(2) { observer.onCompleted() } return NopDisposable.instance }.debug("sC") let subjects = [subjectA, subjectB, subjectC] let sequence:Observable<Int> = createSequenceWithWait([0, 1, 2],waitTime:1) {$0} .debug("sequence") let flatMapSequence:Observable<Int> = sequence.flatMapLatest{val in return subjects[val].asObservable() }.debug("flatMapSequence") flatMapSequence.subscribe { e in print(e) } }
コンソール: --- flatMapLatest example --- Next(0 - 10) Next(1 - 10) Next(2 - 10) Next(0 - 20) Next(1 - 20) Next(2 - 20) Next(0 - 30) Next(1 - 30) Next(2 - 30) Completed --- flatMapLatest with delay example --- 2016-04-12 19:30:50.309: flatMapSequence -> subscribed 2016-04-12 19:30:50.310: sequence -> subscribed 2016-04-12 19:30:50.318: sequence -> Event Next(0) // SO 1 , sA 2016-04-12 19:30:50.319: sA -> subscribed // sA 2016-04-12 19:30:50.319: sA -> Event Next(10) // 2016-04-12 19:30:50.319: flatMapSequence -> Event Next(10) // flatMap Next(10) // RO 2016-04-12 19:30:51.310: sequence -> Event Next(1) // SO 2 , sA 2016-04-12 19:30:51.311: sA -> disposed // sA , 2016-04-12 19:30:51.311: sB -> subscribed // Observable sB 2016-04-12 19:30:51.311: sB -> Event Next(100) 2016-04-12 19:30:51.311: flatMapSequence -> Event Next(100) Next(100) 2016-04-12 19:30:52.310: sequence -> Event Next(2) 2016-04-12 19:30:52.311: sB -> disposed 2016-04-12 19:30:52.311: sC -> subscribed 2016-04-12 19:30:52.311: sC -> Event Next(1000) 2016-04-12 19:30:52.311: flatMapSequence -> Event Next(1000) Next(1000) 2016-04-12 19:30:53.372: sequence -> Event Completed 2016-04-12 19:30:53.372: sequence -> disposed 2016-04-12 19:30:53.372: sC -> Event Next(2000) 2016-04-12 19:30:53.372: flatMapSequence -> Event Next(2000) Next(2000) 2016-04-12 19:30:54.501: sC -> Event Completed 2016-04-12 19:30:54.501: sC -> disposed 2016-04-12 19:30:54.501: flatMapSequence -> Event Completed Completed
最初の例は、Observableは、新しいObservableにサブスクライブするまでに、その要素を生成する時間があります-前の要素はすでにすべての要素を生成しているため、すべてのObservableの要素はROに入ります。2番目の例では、生成の遅延により、以前のObservableからサブスクライブ解除
各SO要素は個別のObservableに変わり、[O1、O2、O3 ...]のすべての要素がROに結合されます。ROでの要素の生成順序は、ソース[O1、O2、O3 ...]での生成時間に依存します(マージコマンドの場合と同様)。flatMapとの違いは、関数に渡される別のパラメーターが生成された要素のインデックスであることです
example("flatMapWithIndex") { let sequence:Observable<Int> = Observable.of(10, 20, 30) let flatMapSequence:Observable<String> = sequence.flatMapWithIndex{val, idx in Observable.of("A", "B", "C").map{"index: (\(idx)) - \($0) - \(val)"} } print(flatMapSequence.dynamicType) flatMapSequence.subscribe { e in print(e) } } example("flatMapWithIndex with wait") { let sequence:Observable<Int> = createSequenceWithWait([0,1,2], waitTime: 1) { $0 } let flatMapSequence:Observable<String> = sequence.flatMapWithIndex{val, idx in createSequenceWithWait(["A","B","C"], waitTime: 2) { element in "index: (\(idx)) - \(element) - \(val)" } } print(flatMapSequence.dynamicType) flatMapSequence.subscribe { e in print(e) } }
コンソール: FlatMapWithIndex<Int, Observable<String>> Next(index: (0) - A - 10) Next(index: (0) - B - 10) Next(index: (0) - C - 10) Next(index: (1) - A - 20) Next(index: (1) - B - 20) Next(index: (1) - C - 20) Next(index: (2) - A - 30) Next(index: (2) - B - 30) Next(index: (2) - C - 30) Completed --- flatMapWithIndex with wait example --- FlatMapWithIndex<Int, Observable<String>> Next(index: (0) - A - 0) Next(index: (1) - A - 1) Next(index: (0) - B - 0) Next(index: (2) - A - 2) Next(index: (1) - B - 1) Next(index: (0) - C - 0) Next(index: (2) - B - 2) Next(index: (1) - C - 1) Next(index: (2) - C - 2) Completed
Observable<T> -> Observable<U>
SO要素は、生成される順序を変更せずに変換されます。値だけでなく、要素のタイプも変更できます。
example("map") { let sequence = Observable.of(1, 2, 3) .map{ "\($0 * 5)" } sequence.subscribe { e in print(e) } }
コンソール: --- map example --- Next(5) Next(10) Next(15) Completed
Observable<T> -> Observable<U>
SO要素は、生成される順序を変更せずに変換されます。値だけでなく、要素のタイプも変更できます。mapとの違いは、関数に渡される別のパラメーターが生成された要素のインデックスであることです
example("mapWithIndex") { let sequence = Observable.of("A", "B", "C") .mapWithIndex({ "\($0) / \($1)" }) sequence.subscribe { e in print(e) } }
コンソール: --- mapWithIndex example --- Next(A / 0) Next(B / 1) Next(C / 2) Completed
SO = Observable<T> RO = Observable<Observable<T>>
SOの要素は、特定の規則に従って、生成された新しいObservableに転送されます。パラメーターとして、countが渡されます。これは、各Observableによって生成される要素の最大数であり、timeSpanは、現在のObservableがSO要素から満たされるまで待機する最大時間です。したがって、ObservableであるRO要素は、生成される要素の数は0からNです。バッファーとの主な違いは、SO要素は生成されるObservableによって即座にミラーリングされることです。バッファの場合は、パラメータとして指定された最大時間( )
example("window") { let varA = Variable<Int>(0) let bufferSequence:Observable<Observable<Int>> = varA.asObservable() .window(timeSpan: 3, count: 3, scheduler: MainScheduler.instance) .debug("bufferSequence") bufferSequence.subscribe { e in if case .Next(let observable) = e { print("\(NSDate()) - Observable") observable.subscribe { val in print(val) } } } varA.value = 1 varA.value = 2 varA.value = 3 delay(4) { varA.value = 4 varA.value = 5 delay(4) { varA.value = 6 } } }
コンソール: --- window example --- 2016-04-12 19:51:54.372: bufferSequence -> subscribed 2016-04-12 19:51:54.373: bufferSequence -> Event Next(RxSwift.AddRef<Swift.Int>) 2016-04-12 16:51:54 +0000 - Observable Next(0) Next(1) Next(2) Completed 2016-04-12 19:51:54.377: bufferSequence -> Event Next(RxSwift.AddRef<Swift.Int>) 2016-04-12 16:51:54 +0000 - Observable Next(3) Completed 2016-04-12 19:51:57.378: bufferSequence -> Event Next(RxSwift.AddRef<Swift.Int>) 2016-04-12 16:51:57 +0000 - Observable Next(4) Next(5) Completed 2016-04-12 19:52:00.380: bufferSequence -> Event Next(RxSwift.AddRef<Swift.Int>) 2016-04-12 16:52:00 +0000 - Observable Next(6) Completed 2016-04-12 19:52:02.895: bufferSequence -> Event Completed
この例では、遅延を使用します。これは、生成されたObservableの部分的な充足を達成するのに役立ちます数学および集計演算子
各SO要素は、転送された関数を使用して変換され、操作の結果はパラメーターとして次のステップで関数に渡されます。SOが最終状態を生成すると、ROは結果を生成します。ROは1つの要素のみを生成します。
example("reduce") { let sequence = Observable.of(1, 2, 3, 4) .reduce(1) { $0 * $1 } sequence.subscribe { e in print(e) } }
コンソール: --- reduce example --- Next(24) Completed
各SO要素は、転送された関数を使用して変換され、操作の結果はROで生成されますが、さらに、次のステップでパラメーターとして関数に渡されます。reduceとは異なり、ROの要素数はSOの要素数に等しくなります。
example("scan") { let sequence = Observable.of(1, 2, 3).scan(10) { result, element in return result + element } sequence.subscribe { e in print(e) } } example("scan multiply") { let sequence = Observable.of(2, 3, 5).scan(10) { result, element in return result * element } sequence.subscribe { e in print(e) } }
コンソール: --- scan example --- Next(11) Next(13) Next(16) Completed --- scan multiply example --- Next(20) Next(60) Next(300) Completed
SO = Observable<T> RO = Observable<[T]>
最終状態の生成後のSOのすべての要素が配列に結合され、ROが生成されます
example("toArray") { let sequence = Observable.of(1, 2, 3) let arraySequence = sequence.toArray() arraySequence.subscribe { e in print(e) } }
コンソール:
エラー処理
SOから生成されたエラーをインターセプトし、新しいObservableに置き換えることができます。これにより、要素が生成されます
example("with catchError") { let sequenceWithError = Observable<Int>.create { observer in observer.on(.Next(1)) observer.on(.Next(2)) observer.on(.Next(3)) observer.on(.Next(4)) observer.onError(RxError.Unknown) observer.on(.Next(5)) return NopDisposable.instance } let sequenceIgnoreError = sequenceWithError.catchError{ error in return Observable.of(10, 11, 12) } sequenceIgnoreError.subscribe { e in print(e) } }
コンソール: --- with catchError example --- Next(1) Next(2) Next(3) Next(4) Next(10) Next(11) Next(12) Completed
要素4が生成された後、RxError.Unknownエラーが生成されましたが、それをインターセプトし、代わりに新しいObservableを返しました
SOから生成されたエラーをインターセプトし、指定された要素に置き換えることができます。その後、SOはCompletedを生成します
example("with catchErrorJustReturn") { let sequenceWithError = Observable.of(1, 2, 3, 4) .concat(Observable.error(RxError.Unknown)) .concat(Observable.just(5)) let sequenceIgnoreError = sequenceWithError.catchErrorJustReturn(-1) sequenceIgnoreError.subscribe { e in print(e) } }
コンソール: --- with catchErrorJustReturn example --- Next(1) Next(2) Next(3) Next(4) Next(-1) Completed
要素4が生成された後、RxError.Unknownエラーが生成されましたが、それをインターセプトし、代わりに-1要素を返しました
SOから生成されたエラーをインターセプトし、渡されたパラメーターに応じて、エラーが再発しないことを期待して、必要な回数だけSOを最初から開始しようとします。
example("retry full sequence") { let sequenceWithError = Observable.of(1, 2, 3, 4).concat(Observable.error(RxError.Unknown)) let wholeSequenceWithErrorRetry = sequenceWithError.retry(2) wholeSequenceWithErrorRetry.subscribe { e in print(e) } }
コンソール: --- retry full sequence example --- Next(1) Next(2) Next(3) Next(4) Next(1) Next(2) Next(3) Next(4) Error(Unknown error occured.)
なぜなら
再試行(2)演算子が適用されました-SOでオーバーサブスクリプションを1回繰り返しましたが、エラーが繰り返され、ROで生成されました。したがって、再試行(1)-1回の繰り返しは行いません。
SOから生成されたエラーをインターセプトし、エラーのタイプに応じて、ROにスローされたエラーを再生成して実行を完了するか、Observable(tryObservable)を生成します。エラーが発生することを期待して、各正しい要素の生成はSO消えます。tryObservableがエラーで終了した場合-ROにスローされ、そこで実行が終了します
example("retryWhen") { var counter = 0 let sequenceWithError = Observable<Int>.create { observer in observer.on(.Next(1)) observer.on(.Next(2)) observer.on(.Next(3)) observer.on(.Next(4)) counter += 1 if counter < 3 { observer.onError(RxError.Unknown) } observer.on(.Next(5)) return NopDisposable.instance }.debug("with error") let sequenceWithoutError = Observable<Int>.create { observer in observer.on(.Next(10)) //observer.onError(RxError.NoElements) return NopDisposable.instance }.debug("without error") let retrySequence = sequenceWithError.retryWhen{ (error: Observable<RxError>) -> Observable<Int> in let seq:Observable<Int> = error.flatMap { (generatedError: RxError) -> Observable<Int> in if case .Unknown = generatedError { return sequenceWithoutError } return Observable<Int>.error(generatedError) } return seq }//.debug() retrySequence.subscribe { e in print(e) } }
コンソール: --- retryWhen example --- 2016-04-12 20:18:04.484: with error -> subscribed 2016-04-12 20:18:04.485: with error -> Event Next(1) Next(1) 2016-04-12 20:18:04.486: with error -> Event Next(2) Next(2) 2016-04-12 20:18:04.486: with error -> Event Next(3) Next(3) 2016-04-12 20:18:04.487: with error -> Event Next(4) Next(4) 2016-04-12 20:18:04.487: with error -> Event Error(Unknown error occured.) 2016-04-12 20:18:04.488: without error -> subscribed 2016-04-12 20:18:04.488: without error -> Event Next(10) 2016-04-12 20:18:04.489: with error -> disposed 2016-04-12 20:18:04.489: with error -> subscribed 2016-04-12 20:18:04.489: with error -> Event Next(1) Next(1) 2016-04-12 20:18:04.490: with error -> Event Next(2) Next(2) 2016-04-12 20:18:04.490: with error -> Event Next(3) Next(3) 2016-04-12 20:18:04.490: with error -> Event Next(4) Next(4) 2016-04-12 20:18:04.491: with error -> Event Error(Unknown error occured.) 2016-04-12 20:18:04.491: without error -> subscribed 2016-04-12 20:18:04.492: without error -> Event Next(10) 2016-04-12 20:18:04.492: with error -> disposed 2016-04-12 20:18:04.492: with error -> subscribed 2016-04-12 20:18:04.493: with error -> Event Next(1) Next(1) 2016-04-12 20:18:04.493: with error -> Event Next(2) Next(2) 2016-04-12 20:18:04.493: with error -> Event Next(3) Next(3) 2016-04-12 20:18:04.494: with error -> Event Next(4) Next(4) 2016-04-12 20:18:04.494: with error -> Event Next(5) Next(5)
sequenceWithErrorの生成に変数iの増分を埋め込んだので、3回目の試行でエラーが消えます。RxError.Overflowエラーの生成のコメントを外す場合、retryWhenステートメントでエラーをインターセプトせず、ROに転送しますConnectable Observableを操作するための演算子
ソースSOから要素をパラメーターとして渡されたサブジェクトにプロキシすることができます。このサブジェクトをサブスクライブする必要があります。サブジェクト要素の生成は、接続演算子を呼び出した後に開始されます。 example("multicast") { let subject = PublishSubject<Int>() let firstSequence = createSequenceWithWait([0,1,2,3,4,5]) { $0 } .multicast(subject) delay(2) { _ = subject.subscribe { e in print("first: \(e)") } } delay(3) { _ = subject.subscribe { e in print("second: \(e)") } } firstSequence.connect() }
コンソール: --- multicast example --- first: Next(2) first: Next(3) second: Next(3) first: Next(4) second: Next(4) first: Next(5) second: Next(5) first: Completed second: Completed
publish = multicast + replay subjectサブスクライブ後もイベントを生成しないConnectable Observableを作成できます。このようなObservableの生成を開始するには、connectコマンドを指定する必要があります。これにより、サブスクライブがいつ実行されたかに関係なく、複数のObserverを1つのObservableに署名し、要素の生成を同時に開始できます。
example("subscribe connectable sequnce with connect") { let sequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance).debug("sequence").publish() var disposable1: Disposable! var disposable2: Disposable! disposable1 = sequence.subscribe { e in print("first: \(e)") } delay(2) { disposable2 = sequence.subscribe { e in print("second: \(e)") } } delay(4) { sequence.connect() } delay(8) { disposable1.dispose() disposable2.dispose() } }
コンソール: --- subscribe connectable sequnce with connect example --- 2016-04-12 21:35:32.130: sequence -> subscribed 2016-04-12 21:35:33.131: sequence -> Event Next(0) first: Next(0) second: Next(0) 2016-04-12 21:35:34.131: sequence -> Event Next(1) first: Next(1) second: Next(1) 2016-04-12 21:35:35.132: sequence -> Event Next(2) first: Next(2) second: Next(2) 2016-04-12 21:35:36.132: sequence -> Event Next(3) 2016-04-12 21:35:37.132: sequence -> Event Next(4)
ご覧のとおり、サブスクリプションが異なる時間に行われた場合でも、connectコマンドが呼び出されるまで、要素の生成は開始されませんでした。しかし、debugコマンドのおかげで、誰もがサブスクライブを解除した後でも、シーケンスが要素を生成し続けたことは明らかです。
Connectableから通常のObservableを作成できます。この通常のObservableへのサブスクライブの最初の呼び出しの後、ConnectableはSOにサブスクライブします。それは次のようことが判明publishSequence SO.publish =()refCountSequence publishSequence.refCount =()SOの長いrefCountSequenceに署名された少なくとも一つである限り要素を生成し続けます。refCountSequenceへのすべてのサブスクリプションがキャンセルされるとすぐに-SOからのサブスクリプション解除およびpublishSequenceがあります
example("with refCount") { let sequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance).debug("sequence") let publishSequence = sequence.publish() // Connectable Observable let refCountSequence = publishSequence.refCount().debug("refCountSequence") let subscription1 = refCountSequence.subscribe{ e in print("first: \(e)") } let subscription2 = refCountSequence.subscribe{ e in print("second: \(e)") } delay(2) { subscription1.dispose() // } delay(3) { subscription2.dispose() // , Observable refCount . Observable SO } delay(5) { _ = refCountSequence.subscribe { e in print("after: \(e)") } } }
コンソール: --- with refCount example --- 2016-04-12 20:25:24.154: refCountSequence -> subscribed // refCountSequence 1 2016-04-12 20:25:24.155: sequence -> subscribed // publishSequence SO 2016-04-12 20:25:24.156: refCountSequence -> subscribed // // refCountSequence 2 2016-04-12 20:25:25.156: sequence -> Event Next(0) 2016-04-12 20:25:25.156: refCountSequence -> Event Next(0) first: Next(0) 2016-04-12 20:25:25.156: refCountSequence -> Event Next(0) second: Next(0) 2016-04-12 20:25:26.156: sequence -> Event Next(1) 2016-04-12 20:25:26.156: refCountSequence -> Event Next(1) first: Next(1) 2016-04-12 20:25:26.157: refCountSequence -> Event Next(1) second: Next(1) 2016-04-12 20:25:26.353: refCountSequence -> disposed // refCountSequence 1 2016-04-12 20:25:27.156: sequence -> Event Next(2) // SO , .. refCountSequence 2016-04-12 20:25:27.157: refCountSequence -> Event Next(2) second: Next(2) 2016-04-12 20:25:27.390: refCountSequence -> disposed // refCountSequence 2 2016-04-12 20:25:27.390: sequence -> disposed // refCountSequence , publishSequence SO 2016-04-12 20:25:29.157: refCountSequence -> subscribed // refCountSequence 2016-04-12 20:25:29.157: sequence -> subscribed // .. - publishSequence SO 2016-04-12 20:25:30.158: sequence -> Event Next(0) 2016-04-12 20:25:30.159: refCountSequence -> Event Next(0) after: Next(0) 2016-04-12 20:25:31.158: sequence -> Event Next(1) 2016-04-12 20:25:31.159: refCountSequence -> Event Next(1) after: Next(1) 2016-04-12 20:25:32.159: sequence -> Event Next(2) 2016-04-12 20:25:32.159: refCountSequence -> Event Next(2) after: Next(2) ....
SOが正常な場合、Connectableに変換します。その後、connect()を呼び出した後にサブスクライブするすべての人は、最初の要素として生成された最後のN個の要素を即座に受け取ります。すべてがサブスクライブされていない場合でも、Connectableはエレメントを生成し続けます
example("replay") { let firstSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance).replay(2) let firstDisposable = firstSequence.subscribe { e in print("first: \(e)") } firstSequence.connect() var secondDisposable: Disposable! delay(3) { secondDisposable = firstSequence.subscribe { e in print("second: \(e)") } } delay(4) { firstDisposable.dispose() } delay(5) { secondDisposable.dispose() } delay(7) { firstSequence.subscribe { e in print("third: \(e)") } } }
コンソール: --- replay example --- first: Next(0) first: Next(1) first: Next(2) second: Next(1) second: Next(2) first: Next(3) // 1 second: Next(3) second: Next(4) // 2 , SO // third: Next(5) third: Next(6)
SOが正常な場合、Connectableに変換します。connect()の呼び出し後にサブスクライブするすべてのユーザーは、以前に生成されたすべての要素を最初に受け取ります。すべてがサブスクライブされていない場合でも、Connectableはエレメントを生成し続けます
example("replayAll") { let firstSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance).replayAll() let firstDisposable = firstSequence.subscribe { e in print("first: \(e)") } firstSequence.connect() var secondDisposable: Disposable! delay(3) { secondDisposable = firstSequence.subscribe { e in print("second: \(e)") } } delay(4) { firstDisposable.dispose() } delay(5) { secondDisposable.dispose() } delay(7) { firstSequence.subscribe { e in print("third: \(e)") } } }
コンソール: --- replayAll example --- first: Next(0) first: Next(1) first: Next(2) second: Next(0) second: Next(1) second: Next(2) first: Next(3) // 1 second: Next(3) second: Next(4) // 2 , SO // third: Next(0) third: Next(1) third: Next(2) third: Next(3) third: Next(4) third: Next(5) third: Next(6) third: Next(7)
ヘルパーメソッド
デバッグ
ROはSOを完全に複製しますが、タイムスタンプを持つすべてのイベントがログに記録されます example("debug") { let sequence = Observable<AnyObject>.of(1, 2, 3) .debug("sequence") .subscribe{} }
コンソール: --- debug example --- 2016-04-12 21:41:08.467: sequence -> subscribed 2016-04-12 21:41:08.469: sequence -> Event Next(1) 2016-04-12 21:41:08.469: sequence -> Event Next(2) 2016-04-12 21:41:08.469: sequence -> Event Next(3) 2016-04-12 21:41:08.469: sequence -> Event Completed
ROはSOを完全に複製しますが、SOライフサイクルからのすべてのイベントのインターセプターを埋め込みます example("simple doOn") { let firstSequence = Observable.of(1,2).doOn{e in print(e) } firstSequence.subscribeNext{ e in
コンソール: --- simple doOn example --- Next(1) Next(2) Completed
SOからROに要素を複製しますが、パラメーターとして時間遅延を指定します
example("delaySubscription") { let sequence = Observable.of(1, 2, 3).debug("sequence") .delaySubscription(3, scheduler: MainScheduler.instance).debug("delayed sequence") sequence.subscribe { e in print(e) } }
コンソール: --- delaySubscription example --- 2016-04-12 21:44:05.226: delayed sequence -> subscribed // delayed sequence 5 2016-04-12 21:44:08.228: sequence -> subscribed // SO 3 2016-04-12 21:44:08.229: sequence -> Event Next(1) 2016-04-12 21:44:08.229: delayed sequence -> Event Next(1) Next(1) 2016-04-12 21:44:08.229: sequence -> Event Next(2) 2016-04-12 21:44:08.229: delayed sequence -> Event Next(2) Next(2) 2016-04-12 21:44:08.229: sequence -> Event Next(3) 2016-04-12 21:44:08.229: delayed sequence -> Event Next(3) Next(3) 2016-04-12 21:44:08.230: sequence -> Event Completed 2016-04-12 21:44:08.230: delayed sequence -> Event Completed Completed 2016-04-12 21:44:08.230: sequence -> disposed
作業オブザーバーを実行するスケジューラーを示します。GUIで作業する場合は特に重要です example("without observeOn") { let sequence = Observable<AnyObject>.of(1, 2, 3) sequence.subscribe { e in print("\(NSThread.currentThread())\(e)") } } example("with observeOn") { let queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0) let sequence = Observable<AnyObject>.of(1, 2, 3) sequence.observeOn(ConcurrentDispatchQueueScheduler.init(queue: queue)) .subscribe { e in print("\(NSThread.currentThread())\(e)") } }
コンソール: --- without observeOn example --- <NSThread: 0x7fac1ac13240>{number = 1, name = main}Next(1) <NSThread: 0x7fac1ac13240>{number = 1, name = main}Next(2) <NSThread: 0x7fac1ac13240>{number = 1, name = main}Next(3) <NSThread: 0x7fac1ac13240>{number = 1, name = main}Completed --- with observeOn example --- <NSThread: 0x7fac1ae50b50>{number = 3, name = (null)}Next(1) <NSThread: 0x7fac1ae50b50>{number = 3, name = (null)}Next(2) <NSThread: 0x7fac1ae50b50>{number = 3, name = (null)}Next(3) <NSThread: 0x7fac1ae50b50>{number = 3, name = (null)}Completed
ご覧のとおり、observeOnのおかげで、subscribe内のコードを別のスレッドで実行できました。
ObservableをObserverにリンクするオペレーターにより、Observableからのすべてのイベントをサブスクライブできます。 example("subscribe") { let firstSequence = Observable.of(1) firstSequence.subscribe { e in print(e) } firstSequence.subscribeCompleted { print("!completed") } firstSequence.subscribeNext{next in print("next: \(next)") } } example("subscribeNext") { let firstSequence = Observable.of(1) firstSequence.subscribeNext{next in print("next: \(next)") } } example("subscribeCompleted") { let firstSequence = Observable.of(1) firstSequence.subscribeCompleted { print("!completed") } } example("subscribeError") { let firstSequence = Observable<Int>.error(RxError.ArgumentOutOfRange) firstSequence.subscribeError {e in print("!error \(e)") } }
コンソール: --- subscribe example --- Next(1) Completed !completed next: 1 --- subscribeNext example --- next: 1 --- subscribeCompleted example --- !completed --- subscribeError example --- !error Argument out of range.
表示される4つのフォーム:subscribe、subscribeNext、subscribeCompleted、subscribeError
監視可能な作業を行うスケジューラを示します。GUIで作業する場合は特に重要です example("with subscribeOn and observeOn") { let queue1 = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0) let queue2 = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0) print("init thread: \(NSThread.currentThread())") let sequence = Observable<Int>.create { observer in print("observable thread: \(NSThread.currentThread())") observer.on(.Next(1)) observer.on(.Next(2)) observer.on(.Next(3)) observer.on(.Completed) return NopDisposable.instance } .subscribeOn(SerialDispatchQueueScheduler(internalSerialQueueName: "queue1")) .observeOn(SerialDispatchQueueScheduler(internalSerialQueueName: "queue2")) sequence.subscribe { e in print("observer thread: \(NSThread.currentThread()) \(e)") } }
コンソール: --- with subscribeOn and observeOn example --- init thread: <NSThread: 0x7ff6914132b0>{number = 1, name = main} // #1 observable thread: <NSThread: 0x7ff6916a0cb0>{number = 4, name = (null)} // subscribeOn Observable #4 #1 observer thread: <NSThread: 0x7ff6914137d0>{number = 5, name = (null)} Next(1) // observer' 1 4, observeOn. observeOn, subscribeOn #4 observer thread: <NSThread: 0x7ff6914137d0>{number = 5, name = (null)} Next(2) observer thread: <NSThread: 0x7ff6914b1b40>{number = 3, name = (null)} Next(3) observer thread: <NSThread: 0x7ff6914b1b40>{number = 3, name = (null)} Completed
SOからROに要素を複製しますが、指定した時間内にSOが要素を生成しなかった場合-ROはエラーを生成します
example("failed timeout ") { let sequence = createSequenceWithWait([1,2,3,4]) { $0 } let timeoutSequence = sequence.timeout(0.9, scheduler: MainScheduler.instance) timeoutSequence.subscribe { e in print(e) } }
コンソール: --- failed timeout example --- Next(1) Error(Sequence timeout.)
ROが有効な間のみ有効なリソースを作成するようにObservableに指示できます。2つのファクトリーがパラメーターとして渡され、1つはリソースを生成し、2つ目のObservableは単一の存続期間を持ちます。 class FakeDisposable: Disposable { func dispose() { print("disposed") } } example("using") { let sequence = Observable.using({ return FakeDisposable() }, observableFactory: { d in Observable.just(1) }) as Observable<Int> sequence.subscribe { e in print(e) } }
コンソール: --- using example --- Next(1) Completed disposed
ご覧のとおり、Observableが要素の生成を完了した後、FakeDisposableリソースでdisposeメソッドが呼び出されました。