0%

Java 锁机制之 Synchronized 实现(上)

引言

为了加快程序的运行效率,我们引入了多线程的概念。但是速度提升的同时引来了共享资源安全性的问题。为了保证多线程下程序的准确性,锁机制就是我们需要掌握的一种手段。本文将介绍的就是 synchronized 的实现原理(以 JDK 8 为基础)。

线程安全问题

不是所有程序在多线程下就是非安全的,我们判断一个程序是否是多线程安全的需要有下面三个要素:

  • 多线程

    如果程序是单线程的,那么它肯定是安全的,因为没有人跟你竞争。

  • 共享资源

    如果资源都是线程独占式的(局部变量),那么也不涉及到安全问题。

  • 非原子性操作

    对资源的操作如果是原子性的,那么也不会存在线程安全性问题。

所以只要同时满足以上三种条件,那么该程序一定是非线程安全性的。

synchronized

synchronized 是众多锁中的一种实现方式,我们可以用它对共享资源的操作加锁以实现安全访问。synchronized 存在三种锁类型:

  • this

    指代当前对象实例,比如修饰成员方法时。要想访问当前资源必须持有当前对象的锁。

  • class

    类对象锁,比如修饰静态方法。由于同一个类可以生成多个对象,那么这些对象独享同一把锁。

  • object

    对象锁,比如同步代码块,你可以指定任意对象为当前资源上锁。当然了,如果指定的是 this 或者 class 那么作用等同于上述两者。

Java 对象内存布局

讲解锁实现之前就需要先了解锁的存储,要想了解锁的存储就得知道对象的内存布局:

我们把焦点放到对象头中的 MarkWord 结构,根据操作系统位数的不同,MarkWord 又分为 32 位及 64 位,如下图:

  • 32 位

  • 64 位

从以上的结构中,我们看出了锁的相关信息了,其中,锁分为无锁、偏向锁、轻量级锁、重量级锁。每个级别它们各自存储的数据结构也有所差异,那也就是说不同的场景对应不同的锁级别。既然我们知道锁是存储在 MarkWord 中的,那么我们就需要看看锁到底是怎么实现的了。

锁的入口

我们知道 .java 文件经过编译之后都会变成 .class 的字节码,这些字节码都会交由 JVM 进行解释执行。而对于 synchronized 而言,它存在 monitorenter、monitorexit 两个指令。如果修饰方法时,那么是不存在以上两条指令的,只是对方法加了一个 flag,标识出当前方法是同步方法。那么它们的入口是否一样呢?

再开始之前,有必要先了解几个基础概念:

markWord

C++ 中的 markWord 对象是 markOopDesc,这里我们需要记住锁的几种状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* jdk/hotspot/src/share/vm/oops/markOop.hpp
*/
class markOopDesc: public oopDesc {
public:
enum {
locked_value = 0, // 00 轻量级锁
unlocked_value = 1, // 01 无锁
monitor_value = 2, // 10 重量级锁
marked_value = 3, // 11 GC 标记
biased_lock_pattern = 5 // 101 偏向锁
};
}

BasicLock

获取锁时,会将对象头中的 markword 复制一份无锁版保留下来。解锁时,再替换回去,用于还原锁状态。

1
2
3
4
5
6
7
/**
* jdk/hotspot/src/share/vm/runtime/basicLock.hpp
*/
class BasicLock VALUE_OBJ_CLASS_SPEC {
private:
volatile markOop _displaced_header;
}

BasicObjectLock

该对象维护锁与目标对象的关联关系。

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* jdk/hotspot/src/share/vm/runtime/basicLock.hpp
*/
class BasicLock VALUE_OBJ_CLASS_SPEC {
private:
volatile markOop _displaced_header; // 获取锁时,会将对象头中的 markword 复制一份无锁版保留下来。解锁时,再替换回去
}

class BasicObjectLock VALUE_OBJ_CLASS_SPEC {
private:
BasicLock _lock; // 当前对象拥有的锁记录
oop _obj; // 锁对应的 Java 对象
}

