0%

Java 锁机制(上)之 CAS

引言

介绍完了 Synchronized 就不得不提另外一套锁机制 —— Lock 家族。本系列文章照旧分为上、中、下三篇,分别介绍 CAS、ReetrantLock、AQS 共享锁。

CAS

从 Synchronized 源码中我们可以看到其中大量运用了 CAS 操作,那么什么是 CAS 呢?CAS 全称 Compare And Swap,它其实是无锁策略的实现,主要依赖于 CPU 原子指令的实现。

核心思想

在 Synchronized 的源码中,我们经常会看到这样的语句:

1
markOop res_mark = (markOop) Atomic::cmpxchg_ptr(prototype_header, obj->mark_addr(), mark)

想必大家已经很熟悉了,它的意思就是:比较 obj->mark_addr() 地址指向的内容是否是 mark,如果是则将该地址的内容指向 prototype_header 然后返回原地址的内容 mark

而 Java 中,主要的使用方式如下:

1
2
AtomicInteger ai = new AtomicInteger(10);
boolean isSuccess = ai.compareAndSet(10, 12);

如上所示,先创建一个值为 10 的原子化类,然后调用该类的 compareAndSet 方法,如果原有的值就是 10,那么就将该值改为 12。如果替换成功那么返回 true,否则返回 false。

从以上两个示例我们可以了解到,Compare And Swap 中的 Compare 比较的就是替换地址原有的值与你期望的值,如果两者相等,说明当前没有其他线程进行修改,此时就可以进行 Swap 操作。

那么可能你会问了,如果比较之后,其他线程修改了该值呢?其实这点不用担心,因为 CAS 借助的是 CPU 原子指令。

从另外一个角度来说,CAS 其实属于乐观派,之所以不提锁,是因为 CAS 无任何加锁行为。为什么说是乐观派呢?是因为它始终假定原地址没有变更。如果当前线程竞争比较激烈,那么乐观派的 CAS 就不再是考虑的范畴了,此时就需要锁的介入。

Unsafe

介绍 Java 层的 CAS 操作就不得不提位于 sun.misc 包中的 Unsafe 类。CAS 的底层操作都是交由 Unsafe 来实现。该类之所以称为不安全,是因为它中的方法都跟底层有关如:可以直接操作内存,不受 JVM 管理,意味着需要自己释放内存;不少方法需要提供原始地址以及被替换对象的地址,一旦出现问题就会导致 JVM 程序崩溃等。

既然 Unsafe 类的方法可以直接操作内存,那也就意味着它的运行速度要比普通的 Java 程序快,也就更适合高并发场景。

操作分类

Unsafe 是一系列执行底层代码的方法集合,我们可以按照操作的类型对其进行分类。

类、对象、变量的操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/**
* 在不调用构造函数的情况下创建对象实例
*/
public native Object allocateInstance(Class<?> cls)
throws InstantiationException;

/**
* 在不进行安全检测的情况下由 JVM 定义类,默认情况下,该类的保护域名与类加载器与方法调用者类相同
*/
public native Class<?> defineClass(String name,
byte[] b,
int off,
int len,
ClassLoader loader,
ProtectionDomain protectionDomain);

/**
* 定义一个匿名类,该类对系统字典、类加载器透明
*/
public native Class<?> defineAnonymousClass(
Class<?> hostClass,
byte[] data,
Object[] cpPatches);

/**
* 获取该字段在所处对象中的偏移量
*/
public native long objectFieldOffset(Field f);

/**
* 获取静态字段在类中的偏移量
*/
public native long staticFieldOffset(Field f);

/**
* 获取指定对象指定偏移量的 Int 值,对象 o 为起始地址,offset 相对于起始地址的偏移量。
* 相似的方法还有 getLong、getByte 等。
*/
public native int getInt(Object o, long offset);

/**
* 根据对象及偏移量设置 x
*/
public native void putInt(Object o, long offset, int x);

/**
* 根据对象及偏移量获取 volatile 变量,即获取当前变量的内存最新值
*/
public native int getIntVolatile(Object o, long offset);

