锁与AQS

提到Java里的互斥同步锁,除了JVM提供的synchronized关键字之外,还有java.util.concurrent的locks包提供的可重入锁ReentrantLock,这篇文章主要从源码的角度来分析ReentrantLock及其实现基础AQS。

ReentrantLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
/** Synchronizer providing all implementation mechanics */
private final Sync sync;

/**
* Base of synchronization control for this lock. Subclassed
* into fair and nonfair versions below. Uses AQS state to
* represent the number of holds on the lock.
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
... ...
}

static final class NonfairSync extends Sync {
... ...
}

static final class FairSync extends Sync {
... ...
}

... ...
}

首先先看可重入锁的源码,它有一个Sync类型的成员变量,根据注视可以知道这个对象实际上实现了锁的所有机制。而Sync是ReentrantLock内部的一个静态类,继承自AbstractQueuedSynchronizer类也就是我们所熟知的AQS;而且ReentrantLock与Synchronized比较不同的一点在于,ReentrantLock除了默认的不公平锁之外,也可以实现公平锁。观察源码可以知道,ReentrantLock实现的公平锁和不公平锁也都是继承自Sync。所以可以说,ReentrantLock的锁的机制是通过AQS实现的,而它仅仅定义了自己的同步语义。因此,接下来我们重点看一下AQS是如何实现的。

AbstractQueuedSynchronizer(AQS)

打开AbstractQueuedSynchronizer.java,可以看到官方对于AQS的介绍

1
2
3
4
5
6
7
8
9
10
11
12
13
* Provides a framework for implementing blocking locks and related
* synchronizers (semaphores, events, etc) that rely on
* first-in-first-out (FIFO) wait queues. This class is designed to
* be a useful basis for most kinds of synchronizers that rely on a
* single atomic {@code int} value to represent state. Subclasses
* must define the protected methods that change this state, and which
* define what that state means in terms of this object being acquired
* or released. Given these, the other methods in this class carry
* out all queuing and blocking mechanics. Subclasses can maintain
* other state fields, but only the atomically updated {@code int}
* value manipulated using methods {@link #getState}, {@link
* #setState} and {@link #compareAndSetState} is tracked with respect
* to synchronization.

AbstractQueuedSynchronizer(AQS)为实现阻塞锁和基于FIFO等待队列的相关同步器提供了一个框架。AQS使用一个单一原子状态的Integer值表示同步器的状态。它的子类必须重写protected方法,以改变状态;同时也要定义从获取或释放对象的角度来说这些状态的含义。为了这些,AQS类中的其他方法处理所有队列阻塞机制。子类可以维护其他状态字段,但只有使用get和set以及compareAndSetState方法原子地更新表示状态的Integer值。

关于如何使用AQS,官方说明是AQS的子类应该被定义成非公共的内部类,然后实现外部类的同步机制即可。AQS并没有实现任何同步接口,它仅仅提供了对于同步状态的获取与释放的方法供具体的自定义的锁和同步器调用。

AQS定义的两种资源访问模式

  • 独占模式:只有一个线程可以访问,ReentrantLock
  • 共享模式:多个线程可以同时访问,Semaphore、CountDownLatch、ReadWriteLock,CyclicBarrier

同步器的同步状态定义和修改

1
2
3
4
5
6
7
8
private volatile int state;

protected final int getState() { return state;}
protected final void setState(int newState) {state = newState;}
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

数据结构

双链链表

AQS提供的队列是一个双向链表,定义了两个Node节点表示头和尾节点。

1
2
3
4
5
6
7
8
9
10
11
12
* Head of the wait queue, lazily initialized.  Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;

/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;

状态位

1
2
3
4
/**
* The synchronization state.
*/
private volatile int state;

AQS使用一个int型的变量表示当前线程的状态,通过setter和getter获得或修改它的值。

每个线程想要获得排他锁时会通过CAS操作尝试去修改state,如果修改成功则表示可以获得该锁。

模版方法模式

AQS采用了模版方法设计模式,它暴露出一些方法供子类重写,而在同步器内部的模版方法会调用这些被子类重写的方法。举个例子:

同步器中的tryAcquire方法定义如下:

1
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException();}

ReentrantLock类中的静态类NonfairSync中重写了tryAcquire方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}

而在AQS同步器中的模版方法acqiure中调用tryAcquire方法。

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

所以,NonfairSync调用继承的模版方法acquire时,内部的tryAcquire就会替换成自己实现的代码逻辑。这就是同步器中模版方法模式的应用。

锁机制

那么我们现在看一下AQS获取互斥锁的实现机制。

根据上一节NonfairSync的代码来说,它会首先通过CAS尝试修改state状态位,如果修改成功,则可以获得排他锁;否则调用acquire方法。

acquire方法是AQS内部定义的模版方法,它首先尝试再一次去获得锁,如果再一次获取失败,会执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

我们首先看addWaiter方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
 private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

代码比较简单,就是把当前线程加入到阻塞队列的尾部并返回这个节点对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

而在acquireQueued中则是通过自旋操作判断当前节点是否可以获得锁,条件是它已经处于队列的首部并且已经获得了锁。

总结

  1. ReentrantLock是基于AQS实现的。

  2. AQS是一个同步器,其他的同步组件(不光是锁,还包括CountDownLatch等)都是依赖于AQS实现的,这些同步组件的实现都是通过定义继承AQS的内部静态类。

  3. AQS采用模版方法设计,它定义的peotected方法可以被子类重写,而在模版方法中会调用重写的方法。

  4. AQS负责处理队列和线程的阻塞机制,而Lock等同步组件则专注于定义同步语义。

  5. AQS的核心数据结构是一个双向链表,用于存放阻塞的线程;还有一个int型的状态值state,表明同步状态,修改状态值底层采用CAS(compare and swap)

0%