偏向锁

偏向锁,顾名思义就是偏向某一方的锁。从 markWord 结构中,我们可以看到当对象处于偏向锁模式时,MarkWord 由线程 ID、Epoch、对象年龄、是否偏向锁、锁标志几个字段组成。

如果一个线程能够将自身的线程 id 保存到 markWord 中,同时将偏向标志打开,就说明该线程成功获取了偏向锁。当线程下次再进来时,跟 markword 中的线程 id 比较,如果一致,说明当前线程已经拥有了锁,那就不需要再次获取锁。

有了以上的基础,我们就开始吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
/**
* jdk/hotspot/src/share/vm/interpreter/bytecodeInterpreter.cpp
* -------------------------------------------------------
* 代码中使用了大量 CAS 操作,如:oldValue Atomic::cmpxchg_ptr(newVal, address, oldValue),当 address 中的值与 oldValue 一致时,则将 address 中的值替换为 newVal,并返回原始值。
*
*/
void BytecodeInterpreter::run(interpreterState istate) {
...
// 如果是同步方法,对应于 synchronized 修饰的方法(含有 ACC_SYNCHRONIZED)
if (METHOD->is_synchronized()) {
oop rcvr;
if (METHOD->is_static()) {
// 获取此方法所属类的常量池进而拿到这个类的 Class 实例
rcvr = METHOD->constants()->pool_holder()->java_mirror();
} else {
// 如果是非静态方法,那就是当前对象本身了
rcvr = LOCALS_OBJECT(0);
VERIFY_OOP(rcvr);
}
// 拿到基础对象锁(维护了当前对象与锁的对应关系)
BasicObjectLock* mon = &istate->monitor_base()[-1];
// 获取锁对应的 Java 对象
oop monobj = mon->obj();
// 锁对应的 Java 对象与当前对象肯定是一致的,否则说明初始化出了问题
assert(mon->obj() == rcvr, "method monitor mis-initialized");

bool success = UseBiasedLocking;
// 如果开启了偏向锁功能(默认开启)
if (UseBiasedLocking) {
// 获取当前对象的 markWord
markOop mark = rcvr->mark();
// 如果处于偏向状态(锁标记为 101)
if (mark->has_bias_pattern()) {
// 检查偏向锁的拥有者以及纪元的最新状态

/*
* yy 主要由 threadId、epoch、biased、lock 四个字段组成,这几个字段都可能不为 0。
*
* 如果 epoch、biased、lock 不为 0,说明对象头中的 mark 与 class 中的 mark 不一致,此时已经不是最新状态了。
*
* 如果 threadId 不为 0,那么此时的偏向线程是不确定的,具体需要根据其他字段的状态来选择是重新偏向到当前线程还是撤销偏向
*
*/
intptr_t xx = ((intptr_t) THREAD) ^ (intptr_t) mark;
xx = (intptr_t) rcvr->klass()->prototype_header() ^ xx;
intptr_t yy = (xx & ~((int) markOopDesc::age_mask_in_place));

if (yy != 0 ) {

// 如果 yy 与偏向锁相与为 0 说明锁的偏向未发生改变
if (yy & markOopDesc::biased_lock_mask_in_place == 0 ) {

// 说明 epoch 未发生变化,也就是说当前的 epoch 是有效的,那么接下来的操作才是合法的。因为原型头中的 epoch 只在安全点才会更新
if (yy & markOopDesc::epoch_mask_in_place == 0) {
// 此时只保留对象的锁偏向状态,年龄、epoch,清除锁的偏向线程(此时,并不知道当前的 owner 是谁)
intptr_t unbiased = (intptr_t) mark & (markOopDesc::biased_lock_mask_in_place |
markOopDesc::age_mask_in_place |
markOopDesc::epoch_mask_in_place);

// 尝试重新偏向到当前线程
if (Atomic::cmpxchg_ptr((intptr_t)THREAD | unbiased, (intptr_t*) rcvr->mark_addr(), unbiased) != unbiased) {
// 如果偏向失败,那么需要进入 InterpreterRuntime::monitorenter 方法,重新撤销再偏向
CALL_VM(InterpreterRuntime::monitorenter(THREAD, mon), handle_exception);
}
} else {
// epoch 已经过期,那么说明当前的偏向线程已经无效了。此时就以原型头中的值作为 CAS 的比较值进行重新偏向
try_rebias:
xx = (intptr_t) rcvr->klass()->prototype_header() | (intptr_t) THREAD;
// 将当前线程替换掉原型头中的偏向线程
if (Atomic::cmpxchg_ptr((intptr_t)THREAD | (intptr_t) rcvr->klass()->prototype_header(),
(intptr_t*) rcvr->mark_addr(),
(intptr_t) mark) != (intptr_t) mark) {
// 偏向失败那就执行 InterpreterRuntime::monitorenter
CALL_VM(InterpreterRuntime::monitorenter(THREAD, mon), handle_exception);
}
}
} else {
// 如果锁的偏向状态发生了变化,那就需要撤销偏向。用原型头中的值替换掉对象头中的值,如果失败,说明有其他线程正在竞争该撤销操作,这是没问题的。
try_revoke_bias:
xx = (intptr_t) rcvr->klass()->prototype_header() | (intptr_t) THREAD;

if (Atomic::cmpxchg_ptr(rcvr->klass()->prototype_header(),
(intptr_t*) rcvr->mark_addr(),
mark) == mark) {
// (*counters->revoked_lock_entry_count_addr())++;
success = false;
}
}
}
} else {
// 此时未处于偏向状态
cas_label:
success = false;
}
}
// 如果未启用偏向锁、未偏向或者偏向失败
if (!success) {
// 获取当前 mark 对象的一个无锁版本
markOop displaced = rcvr->mark()->set_unlocked();
// 更新锁对象中的 markword 记录
mon->lock()->set_displaced_header(displaced);

// 将当前对象设置为无锁状态
if (Atomic::cmpxchg_ptr(mon, rcvr->mark_addr(), displaced) != displaced) {
// 替换失败,说明发生了线程竞争,如果当前线程是锁的拥有者,那就清空基础锁对象中的 markword 记录,毕竟此时的场景是锁重入
if (THREAD->is_lock_owned((address) displaced->clear_lock_bits())) {
mon->lock()->set_displaced_header(NULL);
} else {
// 替换失败,并且当前线程并非锁的拥有者,说明当前线程获取锁失败,那么执行 InterpreterRuntime::monitorenter
CALL_VM(InterpreterRuntime::monitorenter(THREAD, mon), handle_exception);
}
}
}
}

}
...
// 如果是同步代码块,那么就会走这个入口
CASE(_monitorenter): {
// 获取当前对象
oop lockee = STACK_OBJECT(-1);
CHECK_NULL(lockee);

// 遍历当前线程栈,查找空闲的 LockRecord
BasicObjectLock* limit = istate->monitor_base();
BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
BasicObjectLock* entry = NULL;
while (most_recent != limit ) {
// 如果当前的锁记录没有目标对象,就把它作为候选
if (most_recent->obj() == NULL) entry = most_recent;
// 说明已经存在当前对象的锁记录(锁重入)
else if (most_recent->obj() == lockee) break;
most_recent++;
}
// 存在可用的锁记录
if (entry != NULL) {
// 将锁记录与当前对象绑定
entry->set_obj(lockee);
// 将当前对象的 markword 复制成一份无锁版的,displaced mark word
markOop displaced = lockee->mark()->set_unlocked();
// 将 displaced mark word 设置到当前锁记录中
entry->lock()->set_displaced_header(displaced);
// 更新对象中的锁记录信息
if (Atomic::cmpxchg_ptr(entry, lockee->mark_addr(), displaced) != displaced) {
// 如果更新失败,说明其他线程参与了竞争,那就判断当前线程是否是锁记录的持有者
if (THREAD->is_lock_owned((address) displaced->clear_lock_bits())) {
// 如果是,说明发生了锁重入,那么将当前栈中的 markWord 至为 null
entry->lock()->set_displaced_header(NULL);
} else {
// 说明当前线程非锁的持有线程,并且获取锁失败,那就需要执行 monitorenter
CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
}
}
UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
} else {
// 如果未找到可用的锁记录,就需要告诉系统创建一个新的
istate->set_msg(more_monitors);
UPDATE_PC_AND_RETURN(0); // Re-execute
}
}
// 退出同步代码块
CASE(_monitorexit): {
// 获取当前对象
oop lockee = STACK_OBJECT(-1);
CHECK_NULL(lockee);

BasicObjectLock* limit = istate->monitor_base();
BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
// 遍历当前线程拥有的 Lock Record
while (most_recent != limit ) {
// 如果找到当前线程对应的锁记录
if ((most_recent)->obj() == lockee) {
// 拿到锁
BasicLock* lock = most_recent->lock();
// 拿到锁记录中的 displaced mark word
markOop header = lock->displaced_header();
// 取消锁与对象的绑定,释放 Lock Record
most_recent->set_obj(NULL);

if (header != NULL) {
// 将 displaced mark word 替换回原有对象(displaced mark word 是先前对象 mark word 的一个无锁版本),表示当前对象处于无锁状态
if (Atomic::cmpxchg_ptr(header, lockee->mark_addr(), lock) != lock) {
// 如果替换失败,说明发生了竞争,需要重新绑定然后交由 monitorexit 来处理
most_recent->set_obj(lockee);
CALL_VM(InterpreterRuntime::monitorexit(THREAD, most_recent), handle_exception);
}
}
UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
}
most_recent++;
}
// Need to throw illegal monitor state exception
CALL_VM(InterpreterRuntime::throw_illegal_monitor_state_exception(THREAD), handle_exception);
ShouldNotReachHere();
}

