Java并发领域中ReentrantReadWriteLock源码深度解析概览
目录
- 1. ReentrantReadWriteLock是什么
- 2. 非公平ReentrantReadWriteLock
- 2.1. 特性阐述
- 2.2. 使用方式
- 2.3. 源码剖析
- 2.3.1. uml图相关
- 2.3.2. 构造函数细节
- 2.3.3. 读锁加锁流程
- 2.3.3.1. 借助AQS添加共享锁
- 2.3.3.1.1. 利用Sync尝试添加共享锁
- 2.3.3.1.1.1. 判定读操作是否需阻塞(非公平场景)
- 2.3.3.1.1.2. 快速加锁失败时的死循环加锁处理
- 2.3.3.1.2. 加锁失败后的AQS队列入队与阻塞
- 2.3.3.1.2.1. 加入AQS队列并阻塞
- 2.3.4. 读锁解锁流程
- 2.3.4.1. 借助AQS释放共享锁
- 2.3.4.1.1. 利用Sync尝试释放锁
- 2.3.4.1.2. 所有共享锁释放完毕后唤醒AQS队列头节点后续节点
- 2.3.4.1.2.1. 唤醒AQS队列头节点的下一个节点
- 2.3.5. 写锁加锁流程
- 2.3.5.1. 调用AQS添加互斥锁
- 2.3.5.1.1. 利用Sync尝试添加互斥锁
- 2.3.5.1.1.1. 判定写操作是否需阻塞(非公平场景)
- 2.3.5.1.2. 加锁失败后的AQS队列入队与阻塞
- 2.3.6. 写锁解锁流程
- 2.3.6.1. 调用AQS释放互斥锁
- 2.3.6.1.1. 调用Sync尝试释放互斥锁
- 2.3.6.1.2. 尝试解锁成功后唤醒AQS队列头节点后续节点
- 3. 公平ReentrantReadWriteLock
- 3.1. 特性说明
- 3.2. 使用示例
- 3.3. 源码剖析
- 3.3.1. uml相关内容
- 3.3.2. 构造函数详情
- 3.3.3. 读锁加锁流程
- 3.3.3.1. 借助AQS添加共享锁
- 3.3.3.1.1. 利用Sync尝试添加共享锁
- 3.3.3.1.1.1. 判定读操作是否需阻塞(公平场景)
- 3.3.3.1.1.2. 快速加锁失败时的死循环加锁处理
- 3.3.3.1.2. 加锁失败后的AQS队列入队与阻塞
- 3.3.3.1.2.1. 加入AQS队列并阻塞
- 3.3.4. 读锁解锁流程
- 3.3.4.1. 借助AQS释放共享锁
- 3.3.4.1.1. 利用Sync尝试释放锁
- 3.3.4.1.2. 所有共享锁释放完毕后唤醒AQS队列头节点后续节点
- 3.3.4.1.2.1. 唤醒AQS队列头节点的下一个节点
- 3.3.5. 写锁加锁流程
- 3.3.5.1. 调用AQS添加互斥锁
- 3.3.5.1.1. 利用Sync尝试添加互斥锁
- 3.3.5.1.1.1. 判定写操作是否需阻塞(公平场景)
- 3.3.5.1.2. 加锁失败后的AQS队列入队与阻塞
- 3.3.6. 写锁解锁流程
- 3.3.6.1. 调用AQS释放互斥锁
- 3.3.6.1.1. 调用Sync尝试释放互斥锁
- 3.3.6.1.2. 尝试解锁成功后唤醒AQS队列头节点后续节点
- 4. 参考
1. ReentrantReadWriteLock是什么
ReentrantLock能确保同一时刻仅有一个线程可在临界区进行读或写操作,这意味着即便有两个读线程同时读取数据,ReentrantLock也只允许其中一个通过。然而,我们期望的是读操作能够并发执行,一旦有写操作,其他线程就需等待。具体情况如下表所示:
是否可以同时进行 | 读 | 写 |
---|---|---|
读 | √ | × |
写 | × | × |
基于此需求,ReentrantReadWriteLock应运而生。
2. 非公平ReentrantReadWriteLock
2.1. 特性阐述
在非公平模式下,不管队列前面是否有线程在排队等待获取锁,线程都会直接去尝试抢占锁。
2.2. 使用方式
以下是在非公平模式下的使用示例代码:
public class ReadWriteLockTest
{
private static ReadWriteLock lock = new ReentrantReadWriteLock();//默认是非公平模式
private static Lock readLock = lock.readLock();
private static Lock writeLock = lock.writeLock();
private static List<Integer> data = new ArrayList<>();
public static void main(String[] args) throws InterruptedException
{
Thread readThread = new Thread(() -> {
while (true)
{
try
{
TimeUnit.MILLISECONDS.sleep(500);
readLock.lock();
System.out.println(Thread.currentThread().getName() + " read: " + data);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
finally
{
readLock.unlock();
}
}
});
Thread readThread2 = new Thread(() -> {
while (true)
{
try
{
TimeUnit.MILLISECONDS.sleep(300);
readLock.lock();
System.out.println(Thread.currentThread().getName() + " read: " + data);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
finally
{
readLock.unlock();
}
}
});
Thread writeThread = new Thread(() -> {
int i = 0;
while (true)
{
try
{
TimeUnit.MILLISECONDS.sleep(200);
writeLock.lock();
if (i % 2 == 0)
{
data.add(i);
}else
{
data.remove(0);
}
i++;
}
catch (InterruptedException e)
{
e.printStackTrace();
}
finally
{
writeLock.unlock();
}
}
});
readThread.start();
readThread2.start();
writeThread.start();
readThread.join();
readThread2.join();
writeThread.join();
}
}
2.3. 源码剖析
2.3.1. uml图相关
以下是相关的uml图示意:
@startuml
skinparam classAttributeIconSize 0
interface AQS{
}
class Sync{
}
interface Lock{
}
interface ReadWriteLock{
}
class ReentrantReadWriteLock{
}
class WriteLock{
}
class ReadLock{
}
class WriteLock{
}
Lock <|-- ReadLock
Lock <|-- WriteLock
ReadWriteLock <|-- ReentrantReadWriteLock
AQS <|-- Sync
ReentrantReadWriteLock --> ReadLock
ReentrantReadWriteLock --> WriteLock
ReadLock --> Sync
WriteLock --> Sync
@enduml
2.3.2. 构造函数细节
- ReentrantReadWriteLock构造函数:
public ReentrantReadWriteLock() {
// 默认使用非公平模式
this(false);
}
public ReentrantReadWriteLock(boolean fair) {
// 根据传入的fair参数初始化Sync,若为false则使用NonfairSync
sync = fair ? new FairSync() : new NonfairSync();
// 初始化读写锁
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
- ReentrantReadWriteLock.ReadLock构造函数:
protected ReadLock(ReentrantReadWriteLock lock) {
// 保存ReentrantReadWriteLock的Sync
sync = lock.sync;
}
- ReentrantReadWriteLock.WriteLock构造函数:
protected WriteLock(ReentrantReadWriteLock lock) {
// 保存ReentrantReadWriteLock的Sync
sync = lock.sync;
}
2.3.3. 读锁加锁流程
- ReentrantReadWriteLock.readLock方法:
// 返回读锁实例
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
- ReadLock.lock方法:
public void lock() {
// 调用AQS添加共享锁
sync.acquireShared(1);
}
2.3.3.1. 借助AQS添加共享锁
- AQS acquireShared方法:
public final void acquireShared(int arg) {
// 调用ReentrantReadWriteLock的Sync的tryAcquireShared方法
// 如果返回值小于0,说明加锁失败,需要执行doAcquireShared入队并阻塞
// 如果返回值大于等于0,说明加锁成功,执行后续逻辑
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
2.3.3.1.1. 利用Sync尝试添加共享锁
- ReentrantReadWriteLock.Sync.tryAcquireShared方法:
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 如果写锁数量不为0且加锁线程不是当前线程,返回-1表示加锁失败
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
// 非公平锁下判断读操作是否需阻塞
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 处理读锁相关的状态和计数
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
// 快速加锁失败时的处理
return fullTryAcquireShared(current);
}
2.3.3.1.1.1. 判定读操作是否需阻塞(非公平场景)
- ReentrantReadWriteLock.NonfairSync.readerShouldBlock方法:
final boolean readerShouldBlock() {
// 非公平模式下,若队列头节点是互斥节点(加写锁的节点)则需阻塞读操作
return apparentlyFirstQueuedIsExclusive();
}
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
2.3.3.1.1.2. 快速加锁失败时的死循环加锁处理
- ReentrantReadWriteLock.Sync.fullTryAcquireShared方法:
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
} else if (readerShouldBlock()) {
if (firstReader == current) {
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh;
}
return 1;
}
}
}
2.3.3.1.2. 加锁失败后的AQS队列入队与阻塞
- doAcquireShared方法:
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null;
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
2.3.3.1.2.1. 加入AQS队列并阻塞
- addWaiter方法:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
参考:5.AQS.md
2.3.4. 读锁解锁流程
- ReentrantReadWriteLock.ReadLock.unlock方法:
public void unlock() {
// 调用AQS释放共享锁
sync.releaseShared(1);
}
2.3.4.1. 借助AQS释放共享锁
- AbstractQueuedSynchronizer.releaseShared方法:
public final boolean releaseShared(int arg) {
// 调用Sync尝试释放共享锁,如果释放成功则执行doReleaseShared
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
2.3.4.1.1. 利用Sync尝试释放锁
- ReentrantReadWriteLock.Sync.tryReleaseShared方法:
```java
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
文章整理自互联网,只做测试使用。发布者:Lomu,转转请注明出处:https://www.it1024doc.com/12891.html