简述

所谓原子操作(atomic),就是不可分割的操作,在其操作所属期间,不会因为线程调度而被打断。

单处理器单核系统

在单处理器系统上,如果操作是在单个CPU指令中实现的,则该操作始终是原子的。

多处理器或多核系统

在多处理器系统上,确保原子性存在一点困难。要达到原子操作,就需要进行相应的处理。比如我们经常听到的自旋锁互斥锁信号量等线程同步方案。

本文旨在讲解 内存管理 - 引用计数中涉及到的原子操作,别有一番风味,请慢慢享用~

iOS 引用计数关键源码

objc_object源码中,关于内存应用计数相关的方法,通过一个宏判断实现了两套逻辑,代码如下:

1
2
3
4
5
6
7
8
9
#if SUPPORT_NONPOINTER_ISA

......

#else

......

#endif

objc_object 相关方法列表

我们这里只分析SUPPORT_NONPOINTER_ISA下的objc_object::rootRetain,具体源码如下:

  • 调用retain,其实会来到这里,不做过多解释;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
ALWAYS_INLINE id 
objc_object::rootRetain(bool tryRetain, bool handleOverflow)
{
if (isTaggedPointer()) return (id)this;

bool sideTableLocked = false;
bool transcribeToSideTable = false;

isa_t oldisa;
isa_t newisa;

// ❓❓❓问题点1:这里怎么会有一个 do while 循环❓❓❓
do {
transcribeToSideTable = false;
oldisa = LoadExclusive(&isa.bits);
newisa = oldisa;
// ① 非nonpointer
if (slowpath(!newisa.nonpointer)) {
ClearExclusive(&isa.bits);
if (!tryRetain && sideTableLocked) sidetable_unlock();
if (tryRetain) return sidetable_tryRetain() ? (id)this : nil;
else return sidetable_retain();
}

// ② nonpointer (重点分析)
// don't check newisa.fast_rr; we already called any RR overrides
if (slowpath(tryRetain && newisa.deallocating)) {
ClearExclusive(&isa.bits);
if (!tryRetain && sideTableLocked) sidetable_unlock();
return nil;
}
uintptr_t carry;
// 进行 引用计数加一 操作
// ❓❓❓问题点2:这里加一操作为什么没有上述lock、unlock之类的加锁操作❓❓❓
newisa.bits = addc(newisa.bits, RC_ONE, 0, &carry); // extra_rc++

if (slowpath(carry)) {
// 这里其实是 newisa.extra_rc++ 溢出的逻辑(不是本文分析重点)
// newisa.extra_rc++ overflowed
if (!handleOverflow) {
ClearExclusive(&isa.bits);
return rootRetain_overflow(tryRetain);
}
// Leave half of the retain counts inline and
// prepare to copy the other half to the side table.
if (!tryRetain && !sideTableLocked) sidetable_lock();
sideTableLocked = true;
transcribeToSideTable = true;
newisa.extra_rc = RC_HALF;
newisa.has_sidetable_rc = true;
}
} while (slowpath(!StoreExclusive(&isa.bits, oldisa.bits, newisa.bits)));

if (slowpath(transcribeToSideTable)) {
// Copy the other half of the retain counts to the side table.
sidetable_addExtraRC_nolock(RC_HALF);
}

if (slowpath(!tryRetain && sideTableLocked)) sidetable_unlock();
return (id)this;
}

源码中发现的问题

  • 问题①:为什么要加一个 do while 循环?

  • 问题②:newisa.bits = addc(newisa.bits, RC_ONE, 0, &carry);为什么不需要加锁等相关操作,那么该操作如何保证线程同步的呢?

  • 一个猜想: 莫非这个addc加一操作是通过 do while 来实现原子操作的?

简化源码

  • 精简掉 关于 SideTable 存储引用计数的部分及 isa.extra_rc++ 溢出后的处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
ALWAYS_INLINE id 
objc_object::rootRetain(bool tryRetain, bool handleOverflow)
{
isa_t oldisa;
isa_t newisa;

// ❓❓❓问题点1:这里怎么会有一个 do while 循环❓❓❓
do {

oldisa = LoadExclusive(&isa.bits);
newisa = oldisa;

// ❓❓❓问题点2:这里加一操作为什么没有上述lock、unlock之类的加锁操作❓❓❓
// 进行 引用计数加一 操作
newisa.bits = addc(newisa.bits, RC_ONE, 0, &carry); // extra_rc++

} while (slowpath(!StoreExclusive(&isa.bits, oldisa.bits, newisa.bits)));

return (id)this;
}

slowpath 和 fastpath

1
2
#define fastpath(x) (__builtin_expect(bool(x), 1))
#define slowpath(x) (__builtin_expect(bool(x), 0))
  • 这个指令是gcc引入的,我们可以用__builtin_expect来向编译器提供分支预测信息;
  • __builtin_expect(long exp, long c)代表的意思是:预测 exp === c;

由此我们可以知晓,上述源码中:

  • !StoreExclusive(&isa.bits, oldisa.bits, newisa.bits)结果为0 的可能性更大;
  • 也就是说这个do while 执行循环的可能性会比较小(不然是何其的消耗性能,这只是从代码层面上面的分析,这里提出,只是为了避免被这个do while纸老虎给哄住。我们继续);

