在正确的线程上使用Observable【译】

尽管很多人了解 RxJava 的基本逻辑,但是在 Observable 链和操作符究竟运行在哪个线程,仍然会有许多困惑。

首先,让我们梳理清晰,在 RxJava 中 .subsribeOn().observeOn() 区别:

  1. .subsribeOn() 操作符可以改变 Observable 应该在哪个调度器上执行任务。
  2. .observeOn() 操作符可以改变 Observable 将在哪个调度器上发送通知。
  3. 另外,你需要知道,默认情况下,链上的操作符将会在调用 .subsribeOn() 的那个线程上执行任务。

一些例子


主线程或者 .subscribe() 所在线程

如果在 Android 的 Activity下onCreate() 方法中,也就是主线程中使用如下代码:

1
2
Observable.just(1,2,3)
.subscribe( );

表现会像这样:

调用 .subscribeOn()

尽管代码片段在主线程中,但是整个代码块将运行在 .subscribeOn() 定义的线程上:

1
2
3
Observable.just(1,2,3)
.subscribeOn(Schedulers.newThread())
.subscribe();

表现会像这样:

调用 .observeOn()

如果你的代码片段在主线程中,默认情况下Observable的创建是在 .subscribeOn() 定义的线程上,但是,调用 .observeOn() 之后,余下的代码将会执行在 .observeOn() 所定义的线程上:

1
2
3
Observable.just(1,2,3)
.observeOn(Schedulers.newThread())
.subscribe();

合并逻辑

照理合并操作符,放在一起就像这样:

1
2
3
4
Observable.just(1,2,3)
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.subscribe();

一些技巧


UI 线程运行异常

1
2
3
Observable.just(1,2,3)
.subscribeOn(Schedulers.newThread())
.subscribe(/** 与UI线程相关的逻辑 **//);

很明显,这是错的。

保证逻辑运行在工作线程中

如果存在以下代码片段:

1
2
3
4
5
6
7
Observable.just(1,2,3)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())

.flatMap(/** 与UI线程无关的逻辑**//)

.subscribe();

请用以下代码替代:

1
2
3
4
5
6
7
Observable.just(1,2,3)
.subscribeOn(Schedulers.newThread())

.flatMap(/** 与UI线程无关的逻辑**//)

.observeOn(AndroidSchedulers.mainThread())
.subscribe();

通过用第二段代码代替第一段,.flatMap() 操作符(或者在这一点的其他逻辑操作符)将运行在后台线程。这样做就不会阻塞 UI 线程,同时可以防患 ANR 或其他类似问题的发生。看起来有点像 AsyncTask 模式,尽可能的把逻辑放在的 .doInBackground() 中,而不是 .onPostExecute()

取决于更早的 .subscribeOn()

以下代码:

1
2
3
4
Observable.just(1,2,3)
.subscribeOn(thread1)
.subscribeOn(thread2)
.subscribe();

因为 thread1 的逻辑将会覆盖 thread2,所以 Observable 的创建和 .subscribe() 的逻辑处理都将运行在 thread1 中。因此,根本没有必要写多个 .subscribeOn() 操作符。

原文链接:

Observe on the correct thread