余额不足.

GCD线程管理从入门到放弃

字数统计: 3.5k阅读时长: 16 min
2019/06/27 Share

源头

刚敲完一波业务代码,脑子里突然传来一个想法,对象一般能自动销毁,线程也同样也会被自动回收,那这个自动回收是怎么做到的呢?心动不如行动,先找GCD的源码libdispatch,这里用的是libdispatch-1008.200.78.tar.gz版本。

平时如果需要用到多线程,想必大家都是这样的一套基本操作:

1
2
3
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{

});

那这一条熟的不能再熟的代码里都藏着一系列什么样的操作?

likely()与unlikely()

因源码涉及了大量的likely()unlikely()操作,先简单说明一下这两个函数吧

1
2
3
4
5
#define likely(x)       __builtin_expect(!!(x), 1)
#define unlikely(x) __builtin_expect(!!(x), 0)

// 表示不期望foo()被调用,因为我们期望x为0
if (unlikely(x)) { foo() }

__builtin_expect的主要作用就是:帮助编译器判断条件跳转的预期值,避免因执行jmp跳转指令造成时间浪费,所以likely()用于判断条件”非常有可能发生“的情况,而unlikely()就是不太可能发生的情况。

dispatch_queue_global_t

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/*!
* @function dispatch_get_global_queue
*
* @abstract
* 该方法根据参数返回一个全局并发队列
*
* @param identifier
* 线程优先级参数
* - DISPATCH_QUEUE_PRIORITY_HIGH
* - DISPATCH_QUEUE_PRIORITY_DEFAULT
* - DISPATCH_QUEUE_PRIORITY_LOW
* - DISPATCH_QUEUE_PRIORITY_BACKGROUND
*
* @param flags
* 一个预留参数,暂时没啥用,传0之外可能会返回NULL
*
* @result
* 全局并发队列
*/
dispatch_queue_global_t
dispatch_get_global_queue(long identifier, unsigned long flags);

DISPATCH_DECL_SUBCLASS(dispatch_queue_global, dispatch_queue);

通过dispatch_get_global_queue()得到一个dispatch_queue_global_t结构体,该队列是是系统线程池的抽象,是一个特殊的队列,调用dispatch_suspend(), dispatch_resume(), dispatch_set_context()等方法时不会有任何效果。

从源码我们可以看出dispatch_queue_global是基于dispatch_queue的,而dispatch_queue就像一个刚出生的小孩,可以被往多个方向塑造,常见的有串行、并发队列,这些线程统一由系统维护的线程池调用。理论上一个dispatch队列拥有他的执行线程,因为队列间的操作是高度异步的。

1
2
3
4
* Dispatch queues are reference counted via calls to dispatch_retain() and
* dispatch_release(). Pending workitems submitted to a queue also hold a
* reference to the queue until they have finished. Once all references to a
* queue have been released, the queue will be deallocated by the system.

以上注释表明Dispatch queues也是使用引用计数来管理的,而且提交到队列的工作项同样会记入引用计数中,一旦所有的引用都被释放,队列也将被系统销毁。

dispatch_retain()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/*!
* @function dispatch_retain
*
* @abstract
* 为dipatch对象增加引用计数
*
*/
void
dispatch_retain(dispatch_object_t object);

void
dispatch_retain(dispatch_object_t dou)
{
DISPATCH_OBJECT_TFB(_dispatch_objc_retain, dou);
(void)_os_object_retain(dou._os_obj);
}

dispatch_retain()调用了_os_object_retain()

1
2
3
4
5
6
7
8
9
_os_object_t
_os_object_retain(_os_object_t obj)
{
int xref_cnt = _os_object_xrefcnt_inc_orig(obj);
if (unlikely(xref_cnt < 0)) {
_OS_OBJECT_CLIENT_CRASH("Resurrection of an object");
}
return obj;
}

_os_object_retain()内部实现也很简单,增加引用计数,当引用计数为小于0时就崩溃了,难道你要复活它吗??

