0%

Java 锁机制之 Synchronized 实现(下)

引言

上篇文章,我们介绍了锁的入口、偏向锁的偏向与撤销。本文继续讲解锁的膨胀。

轻量级锁

系统未启用、偏向失败,synchronized 并不会立即升级成重量级锁,而是尝试使用轻量级锁的一种优化手段,此时 markWord 会变成轻量级锁结构。轻量级锁之所以能够提升性能是依据“对于绝大部分的锁,在整个同步周期内都不存在竞争”。

轻量级锁 markWord 结构

LockRecord

如果升级成轻量级锁时,当前的 markWord 结构就会变为指向 Lock Record 的指针。其实 Lock Record 并不是在锁升级为轻量级锁时产生的,它在偏向锁阶段就已经产生了。

slow_enter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* jdk/src/share/vm/runtime/synchronizer.cpp
*/
void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {
// 获取当前对象的 markWord,obj->mark() 实际调用的是 oopDesc 的 mark() 方法
markOop mark = obj->mark();
// 此时锁的偏向模式应该是撤销状态,毕竟轻量级锁就没有偏向谁这个概念了
assert(!mark->has_bias_pattern(), "should not see bias pattern here");

// 判断当前是否是无锁状态,也就是锁的标志位为 01
if (mark->is_neutral()) {

// 把当前对象的 markWord 保存到 displaced mark word
lock->set_displaced_header(mark);
// 更新 markWord 使其指向 BasicLock,如果成功,说明获取轻量级锁成功;如果多个线程竞争,那么失败的线程就会继续执行锁的膨胀操作
if (mark == (markOop) Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)) {
TEVENT (slow_enter: release stacklock) ;
return ;
}
} else
// 既然已经是有锁状态,先判断锁标志是否为 00(轻量级锁)以及当前线程是否是锁的持有者
if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) {
assert(lock != mark->locker(), "must not re-lock the same lock");
assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock");
// 既然是锁的持有者,说明是锁的重入,将 displaced mark word 置空
lock->set_displaced_header(NULL);
return;
}
// 这个优化可以忽略不计了
#if 0
// The following optimization isn't particularly useful.
if (mark->has_monitor() && mark->monitor()->is_entered(THREAD)) {
lock->set_displaced_header (NULL) ;
return ;
}
#endif

// 走到这,说明设置 displaced 头失败了,那么对于该值是多少也没什么意义。不过为了避免歧义,这里将锁标记设置为 11(3)。
lock->set_displaced_header(markOopDesc::unused_mark());
// 开始膨胀
ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
}

膨胀到轻量级锁时,首先通过对象的 markWord 判断锁的状态。如果是无锁态(锁标记为 01),那么将当前对象的 markWord 复制一份到 LockRecord,通过 CAS 将 markWord 指向 LockRecord。如果成功,说明没有发生竞争,该线程获取到了锁。

如果当前对象有锁(标记为 00)并且当前线程就是锁的持有者,那么说明是锁的重入,此时将 LockRecord 的 markWord 置空。

除此,说明多个线程在竞争锁,那么需要将 RecordLock 中的 markWord 锁标记改为 11,然后进行膨胀。

重量级锁

此时就属于传统的锁了,markWord 变为重量锁结构,也就是说指向 monitor 的指针:

monitor 对象

当 markWord 指向 monitor 时,说明此时已经处于重量级锁状态了。我们看下它的结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
* jdk/hotspot/src/share/vm/runtime/objectMonitor.hpp
*
* 只介绍部分相关属性
*/
class ObjectMonitor {
private:
// markWord 对象头
volatile markOop _header;
// 记录个数,_WaitSet + _EntryList
volatile intptr_t _count;
// 监视器锁宿主对象,也就是属于哪个对象
void* volatile _object;
// 多个线程争抢锁,会先存入这个单向链表
ObjectWaiter * volatile _cxq;
protected:
// 等待线程的个数
volatile intptr_t _waiters;
// 重入次数
volatile intptr_t _recursions;
// 拥有该 monitor 对象的线程或者基础锁
void * volatile _owner;
// 处于 wait 状态的线程集合,即调用 wait() 方法的线程
ObjectWaiter * volatile _WaitSet;
// 由于等待锁而 block 的线程集合
ObjectWaiter * volatile _EntryList
// 上一个拥有 monitor 对象的线程 id
volatile jlong _previous_owner_tid;

ObjectMonitor() {
_header = NULL;
_count = 0;
_waiters = 0,
_recursions = 0;
_object = NULL;
_owner = NULL;
_WaitSet = NULL;
_WaitSetLock = 0 ;
_Responsible = NULL ;
_succ = NULL ;
_cxq = NULL ;
FreeNext = NULL ;
_EntryList = NULL ;
_SpinFreq = 0 ;
_SpinClock = 0 ;
OwnerIsThread = 0 ;
_previous_owner_tid = 0;
}
}

了解了 monitor 对象结构之后,我们看下锁的膨胀过程:

