iOS 并发,锁,线程同步【二】Operation

在之前的一篇文章中【iOS 并发,锁,线程同步【一】GCD】,我们讨论了一下 GCD 的并发,锁和线程同步的问题,今天,我们来讨论一下 Operation 的并发与线程同步。

Operation 中,我们一般是将所有的 Operation 添加到 OperationQueue 中进行执行,这里需要注意一点,**Operation 添加到队列当中,默认就是执行的并发操作。我们可以设置队列的最大并发数 maxConcurrentOperationCount。如果我们在 OperationQueue 中想要执行串行任务的话,很简单,将 maxConcurrentOperationCount 设置成为1即可。 maxConcurrentOperationCount 的默认值为-1,那么默认情况下的并发数是多少呢?这个是由系统内存和 CPU 决定的,可能内存多久开多一点,内存少就开少一点。**最大并发数建议 2~3,如果并发数太多会导致 UI 卡顿。

不添加到队列当中的 Operation,我们可以调用 start() 方法开始一个操作,也可以调用 cancel() 取消等待中的操作,注意:已经开始执行的操作是没法取消的。代码示例如下:

1
2
3
4
5
let opt = BlockOperation {
print("Operation")
}
opt.start() // 开始执行任务
opt.cancel() // 取消等待中的任务

如果我们需要进行线程同步该怎么做?GCD 中我们可以用 DispatchGroup,在 Operation 中我们可以用一个 addDependency() 的方法。这个方法意味着,某个任务的执行,依赖着其他任务执行完成后才回去执行。代码我们可以这么写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
let queue = OperationQueue()
queue.maxConcurrentOperationCount = 3
let a = BlockOperation {
sleep(2)
print("a")
}
let b = BlockOperation {
print("b")
}
let c = BlockOperation {
print("c")
}

c.addDependency(a)
c.addDependency(b)

queue.addOperation(c)
queue.addOperation(a)
queue.addOperation(b)

这里的 c 操作,需要等到 a, b 完成之后才会执行。运行结果如下:

1
2
3
b
a
c

OK,接下来我们来一点在并发中进行数据写入的操作,代码作如下修改:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
let queue = OperationQueue()
queue.maxConcurrentOperationCount = 3
var ary: [Int] = []
let a = BlockOperation {
for item in 0..<5 {
ary.append(item)
}
}
let b = BlockOperation {
for item in 5..<10 {
ary.append(item)
}
}
let c = BlockOperation {
print(ary.sorted())
}

c.addDependency(a)
c.addDependency(b)

queue.addOperation(c)
queue.addOperation(a)
queue.addOperation(b)

运行结果:

1
[0, 2, 3, 4, 6, 8, 9]

很明显,运行结果是错的。并发当中对同一个数据源进行写的操作时,一定要注意加锁。具体可以看我的上一遍文章:iOS 并发,锁,线程同步【一】GCD,这里我就不在啰嗦。

接下来,我们做一点代码优化,如果我想要实现 n 个任务,每个任务都是向数组中添加数字,每个任务的循环范围按照 05,510,10~15 这样的规律,最后我们输出 ary 中的值。

很明显,向上面的写法太过笨拙。那么我们进行一个函数的抽象,我们先来写一个产生 task 的函数。

首先我们先来定义一个 task:

1
typealias task = () -> ()

接下来,我们需要将产生的 task 添加到数组中,这里需要充分利用函数式编程的优点,方法看起来是这样:

1
2
3
4
5
6
7
8
9
10
11
func makeTask(taskCount: Int, opt: @escaping (_ currentIdx: Int) -> task) -> [task] {
// 如果任务数为0,就返回空数组
guard taskCount > 0 else {
return []
}
var tasks: [task] = []
for idx in 0..<taskCount {
tasks.append(opt(idx))
}
return tasks
}

第一个参数 taskCount 是最大任务数,第二个参数 opt 是执行的任务,是一个闭包,因为任务会存到一个数组中,供后面的方法使用,所以这个闭包是可逃逸的。我们在执行闭包的时候,需要传入一个参数,这个参数表示了当前生成的是第几个 task,返回值也是个闭包,也就是我们最终要执行的 task。OK,经过函数化,我们就可以产生任意数量、任意操作的 task 了。

接下来我们来处理一下并发的方法,它看起来是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func concurrent(tasks: [task], complationHandle: @escaping () -> ()) {
let sema = DispatchSemaphore(value: 1)
let queue = OperationQueue()
queue.maxConcurrentOperationCount = 3
let result = BlockOperation {
complationHandle()
}
tasks.forEach { t in
let o = BlockOperation {
sema.wait()
t()
sema.signal()
}
result.addDependency(o)
queue.addOperation(o)
}
queue.addOperation(result)
}

原理也很简单,我们在一个 forEach 当中设置好 Operation 的 task 与依赖关系。这里用了信号量锁,来保证数据的正确性。最后我们在 complationHandle 这个闭包中处理同步后的数据。

我们来使用一下,感受一下函数式的灵活、强大之处:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
var ary: [Int] = []

let tasks = makeTask(taskCount: 20) { idx in
return {
print("task idx: \(idx)")
for item in (idx * 5)..<(idx * 5 + 5) {
ary.append(item)
}
}
}

concurrent(tasks: tasks) {
print(ary.sorted())
}

输出结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
task idx: 0
task idx: 1
task idx: 2
task idx: 3
task idx: 5
task idx: 6
task idx: 4
task idx: 7
task idx: 9
task idx: 10
task idx: 11
task idx: 8
task idx: 13
task idx: 12
task idx: 14
task idx: 15
task idx: 18
task idx: 16
task idx: 17
task idx: 19
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]

调用还是有点麻烦?没关系我们可以将两个方法合成一个,我们把 makeTaskconcurrent 方法设置成为私有(private),接下来写一个开放接口方法:

1
2
3
4
public func tasksToConcurrent(taskCount: Int, opt: @escaping (_ currentIdx: Int) -> task, complationHandle: @escaping () -> ()) {
let tasks = makeTask(taskCount: 20) { idx in return opt(idx) }
concurrent(tasks: tasks, complationHandle: complationHandle)
}

我们就可以这样来调用:

1
2
3
4
5
6
7
8
9
10
tasksToConcurrent(taskCount: 20, opt: { idx in
return {
print("task idx: \(idx)")
for item in (idx * 5)..<(idx * 5 + 5) {
ary.append(item)
}
}
}) {
print(ary.sorted())
}

感觉比刚开始的写法简洁了不少!!!