dispatch_release()

相应的,有retain就会有release操作,还是要保持生态平衡的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*!
* @function dispatch_release
*
* @abstract
* 为dipatch对象减少引用计数
*
* @discussion
* 当所有引用都被清空时,dipatch对象将被异步释放,系统不保证给客户端的对象是最后或
* 唯一引用。
*
*/
void
dispatch_release(dispatch_object_t object);

void
dispatch_release(dispatch_object_t dou)
{
DISPATCH_OBJECT_TFB(_dispatch_objc_release, dou);
_os_object_release(dou._os_obj);
}

dispatch_release() 调用了_os_object_release()

1
2
3
4
5
6
7
8
9
10
11
12
void
_os_object_release(_os_object_t obj)
{
int xref_cnt = _os_object_xrefcnt_dec(obj);
if (likely(xref_cnt >= 0)) {
return;
}
if (unlikely(xref_cnt < -1)) {
_OS_OBJECT_CLIENT_CRASH("Over-release of an object");
}
return _os_object_xref_dispose(obj);
}

这里先将引用计数-1,然后进行两个条件判断,>=0时则不进行下面的操作了,<-1时也就crash了,值为-1时则进行_os_object_xref_dispose()操作。(为什么是-1???)

1
2
3
4
5
6
7
void
_os_object_xref_dispose(_os_object_t obj)
{
struct _os_object_s *o = (struct _os_object_s *)obj;
_os_object_xrefcnt_dispose_barrier(o);
[obj _xref_dispose];
}

_os_object_xrefcnt_dispose_barrier()看到一个熟悉的barrier,想必就是上面提到的异步销毁对象方法。

1
2
3
4
- (void)_xref_dispose {
_dispatch_queue_xref_dispose((struct dispatch_queue_s *)self);
[super _xref_dispose];
}

画风突变,咋就变成OC了???

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void
_dispatch_queue_xref_dispose(dispatch_queue_t dq)
{
uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
if (unlikely(_dq_state_is_suspended(dq_state))) {
long state = (long)dq_state;
if (sizeof(long) < sizeof(uint64_t)) state = (long)(dq_state >> 32);
if (unlikely(_dq_state_is_inactive(dq_state))) {
// Arguments for and against this assert are within 6705399
DISPATCH_CLIENT_CRASH(state, "Release of an inactive object");
}
DISPATCH_CLIENT_CRASH(dq_state, "Release of a suspended object");
}
os_atomic_or2o(dq, dq_atomic_flags, DQF_RELEASED, relaxed);
}

这段代码块验证当前队列是否处于活跃或者暂停状态,如果命中则crash,否则进入os_atomic_or2o()。
代码走到这就走不下去了,os_atomic_or2o()应该就是将队列标记为释放状态,到这release流程也就走完了。

dispatch_get_global_queue

ok,到这对retain和release的相关操作有了一定的了解了,回过头再来看看熟悉的dispatch_get_global_queue,这个特殊的queue是怎么被创造出来的?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
static inline dispatch_queue_global_t
_dispatch_get_root_queue(dispatch_qos_t qos, bool overcommit)
{
if (unlikely(qos < DISPATCH_QOS_MIN || qos > DISPATCH_QOS_MAX)) {
DISPATCH_CLIENT_CRASH(qos, "Corrupted priority");
}
return &_dispatch_root_queues[2 * (qos - 1) + overcommit];
}

dispatch_queue_global_t
dispatch_get_global_queue(long priority, unsigned long flags)
{
dispatch_assert(countof(_dispatch_root_queues) ==
DISPATCH_ROOT_QUEUE_COUNT);

if (flags & ~(unsigned long)DISPATCH_QUEUE_OVERCOMMIT) {
return DISPATCH_BAD_INPUT;
}
dispatch_qos_t qos = _dispatch_qos_from_queue_priority(priority);
#if !HAVE_PTHREAD_WORKQUEUE_QOS
if (qos == QOS_CLASS_MAINTENANCE) {
qos = DISPATCH_QOS_BACKGROUND;
} else if (qos == QOS_CLASS_USER_INTERACTIVE) {
qos = DISPATCH_QOS_USER_INITIATED;
}
#endif
if (qos == DISPATCH_QOS_UNSPECIFIED) {
return DISPATCH_BAD_INPUT;
}
return _dispatch_get_root_queue(qos, flags & DISPATCH_QUEUE_OVERCOMMIT);
}