inflate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
/**
* jdk/src/share/vm/runtmie/synchronizer.cpp
*/
ObjectMonitor * ATTR ObjectSynchronizer::inflate (Thread * Self, oop object) {
// Inflate mutates the heap ...
// Relaxing assertion for bug 6320749.
assert (Universe::verify_in_progress() ||
!SafepointSynchronize::is_at_safepoint(), "invariant") ;

for (;;) {
// 获取 markWord 对象
const markOop mark = object->mark() ;
// 此时不应处于偏向模式
assert (!mark->has_bias_pattern(), "invariant") ;

/*
* markWord 可能存在的状态:
* Inflated - 膨胀完成,即:拥有了 monitor
* Stack-locked - 栈锁定,需要强迫它膨胀
* INFLATING - 膨胀中
* Neutral - 无锁态,此时正在积极参与膨胀
* BIASED - 非法态,理论上不应该出现
*/

// 此时已膨胀完成,因为它也已经拥有了 monitor 对象(锁标志为 10)
if (mark->has_monitor()) {
// 获取当前对象对应的 monitor
ObjectMonitor * inf = mark->monitor() ;
// 必须是无锁状态
assert (inf->header()->is_neutral(), "invariant");
// 对象关联是否一致
assert (inf->object() == object, "invariant") ;
// 验证 monitor 有效性
assert (ObjectSynchronizer::verify_objmon_isinpool(inf), "monitor is invalid");
// 都没有问题,那就直接返回当前 monitor 对象
return inf ;
}

// 膨胀中,该状态只是一个暂时的中间态。只有一个线程可以完成膨胀,其他线程都需要等待。
if (mark == markOopDesc::INFLATING()) {
TEVENT (Inflate: spin while INFLATING) ;
// 调用 ReadStableMark 方法进行等待,方法内部会调用 os::NakedYield() 释放 CPU 资源
ReadStableMark(object) ;
continue ;
}
/*
* 如果是栈锁状态,也就是锁标志为 00 的轻量级锁(可能被当前线程持有,也可能被其他线程持有)
*
*/
if (mark->has_locker()) {
/*
* 为当前线程分配一个 objectMonitor 对象
*
* 如果按照常规方式分配 ObjectMonitor 对象,那么会增加 markWord 中出现 INFLATED 状态的间隔,从而增加锁竞争的概率。
* 所以这里使用线程私有的空闲 ObjectMonitor 列表分配的方式来降低锁的竞争。
* 这样还有一个好处就是不用考虑 CAS 设置 INFLATING 的时机。
* 具体逻辑可以参考 omAlloc 源码。
*
*/
ObjectMonitor * m = omAlloc (Self) ;
// 初始化 monitor 对象,降低 markWord 出现 INFLATING 的时间
m->Recycle();
m->_Responsible = NULL ;
m->OwnerIsThread = 0 ;
m->_recursions = 0 ;
m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ;

// 将 markWord 设置为 INFLATING 状态
markOop cmp = (markOop) Atomic::cmpxchg_ptr (markOopDesc::INFLATING(), object->mark_addr(), mark) ;

// 如果失败了,那就释放掉分配的 monitor 对象
if (cmp != mark) {
omRelease (Self, m, true) ;
continue ; // Interference -- just retry
}
/*
* 此时,已经成功的将 INFLATING(0)设置到了 markWord 中。这也是唯一成为 INFLATING 的方式。
* 只有成功设置为 INFLATING(0)的线程才能膨胀成功。
*
* 为什么要 CAS 一个 0 到 markWord 而不是直接将 stack-locked 更新到 INFLATED 状态呢?我们需要考虑当线程解锁 stack-locked 时会发生什么。
* 它会通过 CAS 将 displaced header 的值重新替换掉 markWord。
*
* inflate() 例程必须将 header 值从所有者堆栈上的 basiclock 复制到 objectMonitor,同时保证 hashCode 不变。如果持有者在 markWord 为 0 时决定释放锁,那么解锁操作将会失败。最终控制权会从 slow_exit 转为 infalte。持有者将会自旋等待直到 0 值消失。换句话说,处于膨胀阶段,如果持有者释放锁时碰巧是 0 值,那么会导致持有者暂停。重要的一点是,当 ojbect-> mark 为 0 时,mark->displaced_mark_helper 是稳定版。
*
* 而 0 就是作为膨胀过程中的一个“繁忙”标志。
*/

// 关闭重量锁的标志位,此时处于无锁状态 0 1
markOop dmw = mark->displaced_mark_helper() ;
// 校验无锁
assert (dmw->is_neutral(), "invariant") ;

// 设置 monitor header 值
m->set_header(dmw) ;
// 拥有该 monitor 对象的基础锁(可以根据与线程的关联关系优化成线程,同时设置字段 m->OwnerIsThread = 1)
m->set_owner(mark->locker());
// 监视器的宿主对象
m->set_object(object);

// 确保 monitor 的状态
guarantee (object->mark() == markOopDesc::INFLATING(), "invariant") ;
// 设置 markWord 值,并且将当前锁基本设置为重量级锁 1 0
object->release_set_mark(markOopDesc::encode(m));

// Hopefully the performance counters are allocated on distinct cache lines
// to avoid false sharing on MP systems ...
if (ObjectMonitor::_sync_Inflations != NULL) ObjectMonitor::_sync_Inflations->inc() ;
TEVENT(Inflate: overwrite stacklock) ;
if (TraceMonitorInflation) {
if (object->is_instance()) {
ResourceMark rm;
tty->print_cr("Inflating object " INTPTR_FORMAT " , mark " INTPTR_FORMAT " , type %s",
(void *) object, (intptr_t) object->mark(),
object->klass()->external_name());
}
}
// 膨胀成功,返回 monitor 对象
return m ;
}

// 接下来针对于无锁场景
assert (mark->is_neutral(), "invariant");
// 根据当前线程分配出一个 monitor 对象
ObjectMonitor * m = omAlloc (Self) ;
// 初始化 monitor 对象
m->Recycle();
m->set_header(mark);
m->set_owner(NULL);
m->set_object(object);
// 既然是无锁,那么持有者就是线程
m->OwnerIsThread = 1 ;
m->_recursions = 0 ;
m->_Responsible = NULL ;
m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ; // consider: keep metastats by type/class

// 将 markWord 指向 monitor 地址
if (Atomic::cmpxchg_ptr (markOopDesc::encode(m), object->mark_addr(), mark) != mark) {
// 如果竞争失败,那么需要释放 monitor
m->set_object (NULL) ;
m->set_owner (NULL) ;
m->OwnerIsThread = 0 ;
m->Recycle() ;
omRelease (Self, m, true) ;
m = NULL ;
continue ;
// interference - the markword changed - just retry.
// The state-transitions are one-way, so there's no chance of
// live-lock -- "Inflated" is an absorbing state.
}

// Hopefully the performance counters are allocated on distinct
// cache lines to avoid false sharing on MP systems ...
if (ObjectMonitor::_sync_Inflations != NULL) ObjectMonitor::_sync_Inflations->inc() ;
TEVENT(Inflate: overwrite neutral) ;
if (TraceMonitorInflation) {
if (object->is_instance()) {
ResourceMark rm;
tty->print_cr("Inflating object " INTPTR_FORMAT " , mark " INTPTR_FORMAT " , type %s",
(void *) object, (intptr_t) object->mark(),
object->klass()->external_name());
}
}
// 成功则返回 monitor
return m ;
}
}

