博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【Java并发编程实战】——AbstractQueuedSynchronizer源码分析(一)
阅读量:4180 次
发布时间:2019-05-26

本文共 14128 字,大约阅读时间需要 47 分钟。

常说的 AQS 是什么

AbstractQueuedSynchronizer同步器(简称AQS),是用来构建锁或者其他同步组件的基础框架,使用一个 volatile 的整数成员标识锁的状态,通过内置的FIFO队列来实现多线程间竞争和等待。

同步队列的基本结构,一个指向头结点,一个指向尾节点,有节点加入都进入到队尾,设置本节点为尾节点,并修改链表关系。

修改尾结点执行基于CAS操作的 compareAndSetTail(pred, node)
同步队列基本结构
步骤说明:线程先获取同步状态,判断是否成功,成功继续运行线程,失败则自旋转加入到同步队列末尾。加入队列还没有获取到锁会被阻塞,线程被中断或者被前驱节点唤醒都会从阻塞恢复运行,接下来会继续获取同步状态,成功获取锁喉,设置自己为头结点,继续运行,最后唤醒头结点的后续节点。
在这里插入图片描述

AbstractQueuedSynchronizer 源码

先来看AQS 的成员、属性

/**  * 当前拥有独占访问权限的线程,继承自AbstractOwnableSynchronizer. */private transient Thread exclusiveOwnerThread;/**  * 同步队列的头结点,状态一定不为 CANCELLED.  */private transient volatile Node head;/** * 同步队列的尾结点. */private transient volatile Node tail;/** * 锁的同步状态,0表示没有被占用,大于0表示被占用即有线程持有了锁. */private volatile int state;/** * 允许自旋的纳秒时间,在此时间内线程不会阻塞,短时间内可以提高性能. */static final long spinForTimeoutThreshold = 1000L;/** * 同步(阻塞)队列的结点. */static final class Node {
/** 表明当前节点在共享模式下 */ static final Node SHARED = new Node(); /** 表明当前节点在独占模式下 */ static final Node EXCLUSIVE = null; /** 表明节点被取消 */ static final int CANCELLED = 1; /** 表明后续节点处于阻塞状态,此节点释放了锁后,会唤醒后续节点*/ static final int SIGNAL = -1; /** 表明节点阻塞在等待队列,其他线程调用 single() 方法后,会将此节点从等待队列移动到同步队列 */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ /** 表明下一个次共享是同步状态应该无条件地传播 */ static final int PROPAGATE = -3; /** * 节点的状态(CANCELLED、SIGNAL、CONDITION 、PROPAGATE ) */ volatile int waitStatus; /** * 上一个节点 */ volatile Node prev; /** * 下一个节点 */ volatile Node next; /** * 将节点加入队列的线程. */ volatile Thread thread; /** * 下一个等待队列中的节点,或者是特殊的共享节点. */ Node nextWaiter;}

介绍 AbstractQueuedSynchronizer 同步器之前,首先了解下 Lock 的使用方式,ReentrantLock 的实现主要是内聚了一个 AQS 的子类来完成控制访问的。

Lock lock = new ReentrantLock();lock.lock();try {
...} finally {
lock.unlock();}//ReentrantLock构造器,参数代表是否公平锁,默认非公平锁public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();}/** * Sync object for non-fair locks */static final class NonfairSync extends Sync {
...}/** * Base of synchronization control for this lock. Subclassed * into fair and nonfair versions below. Uses AQS state to * represent the number of holds on the lock. */abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L; /** * Performs {@link Lock#lock}. The main reason for subclassing * is to allow fast path for nonfair version. */ abstract void lock();}

reentrantLock.lock() 是怎么控制锁的呢?这里以非公平锁为例子,为了看代码方便,将调用步骤展示在一起