一系列的异常条件判断,最后从线程池中取出可以overcommit的queue

dispatch_get_main_queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
dispatch_queue_main_t
dispatch_get_main_queue(void)
{
return DISPATCH_GLOBAL_OBJECT(dispatch_queue_main_t, _dispatch_main_q);
}

struct dispatch_queue_static_s _dispatch_main_q = {
DISPATCH_GLOBAL_OBJECT_HEADER(queue_main),
#if !DISPATCH_USE_RESOLVERS
.do_targetq = _dispatch_get_default_queue(true),
#endif
.dq_state = DISPATCH_QUEUE_STATE_INIT_VALUE(1) |
DISPATCH_QUEUE_ROLE_BASE_ANON,
.dq_label = "com.apple.main-thread",
// 注意这里的dq_width=1,即最大并发量为1
.dq_atomic_flags = DQF_THREAD_BOUND | DQF_WIDTH(1),
.dq_serialnum = 1,
};

#define _dispatch_get_default_queue(overcommit) \
_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS + \
!!(overcommit)]._as_dq

主队列同样也是从root_queue中取出的

dispatch_queue_create

对两个常见的系统队列有了一定了解之后我们再继续看看如何自建队列
代码很长,可直接绕过看分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
dispatch_queue_t
dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
{
return _dispatch_lane_create_with_target(label, attr,
DISPATCH_TARGET_QUEUE_DEFAULT, true);
}