/**
* Inflated 状态的转换是在 markOopDesc::encode(m) 方法中
* jdk/src/share/vm/oops/markOop.hpp
*/
static markOop encode(ObjectMonitor* monitor) {
intptr_t tmp = (intptr_t) monitor;
// monitor_value = 2(1 0)
return (markOop) (tmp | monitor_value);
}

锁的膨胀其实就是获取 ObjectMonitor 对象的过程,那么锁的竞争发生在什么地方呢?不知道大家是否还记得 inflate 方法调用的地方:

1
2
// 开始膨胀
ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);

当 inflate 方法拿到 ObjectMonitor 对象之后,就会调用它的 enter 方法,ObjectMonitor::enter 方法就是锁竞争的核心方法了!

enter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
/**
* 在深入了解之前,需要先了解一下具体的操作理论,如:monitor 列表、线程驻留等。
*
* 1、如果线程通过 CAS 成功的将 monitor 字段 _owner 从 null 转换为 non-null,那么此线程便拥有了 monitor 的控制权。
*
* 2、不变性:任何时刻,一个线程只能出现在 monitor 列表 cxq、EntryList、WaitSet 中的一个。
*
* 3、参与竞争的线程通过 CAS 将自己 push 到 cxq 中并自旋/挂起。
*
* 4、当线程得到锁之后,它必须从 cxq 或 EntryList 中出队。
*
* 5、获取到锁的线程出队之前会标记以及唤醒 EntryList 中作为“假定继承人” 的后继线程。重要的是,该线程并不会取消后继线程与 EntryList 的链接。
* 线程被唤醒之后会重新竞争 monitor 的控制权,它要不获取到锁要不重新挂起它自己。退出的线程并不会授权或传递控制权给它的后继线程。
* 相反,退出的线程会释放控制权以及尽可能的唤醒后继线程,所以后继线程可以竞争(重新)锁的控制权。如果 EntryList 为空而 cxq 非空,退出线程会转移 cxq 到 EntryList 中。通过使用 CAS 置 null 来分离 cxq 将线程从 cxq 折叠到 EntryList 来实现。其中 EntryList 是双向链表而 cxq 是单向链表,这是由于基于 CAS 的 “push” 入队最近到达的线程来决定的。
*
* 6、并发不变性:
* 1、只有 monitor 拥有者线程可以访问或者修改 EntryList。monitor 的互斥属性可以保护 EntryList 免于并发问题。
* 2、只有 monitor 拥有者可以分离 cxq。
*
* 7、monitor 列表(cxq、EntryList、WaitSet)的操作虽然避免了锁,但是它们并非是非锁的,除了入队。
*
* 8、可以多个线程并发入队 cxq 但只允许一个线程分离,这个机制避免了 ABA 问题。
*
* 9、总结起来就是,cxq 跟 EntryList 是由尝试获取锁的驻留线程组成的逻辑队列。通过这两个不同的队列来提高常量时间出队操作的概率以及降低链表末端的热度。关键一点就是降低 monitor 锁持有的时间。即使少量固定的自旋也能很大程度上降低 EntryList、cxq 上的入队出队次数进而减少内部锁及 monitor 元数据的竞争。
* cxq 保存着最近到达的线程并且由于是通过 CAS push 的操作,所以必须以单向链表的后进先出的顺序出队。当解锁线程发现 EntryList 为空而 _cxq 非空时,会在解锁时将 _cxq 转移到 EntryList。
* EntryList 按线性队列规则排序,它也可以被组织成双向链表或循环双向链表,便于我们可以在常量时间内进行插入、删除操作。队列在 exit() 调用时会强制调整,解锁线程会转移 cxq 到 EntryList,此时可以根据需要对 EntryList 中的线程进行排序以及重排序。
*
* 10、monitor 同步子系统避免了使用除 park-unpark 平台相关的本地同步机制原语。也就是说 monitor 的实现仅仅依赖于原子化操作及 park-unpark。monitor 子系统管理所有的 RUNNING->BLOCKED、BLOCKED->READY 的转换而底层系统管理 RADY<->RUN 的转换。
*
* 11、wait() 方法会将调用者线程寄宿在 WaitSet 中。
*
* 12、notify()、notifyAll() 方法只是负责将线程从 WaitSet 转移到 EntryList 或者 cxq 中。notify() 唤醒被通知者的方式比较低效,被通知者可能只是简单的将自己刺入到通知者持有的锁中。
*/
void ATTR ObjectMonitor::enter(TRAPS) {
// The following code is ordered to check the most common cases first
// and to reduce RTS->RTO cache line upgrades on SPARC and IA32 processors.
Thread * const Self = THREAD ;
void * cur ;

// 将当前线程设置为 monitor 的持有者
cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ;

// 说明设置 owner 成功
if (cur == NULL) {
// Either ASSERT _recursions == 0 or explicitly set _recursions = 0.
assert (_recursions == 0 , "invariant") ;
assert (_owner == Self, "invariant") ;
// CONSIDER: set or assert OwnerIsThread == 1
// 竞争锁成功,返回
return ;
}

// 如果是锁的重入
if (cur == Self) {
// TODO-FIXME: check for integer overflow! BUGID 6557169.
// 自增重入次数,然后返回
_recursions ++ ;
return ;
}

// 如果当前线程就是锁的持有者(cur 为基础锁对象而非线程)
if (Self->is_lock_owned ((address)cur)) {
assert (_recursions == 0, "internal state error");
// 重入次数改为 1
_recursions = 1 ;
// Commute owner from a thread-specific on-stack BasicLockObject address to a full-fledged "Thread *".
// 将持有者改为当前线程
_owner = Self ;
// 将持有者类型改为线程
OwnerIsThread = 1 ;
return ;
}

// 走到这里了,说明发生了竞争
assert (Self->_Stalled == 0, "invariant") ;
Self->_Stalled = intptr_t(this) ;

// 入队之前先进行一轮自旋
if (Knob_SpinEarly && TrySpin (Self) > 0) {
assert (_owner == Self , "invariant") ;
assert (_recursions == 0 , "invariant") ;
assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;
Self->_Stalled = 0 ;
// 成功获取锁,返回
return ;
}

// 持有者并非当前线程
assert (_owner != Self , "invariant") ;
// 当前线程并非后继线程
assert (_succ != Self , "invariant") ;
// 属于 Java 线程
assert (Self->is_Java_thread() , "invariant") ;
// 转换为 Java 线程
JavaThread * jt = (JavaThread *) Self ;
// 未处于安全点
assert (!SafepointSynchronize::is_at_safepoint(), "invariant") ;
// 线程状态并未阻塞
assert (jt->thread_state() != _thread_blocked , "invariant") ;
// monitor 所属对象非空
assert (this->object() != NULL , "invariant") ;
// cxq、EntryList 非空
assert (_count >= 0, "invariant") ;

// Prevent deflation at STW-time. See deflate_idle_monitors() and is_busy().
// Ensure the object-monitor relationship remains stable while there's contention.
// 递增计数
Atomic::inc_ptr(&_count);

EventJavaMonitorEnter event;

{ // 保存当前进程状态,同时将当前状态改为 BLOCKED_ON_MONITOR_ENTER
JavaThreadBlockedOnMonitorEnterState jtbmes(jt, this);

DTRACE_MONITOR_PROBE(contended__enter, this, object(), jt);
if (JvmtiExport::should_post_monitor_contended_enter()) {
JvmtiExport::post_monitor_contended_enter(jt, this);
}

OSThreadContendState osts(Self->osthread());
ThreadBlockInVM tbivm(jt);

// 设置线程等待获取锁的 monitor 对象
Self->set_current_pending_monitor(this);

// TODO-FIXME: change the following for(;;) loop to straight-line code.
for (;;) {
jt->set_suspend_equivalent();
// cleared by handle_special_suspend_equivalent_condition()
// or java_suspend_self()

// 如果获取锁失败,那么需要通过自旋的方式等待锁释放
EnterI (THREAD) ;

if (!ExitSuspendEquivalent(jt)) break ;

//
// We have acquired the contended monitor, but while we were
// waiting another thread suspended us. We don't want to enter
// the monitor while suspended because that would surprise the
// thread that suspended us.
//
_recursions = 0 ;
_succ = NULL ;
exit (false, Self) ;

jt->java_suspend_self();
}
Self->set_current_pending_monitor(NULL);
}

Atomic::dec_ptr(&_count);
assert (_count >= 0, "invariant") ;
Self->_Stalled = 0 ;

// Must either set _recursions = 0 or ASSERT _recursions == 0.
assert (_recursions == 0 , "invariant") ;
assert (_owner == Self , "invariant") ;
assert (_succ != Self , "invariant") ;
assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;

// The thread -- now the owner -- is back in vm mode.
// Report the glorious news via TI,DTrace and jvmstat.
// The probe effect is non-trivial. All the reportage occurs
// while we hold the monitor, increasing the length of the critical
// section. Amdahl's parallel speedup law comes vividly into play.
//
// Another option might be to aggregate the events (thread local or
// per-monitor aggregation) and defer reporting until a more opportune
// time -- such as next time some thread encounters contention but has
// yet to acquire the lock. While spinning that thread could
// spinning we could increment JVMStat counters, etc.

DTRACE_MONITOR_PROBE(contended__entered, this, object(), jt);
if (JvmtiExport::should_post_monitor_contended_entered()) {
JvmtiExport::post_monitor_contended_entered(jt, this);
}

if (event.should_commit()) {
event.set_klass(((oop)this->object())->klass());
event.set_previousOwner((TYPE_JAVALANGTHREAD)_previous_owner_tid);
event.set_address((TYPE_ADDRESS)(uintptr_t)(this->object_addr()));
event.commit();
}

if (ObjectMonitor::_sync_ContendedLockAttempts != NULL) {
ObjectMonitor::_sync_ContendedLockAttempts->inc() ;
}
}

  • enterI

    获取锁失败时,是通过 enterI 方法进行自旋等待锁释放。所以我们深入 enterI 看看具体的逻辑。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    void ATTR ObjectMonitor::EnterI (TRAPS) {
    Thread * Self = THREAD ;
    assert (Self->is_Java_thread(), "invariant") ;
    assert (((JavaThread *) Self)->thread_state() == _thread_blocked , "invariant") ;

    // 尝试获取一次锁,万一成了呢
    if (TryLock (Self) > 0) {
    assert (_succ != Self , "invariant") ;
    assert (_owner == Self , "invariant") ;
    assert (_Responsible != Self , "invariant") ;
    return ;
    }

    DeferredInitialize () ;

    // We try one round of spinning *before* enqueueing Self.
    //
    // If the _owner is ready but OFFPROC we could use a YieldTo()
    // operation to donate the remainder of this thread's quantum
    // to the owner. This has subtle but beneficial affinity
    // effects.

    // 尝试一次自旋
    if (TrySpin (Self) > 0) {
    assert (_owner == Self , "invariant") ;
    assert (_succ != Self , "invariant") ;
    assert (_Responsible != Self , "invariant") ;
    return ;
    }

    // 如果还是没有获取到锁,那么就需要入队挂起了
    assert (_succ != Self , "invariant") ;
    assert (_owner != Self , "invariant") ;
    assert (_Responsible != Self , "invariant") ;

    // Enqueue "Self" on ObjectMonitor's _cxq.
    //
    // Node acts as a proxy for Self.
    // As an aside, if were to ever rewrite the synchronization code mostly
    // in Java, WaitNodes, ObjectMonitors, and Events would become 1st-class
    // Java objects. This would avoid awkward lifecycle and liveness issues,
    // as well as eliminate a subset of ABA issues.
    // TODO: eliminate ObjectWaiter and enqueue either Threads or Events.
    //

    // 将当前线程封装成 ObjectWaiter 对象
    ObjectWaiter node(Self) ;
    Self->_ParkEvent->reset() ;
    node._prev = (ObjectWaiter *) 0xBAD ;
    node.TState = ObjectWaiter::TS_CXQ ;

    ObjectWaiter * nxt ;
    // 通过自旋的方式将线程存入 _cxq
    for (;;) {
    // 采用头插法入队 _cxq:thread-2 -> thread-1 -> thread-0
    node._next = nxt = _cxq ;
    // 存入成功则跳出循环,每次修改 _cxq 头结点为新入队节点
    if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;

    // 如果 _cxq 变更导致 CAS 失败,只需要重试就行了。当然,这可以再尝试获取锁一次
    if (TryLock (Self) > 0) {
    assert (_succ != Self , "invariant") ;
    assert (_owner == Self , "invariant") ;
    assert (_Responsible != Self , "invariant") ;
    return ;
    }
    }

    /*
    * 尝试为 monitor 对象分配一个 “责任” 线程,如果竞争的线程都挂起了,那么即使 monitor 解锁了,也没有线程能获取到。
    * 所以需要有一个定时挂起的线程来负责检查并唤醒相应线程,该线程就是 “责任” 线程。
    */
    if ((SyncFlags & 16) == 0 && nxt == NULL && _EntryList == NULL) {
    // Try to assume the role of responsible thread for the monitor.
    // CONSIDER: ST vs CAS vs { if (Responsible==null) Responsible=Self }
    Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
    }

    // The lock have been released while this thread was occupied queueing
    // itself onto _cxq. To close the race and avoid "stranding" and
    // progress-liveness failure we must resample-retry _owner before parking.
    // Note the Dekker/Lamport duality: ST cxq; MEMBAR; LD Owner.
    // In this case the ST-MEMBAR is accomplished with CAS().
    //
    // TODO: Defer all thread state transitions until park-time.
    // Since state transitions are heavy and inefficient we'd like
    // to defer the state transitions until absolutely necessary,
    // and in doing so avoid some transitions ...

    TEVENT (Inflated enter - Contention) ;
    int nWakeups = 0 ;
    int RecheckInterval = 1 ;

    for (;;) {
    // 拿到锁退出
    if (TryLock (Self) > 0) break ;
    assert (_owner != Self, "invariant") ;

    // 不存在 “责任” 线程时,设置 “责任” 线程
    if ((SyncFlags & 2) && _Responsible == NULL) {
    Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
    }

    // 如果当前线程是 “责任” 线程,那么定时挂起当前线程,否则直接挂起
    if (_Responsible == Self || (SyncFlags & 1)) {
    TEVENT (Inflated enter - park TIMED) ;
    Self->_ParkEvent->park ((jlong) RecheckInterval) ;
    // Increase the RecheckInterval, but clamp the value.
    RecheckInterval *= 8 ;
    if (RecheckInterval > 1000) RecheckInterval = 1000 ;
    } else {
    TEVENT (Inflated enter - park UNTIMED) ;
    Self->_ParkEvent->park() ;
    }

    // 尝试获取锁
    if (TryLock(Self) > 0) break ;

    // 记录无效的唤醒次数
    TEVENT (Inflated enter - Futile wakeup) ;
    if (ObjectMonitor::_sync_FutileWakeups != NULL) {
    ObjectMonitor::_sync_FutileWakeups->inc() ;
    }
    ++ nWakeups ;

    // Assuming this is not a spurious wakeup we'll normally find _succ == Self.
    // We can defer clearing _succ until after the spin completes
    // TrySpin() must tolerate being called with _succ == Self.
    // Try yet another round of adaptive spinning.
    if ((Knob_SpinAfterFutile & 1) && TrySpin (Self) > 0) break ;

    // We can find that we were unpark()ed and redesignated _succ while
    // we were spinning. That's harmless. If we iterate and call park(),
    // park() will consume the event and return immediately and we'll
    // just spin again. This pattern can repeat, leaving _succ to simply
    // spin on a CPU. Enable Knob_ResetEvent to clear pending unparks().
    // Alternately, we can sample fired() here, and if set, forgo spinning
    // in the next iteration.

    if ((Knob_ResetEvent & 1) && Self->_ParkEvent->fired()) {
    Self->_ParkEvent->reset() ;
    OrderAccess::fence() ;
    }
    if (_succ == Self) _succ = NULL ;

    // Invariant: after clearing _succ a thread *must* retry _owner before parking.
    OrderAccess::fence() ;
    }

    // 走到这里说明已经获取了锁
    assert (_owner == Self , "invariant") ;
    assert (object() != NULL , "invariant") ;
    // I'd like to write:
    // guarantee (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;
    // but as we're at a safepoint that's not safe.

    // 获取到锁之后的取消链接
    UnlinkAfterAcquire (Self, &node) ;
    // 如果后继线程是当前线程,将后继线程置 null
    if (_succ == Self) _succ = NULL ;

    assert (_succ != Self, "invariant") ;
    // 如果当前线程是 “责任” 线程
    if (_Responsible == Self) {
    // “责任” 线程置 null
    _Responsible = NULL ;
    OrderAccess::fence(); // Dekker pivot-point

    // We may leave threads on cxq|EntryList without a designated
    // "Responsible" thread. This is benign. When this thread subsequently
    // exits the monitor it can "see" such preexisting "old" threads --
    // threads that arrived on the cxq|EntryList before the fence, above --
    // by LDing cxq|EntryList. Newly arrived threads -- that is, threads
    // that arrive on cxq after the ST:MEMBAR, above -- will set Responsible
    // non-null and elect a new "Responsible" timer thread.
    //
    // This thread executes:
    // ST Responsible=null; MEMBAR (in enter epilog - here)
    // LD cxq|EntryList (in subsequent exit)
    //
    // Entering threads in the slow/contended path execute:
    // ST cxq=nonnull; MEMBAR; LD Responsible (in enter prolog)
    // The (ST cxq; MEMBAR) is accomplished with CAS().
    //
    // The MEMBAR, above, prevents the LD of cxq|EntryList in the subsequent
    // exit operation from floating above the ST Responsible=null.
    }

    // We've acquired ownership with CAS().
    // CAS is serializing -- it has MEMBAR/FENCE-equivalent semantics.
    // But since the CAS() this thread may have also stored into _succ,
    // EntryList, cxq or Responsible. These meta-data updates must be
    // visible __before this thread subsequently drops the lock.
    // Consider what could occur if we didn't enforce this constraint --
    // STs to monitor meta-data and user-data could reorder with (become
    // visible after) the ST in exit that drops ownership of the lock.
    // Some other thread could then acquire the lock, but observe inconsistent
    // or old monitor meta-data and heap data. That violates the JMM.
    // To that end, the 1-0 exit() operation must have at least STST|LDST
    // "release" barrier semantics. Specifically, there must be at least a
    // STST|LDST barrier in exit() before the ST of null into _owner that drops
    // the lock. The barrier ensures that changes to monitor meta-data and data
    // protected by the lock will be visible before we release the lock, and
    // therefore before some other thread (CPU) has a chance to acquire the lock.
    // See also: http://gee.cs.oswego.edu/dl/jmm/cookbook.html.
    //
    // Critically, any prior STs to _succ or EntryList must be visible before
    // the ST of null into _owner in the *subsequent* (following) corresponding
    // monitorexit. Recall too, that in 1-0 mode monitorexit does not necessarily
    // execute a serializing instruction.

    if (SyncFlags & 8) {
    OrderAccess::fence() ;
    }
    return ;
    }
    • tryLock

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      // Caveat: TryLock() is not necessarily serializing if it returns failure.
      // Callers must compensate as needed.
      int ObjectMonitor::TryLock (Thread * Self) {
      for (;;) {
      // monitor 当前的持有者
      void * own = _owner ;
      // 如果不为 null,说明已经有归属,那么上锁肯定失败
      if (own != NULL) return 0 ;
      // 如果 CAS 成功,说明获取到了锁
      if (Atomic::cmpxchg_ptr (Self, &_owner, NULL) == NULL) {
      // 重复次数肯定为 0,毕竟刚获得锁
      assert (_recursions == 0, "invariant") ;
      // 持有者是否为自己
      assert (_owner == Self, "invariant") ;
      // 说明上锁成功了
      return 1 ;
      }
      // 上锁失败
      if (true) return -1 ;
      }
      }

      总结

      enter 方法主要做了以下几件事:
    • 通过 CAS 操作,将当前线程设置到 monitor 对象的 _owner 字段中以获取锁,成功则返回。
    • 如果 _owner 指向的就是当前线程,说明发生了锁重入,递增 _recursions。
    • 如果当前线程获取锁成功,那么就将 _recursions 设置为 1,同时 _owner 指向当前线程。
    • 如果失败,需要等待锁的释放。

    如果获取锁失败,那么就需要通过自旋的方式等待锁的释放,也就是 enterI 方法要做的事:

    • 将当前线程封装成状态为 TS_CXQ 的 ObjectWaiter。
    • 通过自旋操作将 ObjectWaiter 节点 push 到 _cxq 队列中。
    • 进入到 _cxq 队列中的节点继续通过自旋的方式尝试获取锁,如果达到指定阈值还未成功,则通过 park 将自己挂起等待被唤醒。