/*
* 根据偏移量设置 x,该值会立马可见
*/
public native void putIntVolatile(Object o, long offset, int x);

...

内存的操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* 获取指定大小的内存,由 freeMemory 释放
*/
public native long allocateMemory(long bytes);

/**
* 根据原有分配的内存地址重新分配指定大小的内存,由 freeMemory 释放
*/
public native long reallocateMemory(long address, long bytes);

/**
* 用于释放 allocateMemory、reallocateMemory 分配的内存
*/
public native void freeMemory(long address);

/**
* 用于将指定对象指定偏移量的指定字节数设置成指定的值
*/
public native void setMemory(Object o, long offset, long bytes, byte value);

/**
* 将指定对象指定偏移量的数值复制到目标对象目标偏移量对应的地址中
*/
public native void copyMemory(Object srcBase,
long srcOffset,
Object destBase,
long destOffset,
long bytes);

/**
* 获取指定地址中的 Int 值,相似的方法还有 getLong、getChar 等
*/
public native int getInt(long address);

/**
* 除了 getXXX 系列,当然还有对应的 putXXX 系列,将 address 指定的值修改为 x
*/
public native void putInt(long address, int x);

数组的操作

1
2
3
4
5
6
7
8
9
10
/**
* 获取指定数组类中第一个元素的偏移地址
*/
public native int arrayBaseOffset(Class<?> arrayClass);

/**
* 获取指定数据第一个元素所占的内存空间
*/
public native int arrayIndexScale(Class<?> arrayClass);

CAS 操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 /**
* Unsafe 提供了三个 CAS 操作方法,分别针对对象、int 值、long 值的
*/
public final native boolean compareAndSwapObject(Object o,
long offset,
Object expected,
Object x);

public final native boolean compareAndSwapInt(Object o,
long offset,
int expected,
int x);

public final native boolean compareAndSwapLong(Object o,
long offset,
long expected,
long x);

除此之外,JDK 8 还提供了基于以上三种 CAS 操作的变种,这里就以其中一个方法为例进行说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 将对象 o 对应偏移量 offset 的值加上 delta
*/
public final long getAndAddLong(Object o, long offset, long delta) {
long v;
do {
// 获取当前指定偏移量对应的最新值 v
v = getLongVolatile(o, offset);
// 通过 compareAndSwapLong 方法将最新值 v 加上 delta 并设置回内存,直到成功
} while (!compareAndSwapLong(o, offset, v, v + delta));
// 返回旧值
return v;
}

内存屏障

有 CAS 操作,那对于内存屏障的操作肯定也必不可少。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 读屏障,该方法之前的所有读操作一定在该屏障之前完成
*/
public native void loadFence();

/**
* 写屏障,该方法之前的所有写操作一定在该屏障之前完成
*/
public native void storeFence();

/**
* 读写屏障,该方法之前的所有读写一定在该屏障之前完成
*/
public native void fullFence();

对于更多的 API 这里就不再介绍了。

底层实现

我们看到大部分方法都是 native 修饰的,也就是说都是本地方法,既然讲到这里了,那不妨在深挖一下看看底层是如何实现的。

当然了,为了不偏移文章主题,这里只介绍 CAS 操作的底层实现。其他 Unsafe 包下的源码大家可以按照同样的思路来分析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/**
* 路径:hostspot/src/share/vm/prims/unsafe.cpp
*
* java 中的方法签名:
* compareAndSwapObject(Object o,long offset,Object expected,Object x)
* 有了以上对照,我们知道 C++ 中各个参数的含义:
*
* @param jobject obj 对应 o 也就是目标对象
* @param jlong offset 变量在 o 中的偏移量
* @param jobject e_h 该变量中的原始值也就是期望值
* @param jobject x_h 需要替换的新值
*/
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapObject(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jobject e_h, jobject x_h))
UnsafeWrapper("Unsafe_CompareAndSwapObject");
// 解析底层对应的 java 对象
oop x = JNIHandles::resolve(x_h);
oop e = JNIHandles::resolve(e_h);
oop p = JNIHandles::resolve(obj);
// 根据目标对象以及偏移地址算出 oop 中该变量的地址
HeapWord* addr = (HeapWord *)index_oop_from_field_offset_long(p, offset);
// 执行 CAS
oop res = oopDesc::atomic_compare_exchange_oop(x, addr, e, true);
// 只要 res 与原始值 e 相等则说明替换成功
jboolean success = (res == e);
if (success)
update_barrier_set((void*)addr, x);
return success;
UNSAFE_END