我们从 do while 中的条件:while (slowpath(!StoreExclusive(&isa.bits, oldisa.bits, newisa.bits))); 入手分析

重点来了~

LoadExclusive 和 StoreExclusive

  • Exclusive: 独有的;排外的;专一的;

在源码中StoreExclusive对于不同的处理器架构有三种不同的实现:

  • __arm64__
  • __arm__
  • __x86_64__ || __i386__

LoadExclusive & StoreExclusive

我们先从 arm64 开始分析

arm64

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// Pointer-size register prefix for inline asm
# if __LP64__
# define p "x" // true arm64
# else
# define p "w" // arm64_32
# endif

static ALWAYS_INLINE uintptr_t
LoadExclusive(uintptr_t *src)
{
uintptr_t result;
asm("ldxr %" p "0, [%x1]"
: "=r" (result)
: "r" (src), "m" (*src));
return result;
}

static ALWAYS_INLINE bool
StoreExclusive(uintptr_t *dst, uintptr_t oldvalue __unused, uintptr_t value)
{
uint32_t result;
asm("stxr %w0, %" p "2, [%x3]"
: "=&r" (result), "=m" (*dst)
: "r" (value), "r" (dst));
return !result;

/*
ldaxr w1, [x0] add
w1, w1, #0x1 stlxr
w2, w1, [x0]
cbnz w2, top
*/
}

  • 如果不清楚__LP64__的可以翻看我之前的一片文章;
  • 内部实现居然是汇编,我们将重要的指令提出进行分析:ldxrstxr 指令;
  • ldxrLoadExclusive 的重要内部实现
  • stxrStoreExclusive 的重要内部实现

汇编指令: ldxr 和 stxr

  • 这是ARM中的原子操作指令,是不是发现了新大陆。

ldxr 指令

  • Load exclusive register

  • ldxr - ARM 官方文档

    1
    2
    LDXR  Wt, [Xn|SP{,#0}]    ; 32-bit general registers
    LDXR Xt, [Xn|SP{,#0}] ; 64-bit general registers
  • Wt:32位的通用寄存器名称,范围:0 ~ 31

  • Xt:64位的通用寄存器名称,范围:0 ~ 31

  • Xn|SP:64位的通用基址寄存器或堆栈指针,范围:0 ~ 31

  • 说明:32位/64位 代表寄存器的 容量范围 可以理解为寄存器的 编号

stxr 指令

  • Store exclusive register, returning status.

  • stxr - ARM 官方文档

    1
    2
    STXR  Ws, Wt, [Xn|SP{,#0}]    ; 32-bit general registers
    STXR Ws, Xt, [Xn|SP{,#0}] ; 64-bit general registers
  • Wt:32位的通用寄存器名称,范围:0 ~ 31

  • Xt:64位的通用寄存器名称,范围:0 ~ 31

  • Ws:32位的通用寄存器名称,存储 exclusive 的状态

  • Xn|SP:64位的通用基址寄存器或堆栈指针,范围:0 ~ 31

  • 说明:32位/64位 代表寄存器的 容量范围 可以理解为寄存器的 编号

关于Exclusive accesses更多内容可以参考ARM开发者文档中对Exclusive accesses 的介绍

x86_64 || i386

1
2
3
4
5
6
7
8
9
10
11
static ALWAYS_INLINE uintptr_t 
LoadExclusive(uintptr_t *src)
{
return *src;
}

static ALWAYS_INLINE bool
StoreExclusive(uintptr_t *dst, uintptr_t oldvalue, uintptr_t value)
{
return __sync_bool_compare_and_swap((void **)dst, (void *)oldvalue, (void *)value);
}

__sync_bool_compare_and_swap 内置函数

  • Compare And Swap,简称CAS;
  • 简单来说就是,在写入新值之前, 先根据内存地址读出此刻内存真实值,然后与此刻操作期望值进行比较,当且仅当此刻内存真实值此刻操作期望值一致时,才将此刻操作期望值写入,并返回true
  • 可能有点绕,联想着多线程的数据竞争慢慢体会;
    • 比如:此刻期望将数据修改为10,但是由于多线程的缘故,此刻内存中的真实值并不一定为10,如果不为10,就不执行写入操作;

do while 循环的作用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 简化代码
objc_object::rootRetain()
{
isa_t oldisa; // 用于存储此刻内存中的真实值
isa_t newisa; // 用于存储此刻操作期的期望值

oldisa = LoadExclusive(&isa.bits); // 从内存中读取此刻真实值
newisa = oldisa;

do {
// extra_rc++ 溢出标记
uintptr_t carry;
// extra_rc++
newisa.bits = addc(newisa.bits, RC_ONE, 0, &carry);
} while (slowpath(!StoreExclusive(&isa.bits, oldisa.bits, newisa.bits)));
}

这样就很直观了~

  • 注意while 条件中的 取反操作 !;
  • 如果写入不成功,那就一直执行循环,直到写入成功;
  • 当然,由我们之前对slowpathLoadExclusive / StoreExclusive的分析,可知:预测分析执行循环的概率比较低,只有在线程数据竞争的时候发生;正常情况下,是不会进入循环的(执行一次do);
  • 有点自旋锁的味道,慢慢体会;