忽略事件

ignoreElements

忽略掉所有的.next 事件,只接受.completed 事件

let tasks = PublishSubject<String>()
let bag = DisposeBag()

tasks.ignoreElements()
    .subscribe { print($0) }
    .disposed(by: bag)

tasks.onNext("T1")
tasks.onNext("T2")
tasks.onNext("T3")
tasks.onCompleted()

skip(n)

忽略前 n 个信号

let tasks = PublishSubject<String>()
let bag = DisposeBag()

tasks.skip(2)
    .subscribe { print($0) }
    .disposed(by: bag)

tasks.onNext("T1")
tasks.onNext("T2")
tasks.onNext("T3")
tasks.onCompleted()

skipWhile

当遇到第一个不满足条件的事件之后,就不再忽略任何事件了

let tasks = PublishSubject<String>()
let bag = DisposeBag()

tasks.skipWhile { $0 == "T2" }
    .subscribe { print($0) }
    .disposed(by: bag)

tasks.onNext("T1")
tasks.onNext("T2")
tasks.onNext("T3")
tasks.onCompleted()
// next(T1)
// next(T2)
// next(T3)
// completed

skipUntil

和 skipWhile 类似,不过忽略条件为另外一个事件序列中的事件

let tasks = PublishSubject<String>()
            let bag = DisposeBag()
            let bossIsAngry = PublishSubject<Void>()
            tasks.skipUntil(bossIsAngry)
                .subscribe { print($0) }
                .disposed(by: bag)

            tasks.onNext("T1")
            tasks.onNext("T2")
            bossIsAngry.onNext(())
            tasks.onNext("T3")
            tasks.onNext("T4")
            tasks.onCompleted()
            // next(T3)
            // next(T4)
            // next(T5)
            // completed

distinctUntilChanged

忽略序列中连续重复的事件

let tasks = PublishSubject<String>()
let bag = DisposeBag()

tasks.distinctUntilChanged()
    .subscribe { print($0) }
    .disposed(by: bag)

tasks.onNext("T1")
tasks.onNext("T2")
tasks.onNext("T2")
tasks.onNext("T3")
tasks.onNext("T4")
tasks.onNext("T4")
// next(T1)
// next(T2)
// next(T3)
// next(T4)

获取事件

elementAt(n)

取下标为 n 的元素

let tasks = PublishSubject<String>()
let bag = DisposeBag()

tasks.elementAt(0)
    .subscribe { print($0) }
    .disposed(by: bag)

tasks.onNext("T1")
tasks.onNext("T2")
tasks.onNext("T3")
tasks.onCompleted()

filter

let tasks = PublishSubject<String>()
let bag = DisposeBag()

tasks.filter { $0 == "T1" }
    .subscribe { print($0) }
    .disposed(by: bag)

tasks.onNext("T1")
tasks.onNext("T2")
tasks.onNext("T3")
tasks.onCompleted()

take(n)

除了选择订阅单一事件之外,我们也可以选择一次性订阅多个事件,例如,选择序列中的前两个事件

let tasks = PublishSubject<String>()
let bag = DisposeBag()

tasks.take(2) // 取前2个
    .subscribe { print($0) }
    .disposed(by: bag)

tasks.onNext("T1")
tasks.onNext("T2")
tasks.onNext("T3")
tasks.onCompleted()

takeLast(count)

取倒数的 n 个数

let tasks = PublishSubject<String>()
let bag = DisposeBag()

tasks.takeLast(1)
    .subscribe { print($0) }
    .disposed(by: bag)

tasks.onNext("T1")
tasks.onNext("T2")
tasks.onNext("T3")
tasks.onCompleted()

takeWhile

只要条件为 true 就一直订阅下去”这样的概念

let tasks = PublishSubject<String>()
let bag = DisposeBag()

tasks.takeWhile { $0 != "T2" } // 只要不是T2,就会一直订阅
    .subscribe { print($0) }
    .disposed(by: bag)

tasks.onNext("T1")
tasks.onNext("T2")
tasks.onNext("T3")
tasks.onCompleted()

takeUntil

发生之前一直订阅,条件为另外一个信号

let tasks = PublishSubject<String>()
let bossHasGone = PublishSubject<Void>()
let bag = DisposeBag()

tasks.takeUntil(bossHasGone).subscribe {
    print($0)
}
.disposed(by: bag)

tasks.onNext("T1")
tasks.onNext("T2")
tasks.onNext("T3")
tasks.onNext("T4")
bossHasGone.onNext(())
tasks.onNext("T5")
tasks.onCompleted()
// next(T1)
// next(T2)
// next(T3)
// next(T4)
// completed

Transform 操作符

toArray

Observable<T> 中所有的事件值,在订阅的时候,打包成一个 Array返回给订阅者。只会在结束后计算.