/**
* Unsafe_CompareAndSwapInt
*/
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj);
// 计算底层偏移地址
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
// 由 Atomic::cmpxchg(x, addr, e) 方法实现
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END

/**
* CompareAndSwapLong
*/
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapLong(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jlong e, jlong x))
UnsafeWrapper("Unsafe_CompareAndSwapLong");
Handle p (THREAD, JNIHandles::resolve(obj));
jlong* addr = (jlong*)(index_oop_from_field_offset_long(p(), offset));
// 判断 CPU 是否支持 CMPXCHG8 指令
if (VM_Version::supports_cx8())
// 执行 CAS 替换
return (jlong)(Atomic::cmpxchg(x, addr, e)) == e;
else {
jboolean success = false;
ObjectLocker ol(p, THREAD);
if (*addr == e) { *addr = x; success = true; }
return success;
}
UNSAFE_END

以上存在两种 CAS,oopDesc::atomic_compare_exchange_oopAtomic::cmpxchg,我们看下两者有什么不同:

oopDesc::atomic_compar_exchange_oop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* @param exchange_value 替换的新值
* @param dest 替换的目标地址
* @param compare_value 原始值
* @param prebarrier true
*/
inline oop oopDesc::atomic_compare_exchange_oop(oop exchange_value,
volatile HeapWord *dest,
oop compare_value,
bool prebarrier) {
// 是否启用指针压缩
if (UseCompressedOops) {
if (prebarrier) {
update_barrier_set_pre((narrowOop*)dest, exchange_value);
}
// encode exchange and compare value from oop to T
narrowOop val = encode_heap_oop(exchange_value);
narrowOop cmp = encode_heap_oop(compare_value);
// 我们重点看这条语句
narrowOop old = (narrowOop) Atomic::cmpxchg(val, (narrowOop*)dest, cmp);
// decode old from T to oop
return decode_heap_oop(old);
} else {
if (prebarrier) {
update_barrier_set_pre((oop*)dest, exchange_value);
}
return (oop)Atomic::cmpxchg_ptr(exchange_value, (oop*)dest, compare_value);
}
}

如果未启用指针压缩则通过 Atomic::cmpxchg_ptr 完成 CAS,如果启用指针压缩,那么执行 Atomic::cmpxchg

Atomic::cmpxchg

Atomic::cmpxchg 根据不同的系统会有所差异,这里我们以 Linux 为例进行说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// hotspot/src/os_cpu/linux_x86/vm/atomic_linux_x86.inline.hpp

// Adding a lock prefix to an instruction on MP machine
#define LOCK_IF_MP(mp) "cmp $0, " #mp "; je 1f; lock; 1: "

inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
// 判断是否是多核处理器
int mp = os::is_MP();
__asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
: "=a" (exchange_value)
: "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
: "cc", "memory");
return exchange_value;
}

汇编扩展以 __asm__ 开头表示后面部分为汇编,volatile 严禁将此处的汇编语句和其他语句进行重组优化。整个语法包含四部分,之间使用 : 分隔,基本形式如下:

1
2
3
4
5
6
__asm__(
"指令,每条指令之后最好使用 \n\t 分隔"
:输出寄存器,() 内为 C 程序值
:输入寄存器,() 内为 C 程序值
:会被修改的寄存器
)

LOCK_IF_MP 为宏定义,包含了多条分号分割的汇编指令。从注释中可以看出如果运行在多核处理器上则需要添加 lock 前缀,否则只需要执行 cmpxchgl。而 lock 前缀属于内存屏障,它的作用是在执行后面指令的过程中锁住总线或者缓存行以保证一致性。

