Observable

Observable 可以理解为时间序列。(仅供订阅)

代码示例:

// Observable.of("1","2","3","4","5","6")
// Observable.from(["1","2","3","4","5","6"])
// observable进行异步操作
let observalble = Observable.of("1", "2", "3", "4", "5", "6", "8")
    .map { Int($0) }
    .filter {
        if let item = $0, item % 2 == 0 {
            print("xxxx:\(item)") // 被订阅后才会被真正执行
            return true
        }
        return false
    }

//上面的不会执行,只有订阅时才会执行
ob.subscribe { event in
    print(event)
}

//next(Optional(2))
//next(Optional(4))
//next(Optional(6))
//completed

// subscribe 表示全程进行订阅
_ = observalble.subscribe { event in
    print("event:\(event)")
}
// 使用skip来表示模拟半路进行订阅
_ = observalble.skip(2).subscribe { item in
    print("skiped first 2:\(item)")
}
// next 以及 completed
_ = observalble.skip(2).subscribe(
    onNext: { event in
        print("on next:\(event!)")
    },
    onError: { _ in },
    onCompleted: {
        print("onCompleted xxx")
    },
    onDisposed: {
        // 资源回收的理解
        // 1. 如果序列是有限长的,执行完会进行释放
        // 2. 如果序列是无线长的,就会有资源占用
        // 3. disposeBag的理解:将所有的订阅对象放到disposeBag里,当bag销毁的时候就进行zbag里装的订阅释放
    }
)

资源回收的理解:

  1. 如果序列是有限长的,执行完会进行释放
  2. 如果序列是无线长的,就会有资源占用
  3. disposeBag 的理解:将所有的订阅对象放到 disposeBag 里,当 bag 销毁的时候就进行 zbag 里装的订阅释放
  4. 对于一个Observable来说,除了所有订阅者都取消订阅会导致其被回收之外,Observable自然结束(onCompleted)或发生错误结束(onError)也会自动让所有订阅者取消订阅,并导致Observable占用的资源被回收。
let bag = DisposeBag()
func interval(){
    _ = Observable<Int>
        //interval 参数1表示发送的时间间隔。参数2表示所在的现成
        .interval(1, scheduler: MainScheduler.instance)
        .subscribe{ print($0)}
        .disposed(by: self.bag)
}

Create

当需要精确控制发送给订阅者的成功、失败等,就需要用到 Observable 的 create 方法。

/**
    Creates an observable sequence from a specified subscribe method implementation.

    - seealso: [create operator on reactivex.io](http://reactivex.io/documentation/operators/create.html)

    - parameter subscribe: Implementation of the resulting observable sequence's `subscribe` method.
    - returns: The observable sequence with the specified implementation for the `subscribe` method.
    */
public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
    return AnonymousObservable(subscribe)
}

使用举例:

enum CustomError: Error {
    case somethingError
}

let create = Observable<Int>.create { (observer) -> Disposable in
    observer.onNext(10)
    observer.onNext(11)
    observer.onError(CustomError.somethingError)
    observer.onCompleted()
    return Disposables.create()
}

create.subscribe(
    onNext: { print($0) },
    onError: { print($0) },
    onCompleted: { print("completed") },
    onDisposed: { print("Game Over") }
).disposed(by: self.bag)

使用 do 来进行处理操作

create.do(
    onNext: { print("do:\($0)") },
    onError: { print("do:\($0)") },
    onCompleted: { print("completed") }
)
.subscribe(
    onNext: { print($0) },
    onError: { print($0) },
    onCompleted: { print("completed") },
    onDisposed: { print("Game Over") }
).disposed(by: self.bag)

使用 debug 进行方便的调试,会打印出事件发生的时间等。

 create
.debug()
.subscribe(
    onNext: { print($0) },
    onError: { print($0) },
    onCompleted: { print("completed") },
    onDisposed: { print("Game Over") }
).disposed(by: self.bag)

Subject

Subject 可以做 Observable,也可以做 Observe.

PublishSubject

PublishSubject 就像个出版社,到处收集内容,此时它是一个 Observer,然后发布给它的订阅者,此时,它是一个 Observable。

let subject = PublishSubject<String>()

