

相对于 synchronized 它具备如下特点
  • 可中断
  • 可以设置超时时间
  • 可以设置为公平锁
  • 支持多个条件变量
相同点 与 synchronized 一样,都支持可重入
// 获取锁 reentrantLock.lock(); try { // 临界区 } finally { // 释放锁 reentrantLock.unlock(); }


可重入是指同一个线程如果首次获得了这把锁,那么因为它是这把锁的拥有者,因此有权利再次获取这把锁 如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住
@Slf4j(topic = "c.TestReentrant") public class TestReentrant { static ReentrantLock lock = new ReentrantLock(); public static void main(String[] args) { method1(); } public static void method1() { lock.lock(); try { log.debug("execute method1"); method2(); } finally { lock.unlock(); } } public static void method2() { lock.lock(); try { log.debug("execute method2"); method3(); } finally { lock.unlock(); } } public static void method3() { lock.lock(); try { log.debug("execute method3"); } finally { lock.unlock(); } } }


必须使用lock.lockInterruptibly() 获取锁
如果是不可中断模式,那么即使使用了 interrupt 也不会让等待中断
@Slf4j(topic = "c.TestInterrupt") public class TestInterrupt { public static void main(String[] args) { test1(); } private static void test2() { ReentrantLock lock = new ReentrantLock(); Thread t1 = new Thread(() -> { log.debug("启动..."); lock.lock(); try { log.debug("获得了锁"); } finally { lock.unlock(); } }, "t1"); lock.lock(); log.debug("获得了锁"); t1.start(); try { sleep(1); t1.interrupt(); log.debug("执行打断"); sleep(1); } finally { log.debug("释放了锁"); lock.unlock(); } } private static void test1() { ReentrantLock lock = new ReentrantLock(); Thread t1 = new Thread(() -> { log.debug("启动..."); try { lock.lockInterruptibly(); } catch (InterruptedException e) { e.printStackTrace(); log.debug("等锁的过程中被打断"); return; } try { log.debug("获得了锁"); } finally { lock.unlock(); } }, "t1"); lock.lock(); log.debug("获得了锁"); t1.start(); try { sleep(1); t1.interrupt(); log.debug("执行打断"); } finally { lock.unlock(); } } }