以上汇编最终的格式如下:

1
2
3
4
5
6
7
cmp $0, #mp 
je 1f
lock
1:cmpxchgl %1,(%3)
: "=a" (exchange_value)
: "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
: "cc", "memory"

我们来翻译以上内容:

  1. cmp 是比较命令,$0 代表立即数 0。如果 mp 等于 0,那么将 ZF(零标志)位置为 1。

  2. je 比较跳转指令,如果 ZF 等于 1 那么就跳转至 1 标签处(f 表示 forward 向前,b 表示 backword 向后)。

  3. lock 表示是否要锁住总线以执行后面的指令。

  4. 1: 相当于标签,cmpxchgl %1,(%3) 则是标签对应的指令。% 后边跟的数字代表输出和输入寄存器顺序编号,顺序从输出寄存器序列从左到右,从上到下以 %0 开始一直到 %9 10 个寄存器。示例中,输出的 eax 为 %0,输出的四个参数 exchange_value、compare_value、dest、mp 分别为 %1、%2、%3、%4。

    而 cmpxchgl %1,(%3) 也就等价于 cmpxchgl exchange_value,(dest),(dest) 表示 dest 地址所存的值,而 cmpxchgl 还有一个隐含操作数 eax,也就是说先比较输入的 eax 值(compare_value)是否和 dest 所存的值相等,如果相等则把 %1 (exchange_value) 的值写入 dest 所指向的地址。如果不想等则把 dest 地址所存的值写入 eax。

作为输出中的 =a(exchange_value) 指令,= 表示输出,a 表示 eax 寄存器,整个指令则表示把 eax 中的值写入 exchange_value 变量中。所以回归到 (jint)(Atomic::cmpxchg(x, addr, e)) == e 函数,如果 eax 值与 dest 值相等,就会把 eax 存的输入值 compare_value 赋值给输出的 exchange_value,那么函数就会返回 true。如果不想等,则把 dest 指向的值赋值给 eax 进而赋值给 exchange_value 函数最终返回 false。

另外,输入中的 r 表示任意寄存器,cc 则告诉编译器该指令的执行会影响到标志寄存器,要每次重新读取。memory 告诉编译器该指令需要重新从内存中读取最新值而不是从缓存了的寄存器中读取。

使用

介绍完了 Unsafe 类,那么该如何使用它?其实我们只要能获取 Unsafe 的实例就行了,不过从代码中我们看到 Unsafe 的构造方法都是私有的,也就是说外界无法获取。不过它对外提供了 getUnsafe() 方法:

1
2
3
4
5
6
7
8
9
10
private Unsafe() {}
private static final Unsafe theUnsafe = new Unsafe();

@CallerSensitive
public static Unsafe getUnsafe() {
Class<?> caller = Reflection.getCallerClass();
if (!VM.isSystemDomainLoader(caller.getClassLoader()))
throw new SecurityException("Unsafe");
return theUnsafe;
}

原子操作包

有了 CAS 及 Unsafe 类的基础之后,我们就可以介绍 JUC 并发包提供的 atomic 类了。

基本类型原子操作类