let sub1 = subject.subscribe(onNext: { msg in
    print("###:\(msg)")
})

// 当我们把subject当作Observer(观察者)的时候,可以使用onNext方法给它发送事件:
subject.onNext("Episode1 update 1")
subject.onNext("Episode1 update 2")

当订阅者销毁之后不会再接受信息:

let subject = PublishSubject<String>()

let sub1 = subject.subscribe(onNext: { msg in
    print("###:\(msg)")
})

subject.onNext("Episode1 update 1")
subject.onNext("Episode1 update 2")
// 释放后不会收到
sub1.dispose()

//sub2不会接收到订阅之前的消息
let sub2 = subject.subscribe(onNext: { msg in
    print("###sub2:\(msg)") // ###sub2:Episode1 update 2
})
subject.onNext("Episode1 update 2")

2019091815688162845001.png

BehaviorSubject

BehaviorSubject和PublisherSubject唯一的区别,就是只要有人订阅,它就会向订阅者发送最新的一次事件作为“试用”。

20190918156881644760395.png

初始化的时候给定默认值,当之前没有信号时,收到默认信号。

let subject = BehaviorSubject<String>(value: "default value")
let sub1 = subject.subscribe { (msg) in
    print("sub1:\(msg)") //sub1:next(default value)
}

ReplaySubject

可以订阅历史消息,但是没有默认消息。可以指定缓冲大小buffeSize,

let subject = ReplaySubject<String>.create(bufferSize: 2)

20190918156881717922801.png

Variable

除了事件序列之外,在平时的编程中我们还经常需遇到一类场景,就是需要某个值是有“响应式”特性的。例如可以通过设置这个值来动态控制按钮是否禁用,是否显示某些内容等。为了方便这个操作,RxSwift还提供了一个特殊的subject,叫做Variable。

目前Variable已经标记为过期,被BehaviorRelay所替代。

let stringValue = Variable("Episode1")
// 使用时,需要先明确使用asObservable
_ = stringValue
    .asObservable()
    .subscribe {
        print($0)
    }
// 更新为新值
stringValue.value = "Episode2"

//中途添加另一个监听者
_ = stringValue
    .asObservable()
    .subscribe {
        print("22-\($0)")
    }
stringValue.value = "Episode3"

//log情况
next(Episode1)
next(Episode2)
22-next(Episode2)
next(Episode3)
22-next(Episode3)
completed
22-completed

需要注意:

  • Variable其实是对BehaviorRelay的封装,也需要一个默认的初始值进行初始化
    • 具有BehaviorReplay的特性,会向订阅者发送上一个event以及之后创建的event
  • 不同之处,会将当前发出的值保存为自己的状态。表达一个“响应式”值的语义。
    • 绝不会发生.error事件
    • 无需手动给它发送.complete事件表示完成

BehaviorRelay

  • BehaviorRelay是作为Variable的替代品出现。具有Variable的特性
  • 但是需要注意的是,即使销毁时也不会自动发送 .complete 的 event
  • 通过设置accept()方法进行值的修改
// variable已经被BehaviorRelay替代,需要import RxCoca
let stringValue = BehaviorRelay<String>(value: "Episode1-xx1")
// 使用时,需要先明确使用asObservable
_ = stringValue
    .asObservable()
    .subscribe {
        print($0)
}
// 设置新值
stringValue.accept("Episode2-xx2")

如果需要新值合并到旧值,可以通过accept()方法与value属性来实现。

let disposeBag = DisposeBag()
         
//创建一个初始值为包含一个元素的数组的BehaviorRelay
let subject = BehaviorRelay<[String]>(value: ["1"])
    
//修改value值
subject.accept(subject.value + ["2", "3"])
    
//第1次订阅
subject.asObservable().subscribe {
    print("第1次订阅:", $0)
    }.disposed(by: disposeBag)
    
//修改value值
subject.accept(subject.value + ["4", "5"])
    
//第2次订阅
subject.asObservable().subscribe {
    print("第2次订阅:", $0)
    }.disposed(by: disposeBag)
    
//修改value值
subject.accept(subject.value + ["6", "7"])

参考

RxSwift的使用详解7

泊学