static dispatch_queue_t
_dispatch_lane_create_with_target(const char *label, dispatch_queue_attr_t dqa,
dispatch_queue_t tq, bool legacy)
{
dispatch_queue_attr_info_t dqai = _dispatch_queue_attr_to_info(dqa);

//
// Step 1: Normalize arguments (qos, overcommit, tq)
//

dispatch_qos_t qos = dqai.dqai_qos;
#if !HAVE_PTHREAD_WORKQUEUE_QOS
if (qos == DISPATCH_QOS_USER_INTERACTIVE) {
dqai.dqai_qos = qos = DISPATCH_QOS_USER_INITIATED;
}
if (qos == DISPATCH_QOS_MAINTENANCE) {
dqai.dqai_qos = qos = DISPATCH_QOS_BACKGROUND;
}
#endif // !HAVE_PTHREAD_WORKQUEUE_QOS

_dispatch_queue_attr_overcommit_t overcommit = dqai.dqai_overcommit;
if (overcommit != _dispatch_queue_attr_overcommit_unspecified && tq) {
if (tq->do_targetq) {
DISPATCH_CLIENT_CRASH(tq, "Cannot specify both overcommit and "
"a non-global target queue");
}
}

if (tq && dx_type(tq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE) {
// Handle discrepancies between attr and target queue, attributes win
if (overcommit == _dispatch_queue_attr_overcommit_unspecified) {
if (tq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT) {
overcommit = _dispatch_queue_attr_overcommit_enabled;
} else {
overcommit = _dispatch_queue_attr_overcommit_disabled;
}
}
if (qos == DISPATCH_QOS_UNSPECIFIED) {
qos = _dispatch_priority_qos(tq->dq_priority);
}
tq = NULL;
} else if (tq && !tq->do_targetq) {
// target is a pthread or runloop root queue, setting QoS or overcommit
// is disallowed
if (overcommit != _dispatch_queue_attr_overcommit_unspecified) {
DISPATCH_CLIENT_CRASH(tq, "Cannot specify an overcommit attribute "
"and use this kind of target queue");
}
} else {
if (overcommit == _dispatch_queue_attr_overcommit_unspecified) {
// Serial queues default to overcommit!
overcommit = dqai.dqai_concurrent ?
_dispatch_queue_attr_overcommit_disabled :
_dispatch_queue_attr_overcommit_enabled;
}
}
if (!tq) {
tq = _dispatch_get_root_queue(
qos == DISPATCH_QOS_UNSPECIFIED ? DISPATCH_QOS_DEFAULT : qos,
overcommit == _dispatch_queue_attr_overcommit_enabled)->_as_dq;
if (unlikely(!tq)) {
DISPATCH_CLIENT_CRASH(qos, "Invalid queue attribute");
}
}

//
// Step 2: Initialize the queue
//

if (legacy) {
// if any of these attributes is specified, use non legacy classes
if (dqai.dqai_inactive || dqai.dqai_autorelease_frequency) {
legacy = false;
}
}

const void *vtable;
dispatch_queue_flags_t dqf = legacy ? DQF_MUTABLE : 0;
if (dqai.dqai_concurrent) {
vtable = DISPATCH_VTABLE(queue_concurrent);
} else {
vtable = DISPATCH_VTABLE(queue_serial);
}
switch (dqai.dqai_autorelease_frequency) {
case DISPATCH_AUTORELEASE_FREQUENCY_NEVER:
dqf |= DQF_AUTORELEASE_NEVER;
break;
case DISPATCH_AUTORELEASE_FREQUENCY_WORK_ITEM:
dqf |= DQF_AUTORELEASE_ALWAYS;
break;
}
if (label) {
const char *tmp = _dispatch_strdup_if_mutable(label);
if (tmp != label) {
dqf |= DQF_LABEL_NEEDS_FREE;
label = tmp;
}
}

dispatch_lane_t dq = _dispatch_object_alloc(vtable,
sizeof(struct dispatch_lane_s));
_dispatch_queue_init(dq, dqf, dqai.dqai_concurrent ?
DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
(dqai.dqai_inactive ? DISPATCH_QUEUE_INACTIVE : 0));

dq->dq_label = label;
dq->dq_priority = _dispatch_priority_make((dispatch_qos_t)dqai.dqai_qos,
dqai.dqai_relpri);
if (overcommit == _dispatch_queue_attr_overcommit_enabled) {
dq->dq_priority |= DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
}
if (!dqai.dqai_inactive) {
_dispatch_queue_priority_inherit_from_target(dq, tq);
_dispatch_lane_inherit_wlh_from_target(dq, tq);
}
_dispatch_retain(tq);
dq->do_targetq = tq;
_dispatch_object_debug(dq, "%s", __func__);
return _dispatch_trace_queue_create(dq)._dq;
}

第一步:判断qos,overcommit,tq是否合法(specify和overcommit这两个参数只有在全局队列生效,之后经过一系列的条件判断,返回一个队列,该队列可能是全局队列,也可能是自定义的队列)。

第二步:判断队列根据参数确定该队列是一个串行还是并发队列,然后确定队列是否需要autorelease,接着给队列打上label,然后分配内存,对队列进行初始化,最后将之前确定的参数一一塞进队列。

到这,我们对队列的创建想必都有一定的了解了,接着就应该去了解系统是怎么给队列派分任务的。

dispatch_sync()

1
2
void
dispatch_sync(dispatch_queue_t queue, DISPATCH_NOESCAPE dispatch_block_t block);

这里需要注意的是如果在当前队列调用dispatch_sync()把当前队列作为目标将会发生死锁,dispatch_sync()的调用也会遇到由于使用互斥锁导致的死锁问题,如果对dispatch_sync()操作的不是很熟练的话建议使用dispatch_async()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
void
dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
{
uintptr_t dc_flags = DC_FLAG_BLOCK;
if (unlikely(_dispatch_block_has_private_data(work))) {
return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
}
_dispatch_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
}

static void
_dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
uintptr_t dc_flags)
{
_dispatch_sync_f_inline(dq, ctxt, func, dc_flags);
}

