AbstractQueuedSynchronizer

AbstractQueuedSynchronizer

是 Java 并发包的基础工具类,是实现 ReentrantLock、CountDownLatch、CyclicBarrier、Semaphore、FutureTask 等类的基础
属性如下:

// 头结点,你直接把它当做 当前持有锁的线程
private transient volatile Node head;
// 阻塞的尾节点,每个新的节点进来,都插入到最后,也就形成了一个链表
private transient volatile Node tail;
// 这个是最重要的,代表当前锁的状态,0代表没有被占用,大于 0 代表有线程持有当前锁
// 这个值可以大于 1,是因为锁可以重入,每次重入都加上 1
private volatile int state;
// 代表当前持有独占锁的线程,举个最重要的使用例子,因为锁可以重入
private transient Thread exclusiveOwnerThread; //继承自AbstractOwnableSynchronizer

其内部维护了一个等待队列 –包含头节点及阻塞队列 –由Node 组成的双向链表

Node 的结构简介

// 取值为上面的1、-1、-2、-3,或者0(以后会讲到)
// 这么理解,暂时只需要知道如果这个值 大于0 代表此线程取消了等待,
// ps: 半天抢不到锁,不抢了,ReentrantLock 是可以指定timeouot的。。。
volatile int waitStatus;
// 前驱节点的引用
volatile Node prev;
// 后继节点的引用
volatile Node next;
// 这个就是线程本尊
volatile Thread thread;

关键方法:

  • lock();(-子类) —> acquire()(-AQS); —> tryAcquire()(-子类);acquireQueued();addWaiter();enq();(-AQS); —> 结束
  • unlock();(-子类) —> release()(-AQS); —> tryRelease()(-子类);unparkSuccessor();(-AQS); —> 结束
  • tryAcquire(arg) 尝试获取下锁,如果返回true(整个条件队列为空或者是重入锁), 也就结束了;-子类实现
    下面方法 AQS 自己实现
  • addWaiter(Node mode) 线程包装成node,同时进入到队列中
  • enq(final Node node) 初始化头节点 或 自旋的方式入队
  • acquireQueued(final Node node, int arg) 真正的线程挂起,被唤醒然后去获取锁,都在这个方法里了;(返回true的话将进入 selfInterrupt(),所以正常情况下,应该返回false)
  • shouldParkAfterFailedAcquire(Node pred, Node node) “当前线程没有抢到锁,是否需要挂起当前线程?” 或者说 保证当前节点的prev.waitStatus== Node.SIGNAL
  • parkAndCheckInterrupt() 负责挂起线程的
  • tryRelease(int releases) 减少state的值,返回boolean, 是否完全释放锁
  • unparkSuccessor(Node node) 唤醒后继节点

tail==head的时候,其实阻塞队列是空的。(==null或者==new Node())
前驱节点的 waitStatus == -1 ,说明前驱节点状态正常,当前线程需要挂起
在并发环境下,加锁和解锁需要以下三个部件的协调:锁状态、线程的阻塞和解除阻塞、阻塞队列。


ReentrantLock,Condition

公平锁和非公平锁只有两处不同:
1. 非公平锁在调用 lock 后,首先就会调用 CAS 进行一次抢锁,如果这个时候恰巧锁没有被占用,那么直接就获取到锁返回了。
2. 在 tryAcquire 方法中,如果发现锁这个时候被释放了(state == 0),非公平锁会直接 CAS 抢锁,但是公平锁会判断等待队列是否有线程处于等待状态,如果有则不去抢锁,乖乖排到后面。

AbstractQueuedSynchronizer 类中的 ConditionObject

ReentrantLock 维持了一个阻塞队列;Condition 维持了一个条件队列
ReentrantLock与Condition都是Node组成的链表,prev 和 next 用于实现阻塞队列的双向链表,nextWaiter 用于实现条件队列的单向链表
关键属性

// 条件队列的第一个节点
private transient Node firstWaiter;
// 条件队列的最后一个节点
private transient Node lastWaiter;

Condition 是依赖于 ReentrantLock 的,不管是调用 await 进入等待还是 signal 唤醒,都必须获取到锁才能进行操作。

  1. 一个 ReentrantLock 实例可以通过多次调用 newCondition() 来产生多个 Condition 实例;
  2. 每个 condition 有一个关联的条件队列,await() 方法即可将当前线程包装成 Node 后加入到条件队列中,然后阻塞在这里;
  3. signal() 触发一次唤醒,唤醒的是队头,会将对应condition的条件队列的 firstWaiter(队头) 移到阻塞队列的队尾,等待获取锁。

