Observable 合并

startWith

需要注意的是 startWith 中的事件值的类型,和它后续的事件值类型,必须是相同的

enum Condition: String {
    case cellular = "Cellular"
    case wifi
    case none
}
let bag = DisposeBag()
let request = Observable<String>.create { (ob) -> Disposable in
    ob.onNext("Reponse from server.") //
    ob.onCompleted()
    return Disposables.create()
}

// 
request.startWith(Condition.wifi.rawValue)
    .subscribe(onNext: { dump($0) })
    .disposed(by: bag)
// - "wifi"
// - "Reponse from server."

concat

把两个并行的 Observable 合并起来串行处理。可以合并多个 observer ,假设合并了[A、B]

  • A 执行完后才会对 B 进行监听
  • A 和 B 都执行完才会执行 subscribe 的 Completed、Disposed
  • Error 的情况
    • B 如果在等待状态(还没有监听),不会触发 subscribe 的 onError
    • 在监听状态会触发 onError -> onDisposed
enum E: Error {
        case demo
    }

    // ()
    let queueA = PublishSubject<String>()
    let queueB = PublishSubject<String>()
    // queueAqueueB
    let sequence = Observable.concat([queueA.asObserver(), queueB.asObserver()])
    // let sequence2 = queueA.concat(queueB.asObserver()) 
    sequence.subscribe(onNext: { msg in
        dump(msg)
        // - "A1"
        // - "A11"
        // - "A2"
        // - "A21"
        // - "B2"
        // - "B3"
        //Completed
        //Disposed
    }, onError: {
        print($0)
    }, onCompleted: {
        print("Completed")
    }, onDisposed: {
        print("Disposed")
    }).disposed(by: bag)

    queueA.onNext("A1")
    queueA.onNext("A11")
    // queueA.onError(E.demo) //error onError -> onDisposed

    // queueB.onError(E.demo) //Observable
    queueB.onNext("B1")

    queueA.onNext("A2")
    queueA.onNext("A21")
    queueA.onCompleted() // AB

    queueB.onNext("B2")
    // queueB.onError(E.demo)  onError -> onDisposed
    queueB.onNext("B3")
    queueB.onCompleted() // B,subscribecompleted
}

## merge

- 
- 
-  onError, subscribe  error->completed

self.example("merge 并行") {
    enum E: Error {
        case demo
    }

    // ()
    let queueA = PublishSubject<String>()
    let queueB = PublishSubject<String>()
    // queueAqueueB
    // let sequence = Observable.merge([queueA.asObserver(), queueB.asObserver()])
    let sequence = Observable.of(queueA.asObserver(), queueB.asObserver()).merge() // 
    // let sequence = Observable.of(queueA.asObserver(), queueB.asObserver()).merge(maxConcurrent: 1) // . 1concat

    sequence.subscribe(onNext: { msg in
        dump(msg)
    }, onError: {
        print($0)
    }, onCompleted: {
        print("Completed")
    }, onDisposed: {
        print("Disposed")
    }).disposed(by: bag)

    queueA.onNext("A1")
    queueA.onNext("A2")
    // queueA.onError(E.demo) // errordisposed
    queueB.onNext("B1")
    queueA.onNext("A3")
    queueA.onCompleted()
    queueB.onNext("B2")
    queueB.onCompleted() // completed disposed
    // concatmergeSub-observableErrorObservable

Observables 中的事件合并

combineLatest

对最新的事件进行合并,可以合并不同类型的事件

enum E: Error {
    case demo
}

let queueA = PublishSubject<String>()
let queueB = PublishSubject<String>()
Observable.combineLatest(queueA, queueB, resultSelector: { (a, b) -> String in
    a + "," + b
}).subscribe(onNext: { msg in
    dump(msg)
}).disposed(by: bag)
queueA.onNext("A1")
queueA.onNext("A2")
queueA.onNext("A3")
queueB.onNext("B1")
queueB.onNext("B2")
// 
// - "A3,B1"
// - "A3,B2"

如果中间有完成的,则将会记录它最后一个是事件。与后续事件进行合并

queueA.onNext("A1")
queueA.onNext("A2")
queueA.onNext("A3")
queueB.onNext("B1")
queueA.onCompleted() // 
queueB.onNext("B2")
queueB.onNext("B3")
// - "A3,B1"
// - "A3,B2"
// - "A3,B3"

如果发生 error 则立即结束

// error
queueA.onNext("A1")
queueB.onNext("B1")
queueA.onNext("A2")
queueA.onError(E.demo)
queueB.onNext("B2")
queueB.onNext("B3")
//    - "A1,B1"
//    - "A2,B1"
//Unhandled error happened: demo
//subscription called from:

zip

顾名思义,拉链。合并并消费

enum E: Error {
    case demo
}

let queueA = PublishSubject<String>()
let queueB = PublishSubject<String>()
Observable.zip(queueA, queueB, resultSelector: { (a, b) -> String in
    a + "," + b
}).subscribe(onNext: { msg in
    dump(msg)
}).disposed(by: bag)

queueA.onNext("A1")
queueA.onNext("A2")
queueA.onNext("A3")
queueB.onNext("B1")
queueB.onNext("B2")
queueA.onCompleted()
queueB.onCompleted()
// - "A1,B1"
// - "A2,B2"
//