@Slf4j(topic = "c.TestTimeout") public class TestTimeout { public static void main(String[] args) { test1(); } //超时失败 private static void test1() { ReentrantLock lock = new ReentrantLock(); Thread t1 = new Thread(() -> { log.debug("启动..."); try { if (!lock.tryLock(1, TimeUnit.SECONDS)) { log.debug("获取等待 1s 后失败,返回"); return; } } catch (InterruptedException e) { e.printStackTrace(); } try { log.debug("获得了锁"); } finally { lock.unlock(); } }, "t1"); lock.lock(); log.debug("获得了锁"); t1.start(); try { sleep(2); } finally { lock.unlock(); } } //立即失败 private static void test2() { ReentrantLock lock = new ReentrantLock(); Thread t1 = new Thread(() -> { log.debug("启动..."); if (!lock.tryLock()) { log.debug("获取立刻失败,返回"); return; } try { log.debug("获得了锁"); } finally { lock.unlock(); } }, "t1"); lock.lock(); log.debug("获得了锁"); t1.start(); try { sleep(2); } finally { lock.unlock(); } } }
使用 tryLock 解决哲学家就餐问题
public class TestDeadLock { public static void main(String[] args) { Chopstick c1 = new Chopstick("1"); Chopstick c2 = new Chopstick("2"); Chopstick c3 = new Chopstick("3"); Chopstick c4 = new Chopstick("4"); Chopstick c5 = new Chopstick("5"); new Philosopher("苏格拉底", c1, c2).start(); new Philosopher("柏拉图", c2, c3).start(); new Philosopher("亚里士多德", c3, c4).start(); new Philosopher("赫拉克利特", c4, c5).start(); new Philosopher("阿基米德", c5, c1).start(); } } @Slf4j(topic = "c.Philosopher") class Philosopher extends Thread { Chopstick left; Chopstick right; public Philosopher(String name, Chopstick left, Chopstick right) { super(name); this.left = left; this.right = right; } @Override public void run() { while (true) { // 尝试获得左手筷子 if (left.tryLock()) { try { // 尝试获得右手筷子 if (right.tryLock()) { try { eat(); } finally { right.unlock(); } } } finally { left.unlock(); } } } } private void eat() { log.debug("eating..."); Sleeper.sleep(1); } } class Chopstick extends ReentrantLock { String name; public Chopstick(String name) { = name; } @Override public String toString() { return "筷子{" + name + '}'; } }


ReentrantLock 默认是不公平的
public class TestFair { public static void main(String[] args) throws InterruptedException { ReentrantLock lock = new ReentrantLock(); lock.lock(); for (int i = 0; i < 20; i++) { new Thread(() -> { lock.lock(); try { System.out.println(Thread.currentThread().getName() + " running..."); } finally { lock.unlock(); } }, "t" + i).start(); } // 1s 之后去争抢锁 Thread.sleep(1000); for (int i = 0; i < 5; i++) { new Thread(() -> { lock.lock(); try { System.out.println(Thread.currentThread().getName() + " running..."); } finally { lock.unlock(); } }, "强行插入").start(); } lock.unlock(); } } // 强行插入 running... // 强行插入 running... // t0 running... // t1 running... // t2 running... // t3 running... // t4 running... // t5 running... // t6 running... // t7 running... // t8 running... // t9 running... // t10 running... // t11 running... // t12 running... // t13 running... // t14 running... // t15 running... // t16 running... // t18 running... // t17 running... // t19 running... // 强行插入 running... // 强行插入 running... // 强行插入 running...
ReentrantLock lock = new ReentrantLock(true);
t0 running... t1 running... t2 running... t3 running... t4 running... t5 running... t6 running... t9 running... t8 running... t7 running... t10 running... t12 running... t11 running... t13 running... t14 running... t15 running... t16 running... t17 running... t18 running... t19 running... 强行插入 running... 强行插入 running... 强行插入 running... 强行插入 running... 强行插入 running...


/** * Creates an instance of {@code ReentrantLock}. * This is equivalent to using {@code ReentrantLock(false)}. */ public ReentrantLock() { sync = new NonfairSync(); } public void lock() { sync.lock(); }
NonfairSync 继承自 AQS
/** * Sync object for non-fair locks */ static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { //首先用 cas 尝试(仅尝试一次)将 state 从 0 改为 1, 如果成功表示获得了独占锁 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } /** Sync 继承过来的方法, 方便阅读, 放在此处 * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 如果还没有获得锁 if (c == 0) { // 尝试用 cas 获得, 这里体现了非公平性: 不去检查 AQS 队列 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
没有竞争时 work flow
notion image
Thread-1 执行了
  1. CAS 尝试将 state 由 0 改为 1,结果失败
  1. 进入 tryAcquire 逻辑,这时 state 已经是1,结果仍然失败
  1. 接下来进入 addWaiter 逻辑,构造 Node 队列
      • 图中黄色三角表示该 Node 的 waitStatus 状态,其中 0 为默认正常状态
      • Node 的创建是懒惰的
      • 其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程
notion image
public final void acquire(int arg) { // 当 tryAcquire 返回为 false 时, 先调用 addWaiter, 接着 acquireQueued if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { // 将当前线程关联到一个 Node 对象上, 模式为独占模式 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure //如果 tail 不为 null, cas 尝试将 Node 对象加入 AQS 队列尾部 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { // 双向链表 = node; return node; } } //尝试将 Node 加入 AQS, enq(node); return node; } /** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize //还没有, 设置 head 为哨兵节点(不对应线程,状态为 0) if (compareAndSetHead(new Node())) tail = head; } else { // cas 尝试将 Node 对象加入 AQS 队列尾部 node.prev = t; if (compareAndSetTail(t, node)) { = node; return t; } } } }
当前线程进入 acquireQueued 逻辑
  1. acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞
  1. 如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
  1. 进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的 waitStatus 改为 -1,这次返回 false
    1. notion image
  1. shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,当然这时state 仍为 1,失败
  1. 当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回true
  1. 进入 parkAndCheckInterrupt, Thread-1 park(灰色表示)
    1. notion image
  1. 再次有多个线程经历上述过程竞争失败,变成这个样子
    1. notion image
/** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); // 上一个节点是 head, 表示轮到自己(当前线程对应的 node)了, 尝试获取 if (p == head && tryAcquire(arg)) { // 获取成功, 设置自己(当前线程对应的 node)为 head setHead(node); = null; /// 上一个节点 help GC failed = false;// 返回中断标记 false return interrupted; } //判断是否应当 park, 进入 if (shouldParkAfterFailedAcquire(p, node) && // park 等待, 此时 Node 的状态被置为 Node.SIGNAL parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //获取上一个节点的状态 int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. // 上一个节点都在阻塞, 那么自己也阻塞好了 */ return true; // > 0 表示取消状态 if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ // 上一个节点取消, 那么重构删除前面所有取消的节点, 返回到外层循环重试 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ // 这次还没有阻塞 // 但下次如果重试不成功, 则需要阻塞,这时需要设置上一个节点状态为 Node.SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1;
  • 设置 exclusiveOwnerThread 为 null
  • state = 0
notion image
当前队列不为 null,并且 head 的 waitStatus = -1,进入 unparkSuccessor 流程
找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,本例中即为 Thread-1
回到 Thread-1 的 acquireQueued 流程
notion image
  • exclusiveOwnerThread 为 Thread-1,state = 1
  • head 指向刚刚 Thread-1 所在的 Node,该 Node 清空 Thread
  • 原本的 head 因为从链表断开,而可被垃圾回收
如果这时候有其它线程来竞争(非公平的体现),例如这时有 Thread-4 来了
notion image
如果不巧又被 Thread-4 占了先
  • Thread-4 被设置为 exclusiveOwnerThread,state = 1
  • Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞
/** * Attempts to release this lock. * * <p>If the current thread is the holder of this lock then the hold * count is decremented. If the hold count is now zero then the lock * is released. If the current thread is not the holder of this * lock then {@link IllegalMonitorStateException} is thrown. * * @throws IllegalMonitorStateException if the current thread does not * hold this lock */ public void unlock() { sync.release(1); } /** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ public final boolean release(int arg) { //尝试释放锁, 进入 if (tryRelease(arg)) { // 队列头节点 unpark Node h = head; // 队列不为 null // waitStatus == Node.SIGNAL 才需要 unpark if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 支持锁重入, 只有 state 减为 0, 才释放成功 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. 如果状态为 Node.SIGNAL 尝试重置状态为 0 */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ // 找到需要 unpark 的节点, 但本节点从 AQS 队列中脱离, 是由唤醒节点完成的 Node s =; // 不考虑已取消的节点, 从 AQS 队列从后至前找到队列最前面需要 unpark 的节点 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }


final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 如果还没有获得锁 if (c == 0) { // 尝试用 cas 获得, 这里体现了非公平性: 不去检查 AQS 队列 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 支持锁重入, 只有 state 减为 0, 才释放成功 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }


  1. 不可打断模式
    1. public final void acquire(int arg) { // 当 tryAcquire 返回为 false 时, 先调用 addWaiter, 接着 acquireQueued if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); = null; failed = false; // 还是需要获得锁后, 才能返回打断状态 return interrupted; } //判断是否应当 park, 进入 if (shouldParkAfterFailedAcquire(p, node) && // park 等待, 此时 Node 的状态被置为 Node.SIGNAL parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private final boolean parkAndCheckInterrupt() { // 如果打断标记已经是 true, 则 park 会失效 LockSupport.park(this); // interrupted 会清除打断标记 return Thread.interrupted(); } static void selfInterrupt() { // 重新产生一次中断 Thread.currentThread().interrupt(); }
  1. 可打断模式
    1. public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); setHead(node); = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }


与非公平锁主要区别在于 tryAcquire 方法的实现
static final class FairSync extends Sync { public class ConditionObject implements Condition, { private static final long serialVersionUID = 1173984872572414699L; /** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter; /** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 先检查 AQS 队列中是否有前驱节点, 没有才去竞争 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; // h != t 时表示队列中有 Node // (s = == null 表示队列中还有没有老二 // 或者队列中老二线程不是此线程 return h != t && ((s = == null || s.thread != Thread.currentThread()); } }


每个条件变量其实就对应着一个等待队列,其实现类是 ConditionObject,它是AQS的一个静态内部类
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
开始 Thread-0 持有锁,调用 await,进入 ConditionObject 的 addConditionWaiter 流程 创建新的 Node 状态为 -2(Node.CONDITION),关联 Thread-0,加入等待队列尾部
notion image
(1) addConditionWaiter 源码
//添加一个 Node 至等待队列 private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. // 所有已取消的 Node 从队列链表删除, 见 (2) if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } // 创建一个关联当前线程的新 Node, 添加至队列尾部 Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
接下来进入 AQS 的 fullyRelease 流程,释放同步器上的锁
notion image
unpark AQS 队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么 Thread-1 竞争成功
notion image
park 阻塞 Thread-0
notion image
//外部类方法, 方便阅读, 放在此处 //(4)因为某线程可能重入,需要将 state 全部释放 final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } /** * Wakes up node's successor, if one exists. private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s =; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
假设 Thread-1 要来唤醒 Thread-0
进入 ConditionObject 的 doSignal 流程,取得等待队列中第一个 Node,即 Thread-0 所在 Node
notion image
/** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } /** * Removes and transfers nodes until hit non-cancelled one or * null. Split out from signal in part to encourage compilers * to inline the case of no waiters. * @param first (non-null) the first node on condition queue */ private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
执行 transferForSignal 流程,将该 Node 加入 AQS 队列尾部,将 Thread-0 的 waitStatus 改为 0,Thread-3 的waitStatus 改为 -1
notion image
/** * Transfers a node from a condition queue onto sync queue. * Returns true if successful. * @param node the node * @return true if successfully transferred (else the node was * cancelled before signal) */ final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
Thread-1 释放锁,进入 unlock 流程