以上就是同步代码块、同步方法的入口点。程序进入 InterpreterRuntime::monitorenter 方法的条件就是当前线程获取偏向锁失败(注意:此时就已经存在 LockRecord 了,大家可以比较一下偏向锁跟轻量级锁中,LockRecord 的作用是否一致)。我们把以上逻辑以图例的形式表现出来:

  • 同步方法

  • 同步代码块

  • 同步代码块退出指令

如果获取偏向锁失败的话,就需要执行 InterpreterRuntime::monitorenter 方法:

monitorenter

InterpreterRuntime::monitorenter 方法是 monitorenter 指令的入口,该方法通过判断是否启用偏向锁来选择快速进入还是慢进入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* IRT_ENTRY_NO_ASYNC 为宏定义(jdk/hotspot/src/share/vm/runtime/interfaceSupport.hpp)
*
* #define IRT_ENTRY_NO_ASYNC(result_type, header) \
* result_type header { \
* ThreadInVMfromJavaNoAsyncException __tiv(thread);\
* VM_ENTRY_BASE(result_type, header, thread) \
* debug_only(VMEntryWrapper __vew;)
*
* #define IRT_END }
* -------------------------------------------------------
* 参数:JavaThread 为 Java 线程对象的封装
* BasicObjectLock 为基础锁对象,维护了锁与 Java 对象的关联关系
* -------------------------------------------------------
* Handle 对象维护了描述 Java 对象的 oop 以及 Java 对象类对象的 klass
*
*/
IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))
#ifdef ASSERT
thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
// 记录统计信息
if (PrintBiasedLockingStatistics) {
// 递增慢进入数量
Atomic::inc(BiasedLocking::slow_path_entry_count_addr());
}