Observable.of(1, 2, 3)
        .toArray()
        .subscribe(onSuccess: { arr in
            print(arr) // [1,2,3]
        }, onError: { error in
            print(error)
        })
        .disposed(by: bag)

    // 把原始Observable中所有的事件值变成一个数组,只要原始Observable不结束,这个转换就不会发生
    let numbers = PublishSubject<Int>()
    numbers.asObservable()
        .toArray()
        .subscribe(onSuccess: { arr in
            print("subject:\(arr)") // [1, 2, 4]
        }, onError: { error in
            print(error)
        }).disposed(by: bag)
    numbers.onNext(1)
    numbers.onNext(2)
    numbers.onNext(4)
    numbers.onCompleted()

scan

数据两两进行运算,如进行求和运算等。 每次有事件的时候都会执行.

 let obs = Observable<Int>.create { (ob) -> Disposable in
        ob.onNext(1)
        ob.onNext(2)
        ob.onNext(3)
        ob.onNext(4)
        ob.onNext(5)
        return Disposables.create()
    }
    obs.scan(0, accumulator: { (sum, new) -> Int in
        sum + new
    }).subscribe(onNext: { value in
        print(value) // 1 3 6 ....
    }).disposed(by: bag)

map

self.example("map") {
    Observable.of(1, 2, 3).map {
        value in value * 2
    }.subscribe(onNext: {
        print($0)
    }).disposed(by: bag)
}

其他

share

 let obs = Observable<Int>.create { (ob) -> Disposable in
        ob.onNext(1)
        ob.onNext(2)
        ob.onNext(3)
        ob.onNext(4)
        ob.onNext(5)
        return Disposables.create()
    }.share()

    obs.subscribe(onNext: { code in
        print("1---\(code)")
    }).disposed(by: bag)

一个比较好的介绍说明: https://www.jianshu.com/p/08b30b4181ea

publish

publish 用于向所有订阅者“统一”发布事件

let interval = Observable<Int>
    .interval(1, scheduler: MainScheduler.instance)
    .publish()

加上 publish,我们发现不管从何时开始监听,统一时间监听到的信号是相同的(晚监听的不会去从 1 开始)

multicast

可以让原事件序列中的事件通过另外一个 subject 对象代为传递

let supervisor = PublishSubject<Int>()

_ = supervisor.subscribe(onNext: {
    print("Supervisor: event \($0)") })

let interval = Observable<Int>
    .interval(1, scheduler: MainScheduler.instance)
    .multicast(supervisor)

个人理解为,对原始信号进行监听。

replay(n)

订阅时,回放历史事件

buffer

可以想象的是,随意使用replayAll很容易导致问题,特别是当历史事件很多的时候,就非常容易导致资源被耗尽。为此,我们还可以为事件的回放在特定的时间范围里,指定一个最大事件数量。这个operator叫做buffer。我们直接来看代码:

let interval = Observable<Int>
    .interval(1, scheduler: MainScheduler.instance)
    .buffer(timeSpan: 4, count: 2, scheduler: MainScheduler.instance)

要注意的是,使用了buffer之后,interval就不再是connectable observable了。它有三个参数:

  • timeSpan:缓冲区的时间跨度,尽管interval每隔1秒钟发生一次事件,但经过buffer处理后,就变成了最长timeSpan秒发生一次事件了,事件的值,就是由所有缓存的事件值构成的数组。如果timeSpan过后没有任何事件发生,就向事件的订阅者发送一个空数组;
  • count:缓冲区在timeSpan时间里可以缓存的最大事件数量,当达到这个值之后,buffer就会立即把缓存的事件用一个数组发送给订阅者,并重置timeSpan;
  • scheduler:表示Observable事件序列发生在主线程,在后面的内容里,我们还会专门介绍RxSwift中的各种scheduler;

flatMap / flatMapLast

struct Player {
var score: Variable<Int>
}

let John = Player(score: Variable(70))
let Jole = Player(score: Variable(90))

let players = PublishSubject<Player>()

players.asObservable()
    .flatMap { // .flatMapLatest
        $0.score.asObservable()
    }
    .subscribe(onNext: {
        print($0)
    })
    .disposed(by: self.bag)

players.onNext(John)
players.onNext(Jole)

John.score.value = 100
Jole.score.value = 101
// 70
// 90
// 100
// 101

若使用.flatMapLatest,则打印
// 70
// 90
// 101
  • flatMap

把它原序列中的每个事件,都变换成一个 Observable。
因此,再加入了 Jole 之后,flatMap 一共变换出了两个 Observable, 经过 flatMap 合并过的 Observable会按发生的顺序,反映 John 和 Jole 中的所有事件。

  • flatMapLatest

当原序列中有新事件发生的时候,flatMapLatest 就会自动取消上一个事件的订阅,然后转换到新事件的订阅。