Observable 合并

startWith

需要注意的是 startWith 中的事件值的类型,和它后续的事件值类型,必须是相同的

enum Condition: String {
    case cellular = "Cellular"
    case wifi
    case none
}
let bag = DisposeBag()
let request = Observable<String>.create { (ob) -> Disposable in
    ob.onNext("Reponse from server.") //模拟服务器返回数据
    ob.onCompleted()
    return Disposables.create()
}

// 有了这个思路之后,我们就可以在订阅的代码里,根据当前网络条件,以及服务器的返回值,做各种后续操作了。
request.startWith(Condition.wifi.rawValue)
    .subscribe(onNext: { dump($0) })
    .disposed(by: bag)
// - "wifi"
// - "Reponse from server."

concat

把两个并行的 Observable 合并起来串行处理。可以合并多个 observer ,假设合并了[A、B]

  • A 执行完后才会对 B 进行监听
  • A 和 B 都执行完才会执行 subscribe 的 Completed、Disposed
  • Error 的情况
    • B 如果在等待状态(还没有监听),不会触发 subscribe 的 onError
    • 在监听状态会触发 onError -> onDisposed
enum E: Error {
        case demo
    }

    // (只能合并相同类型的信号)
    let queueA = PublishSubject<String>()
    let queueB = PublishSubject<String>()
    // 先处理完queueA中的事件之后,再开始处理queueB中的事件
    let sequence = Observable.concat([queueA.asObserver(), queueB.asObserver()])
    // let sequence2 = queueA.concat(queueB.asObserver()) 效果和上边相同
    sequence.subscribe(onNext: { msg in
        dump(msg)
        // - "A1"
        // - "A11"
        // - "A2"
        // - "A21"
        // - "B2"
        // - "B3"
        //Completed
        //Disposed
    }, onError: {
        print($0)
    }, onCompleted: {
        print("Completed")
    }, onDisposed: {
        print("Disposed")
    }).disposed(by: bag)

    queueA.onNext("A1")
    queueA.onNext("A11")
    // queueA.onError(E.demo) //如果有error的情况会直接触发 onError -> onDisposed

    // queueB.onError(E.demo) //如果是排队中触发不会对合成的Observable产生影像
    queueB.onNext("B1")

    queueA.onNext("A2")
    queueA.onNext("A21")
    queueA.onCompleted() // 执行完A之后才会进行监听B

    queueB.onNext("B2")
    // queueB.onError(E.demo) 没有在排队中,会直接触发 onError -> onDisposed
    queueB.onNext("B3")
    queueB.onCompleted() // B也完成后,才会触发subscribe的completed
}

## merge

- 只能合并相同类型的信号,多个信号并行。
- 可以指定最大并行的数量
- 并行中信号任意一个执行 onError,会触发 subscribe  error->completed

self.example("merge 并行") {
    enum E: Error {
        case demo
    }

    // (只能合并相同类型的信号)
    let queueA = PublishSubject<String>()
    let queueB = PublishSubject<String>()
    // 先处理完queueA中的事件之后,再开始处理queueB中的事件
    // let sequence = Observable.merge([queueA.asObserver(), queueB.asObserver()])
    let sequence = Observable.of(queueA.asObserver(), queueB.asObserver()).merge() // 效果同上
    // let sequence = Observable.of(queueA.asObserver(), queueB.asObserver()).merge(maxConcurrent: 1) // 同时订阅的数量. 1同concat的效果

    sequence.subscribe(onNext: { msg in
        dump(msg)
    }, onError: {
        print($0)
    }, onCompleted: {
        print("Completed")
    }, onDisposed: {
        print("Disposed")
    }).disposed(by: bag)

    queueA.onNext("A1")
    queueA.onNext("A2")
    // queueA.onError(E.demo) // 任意发生error旧值执行disposed
    queueB.onNext("B1")
    queueA.onNext("A3")
    queueA.onCompleted()
    queueB.onNext("B2")
    queueB.onCompleted() // 仍然是都执行completed后才进行 disposed
    // 另外一点和concat不同的是,只要merge中的任何一个Sub-observable发生Error事件,合并后的Observable就会结束掉,大家可以自己试一下。

Observables 中的事件合并

combineLatest

对最新的事件进行合并,可以合并不同类型的事件

enum E: Error {
    case demo
}

let queueA = PublishSubject<String>()
let queueB = PublishSubject<String>()
Observable.combineLatest(queueA, queueB, resultSelector: { (a, b) -> String in
    a + "," + b
}).subscribe(onNext: { msg in
    dump(msg)
}).disposed(by: bag)
queueA.onNext("A1")
queueA.onNext("A2")
queueA.onNext("A3")
queueB.onNext("B1")
queueB.onNext("B2")
// 两两进行计算
// - "A3,B1"
// - "A3,B2"

如果中间有完成的,则将会记录它最后一个是事件。与后续事件进行合并

queueA.onNext("A1")
queueA.onNext("A2")
queueA.onNext("A3")
queueB.onNext("B1")
queueA.onCompleted() // 如果中间有完成的,则将会记录它最后一个是事件。与后续事件进行合并
queueB.onNext("B2")
queueB.onNext("B3")
// - "A3,B1"
// - "A3,B2"
// - "A3,B3"

如果发生 error 则立即结束

// 如果发生error则立即结束
queueA.onNext("A1")
queueB.onNext("B1")
queueA.onNext("A2")
queueA.onError(E.demo)
queueB.onNext("B2")
queueB.onNext("B3")
//    - "A1,B1"
//    - "A2,B1"
//Unhandled error happened: demo
//subscription called from:

zip

顾名思义,拉链。合并并消费

enum E: Error {
    case demo
}

let queueA = PublishSubject<String>()
let queueB = PublishSubject<String>()
Observable.zip(queueA, queueB, resultSelector: { (a, b) -> String in
    a + "," + b
}).subscribe(onNext: { msg in
    dump(msg)
}).disposed(by: bag)

queueA.onNext("A1")
queueA.onNext("A2")
queueA.onNext("A3")
queueB.onNext("B1")
queueB.onNext("B2")
queueA.onCompleted()
queueB.onCompleted()
// - "A1,B1"
// - "A2,B2"
// 只会配对一次,可以理解为合并完就被消费掉了