关键方法:

  • addConditionWaiter() 将当前线程对应的节点入队,插入队尾
  • fullyRelease(node) 完全释放独占锁
  • while (!isOnSyncQueue(node)) 自旋,等待进入阻塞队列;isOnSyncQueue(node) 判断是否在阻塞队列。
  • checkInterruptWhileWaiting(node) 用于判断是否在线程挂起期间发生了中断
  • transferAfterCancelledWait(Node node) 判断在 signal 之前还是之后发生的中断,并将signal之前中断的节点转移到阻塞队列
  • acquireQueued(node, savedState) 上一节的方法,返回boolean代表线程是否被中断
  • reportInterruptAfterWait(int interruptMode) 处理中断状态
  • transferForSignal(Node node) 将节点从条件队列转移到阻塞队列,如果前驱节点取消或者前驱节点 CAS 失败,会唤醒线程。

有以下三种情况会让 LockSupport.park(this); 这句返回继续往下执行:(首先执行checkInterruptWhileWaiting())

  1. 常规路径。signal -> 转移节点到阻塞队列 -> 获取了锁(unpark) interruptMode==0
  2. 线程中断。在 park 的时候,另外一个线程对这个线程进行了中断 signal 之前中断,interruptMode==THROW_IE(-1)
  3. signal 的时候我们说过,转移以后的前驱节点取消了,或者对前驱节点的CAS操作失败了 signal 之后中断,interruptMode==REINTERRUPT(1)
  4. 假唤醒。这个也是存在的,和 Object.wait() 类似,都有这个问题

注意:

  • 前驱节点取消了,或者对前驱节点的CAS操作失败 –(wait()方法 没有对前驱节点做后续处理)
  • while (!isOnSyncQueue(node)); Thread.yield();
    signal 方法会将节点转移到阻塞队列,但是可能还没完成,这边自旋等待其完成。当然,这种事情还是比较少的吧:signal 调用之后,没完成转移之前,发生了中断
  • 带超时机制的 await
    await(long time, TimeUnit unit) 调用 parkNanos 方法来休眠指定的时间,醒来后判断是否 signal 调用了,调用了就是没有超时-true,否则就是超时了-false。(不带超时参数的 await 是 park,然后等待别人唤醒)
  • AQS 独占锁的取消排队
    lock() 仅设置线程的中断状态而已,不做任何处理
    lockInterruptibly() 发生中断异常时会跳到 cancelAcquire(node),把节点的 waitStatus 设置为 Node.CANCELLED,从而取消排队。

    再说 java 线程中断和 InterruptedException 异常 –即怎么用 线程中断??

  1. 不是说我们中断某个线程,这个线程就停止运行了。中断代表线程状态,每个线程都关联了一个中断状态,是一个 true 或 false 的 boolean 值,初始值为 false。
  2. 如果线程处于以下三种情况,那么当线程被中断的时候,能自动感知到(跳出该方法):
    • wait() join() sleep(long)
      中断线程会从这些方法中立即返回,抛出 InterruptedException 异常,同时重置中断状态为 false。
    • 实现了 InterruptibleChannel 接口的类中的一些 I/O 阻塞操作
      中断线程会导致这些方法抛出 ClosedByInterruptException 并重置中断状态。
    • Selector 中的 select 方法
      一旦中断,方法立即返回
      对于以上 3 种情况是最特殊的,因为他们能自动感知到中断(这里说自动,当然也是基于底层实现),并且在做出相应的操作后都会重置中断状态为 false。
      那是不是只有以上 3 种方法能自动感知到中断呢?不是的,如果线程阻塞在 LockSupport.park(Object obj) 方法,也叫挂起,这个时候的中断也会导致线程唤醒,但是唤醒后不会重置中断状态,所以唤醒后去检测中断状态将是 true。
      LockSupport.unpark(s.thread);不会改变s.thread的中断状态,即s.thread.isInterrupted()==false。
  3. 除了几个特殊类(如 Object,Thread等)外,感知中断并提前返回是通过轮询中断状态来实现的。我们自己需要写可中断的方法的时候,就是通过在合适的时机(通常在循环的开始处)去判断线程的中断状态,然后做相应的操作(通常是方法直接返回或者抛出异常)。
  4. 在并发包中,有很多方法,分别为响应中断和不响应中断 的操作

CountDownLatch,CyclicBarrier

CountDownLatch 这个类是比较典型的 AQS 的共享模式的使用

  • CountDownLatch 的使用场景:有 m 个线程是做任务的,有 n 个线程在某个栅栏上等待这 m 个线程做完任务,直到所有 m 个任务完成后,n 个线程同时通过栅栏。
  • 套路分析:AQS 里面的 state 是一个整数值,这边用一个 int count 参数其实初始化就是设置了这个值,所有调用了 await 方法的等待线程会挂起,然后有其他一些线程会做 state = state - 1 操作,当 state 减到 0 的同时,那个将 state 减为 0 的线程会负责唤醒 所有调用了 await 方法的线程。