static inline void
_dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
if (likely(dq->dq_width == 1)) {
return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
}

if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
}

dispatch_lane_t dl = upcast(dq)._dl;
// Global concurrent queues and queues bound to non-dispatch threads
// always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);
}

if (unlikely(dq->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
}
_dispatch_introspection_sync_begin(dl);
_dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
_dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));
}

经过一系列调用后来到_dispatch_sync_f_inline(),根据main_queue的特性我们可以知道串行队列的dq_width=1,所以申请调用了_dispatch_barrier_sync_f()栅栏函数来进行同步串行操作。_dispatch_barrier_sync_f()调用_dispatch_barrier_sync_f_inline()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

static inline void
_dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
dispatch_tid tid = _dispatch_tid_self();

if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
}

dispatch_lane_t dl = upcast(dq)._dl;
// The more correct thing to do would be to merge the qos of the thread
// that just acquired the barrier lock into the queue state.
//
// However this is too expensive for the fast path, so skip doing it.
// The chosen tradeoff is that if an enqueue on a lower priority thread
// contends with this fast path, this thread may receive a useless override.
//
// Global concurrent queues and queues bound to non-dispatch threads
// always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) {
return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,
DC_FLAG_BARRIER | dc_flags);
}

if (unlikely(dl->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dl, ctxt, func,
DC_FLAG_BARRIER | dc_flags);
}
_dispatch_introspection_sync_begin(dl);
_dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));
}

代码段中注释的大概意思是:正确的方法应该是将获得barrier锁的线程的qos合并,但是这样的操作的代价过高,入队后低优先级的任务会收到无用的覆盖,全局并发队列和普通队列总会走进_dispatch_sync_f_slow(),也就是说dispatch_sync()环境下所有的队列都会进入慢路径的任务派发。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
static void
_dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt,
dispatch_function_t func, uintptr_t top_dc_flags,
dispatch_queue_class_t dqu, uintptr_t dc_flags)
{
// 获取queue
dispatch_queue_t top_dq = top_dqu._dq;
dispatch_queue_t dq = dqu._dq;
if (unlikely(!dq->do_targetq)) {
return _dispatch_sync_function_invoke(dq, ctxt, func);
}

// 获取队列优先级
pthread_priority_t pp = _dispatch_get_priority();
// 构造上下文
struct dispatch_sync_context_s dsc = {
.dc_flags = DC_FLAG_SYNC_WAITER | dc_flags,
.dc_func = _dispatch_async_and_wait_invoke,
.dc_ctxt = &dsc,
.dc_other = top_dq,
.dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
.dc_voucher = _voucher_get(),
.dsc_func = func,
.dsc_ctxt = ctxt,
.dsc_waiter = _dispatch_tid_self(),
};

_dispatch_trace_item_push(top_dq, &dsc);
// 传入sync上下文和当前队列,确认上一个任务是否执行结束,当前队列是否需要进入等待
__DISPATCH_WAIT_FOR_QUEUE__(&dsc, dq);

// 如果当前队列的方法块是空的,则向下执行下一队列
if (dsc.dsc_func == NULL) {
dispatch_queue_t stop_dq = dsc.dc_other;
return _dispatch_sync_complete_recurse(top_dq, stop_dq, top_dc_flags);
}

_dispatch_introspection_sync_begin(top_dq);
_dispatch_trace_item_pop(top_dq, &dsc);
_dispatch_sync_invoke_and_complete_recurse(top_dq, ctxt, func,top_dc_flags
DISPATCH_TRACE_ARG(&dsc));
}

最后的_dispatch_sync_invoke_and_complete_recurse(),就是执行block内部方法的地方,如果没有下一个任务队列的话到这dispatch_sync()的流程就结束了(终于熬到结束了orz!)。

dispatch_async()