atomic 包针对基本类型提供了 AtomicBoolean、AtomicInteger、AtomicLong 三个,它们的实现大同小异,这里我们以 AtomicLong 为例进行说明。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
public class AtomicLong extends Number implements java.io.Serializable {
private static final long serialVersionUID = 1927816293512124184L;
/**
* 获取 Unsafe 类实例,用于执行 CAS 操作
*/
private static final Unsafe unsafe = Unsafe.getUnsafe();
/**
* 这里保存 AtomicLong 属性 value 的偏移量
*/
private static final long valueOffset;

/**
* 用于保存当前 CPU 是否支持无锁的 CAS 操作
*/
static final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8();

/**
* 本地方法,用于判断当前 CPU 是否支持无锁的 CAS 操作,并将结果
* 缓存至 VM_SUPPORTS_LONG_CAS
*/
private static native boolean VMSupportsCS8();

/**
* 具体的 long 值保存,volatile 保证内存的可见性
*/
private volatile long value;

static {
try {
// 获取变量 value 相对于 AtomicLong 实例的偏移量
valueOffset = unsafe.objectFieldOffset
(AtomicLong.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}

/**
* 构造函数,定义初始值
*/
public AtomicLong(long initialValue) {
value = initialValue;
}

/**
* 0 值构造器
*/
public AtomicLong() {
}

/**
* get 方法,用于获取当前 long 值
*/
public final long get() {
return value;
}

/**
* set 方法,用于设置新值,会立即被其他线程发现
*/
public final void set(long newValue) {
value = newValue;
}

/**
* 这个方法就比较有意思了,从名字来看的话大概跟延迟设置有关系。其实通过该
* 方法设置的值不会立即被其他线程看到,所以这就是它名字的由来。
*/
public final void lazySet(long newValue) {
unsafe.putOrderedLong(this, valueOffset, newValue);
}

/**
* 从名字来看,该方法会设置新值然后返回旧值
*/
public final long getAndSet(long newValue) {
return unsafe.getAndSetLong(this, valueOffset, newValue);
}

/**
* 传统的 CAS 操作,如果当前 values 等于 expect 则替换为 update,如果成功
* 返回 true,否则返回 false
*/
public final boolean compareAndSet(long expect, long update) {
return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
}
/**
* 类似于 compareAndSet
*/
public final boolean weakCompareAndSet(long expect, long update) {
return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
}

/**
* 将当前值递增 1,并返回递增前的值
*/
public final long getAndIncrement() {
return unsafe.getAndAddLong(this, valueOffset, 1L);
}

/**
* 将当前值递减 1,并返回递减前的值
*/
public final long getAndDecrement() {
return unsafe.getAndAddLong(this, valueOffset, -1L);
}

/**
* 递增指定的值,并返回前一个值
*/
public final long getAndAdd(long delta) {
return unsafe.getAndAddLong(this, valueOffset, delta);
}

/**
* 递增值并返回递增之后的值
*/
public final long incrementAndGet() {
return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;
}

/**
* 递减值并返回递减后的值
*/
public final long decrementAndGet() {
return unsafe.getAndAddLong(this, valueOffset, -1L) - 1L;
}

/**
* 递增指定的值并返回递增后的值,从方法实现上也能看出这点
*/
public final long addAndGet(long delta) {
return unsafe.getAndAddLong(this, valueOffset, delta) + delta;
}

/**
* 这个方法就有点东西了,LongUnaryOperator 是一个函数式接口,从名子
* 上可以看出它是个一元的,也就是说它接收一个 long 类型的参数并返回
* 另一个 long 值。
*
* 操作上,先获取当前的 value 作为 prev,然后通过 updateFunction 获取
* 新值 next。然后通过 compareAndSet 进行设置,直到成功。最后返回新值。
*/
public final long updateAndGet(LongUnaryOperator updateFunction) {
long prev, next;
do {
prev = get();
next = updateFunction.applyAsLong(prev);
} while (!compareAndSet(prev, next));
return next;
}

/**
* 类似于 updateAndGet(LongUnaryOperator updateFunction),只不过这次是
* 基于二元操作来获取新值。二元操作的入参分别是方法的入参 x 以及当前值
* prev。最后返回前一个值。
*/
public final long getAndAccumulate(long x,
LongBinaryOperator accumulatorFunction) {
long prev, next;
do {
prev = get();
next = accumulatorFunction.applyAsLong(prev, x);
} while (!compareAndSet(prev, next));
return prev;
}

/**
* 这次是返回新值
*/
public final long accumulateAndGet(long x,
LongBinaryOperator accumulatorFunction) {
long prev, next;
do {
prev = get();
next = accumulatorFunction.applyAsLong(prev, x);
} while (!compareAndSet(prev, next));
return next;
}

/**
* toString 方法
*/
public String toString() {
return Long.toString(get());
}

// 以下方法都是强制转换成对应的类型
public int intValue() {
return (int)get();
}

public long longValue() {
return get();
}

public float floatValue() {
return (float)get();
}

public double doubleValue() {
return (double)get();
}
}

是不是有了 CAS 及 Unsafe 的基础再来看 AtomicLong 的实现就简单很多了?

引用类型原子操作类

原理类似,直接上代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class AtomicReference<V> implements java.io.Serializable {
private static final long serialVersionUID = -1848883965231344442L;
/**
* 获取 Unsafe 实例
*/
private static final Unsafe unsafe = Unsafe.getUnsafe();
// value 变量相对于 AtomicReference 实例的偏移量
private static final long valueOffset;

static {
try {
// 获取偏移量
valueOffset = unsafe.objectFieldOffset
(AtomicReference.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}

/**
* 引用类型的 value
*/
private volatile V value;

public final V getAndAccumulate(V x,
BinaryOperator<V> accumulatorFunction) {
V prev, next;
do {
prev = get();
next = accumulatorFunction.apply(prev, x);
} while (!compareAndSet(prev, next));
return prev;
}
...
}

与基础类型的操作大同小异无非就是将值类型换成了引用类型。

数组类型原子操作类

针对于数组类型,atomic 包提供了AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray 三种类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public class AtomicIntegerArray implements java.io.Serializable {
private static final long serialVersionUID = 2862133569453604235L;
/**
* 获取 Unsafe 实例
*/
private static final Unsafe unsafe = Unsafe.getUnsafe();
/**
* 获取数组的起始地址
*/
private static final int base = unsafe.arrayBaseOffset(int[].class);
private static final int shift;
/**
* 保存数值的具体数组
*/
private final int[] array;

static {
// 获取数组每个元素的大小
int scale = unsafe.arrayIndexScale(int[].class);
// 判断每个元素所占空间是否是 2 的幂次
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
// 计算偏移量,Integer.numberOfLeadingZeros 计算入参的前导零的个数
shift = 31 - Integer.numberOfLeadingZeros(scale);
}

/**
* shift 用在该方法中,那么该方法的作用又是什么呢?通过方法的调用关系,
* 我们知道入参 i 为数组的索引。而该方法就是计算每个索引的内存偏移量。
*
* 我们以如下 5 个 int 元素组成的数组为例进行说明:
* 0x01 0x05 0x09 0x0D 0x11
* [ 1, 3, 5, 7, 8]
* / \
* |
* arr
* 其中 base 为数组 arr 的起始地址,也就是 0x01。由于 scale 为 4,那么
* shift = 31 - 29 = 2。通过以下方法,当 i = 0 时,0 << 2 + 0x01 = 0x01;
* i = 2 是, 2 << 2 + 0x01 = 0x09。
*/
private static long byteOffset(int i) {
return ((long) i << shift) + base;
}

/**
* 返回索引 i 对应的内存偏移量,并对索引进行越界检查
*/
private long checkedByteOffset(int i) {
if (i < 0 || i >= array.length)
throw new IndexOutOfBoundsException("index " + i);

return byteOffset(i);
}

/**
* 方法中的参数 i 为数组的索引,通过 checkedByteOffset 方法计算出该索引
* 对应的数组内存偏量,然后通过 unsafe.getAndSetInt 执行 CAS 操作。其他
* 方法类似,这里就不过多介绍了。
*/
public final int getAndSet(int i, int newValue) {
return unsafe.getAndSetInt(array, checkedByteOffset(i), newValue);
}

...
}

属性更新原子操作类

我们依然可以使用原子操作类来更新对象中的某个属性以满足多线程安全。atomic 包提供了 AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater 三种类型的属性更新。

我们以 Integer 类型的属性更新器为例进行说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
/**
* 该类被定义为抽象类,而它的实现就是私有内部类 AtomicIntegerFieldUpdaterImpl。
*/
public abstract class AtomicIntegerFieldUpdater<T> {

/**
* 获取字段修改器,由子类实现,并对类中该字段进行相关校验
*/
@CallerSensitive
public static <U> AtomicIntegerFieldUpdater<U> newUpdater(Class<U> tclass,
String fieldName) {
return new AtomicIntegerFieldUpdaterImpl<U>
(tclass, fieldName, Reflection.getCallerClass());
}
/**
* 子类
*/
private static final class AtomicIntegerFieldUpdaterImpl<T>
extends AtomicIntegerFieldUpdater<T> {
/**
* 获取 Unsafe 实例
*/
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
//
private final long offset;
/**
* if field is protected, the subclass constructing updater, else
* the same as tclass
*/
private final Class<?> cclass;
/**
* 字段所属类
*/
private final Class<T> tclass;
AtomicIntegerFieldUpdaterImpl(final Class<T> tclass,
final String fieldName,
final Class<?> caller) {
final Field field;
final int modifiers;
try {
// 找到指定字段对象
field = AccessController.doPrivileged(
new PrivilegedExceptionAction<Field>() {
public Field run() throws NoSuchFieldException {
return tclass.getDeclaredField(fieldName);
}
});
modifiers = field.getModifiers();
// 验证字段的访问权限
sun.reflect.misc.ReflectUtil.ensureMemberAccess(
caller, tclass, null, modifiers);
ClassLoader cl = tclass.getClassLoader();
ClassLoader ccl = caller.getClassLoader();
if ((ccl != null) && (ccl != cl) &&
((cl == null) || !isAncestor(cl, ccl))) {
sun.reflect.misc.ReflectUtil.checkPackageAccess(tclass);
}
} catch (PrivilegedActionException pae) {
throw new RuntimeException(pae.getException());
} catch (Exception ex) {
throw new RuntimeException(ex);
}
// 必须为指定类型,这里是 integer
if (field.getType() != int.class)
throw new IllegalArgumentException("Must be integer type");
// 必须由 volatile 修饰以保证可见性
if (!Modifier.isVolatile(modifiers))
throw new IllegalArgumentException("Must be volatile type");

this.cclass = (Modifier.isProtected(modifiers) &&
tclass.isAssignableFrom(caller) &&
!isSamePackage(tclass, caller))
? caller : tclass;
this.tclass = tclass;
// 获取该字段的偏移量
this.offset = U.objectFieldOffset(field);
}

/**
* 检查指定对象是否是该类的实例
*/
private final void accessCheck(T obj) {
if (!cclass.isInstance(obj))
throwAccessCheckException(obj);
}

/**
* 由 Unsafe 执行 CAS 操作
*/
public final boolean compareAndSet(T obj, int expect, int update) {
accessCheck(obj);
return U.compareAndSwapInt(obj, offset, expect, update);
}
...
}
...
}

与其他原子类的操作相似,唯一不同的是对于字段的校验比较严格:

  • 字段必须是对应的类型(如:针对于 Integer 的属性更新,字段就必须是 int 类型)。
  • 字段必须是 volatile,毕竟需要保证内存可见性。
  • 字段必须对当前的 updater 可见(如果不是发生在当前类内部的原子操作则不能使用 private、protected 修饰符;如果是子类修改父类字段那么操作符必须是 protected 以上;如果同属一个 package 则必须满足 default 以上)。

除此之外还有一些隐式的规则:

  • 字段不能被 final 修饰,毕竟常量不可更改。
  • 字段不能被 static 修饰,因为修改的是对象字段而不是类字段。

CAS 如此完美吗?

无锁的 CAS 就完美了吗?其实并非如此!从它的实现上来看,虽然说与期望值一致那么说明并没有线程参与竞争或者说该值没有修改过并不严谨。因为还有可能该值已经被其他线程修改了,只不过在进行比较的时候又再一次修改回了原值,也就是所谓的 ABA 问题。

那么 CAS 如果解决 ABA 问题呢?有两种方案:

  • 基于时间戳控制的 AtomicStampedReference。
  • 基于 boolean 值控制的 AtomicMarkableReference。

AtomicStampedReference

我们看下 AtomicStampedReference 如何完美解决 ABA 问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public class AtomicStampedReference<V> {
/**
* 必不可少的 Unsafe 实例
*/
private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
/**
* pair 字段的偏移量
*/
private static final long pairOffset =
objectFieldOffset(UNSAFE, "pair", AtomicStampedReference.class);

/**
* volatile 类型的 pari 对象
*/
private volatile Pair<V> pair;

/**
* 对外暴露的 AtomicStampedReference 构造器,需要提供初始值及初始时间戳
*/
public AtomicStampedReference(V initialRef, int initialStamp) {
pair = Pair.of(initialRef, initialStamp);
}

/**
* @param expectedReference 预期的引用
* @param newReference 新值
* @param expectedStamp 预期的时间戳
* @param newStamp 产生的新时间戳
*/
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
// 取当前 pair 对象
Pair<V> current = pair;
// 对期望值、期望时间戳进行校验,并根据时机使用 casPair 完成 cas 替换
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}

/**
* 执行 CAS 的辅助方法完成 pair 对象的替换
*/
private boolean casPair(Pair<V> cmp, Pair<V> val) {
return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
}

/**
* AtomicStampedReference 的私有内部类
*/
private static class Pair<T> {
/**
* 原始对象引用
*/
final T reference;
/**
* 时间戳
*/
final int stamp;
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
/**
* 对 AtomicStampedReference 暴露的辅助方法,用以构造 Pair 对象
*/
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}

}

因为有了时间戳的参与,所以也就完美解决了 ABA 问题。

AtomicMarkableReference

与 AtomicStampedReference 维护时间戳的原理不同,AtomicMarkableReference 内部维护 boolean 变量来避免 ABA 问题。也正是如此,通过维护 boolean 状态转换 的 AtomicMarkableReference 并不能完全避免 ABA 问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public class AtomicMarkableReference<V> {
/**
* 熟的不能在熟的 Unsafe 实例
*/
private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
/**
* pair 字段的偏移量
*/
private static final long pairOffset =
objectFieldOffset(UNSAFE, "pair", AtomicMarkableReference.class);
/**
* pair 对象
*/
private volatile Pair<V> pair;

/**
* 通过初始引用及初始 boolean 标记构造原子类对象
*/
public AtomicMarkableReference(V initialRef, boolean initialMark) {
pair = Pair.of(initialRef, initialMark);
}

/**
* @param expectedReference 期望值
* @param newReference 新值
* @param expectedMark 期望 boolean 标记
* @param newMark 新标记
*/
public boolean compareAndSet(V expectedReference,
V newReference,
boolean expectedMark,
boolean newMark) {
Pair<V> current = pair;

// 根据期望值跟期望标记来进行判断,并根据 casPair 完成 CAS 替换
return
expectedReference == current.reference &&
expectedMark == current.mark &&
((newReference == current.reference &&
newMark == current.mark) ||
casPair(current, Pair.of(newReference, newMark)));
}

/**
* 真正执行 CAS 的方法
*/
private boolean casPair(Pair<V> cmp, Pair<V> val) {
return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
}

/**
* 内部维护的 Pair
*/
private static class Pair<T> {
/**
* 原始对象引用
*/
final T reference;
/**
* 基于 boolean 的标记
*/
final boolean mark;
private Pair(T reference, boolean mark) {
this.reference = reference;
this.mark = mark;
}
/**
* 对外提供的辅助方法
*/
static <T> Pair<T> of(T reference, boolean mark) {
return new Pair<T>(reference, mark);
}
}

private volatile Pair<V> pair;
}

基于布尔的实现不像时间戳可以一直递增,布尔仍然会存在 ABA 问题。

总结

至此 CAS 就介绍完了,与 Synchronized 相比,CAS 这种自旋的方式只是在逻辑上让当前线程进行阻塞,是一种用户态的行为。并非让线程真正挂起从而进行用户态到内核态的转换,所以它的性能比较好。但是如果竞争比较激烈,自旋一直失败并且到达指定阈值,系统也会让当前线程挂起从而让出 CPU 资源。那么此时 CAS 就不再是优先选择的方案了。