// 将对象封装成句柄,后续的对象访问通过句柄实现
Handle h_obj(thread, elem->obj());
assert(Universe::heap()->is_in_reserved_or_null(h_obj()),
"must be NULL or an object");

// 如果启用偏向锁(默认开启),那么可以尝试快速进入,避免锁膨胀
if (UseBiasedLocking) {
// Retry fast entry if bias is revoked to avoid unnecessary inflation
ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);
} else {
ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);
}
assert(Universe::heap()->is_in_reserved_or_null(elem->obj()),
"must be NULL or an object");
#ifdef ASSERT
thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
IRT_END

fast_enter

jdk/hotspot/src/share/vm/runtime/synchronizer.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) {
// 判断是否启用偏向锁,如果未启用,还是得走慢进入逻辑
if (UseBiasedLocking) {
// 判断当前是否处于安全点
if (!SafepointSynchronize::is_at_safepoint()) {
// 尝试获取偏向锁
BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD);
// 如果当前状态为撤销与重偏向,那么返回
if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) {
return;
}
} else {
// 如果在安全点,那么需要撤销偏向
assert(!attempt_rebias, "can not rebias toward VM thread");
// 撤销偏向
BiasedLocking::revoke_at_safepoint(obj);
}
// 偏向锁获取失败,那么到了这里锁的偏向状态应该已经撤销,然后开始获取轻量级锁
assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
}
// 获取轻量级锁
slow_enter (obj, lock, THREAD) ;
}