关键方法:
1.await() 是一个阻塞方法,当 state 减为 0 的时候,await 方法才会返回。
—> tryAcquireShared(int acquires) 只有当 state == 0 的时候,这个方法才会返回 1
—> doAcquireSharedInterruptibly(arg) 获取共享锁
—> setHeadAndPropagate(node, r) –(setHead(node);doReleaseShared();//先把 head 给占了,然后唤醒队列中其他的线程)
2.countDown() 每次调用都会将 state 减 1,直到 state 的值为 0 唤醒所有等待的线程。
—> tryReleaseShared(int releases) 只有当 state 减为 0 的时候,才返回 true
—> doReleaseShared() —> unparkSuccessor(h) 唤醒 head 的后继节点

重点分析:

// 调用这个方法的时候,state == 0  
private void doReleaseShared() {  
    for (;;) {  
        Node h = head;  
        // 1. h == null: 说明阻塞队列为空  
        // 2. h == tail: 说明头结点可能是刚刚初始化的头节点,  
        //   或者是普通线程节点,但是此节点既然是头节点了,那么代表已经被唤醒了,阻塞队列没有其他节点了  
        // 所以这两种情况不需要进行唤醒后继节点  
        if (h != null && h != tail) {  
            int ws = h.waitStatus;  
            // t4 将头节点(此时是 t3)的 waitStatus 设置为 Node.SIGNAL(-1) 了  
            if (ws == Node.SIGNAL) {  
                // 这里 CAS 失败的场景请看下面的解读  
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))  
                    continue;            // loop to recheck cases  
                // 就是这里,唤醒 head 的后继节点,也就是阻塞队列中的第一个节点  
                // 在这里,也就是唤醒 t4  
                unparkSuccessor(h);  
            }  
            else if (ws == 0 &&  
                     // 这个 CAS 失败的场景是:执行到这里的时候,刚好有一个节点入队,入队会将这个 ws 设置为 -1  
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))  
                continue;                // loop on failed CAS  
        }  
        // 如果到这里的时候,前面唤醒的线程已经占领了 head,那么再循环  
        // 否则,就是 head 没变,那么退出循环,  
        // 退出循环是不是意味着阻塞队列中的其他节点就不唤醒了?当然不是,唤醒的线程之后还是会调用这个方法的  
        if (h == head)                   // loop if head changed  
            break;  
    }  
}    

我们分析下最后一个 if 语句,然后才能解释第一个 CAS 为什么可能会失败:

  1. h == head:说明头节点还没有被刚刚用 unparkSuccessor 唤醒的线程(这里可以理解为 t4)占有,此时 break 退出循环。
  2. h != head:头节点被刚刚唤醒的线程(这里可以理解为 t4)占有,那么这里重新进入下一轮循环,唤醒下一个节点(这里是 t5 )。我们知道,等到 t4 被唤醒后,其实是会主动唤醒 t5、t6、t7…,那为什么这里要进行下一个循环来唤醒 t5 呢?我觉得是出于吞吐量的考虑。
    满足上面的 2 的场景,那么我们就能知道为什么上面的 CAS 操作 compareAndSetWaitStatus(h, Node.SIGNAL, 0) 会失败了?
    因为当前进行 for 循环的线程到这里的时候,可能刚刚唤醒的线程 t4 也刚刚好到这里了,那么就有可能 CAS 失败了。
    CountDownLatch 完全没看出来共享模式。。。s.isShared() ??

CyclicBarrier

“可重复使用的栅栏”,与 CountDownLatch 不同,它是 ReentrantLock 和 Condition 的组合使用。
操作方法只有一个:await();每个线程 await 一次,count 减1,当最后一个线程 await 的时候,那么就来齐了,大家一起通过栅栏。
其他方法:
nextGeneration() –开启新的一代 相当重新初始化 CyclicBarrier
breakBarrier() –打破一个栅栏

Semaphore

类似一个资源池,每个线程需要调用 acquire() 方法获取资源,然后才能执行,执行完后,需要 release 资源,让给其他的线程用。
Semaphore 类似于操作系统中的信号量,可以控制对互斥资源的访问线程数。–by cyc2018
套路解读:

  • 创建 Semaphore 实例的时候,需要一个参数 permits,这个基本上可以确定是设置给 AQS 的 state 的,然后每个线程调用 acquire 的时候,执行 state = state - 1,release 的时候执行 state = state + 1,当然,acquire 的时候,如果 state = 0,说明没有资源了,需要等待其他线程 release。
  • availablePermits() 获取 state 值。