observeOn vs. subscribeOn【译】

跨线程订阅的问题似乎在 RxSwift Slack 上一次又一次的被提到。这个解释起来也是非常的简单所以我觉得放在博客当中是一个好主意,无论你在何时需要,都可以通过连接进行访问,我也不用一次又一次的回复。

可观察序列的订阅(Observable subscriptions)


subscribing(订阅) 和 observing(观察) 方面的术语还有一点混乱,所以让我们先来解决这个问题(不要跳过本章节!)。

让我们看看 observable subscription 的工作原理。我们可以将订阅分成 3 个部分:

  1. 首先,你定义了一个 Observable,在某些情况下,你在闭包中提供一些代码来执行工作并向任何观察者(observers)发出元素。当你创建了一个可观察序列的时候,代码将会被保存以供将来使用,但是不会立即执行。如果没有观察者 - Observable 只是坐着等待并不做任何事情。
  2. 在为订阅建模时,你可以使用一些运算符(如 mapfilter 等)来处理发出的元素。
  3. 只有当你在一个 Observable 上调用 subscribe(...) 方法时,你才能“打开它”。调用 subscribe(...) 实际上将执行你在第1部分(上面)的块中提供的代码。

所以从这个意义上说,这里有两个要点:

  1. subscription code 是从你的 subscribe() 中调用并且位于 Observable.create { ... } 中的代码。这是创建订阅并且生成元素的代码。
  2. observation code 是你观察元素的地方 - 这是你在 onNext: { ... }onCompleted: {...} 等提供的代码。这是你进行观察的地方。

调度程序(Schedulers)


RxSwift 附带了许多预定义的调度程序,可以在大多数情况下满足你的所有需求。这个主题有一个简短的文档:schedulers at RxSwift repo.

在这片文章中,我们将使用两个调度程序:

  • MainScheduler.instance 工作于主线程上。
  • ConcurrentDispatchQueueScheduler 它使用 GCD 在给定队列上执行工作。

Subscribing and subscribeOn


接下来让我们来看一下 subscribeOn 操作 - 它允许你更改将在其上执行订阅代码(subscription code)的调度程序。

默认情况下,订阅代码将在与调用 subscribe() 的代码相同的线程上运行,除非你使用 subscribeOn(...) 更改上下文。

例如:

1
2
3
4
5
6
7
8
9
Observable<Int>.create { observer in
observer.onNext(1)
sleep(1)
observer.onNext(2)
return Disposables.create()
}
.subscribe(onNext: { el in
print(Thread.isMainThread)
})

如果将此代码放在 viewDidLoad 中,则会因为在订阅代码中使用了 sleep 而阻塞主线程。

你的 onNext 中的代码将会输出 true,因为它一直在主线程上运行。

1
[main] subscribe() -> [main] create{ ... } -> [main] onNext { ... }

现在你可以通过插入 subscribeOn 来更改你订阅的调度程序:

1
2
3
4
5
6
7
8
9
10
Observable<Int>.create { observer in
observer.onNext(1)
sleep(1)
observer.onNext(2)
return Disposables.create()
}
.subscribeOn(ConcurrentDispatchQueueScheduler(qos: .background))
.subscribe(onNext: { el in
print(Thread.isMainThread)
})

这次你将在订阅时切换线程:

1
[main] subscribe() -> [background] create{ ... } -> [background] onNext { ... }

onNext 将会输出 false

Observing and observeOn


现在让我们来观察序列的元素。这部分涉及你的观察代码。

在我们之前的例子中,你将订阅切换到了后台,因为它执行了一些阻塞线程的操作。但你实际上想要的是在主线程上运行 onNext {..} 中的代码,以便更新应用程序的 UI。

这可以通过使用 observeOn 来实现。顺便说一句,你可以将 observeOnsubscribeOn 放在响应链的任何地方 - 顺序并不重要。

RxSwift 包含一个使用主线程 MainScheduler.instance 的共享调度程序,因此你可以使用它来轻松地观察元素:

1
2
3
4
5
6
7
8
9
10
11
Observable<Int>.create { observer in
observer.onNext(1)
sleep(1)
observer.onNext(2)
return Disposables.create()
}
.observeOn(MainScheduler.instance)
.subscribeOn(ConcurrentDispatchQueueScheduler(qos: .background))
.subscribe(onNext: { el in
print(Thread.isMainThread)
})

这将执行如下代码:

1
[main] subscribe() -> [background] create{ ... } -> [main] onNext { ... }

如果你做了很多异步工作,那么这是一种你经常会使用的模式,,所以早点习惯它 - 最好。

我希望这篇简短的文章能够清楚地说明命名和用法。

原文地址:observeOn vs. subscribeOn