分析完dispatch_sync()之后就轮到dispatch_async()了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void
dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
{
dispatch_continuation_t dc = _dispatch_continuation_alloc();
uintptr_t dc_flags = DC_FLAG_CONSUME;
dispatch_qos_t qos;

qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
_dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}

static inline void
_dispatch_continuation_async(dispatch_queue_class_t dqu,
dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
{
#if DISPATCH_INTROSPECTION
if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
_dispatch_trace_item_push(dqu, dc);
}
#else
(void)dc_flags;
#endif
return dx_push(dqu._dq, dc, qos);
}

到了dx_push(),线索断了,妈耶!源码只提供了一个

1
#define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z)

那就顺着dq_push()走下去吧

1
2
void (*const dq_push)(dispatch_queue_class_t, dispatch_object_t, \
dispatch_qos_t)

顺藤摸瓜找到了_dispatch_root_queue_push()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
void
_dispatch_root_queue_push(dispatch_queue_global_t rq, dispatch_object_t dou,
dispatch_qos_t qos)
{
#if DISPATCH_USE_KEVENT_WORKQUEUE
dispatch_deferred_items_t ddi = _dispatch_deferred_items_get();
if (unlikely(ddi && ddi->ddi_can_stash)) {
dispatch_object_t old_dou = ddi->ddi_stashed_dou;
dispatch_priority_t rq_overcommit;
rq_overcommit = rq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;

if (likely(!old_dou._do || rq_overcommit)) {
dispatch_queue_global_t old_rq = ddi->ddi_stashed_rq;
dispatch_qos_t old_qos = ddi->ddi_stashed_qos;
ddi->ddi_stashed_rq = rq;
ddi->ddi_stashed_dou = dou;
ddi->ddi_stashed_qos = qos;
_dispatch_debug("deferring item %p, rq %p, qos %d",
dou._do, rq, qos);
if (rq_overcommit) {
ddi->ddi_can_stash = false;
}
if (likely(!old_dou._do)) {
return;
}
// push the previously stashed item
qos = old_qos;
rq = old_rq;
dou = old_dou;
}
}
#endif
#if HAVE_PTHREAD_WORKQUEUE_QOS
if (_dispatch_root_queue_push_needs_override(rq, qos)) {
return _dispatch_root_queue_push_override(rq, dou, qos);
}
#else
(void)qos;
#endif
_dispatch_root_queue_push_inline(rq, dou, dou, 1);
}

看参数应该是全局并发队列的入队方法,不期望发生的场景就直接忽略了,最后调度执行_dispatch_root_queue_push_inline()

1
2
3
4
5
6
7
8
9
static inline void
_dispatch_root_queue_push_inline(dispatch_queue_global_t dq,
dispatch_object_t _head, dispatch_object_t _tail, int n)
{
struct dispatch_object_s *hd = _head._do, *tl = _tail._do;
if (unlikely(os_mpsc_push_list(os_mpsc(dq, dq_items), hd, tl, do_next))) {
return _dispatch_root_queue_poke(dq, n, 0);
}
}

这里居然出现了unlikely()

1
2
3
4
5
// Returns true when the queue was empty and the head must be set
#define os_mpsc_push_item(Q, tail, _o_next) ({ \
os_mpsc_node_type(Q) _tail = (tail); \
os_mpsc_push_list(Q, _tail, _tail, _o_next); \
})

源码注释告知我们只有队首为空的情况才会为true,所以只有第一个进入的任务才会执行_dispatch_root_queue_poke(),所以才会用unlikely()

_dispatch_root_queue_poke()经过一系列的条件判断后调用_dispatch_root_queue_poke_slow()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
static void
_dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
{
int remaining = n;
int r = ENOSYS;

_dispatch_root_queues_init();
_dispatch_debug_root_queue(dq, __func__);
_dispatch_trace_runtime_event(worker_request, dq, (uint64_t)n);

#if !DISPATCH_USE_INTERNAL_WORKQUEUE
#if DISPATCH_USE_PTHREAD_ROOT_QUEUES
if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE)
#endif
{
_dispatch_root_queue_debug("requesting new worker thread for global "
"queue: %p", dq);
r = _pthread_workqueue_addthreads(remaining,
_dispatch_priority_to_pp_prefer_fallback(dq->dq_priority));
(void)dispatch_assume_zero(r);
return;
}
#endif // !DISPATCH_USE_INTERNAL_WORKQUEUE