锁的释放

轻量级锁的释放

轻量级锁的释放是通过 monitorexit 调用完成的。

monitorexit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// monitorexit 指令定义
IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorexit(JavaThread* thread, BasicObjectLock* elem))
#ifdef ASSERT
thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
Handle h_obj(thread, elem->obj());
assert(Universe::heap()->is_in_reserved_or_null(h_obj()),
"must be NULL or an object");
// 先判断当前的锁状态
if (elem == NULL || h_obj()->is_unlocked()) {
THROW(vmSymbols::java_lang_IllegalMonitorStateException());
}
// 释放锁
ObjectSynchronizer::slow_exit(h_obj(), elem->lock(), thread);

// Free entry. This must be done here, since a pending exception might be installed on
// exit. If it is not cleared, the exception handling code will try to unlock the monitor again.
// 将锁指向的目标对象设置为 null
elem->set_obj(NULL);
#ifdef ASSERT
thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
IRT_END

slow_exit

1
2
3
4
// 由 fast_exit 完成锁的释放
void ObjectSynchronizer::slow_exit(oop object, BasicLock* lock, TRAPS) {
fast_exit (object, lock, THREAD) ;
}

fast_exit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
void ObjectSynchronizer::fast_exit(oop object, BasicLock* lock, TRAPS) {
// 锁状态不能处于偏移模式
assert(!object->mark()->has_bias_pattern(), "should not see bias pattern here");
// if displaced header is null, the previous enter is recursive enter, no-op

// 获取锁中的 displaced_header 用于还原 markWord
markOop dhw = lock->displaced_header();
markOop mark ;
// 如果为 null,说明前一个锁的进入是重入的,除了校验之外无需做任何操作
if (dhw == NULL) {
// Recursive stack-lock.
// Diagnostics -- Could be: stack-locked, inflating, inflated.
// 去对象中的 markWord
mark = object->mark() ;
// 有锁状态
assert (!mark->is_neutral(), "invariant") ;
// 轻量级锁并且并非处于膨胀中状态,那么当前线程必须为锁的持有者
if (mark->has_locker() && mark != markOopDesc::INFLATING()) {
assert(THREAD->is_lock_owned((address)mark->locker()), "invariant") ;
}
// 重量级锁
if (mark->has_monitor()) {
ObjectMonitor * m = mark->monitor() ;
assert(((oop)(m->object()))->mark() == mark, "invariant") ;
assert(m->is_entered(THREAD), "invariant") ;
}
return ;
}

// 非锁重入,那么取 markWord
mark = object->mark() ;

// 如果是轻量级锁,那么只需用 displaced_header 替换回 markWord 即可
if (mark == (markOop) lock) {
assert (dhw->is_neutral(), "invariant") ;
if ((markOop) Atomic::cmpxchg_ptr (dhw, object->mark_addr(), mark) == mark) {
TEVENT (fast_exit: release stacklock) ;
return;
}
}
// 替换失败升级为重量级锁
ObjectSynchronizer::inflate(THREAD, object)->exit (true, THREAD) ;
}