//--------------------------辅助方法--------------------------//
// jdk/hotspot/src/share/vm/runtime/safepoint.hpp
class SafepointSynchronize : AllStatic {
public:
enum SynchronizeState {
_synchronized = 2 // 所有的 Java 线程全部停在安全点,只有 jvm 线程在运行
};

public:
// 判断当前是否处于安全点
inline static bool is_at_safepoint() {
return _state == _synchronized;
}
}
  • revoke_and_rebias

    jdk/hotspot/src/share/vm/runtime/biasedLocking.cpp

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
      /**
    * Handle 内部维护了 oop 对象,同时 Handle 重写了 -> 操作符,所以
    * obj->mark()、obj->klass() 其实调用的是 oop(实际上是 oopDesc ) 的对应方法。
    */
    BiasedLocking::Condition BiasedLocking::revoke_and_rebias(Handle obj, bool attempt_rebias, TRAPS) {

    // 只有在非安全点的时候才能继续执行
    assert(!SafepointSynchronize::is_at_safepoint(), "must not be called while at safepoint");

    // 拿到 markWord 对象
    markOop mark = obj->mark();
    /*
    * 匿名偏向就是当前 markWord 中已打开偏向标志位,但是 threadId 为空
    *
    * 如果处于匿名偏向状态,但是不允许重新偏向,那么就需要撤销偏向
    */
    if (mark->is_biased_anonymously() && !attempt_rebias) {

    markOop biased_value = mark;

    // 获取一个相同年龄的非偏向对象
    markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age());

    // 通过 CAS 操作将 Java 对象中的偏向状态设置成未偏向,以此达到撤销偏向的目的
    markOop res_mark = (markOop) Atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark);

    // 说明 CAS 成功(如果成功了,说明当前没有线程参与竞争)
    if (res_mark == biased_value) {
    // 返回已撤销
    return BIAS_REVOKED;
    }

    }
    // 当前处于偏向模式
    else if (mark->has_bias_pattern()) {
    // 获取 Java 对象的类对象
    Klass* k = obj->klass();
    // 拿到类对象中的 markWord
    markOop prototype_header = k->prototype_header();

    /*
    * 如果此时类的状态为未偏向
    */
    if (!prototype_header->has_bias_pattern()) {
    markOop biased_value = mark;
    // 替换
    markOop res_mark = (markOop) Atomic::cmpxchg_ptr(prototype_header, obj->mark_addr(), mark);
    assert(!(*(obj->mark_addr()))->has_bias_pattern(), "even if we raced, should still be revoked");
    // 撤销成功
    return BIAS_REVOKED;
    }
    /*
    * 当前偏向时间戳过期的话说明该对象已经偏向失效了。根据具体需求即 attempt_rebias 来决定是重新偏向还是撤销偏向
    */
    else if (prototype_header->bias_epoch() != mark->bias_epoch()) {
    // 如果需要重新偏向
    if (attempt_rebias) {
    assert(THREAD->is_Java_thread(), "");

    markOop biased_value = mark;
    // 既然是重新偏向,就需要将当前线程记录下来,同时还有对象的年龄及偏向时间
    markOop rebiased_prototype = markOopDesc::encode((JavaThread*) THREAD, mark->age(), prototype_header->bias_epoch());
    // CAS 更新
    markOop res_mark = (markOop) Atomic::cmpxchg_ptr(rebiased_prototype, obj->mark_addr(), mark);

    if (res_mark == biased_value) {
    // 撤销成功并且已重新偏向当前线程
    return BIAS_REVOKED_AND_REBIASED;
    }
    } else {
    // 如果只是单纯的撤销偏向
    markOop biased_value = mark;
    // 那就生成一个相同年龄非偏向的新对象头
    markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age());
    markOop res_mark = (markOop) Atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark);
    if (res_mark == biased_value) {
    // 撤销偏向成功
    return BIAS_REVOKED;
    }
    }
    }
    }
    // 如果对象头为不可偏向、CAS 更新失败就会进行启发式更新,该方法会对撤销偏向锁计数进行递增(注意这里重写的括号操作符 obj())
    HeuristicsResult heuristics = update_heuristics(obj(), attempt_rebias);

    // 不可偏向
    if (heuristics == HR_NOT_BIASED) {
    return NOT_BIASED;
    }
    // 未到批处理阈值
    else if (heuristics == HR_SINGLE_REVOKE) {
    // 获取类对象
    Klass *k = obj->klass();
    // 取类对象中的头信息
    markOop prototype_header = k->prototype_header();

    // 如果当前线程就是偏向锁的所有者,同时偏向还有效,那就由当前线程完成撤销
    if (mark->biased_locker() == THREAD &&
    prototype_header->bias_epoch() == mark->bias_epoch()) {
    ResourceMark rm;

    // 信息记录
    if (TraceBiasedLocking) {
    tty->print_cr("Revoking bias by walking my own stack:");
    }

    // 注意 obj() 返回的是内部的 oop 对象,也就是当前对应的 Java 对象
    BiasedLocking::Condition cond = revoke_bias(obj(), false, false, (JavaThread*) THREAD);

    // 因为已经撤销了,那就把当前线程拥有的 monitor 对象置 NULL
    ((JavaThread*) THREAD)->set_cached_monitor_info(NULL);

    assert(cond == BIAS_REVOKED, "why not?");
    return cond;
    } else {
    // 如果当前线程不是偏向锁偏向的线程,那么就由虚拟机线程处理了。
    VM_RevokeBias revoke(&obj, (JavaThread*) THREAD);
    VMThread::execute(&revoke);
    return revoke.status_code();
    }
    }
    // 如果既不是 HR_BULK_REVOKE 也不是 HR_BULK_REBIAS,那不就有问题了?
    assert((heuristics == HR_BULK_REVOKE) ||
    (heuristics == HR_BULK_REBIAS), "?");

    // 只能交由虚拟机线程完成批处理了)
    VM_BulkRevokeBias bulk_revoke(&obj, (JavaThread*) THREAD,
    (heuristics == HR_BULK_REBIAS),
    attempt_rebias);
    VMThread::execute(&bulk_revoke);
    return bulk_revoke.status_code();
    }

    JVM 内部为每个类维护了一个偏向锁 revoke 计数器,记录每个对象的撤销次数。当这个计数达到指定阈值(BiasedLockingBulkRebiasThreshold, 20)时,JVM 认为该类的偏向锁有问题了,需要重新偏向。批量操作的含义就是将该类的所有对象进行重偏向(bulk rebias)。