#if DISPATCH_USE_PTHREAD_POOL
...,
#endif // DISPATCH_USE_PTHREAD_POOL
}

这里我们主要看DISPATCH_USE_PTHREAD_ROOT_QUEUES的实现方式,DISPATCH_USE_INTERNAL_WORKQUEUE相关代码就忽略了。
继续调用_pthread_workqueue_addthreads(),然而到这线索又断了,给跪了orz!
思来想去,发现这是个pthread的方法,找到darwin-libpthread-330.220.2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
int
_pthread_workqueue_addthreads(int numthreads, pthread_priority_t priority)
{
int res = 0;

if (__libdispatch_workerfunction == NULL) {
return EPERM;
}

#if TARGET_OS_OSX
// <rdar://problem/37687655> Legacy simulators fail to boot
//
// Older sims set the deprecated _PTHREAD_PRIORITY_ROOTQUEUE_FLAG wrongly,
// which is aliased to _PTHREAD_PRIORITY_SCHED_PRI_FLAG and that XNU
// validates and rejects.
//
// As a workaround, forcefully unset this bit that cannot be set here
// anyway.
priority &= ~_PTHREAD_PRIORITY_SCHED_PRI_FLAG;
#endif

res = __workq_kernreturn(WQOPS_QUEUE_REQTHREADS, NULL, numthreads, (int)priority);
if (res == -1) {
res = errno;
}
return res;
}

通过该方法将任务托付给了内核,到了这dispatch_async()的工作也就结束了。但是你真的以为就这样结束了吗?

ok,进入内核阶段,首要任务,把程序搞挂!!随便搞个数组越界之类的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
reason: '*** -[__NSSingleObjectArrayI objectAtIndex:]: index 2 beyond bounds [0 .. 0]'
*** First throw call stack:
(
0 CoreFoundation 0x00000001029936fb __exceptionPreprocess + 331
1 libobjc.A.dylib 0x0000000101f37ac5 objc_exception_throw + 48
2 CoreFoundation 0x00000001028e31cf -[__NSSingleObjectArrayI objectAtIndex:] + 111
3 YTPerformance_Example 0x000000010160eeec __33-[YTAppFluencyMonitor commonInit]_block_invoke + 124
4 libdispatch.dylib 0x000000010579bd7f _dispatch_call_block_and_release + 12
5 libdispatch.dylib 0x000000010579cdb5 _dispatch_client_callout + 8
6 libdispatch.dylib 0x000000010579f7b9 _dispatch_queue_override_invoke + 1022
7 libdispatch.dylib 0x00000001057ad632 _dispatch_root_queue_drain + 351
8 libdispatch.dylib 0x00000001057adfca _dispatch_worker_thread2 + 130
9 libsystem_pthread.dylib 0x0000000105b856b3 _pthread_wqthread + 583
10 libsystem_pthread.dylib 0x0000000105b853fd start_wqthread + 13
)

这样就能清晰的看见调用链了

1
start_wqthread -> _pthread_wqthread ->_dispatch_worker_thread2 ->_dispatch_root_queue_drain ->_dispatch_queue_override_invoke ->_dispatch_client_callout -> _dispatch_call_block_and_release

后续在GCD线程管理(内核相关)

CATALOG
  1. 1. 源头
  2. 2. likely()与unlikely()
  3. 3. dispatch_queue_global_t
  4. 4. dispatch_retain()
  5. 5. dispatch_release()
  6. 6. dispatch_get_global_queue
  7. 7. dispatch_get_main_queue
  8. 8. dispatch_queue_create
  9. 9. dispatch_sync()
  10. 10. dispatch_async()