轻量级锁的释放很简单,只是将当前线程栈中锁记录的 displaced_header 头替换回对象 markWord 即可。如果替换失败,那么需要膨胀成重量级锁,实现重量级锁的释放逻辑。

重量级锁的释放

重量级锁的释放由 ObjectMonitor::exit 方法来实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
void ATTR ObjectMonitor::exit(bool not_suspended, TRAPS) {
Thread * Self = THREAD ;
// 当前 _owner 未指向当前线程
if (THREAD != _owner) {
// 如果 _owner 指向了当前线程的栈锁
if (THREAD->is_lock_owned((address) _owner)) {
// 该场景下肯定未发生重入(因为重入的时候会进行栈锁到线程的替换)
assert (_recursions == 0, "invariant") ;
// 将栈锁替换为线程
_owner = THREAD ;
// 重入次数为 0
_recursions = 0 ;
// owner 指向的类型改为线程
OwnerIsThread = 1 ;
} else {
// NOTE: we need to handle unbalanced monitor enter/exit
// in native code by throwing an exception.
// TODO: Throw an IllegalMonitorStateException ?
TEVENT (Exit - Throw IMSX) ;
assert(false, "Non-balanced monitor enter/exit!");
if (false) {
THROW(vmSymbols::java_lang_IllegalMonitorStateException());
}
return;
}
}

// 如果发生了锁的重入
if (_recursions != 0) {
// 递减次数,然后返回
_recursions--;
TEVENT (Inflated exit - recursive) ;
return ;
}

// Invariant: after setting Responsible=null an thread must execute
// a MEMBAR or other serializing instruction before fetching EntryList|cxq.
if ((SyncFlags & 4) == 0) {
// “责任” 线程置 null
_Responsible = NULL ;
}

#if INCLUDE_TRACE
// get the owner's thread id for the MonitorEnter event
// if it is enabled and the thread isn't suspended
if (not_suspended && Tracing::is_event_enabled(TraceJavaMonitorEnterEvent)) {
_previous_owner_tid = SharedRuntime::get_java_tid(Self);
}
#endif

for (;;) {
// 释放锁的线程为当前锁的持有者
assert (THREAD == _owner, "invariant") ;


if (Knob_ExitPolicy == 0) {
// 释放锁,将 _owner 置为 null
OrderAccess::release_store_ptr (&_owner, NULL) ;
// 写读屏障
OrderAccess::storeload() ; // See if we need to wake a successor
if ((intptr_t(_EntryList)|intptr_t(_cxq)) == 0 || _succ != NULL) {
TEVENT (Inflated exit - simple egress) ;
return ;
}
TEVENT (Inflated exit - complex egress) ;

/*
* 通常退出的线程负责确保后继线程,但是如果其他的后继线程已经就绪或者
* 其他进入的线程都在自旋,那么当前线程可以简单地将 _owner 置为 null
* 而不用唤醒后继线程。“责任” 也会转移到已经就绪后者正在运行的后继线程。
*
* 变量 _succ 表示后继线程已经唤醒但还未运行,它对减少无效的唤醒频率至关重要。
* Enter() 中的自旋线程可以将 _succ 设置为非 null,在当前实现中,自旋
* 线程通过设置 _succ 使得退出线程避免唤醒后继线程(当然还有另外一种
* 方案是,退出线程释放锁后在通过短暂自旋以查看是否有自旋线程在尝试获取锁
* 如果是,退出线程可以迅速退出而免于唤醒后继线程,否则就需要出队并唤醒一个后继。注意,我们需要缩短 post-drop 的自旋时间但是不能短于最坏情况下的往返缓存行迁移时间。
* 释放的锁需要对自旋者可见,自旋者获取的锁也必须对退出的线程可见。
*
* 只有锁的持有者才能变更 EntryList 或者转移 _cxq,如果获取锁失败,
* 那么确保后继线程的责任就落实到后续的新持有者了。
*/
if (Atomic::cmpxchg_ptr (THREAD, &_owner, NULL) != NULL) {
return ;
}
TEVENT (Exit - Reacquired) ;
} else {
if ((intptr_t(_EntryList)|intptr_t(_cxq)) == 0 || _succ != NULL) {
OrderAccess::release_store_ptr (&_owner, NULL) ; // drop the lock
OrderAccess::storeload() ;
// Ratify the previously observed values.
if (_cxq == NULL || _succ != NULL) {
TEVENT (Inflated exit - simple egress) ;
return ;
}

/* 如果处于快速退出的线程与慢进入的线程发生了竞争,那么有以下两种选择:
* A. 尝试重新获取锁。
* 如果 CAS 失败并立即返回,那么要不重启/新执行退出操作要不执行
* 以下唤醒后继线程的代码。
* B. 如果构成 EntryList/cxq 的元素是 TSM,那么我们可以简单的唤醒
* 一个引导线程而无需设置 _succ。
*/
if (Atomic::cmpxchg_ptr (THREAD, &_owner, NULL) != NULL) {
TEVENT (Inflated exit - reacquired succeeded) ;
return ;
}
TEVENT (Inflated exit - reacquired failed) ;
} else {
TEVENT (Inflated exit - complex egress) ;
}
}

guarantee (_owner == THREAD, "invariant") ;

ObjectWaiter * w = NULL ;
int QMode = Knob_QMode ;

if (QMode == 2 && _cxq != NULL) {
// QMode == 2 : cxq 优先于 EntryList.
// 尝试直接唤醒 cxq 中的后继线程,如果成功,后继线程需要从 cxq 中解绑。
w = _cxq ;
assert (w != NULL, "invariant") ;
assert (w->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
// 唤醒节点
ExitEpilog (Self, w) ;
return ;
}

if (QMode == 3 && _cxq != NULL) {
// 将 cxq 转移到 EntryList 中
w = _cxq ;
// 将 cxq 中的元素置空
for (;;) {
assert (w != NULL, "Invariant") ;
ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
if (u == w) break ;
w = u ;
}
assert (w != NULL , "invariant") ;

ObjectWaiter * q = NULL ;
ObjectWaiter * p ;
// 变更状态
for (p = w ; p != NULL ; p = p->_next) {
guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
p->TState = ObjectWaiter::TS_ENTER ;
p->_prev = q ;
q = p ;
}


ObjectWaiter * Tail ;
// 找到 EntryList 的尾节点
for (Tail = _EntryList ; Tail != NULL && Tail->_next != NULL ; Tail = Tail->_next) ;
if (Tail == NULL) {
_EntryList = w ;
} else {
// 双向指针,将 _cxq 中的元素追加到 EntryList 的尾部
Tail->_next = w ;
w->_prev = Tail ;
}

// Fall thru into code that tries to wake a successor from EntryList
}

if (QMode == 4 && _cxq != NULL) {
w = _cxq ;
for (;;) {
assert (w != NULL, "Invariant") ;
ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
if (u == w) break ;
w = u ;
}
assert (w != NULL , "invariant") ;

ObjectWaiter * q = NULL ;
ObjectWaiter * p ;
for (p = w ; p != NULL ; p = p->_next) {
guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
p->TState = ObjectWaiter::TS_ENTER ;
p->_prev = q ;
q = p ;
}

// 将 EntryList 追加到 _cxq 的尾部
if (_EntryList != NULL) {
q->_next = _EntryList ;
_EntryList->_prev = q ;
}
// 将 EntryList 头指针指向 cxq 头部元素,也就是说 EntryList 头部元素是最新运行的线程
_EntryList = w ;

// Fall thru into code that tries to wake a successor from EntryList
}

w = _EntryList ;
if (w != NULL) {
// I'd like to write: guarantee (w->_thread != Self).
/*
* 实践中,正在退出的线程也可能出现在 EntryList 中。当线程 T1 调用
* 0.wait() 时,会将 T1 入队到 0 的 waitset 中,接着调用 exit()
* 通过将 0._owner 设置为 null 来释放锁。T2 获取到了 0 并且调用
* 0.notify(),此时 T1 就会从 0 的 waitset 转移到 0 的 EntryList
* 中。T2 就会释放锁,将 _owner 设置为 null,此时 T2 注意到
* EntryList 已经被填充,那么处于 EntryList 中的 T1 就会重新申请锁,
* 但是此时它发现它就处于 EntryList 中。
*
*/
// 鉴于此,我们不得不考虑 w 与 self 关联的情况。
assert (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
// 唤醒节点
ExitEpilog (Self, w) ;
return ;
}

// EntryList 为空,此时查看 _cxq 是否也为空,如果如此,那么继续执行循环
w = _cxq ;
if (w == NULL) continue ;

// _cxq 不为空,那么将 _cxq 置空
for (;;) {
assert (w != NULL, "Invariant") ;
ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
if (u == w) break ;
w = u ;
}
TEVENT (Inflated exit - drain cxq into EntryList) ;
// 此时 w 肯定是不为空的,它指向了原 _cxq 队列
assert (w != NULL , "invariant") ;
// EntryList 肯定为空
assert (_EntryList == NULL , "invariant") ;

// Convert the LIFO SLL anchored by _cxq into a DLL.
// The list reorganization step operates in O(LENGTH(w)) time.
// It's critical that this step operate quickly as
// "Self" still holds the outer-lock, restricting parallelism
// and effectively lengthening the critical section.
// Invariant: s chases t chases u.
// TODO-FIXME: consider changing EntryList from a DLL to a CDLL so
// we have faster access to the tail.

if (QMode == 1) {
// QMode == 1 : 将 cxq 转移至 EntryList,并对集合中的元素进行反转
ObjectWaiter * s = NULL ;
/*
* thread-2 -> thread-1 -> thread-0 (_cxq,后进先出)
* w t
*/
ObjectWaiter * t = w ;
ObjectWaiter * u = NULL ;
while (t != NULL) {
// 当前 w 指向的元素状态必须为 TS_CXQ
guarantee (t->TState == ObjectWaiter::TS_CXQ, "invariant") ;
t->TState = ObjectWaiter::TS_ENTER ;
u = t->_next ;
t->_prev = u ;
t->_next = s ;
s = t;
t = u ;
}
/*
* thread-0 <-> thread-1 <-> thread-2
* s
*/
_EntryList = s ;
assert (s != NULL, "invariant") ;
} else {
// QMode == 0 or QMode == 2
_EntryList = w ;
ObjectWaiter * q = NULL ;
ObjectWaiter * p ;
/*
* thread-2 <-> thread-1 <-> thread-0
* EntryList
*/
for (p = w ; p != NULL ; p = p->_next) {
guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
p->TState = ObjectWaiter::TS_ENTER ;
// 变成双链表
p->_prev = q ;
q = p ;
}
}

// In 1-0 mode we need: ST EntryList; MEMBAR #storestore; ST _owner = NULL
// The MEMBAR is satisfied by the release_store() operation in ExitEpilog().

// See if we can abdicate to a spinner instead of waking a thread.
// A primary goal of the implementation is to reduce the
// context-switch rate.
if (_succ != NULL) continue;

w = _EntryList ;
if (w != NULL) {
// 状态校验
guarantee (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
// 唤醒节点
ExitEpilog (Self, w) ;
return ;
}
}
}

ExitEpilog

ExitEpilog 负责节点的唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
void ObjectMonitor::ExitEpilog (Thread * Self, ObjectWaiter * Wakee) {
// 持有者必须为释放锁的线程
assert (_owner == Self, "invariant") ;

// 大体流程:
// 1. ST _succ = wakee
// 2. membar #loadstore|#storestore;
// 2. ST _owner = NULL
// 3. unpark(wakee)

// 如果后继功能开启,那么将 _succ 指向等待唤醒的节点线程
_succ = Knob_SuccEnabled ? Wakee->_thread : NULL ;
ParkEvent * Trigger = Wakee->_event ;

// Hygiene -- once we've set _owner = NULL we can't safely dereference Wakee again.
// The thread associated with Wakee may have grabbed the lock and "Wakee" may be
// out-of-scope (non-extant).
Wakee = NULL ;

// 释放锁
OrderAccess::release_store_ptr (&_owner, NULL) ;
OrderAccess::fence() ; // ST _owner vs LD in unpark()

if (SafepointSynchronize::do_call_back()) {
TEVENT (unpark before SAFEPOINT) ;
}

DTRACE_MONITOR_PROBE(contended__exit, this, object(), Self);
// 唤醒线程
Trigger->unpark() ;

// Maintain stats and report events to JVMTI
if (ObjectMonitor::_sync_Parks != NULL) {
ObjectMonitor::_sync_Parks->inc() ;
}
}