当 bulk rebias 时,会对这个类的 epoch 加一,后期为该类分配对象时都以该值为基础,同时还要对当前已获得偏向锁的 epoch 加一,并将这些锁记录保存在方法栈里。

判断一个对象是否是获得偏向锁的条件是:markWord 后三位为 101,线程 ID 字段等于当前线程,epoch 与该对象所属类的 epoch 相同。如果 epoch 不一样,说明进行了重偏向,只不过没有更新到该对象中,所以也是无效的,即使线程 ID 一样。

如果该类的 revoke 计数继续增加达到另一个阈值(BiasedLockingBulkRevokeThreshold, 40),那就说明该类不再适合偏向,需要进行 bulk revoke 了。如以下代码:

  • update_heuristics

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    static HeuristicsResult update_heuristics(oop o, bool allow_rebias) {
    // 获取 markWord 对象
    markOop mark = o->mark();
    // 未处于偏移模式
    if (!mark->has_bias_pattern()) {
    return HR_NOT_BIASED;
    }

    // 获取类对象
    Klass* k = o->klass();
    jlong cur_time = os::javaTimeMillis();
    // 最近一次批量撤销偏向锁的时间
    jlong last_bulk_revocation_time = k->last_biased_lock_bulk_revocation_time();

    // 获取撤销次数
    int revocation_count = k->biased_lock_revocation_count();

    /*
    * BiasedLockingBulkRebiasThreshold = 20
    * BiasedLockingBulkRevokeThreshold = 40
    * BiasedLockingDecayTime = 25000ms
    */
    if ((revocation_count >= BiasedLockingBulkRebiasThreshold) &&
    (revocation_count < BiasedLockingBulkRevokeThreshold) &&
    (last_bulk_revocation_time != 0) &&
    (cur_time - last_bulk_revocation_time >= BiasedLockingDecayTime)) {

    // 重置撤销次数
    k->set_biased_lock_revocation_count(0);
    revocation_count = 0;
    }

    // 如果小于撤销阈值,那么递增
    if (revocation_count <= BiasedLockingBulkRevokeThreshold) {
    revocation_count = k->atomic_incr_biased_lock_revocation_count();
    }

    // 满足撤销阈值,那么需要批量撤销
    if (revocation_count == BiasedLockingBulkRevokeThreshold) {
    return HR_BULK_REVOKE;
    }
    // 满足重偏向阈值,需要批量重偏向
    if (revocation_count == BiasedLockingBulkRebiasThreshold) {
    return HR_BULK_REBIAS;
    }
    // 只是单纯撤销就行了
    return HR_SINGLE_REVOKE;
    }

    如果当前处于非安全点并且撤销重偏向成功的话,那么就说明拿到了锁无需进行锁膨胀。但是,如果当前就在安全点,那么就需要执行安全点的撤销逻辑了。

  • revoke_at_safepoint

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    void BiasedLocking::revoke_at_safepoint(GrowableArray<Handle>* objs) {
    // 必须处于安全点
    assert(SafepointSynchronize::is_at_safepoint(), "must only be called while at safepoint");
    int len = objs->length();
    for (int i = 0; i < len; i++) {
    oop obj = (objs->at(i))();
    // 对撤销操作进行计数,并获取当前撤销类型
    HeuristicsResult heuristics = update_heuristics(obj, false);
    if (heuristics == HR_SINGLE_REVOKE) {
    // 撤销偏向
    revoke_bias(obj, false, false, NULL);
    } else if ((heuristics == HR_BULK_REBIAS) ||
    (heuristics == HR_BULK_REVOKE)) {
    // 批量撤销
    bulk_revoke_or_rebias_at_safepoint(obj, (heuristics == HR_BULK_REBIAS), false, NULL);
    }
    }
    // 遍历所有线程,逐步清除缓存的 monitor 信息
    clean_up_cached_monitor_info();
    }

    原本计划是用一篇文章将 synchronized 这块核心知识点讲完,但是既然写了,那就从头至尾详详细细的梳理一遍。当然,换来的就是文章篇幅过长,既然如此,那干脆将 synchronized 这块内容分为两篇吧。