public void lock() {
//sync = new NonfairSync() sync.lock();}/** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */final void lock() {
//不公平锁,新线程直接竞争锁,不需要判断同步队列中是否有节点 if (compareAndSetState(0, 1)) //能进入代表竞争成功,调用父类的方法设置当前线程为锁拥有者 setExclusiveOwnerThread(Thread.currentThread()); else //不成功,调用模板方法再次尝试获取锁 acquire(1);}//模板方法,独占模式获取锁,忽略异常public final void acquire(int arg) {
//调用AQS实现类的方法尝试获取锁 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();}protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);}/** * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. *///非公平获取锁final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread(); int c = getState(); //0代表可以加锁 if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current); //设置 State 和独占线程后返回成功 return true; } } //已经有线程持有锁了 else if (current == getExclusiveOwnerThread()) {
//能进入说明持有锁的线程就是自己,将state加一 //重入锁,同一个线程可以多次获取锁,重入一次就加一 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } //锁被别人获取了,获取失败 return false;}

假设锁获取失败,tryAcquire 返回 false,如果锁获取成功,后面的就都不用执行了

public final void acquire(int arg) {
if (!tryAcquire(arg) && //先执行 addWaiter 方法 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //如果被中断,重新设置一下中断标识 selfInterrupt();}/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node *///创建一个节点,将节点入阻塞队列末尾private Node addWaiter(Node mode) {
//新建一个节点,节点类型为Node.EXCLUSIVE,代表是独占模式 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure //先尝试一次将节点设置成队尾节点 Node pred = tail; //队尾不为空 if (pred != null) {
//将新建的节点的前驱指向原来的队尾节点 node.prev = pred; //CAS操作,设置节点为阻塞队列的尾节点 if (compareAndSetTail(pred, node)) {
//成功后,将原来的队尾节点的后驱指向自己,阻塞队列是一个双向链表 pred.next = node; return node; } } //之前的一次尝试失败,原因:队列为空、其他进程也在加入队尾CAS操作失败 enq(node); //入尾队列后,返回此节点 return node;}/** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */private Node enq(final Node node) {
//这里循环,一定要将节点入阻塞队列的队尾 for (;;) {
Node t = tail; //队尾为空,一定要初始化 if (t == null) {
// Must initialize //这里可能失败,因为可能多个线程竞争锁同时都在初始化,失败了也没关系外层还有循环再重新试一次 if (compareAndSetHead(new Node())) //初始化后,头尾节点是一样的 tail = head; } else {
//队尾不为空,和上面的逻辑一样,成功了就建立好双向连接 node.prev = t; //注意,执行CAS成功,代表设置了本节点为尾节点,本节点的前驱指向原尾节点。 if (compareAndSetTail(t, node)) {
//将原尾节点的后驱指向本节点,这一步是在CAS之后,其他线程不一定立即可见 //任何时候要获取队列所有节点,需要从尾部往前遍历 t.next = node; return t; } } }}/** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting *///再次获取锁final boolean acquireQueued(final Node node, int arg) {
//获取锁是否失败标识 boolean failed = true; try {
//是否被中断标识 boolean interrupted = false; //退出循环的条件是获取锁成功,或者 tryAcquire() 发生异常 for (;;) {
//获取前驱 final Node p = node.predecessor(); //前驱为头节点才可以尝试获取锁,//tryAcquire参考之前说明 if (p == head && tryAcquire(arg)) {
//获取锁成功,设置本节点为队列头节点,已经获取到了锁,不用CAS setHead(node); //原头节点可以释放了 p.next = null; // help GC failed = false; //返回中断标识,也就是被中断了也会获取到锁,然后返回 //之后在 selfInterrupt() 方法中再重新设置中断标志,交给原调用者去处理中断 //不处理中断,传递保留中断标识,这是处理中断的一种方式 return interrupted; } //获取锁失败,判断是否需要阻塞此进程 if (shouldParkAfterFailedAcquire(p, node) && //前面一个节点还没有释放锁,阻塞此进程 parkAndCheckInterrupt()) interrupted = true; } } finally {
//如果失败,取消获取锁 if (failed) //能进入的这里的唯一情况是 tryAcquire() 执行时异常 //本次分析的 reentrantLock.lock() 不会执行到这里来 //tryAcquire()由AQS的子类实现,一般是不会到这里的,除非子类实现的有问题 //取消当前节点 cancelAcquire(node); }}/** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev. * * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block */private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; //前驱状态为-1,表明后续节点需要被阻塞,且当前驱节点释放了同步状态或者被取消,后续节点需要被唤醒 //返回true此节点先阻塞 if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; //前驱状态为1,表明前驱超时或者被中断 if (ws > 0) {
/* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ //前驱的前驱有可能也被取消了,一直向前找,直到第一个前驱状态<=0,将此节点的前驱指向找到的节点 //有没有发现这里没有使用CAS操作节点的前驱,能进入到这里表明此节点是尾结点或者在尾节点前面 //就算有其它线程新加入尾节点,此节点排在新加入节点前面 //本节点未被取消前,新加入的节点能修改自己的状态,但是是修改不了自己的前驱,不用担心并发修改 do {
node.prev = pred = pred.prev; } while (pred.waitStatus > 0); //将找到的节点的后驱指向此节点,之后返回重新获取锁 pred.next = node; } else {
/* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ //初始化的头节点状态为0,此时将头节点状态改为-1 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } //返回重新获取锁 return false;}/** * Convenience method to park and then check if interrupted * * @return {@code true} if interrupted */private final boolean parkAndCheckInterrupt() {
//阻塞当前线程 LockSupport.park(this); //线程恢复后,check当前线程是否处于interrupt,并清除interrupt状态 return Thread.interrupted();}/** * Cancels an ongoing attempt to acquire. * * @param node the node */private void cancelAcquire(Node node) {
// Ignore if node doesn't exist //节点不存在直接返回 if (node == null) return; node.thread = null; // Skip cancelled predecessors //前一个节点也可能被取消了 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. //记录找到的前驱指向的下一个节点 Node predNext = pred.next; // Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. //一定要将node节点的状态设置为取消,不管其他线程怎么操作本节点 //别的节点可以读到本节点的状态,然后跳过本节点 node.waitStatus = Node.CANCELLED; // If we are the tail, remove ourselves. //如果此节点为尾结点,设置前驱为尾结点 //CAS失败了(代表有新节点加入)也没关系,如果有后续节点加入队列会跳过本节点 if (node == tail && compareAndSetTail(node, pred)) {
//设置前驱节点的后驱为空,失败也没关系,新加入的节点会维护好链表关系的 compareAndSetNext(pred, predNext, null); } else {
// If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; //前驱不是头结点,且前驱节点的状态为-1,且前驱的线程不为空,尝试设置前驱的后继 if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {
Node next = node.next; //后驱不为空,且没有被中断,设置前驱的后驱为本节点的后驱,即跳过本节点 if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else {
//前驱是头结点或者前面失败,无条件唤醒后驱,让后驱维护好链表关系 //唤醒后驱节点,不管前驱头结点执行完怎么释放锁,本节点提前唤醒后驱,保证后驱能运行 //后驱获取了CPU时间执行acquireQueued(),发现前驱不是头结点 //进入 shouldParkAfterFailedAcquire() 中,跳过中断的本节点 //维护好链表关系,继续执行acquireQueued() //这个时候分两种情况:1.头节点已经释放了锁,那本节点后驱完全可以获取到锁然后执行; // 2.头节点还没有释放,那本节点后驱会再次阻塞,等待被头节点唤醒 unparkSuccessor(node); } node.next = node; // help GC } //综上各种CAS,失败的情况比较多,链表的关系主要由后驱维护 //只要保证本节点状态为1,保证后驱节点不阻塞在本节点就不会有问题}

获取锁失败,会创建一个包含当前线程的节点加入到阻塞队列中,然后此线程阻塞,这个节点前驱执行完释放锁的时候会唤醒此节点。

reentrantLock.unlock() 释放锁,执行这个方法的线程一定要是锁的拥有者,执行完后,恢复头节点之后的第一个节点。

public void unlock() {
sync.release(1);} /** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */public final boolean release(int arg) {
//尝试释放锁 if (tryRelease(arg)) {
//释放锁成功,唤醒后续节点 Node h = head; //头结点状态为空代表阻塞队列没有节点,不需要释放 //参考 shouldParkAfterFailedAcquire() 方法,为0的情况不需要唤醒 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false;}protected final boolean tryRelease(int releases) {
int c = getState() - releases; //若当前线程没有持有锁,抛出运行时异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); //可能此线程获取了多次锁,需要对应调用多次才会完全释放锁 boolean free = false; if (c == 0) {
//当state为0时,当前线程完全释放了锁 free = true; setExclusiveOwnerThread(null); } setState(c); return free;}/** * Wakes up node's successor, if one exists. * * @param node the node */ //唤醒此节点的下一个节点private void unparkSuccessor(Node node) {
/* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) //什么时候这里会失败呢? //在找遍所有可能修改状态的地方后,按照 ReentrantLock 的逻辑: //此节点被取消的话,ws 大于 0,不会进入这里, //有新节点加入到此节点后驱,可能将此状态从 0 设置为 Node.SIGNAL ,这里也不会报错 //因此如果是 ReentrantLock 类,个人觉得这里根本不会报错,不执行这一步都可以 compareAndSetWaitStatus(node, ws, 0); //唯一的可能是 node 为共享锁,两个线程同时执行 doReleaseShared 唤醒此节点 //假设线程A先执行将 node 变为 0,线程B发现 node 为 0 //这时线程B会将 node 变为 Node.PROPAGATE,保证将状态传递下去,于此同时 //node节点已经被唤醒,node也会执行 unparkSuccessor() ,导致此节点 CAS 失败 //后面分析 Semaphore 会再次提到 /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; //下一个节点可能为空或者被取消了 if (s == null || s.waitStatus > 0) {
s = null; //需要从后往前遍历,因为这个时候可能有新节点加入到队尾 for (Node t = tail; t != null && t != node; t = t.prev) //节点不为取消状态才唤醒 if (t.waitStatus <= 0) //找到第一个状态<=0,且不为本节点的阻塞节点就返回 s = t; } if (s != null) //唤醒找到的被阻塞的节点 LockSupport.unpark(s.thread);}

后续线程被唤醒后,会恢复执行 parkAndCheckInterrupt() ,之后检查中断标识,重新竞争锁。

ReentrantLock 公平锁、等待队列的分析请看

转载地址:http://lmrai.baihongyu.com/

你可能感兴趣的文章
Android native和h5混合开发几种常见的hybrid通信方式
查看>>
Vista/Win7 UAC兼容程序开发指南
查看>>
IOS程序开发框架
查看>>
安装jdk的步骤
查看>>
简述JAVA运算符
查看>>
简易ATM源代码及运行结果
查看>>
简述Java中的简单循环
查看>>
用JAVA实现各种乘法表
查看>>
for双重循环实现图形
查看>>
Java类和对象基础
查看>>
简述Java继承和多态
查看>>
Java中Arrays工具类的用法
查看>>
简述JAVA抽象类和接口
查看>>
JAVA常用基础类
查看>>
简述Java异常处理
查看>>
简述Java集合框架
查看>>
jQuery+ajax实现省市区(县)下拉框三级联动
查看>>
Spring中的AOP 面向切面编程
查看>>
简述Spring中的JDBC框架
查看>>
MyBatis 动态SQL
查看>>