所有分类
  • 所有分类
  • 未分类

Java-ReentrantLock的用法和原理

简介

说明

本文介绍Java的JUC中的ReentrantLock(可重入独占式锁)。包括:用法、原理。

概述

ReentrantLock主要利用AQS队列来实现。它支持公平锁和非公平锁。

AQS队列使用了CAS,所以ReentrantLock有CAS的优缺点。优点:性能高。缺点:CPU占用高。

ReentrantLock的流程

  1. state初始化为0,表示未锁定状态
  2. A线程lock()时,会调用tryAcquire()获取锁并将state+1
  3. 其他线程tryAcquire获取锁会失败,直到A线程unlock() 到state=0,其他线程才有机会获取该锁。
  4. A释放锁之前,自己可以重复获取此锁(state累加),这就是可重入的概念。

注意:获取多少次锁就要释放多少次锁,保证state能回到0。 

示例

private Lock lock = new ReentrantLock();
 
public void test(){
    lock.lock();
    try{
        doSomeThing();
    }catch (Exception e){
        // ignored
    }finally {
        lock.unlock();
    }
}

公平与非公平

ReentrantLock的默认实现是非公平锁,但是也可以设置为公平锁。

  • 非公平锁
    • 如果同时还有另一个线程进来尝试获取,那么有可能会让这个线程抢先获取;
  • 公平锁
    • 如果同时还有另一个线程进来尝试获取,当它发现自己不是在队首的话,就会排到队尾,由队首的线程获取到锁。

ReentrantLock提供了两个构造器,分别是

public ReentrantLock() {
    sync = new NonfairSync();
}
 
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

NonfairSync的lock()方法

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

首先用一个CAS操作,判断state是否是0(表示当前锁未被占用),如果是0则把它置为1,并且设置当前线程为该锁的独占线程,表示获取锁成功。当多个线程同时尝试占用同一个锁时,CAS操作只能保证一个线程操作成功,剩下的只能去排队啦。

“非公平”即体现在这里,如果占用锁的线程刚释放锁,state置为0,而排队等待锁的线程还未唤醒时,新来的线程就直接抢占了该锁,那么就“插队”了。

FairSync的lock()方法

final void lock() {
    acquire(1);
}

直接调用acquire(1)方法。

非公平锁lock()原理

场景

简述:A线程获得锁,B和C线程失败,B和C执行acquire(1);

本处以非公平锁(NonfairSync)示例进行讲解,假设有如下场景:有三个线程去竞争锁,假设线程A的CAS操作成功了,拿到了锁开开心心的返回了,那么线程B和C则设置state失败,走到了else里面。

NonfairSync的lock()方法

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

 acquire()方法

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

(1)尝试获得锁

简述:尝试去获取锁。如果尝试获取锁成功,方法直接返回。

非公平锁tryAcquire的流程是:

  1. 检查state字段;
  2. 若为0:表示锁未被占用,那么尝试占用;
    若不为0:检查当前锁是否被自己占用,若是,则state+1(重入锁的次数)。
  3. 若以上两点都失败,则获取锁失败,返回false。
tryAcquire(arg)
final boolean nonfairTryAcquire(int acquires) {
    //获取当前线程
    final Thread current = Thread.currentThread();
    //获取state变量值
    int c = getState();
    if (c == 0) { //没有线程占用锁
        if (compareAndSetState(0, acquires)) {
            //占用锁成功,设置独占线程为当前线程
            setExclusiveOwnerThread(current);
            return true;
        }
    } else if (current == getExclusiveOwnerThread()) { //当前线程已经占用该锁
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        // 更新state值为新的重入次数
        setState(nextc);
        return true;
    }
    //获取锁失败
    return false;
}

(2)入队

由于上文中提到线程A已经占用了锁,所以B和C执行tryAcquire失败,并且入等待队列(acquireQueued(addWaiter(Node.EXCLUSIVE), arg)))。如果线程A拿着锁死死不放,那么B和C就会被挂起。

先看下入队的过程。先看addWaiter(Node.EXCLUSIVE)  

/**
 * 将新节点和当前线程关联并且入队列
 * @param mode 独占/共享
 * @return 新节点
 */
private Node addWaiter(Node mode) {
    //初始化节点,设置关联线程和模式(独占 or 共享)
    Node node = new Node(Thread.currentThread(), mode);
    // 获取尾节点引用
    Node pred = tail;
    // 尾节点不为空,说明队列已经初始化过
    if (pred != null) {
        node.prev = pred;
        // 设置新节点为尾节点
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 尾节点为空,说明队列还未初始化,需要初始化head节点并入队新节点
    enq(node);
    return node;
}

B、C线程同时尝试入队列,由于队列尚未初始化,tail==null,故至少会有一个线程会走到enq(node)。我们假设同时走到了enq(node)里。

/**
 * 初始化队列并且入队新节点
 */
private Node enq(final Node node) {
    //开始自旋
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            // 如果tail为空,则新建一个head节点,并且tail指向head
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            // tail不为空,将新节点入队
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

这里体现了经典的自旋+CAS组合来实现非阻塞的原子操作。由于compareAndSetHead的实现使用了unsafe类提供的CAS操作,所以只有一个线程会创建head节点成功。假设线程B成功,之后B、C开始第二轮循环,此时tail已经不为空,两个线程都走到else里面。假设B线程compareAndSetTail成功,那么B就可以返回了,C由于入队失败还需要第三轮循环。最终所有线程都可以成功入队

当B、C入等待队列后,此时AQS队列如下:

(3)挂起

B和C相继执行acquireQueued(final Node node, int arg)。这个方法让已经入队的线程尝试获取锁,若失败则会被挂起。

/**
 * 已经入队的线程尝试获取锁
 */
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true; //标记是否成功获取锁
    try {
        boolean interrupted = false; //标记线程是否被中断过
        for (;;) {
            final Node p = node.predecessor(); //获取前驱节点
            //如果前驱是head,即该结点已成老二,那么便有资格去尝试获取锁
            if (p == head && tryAcquire(arg)) {
                setHead(node); // 获取成功,将当前节点设置为head节点
                p.next = null; // 原head节点出队,在某个时间点被GC回收
                failed = false; //获取成功
                return interrupted; //返回是否被中断过
            }
            // 判断获取失败后是否可以挂起,若可以则挂起
            if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                // 线程若被中断,设置interrupted为true
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

code里的注释已经很清晰的说明了acquireQueued的执行流程。假设B和C在竞争锁的过程中A一直持有锁,那么它们的tryAcquire操作都会失败,因此会走到第2个if语句中。 

再看下shouldParkAfterFailedAcquire和parkAndCheckInterrupt流程吧

/**
 * 判断当前线程获取锁失败之后是否需要挂起.
 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //前驱节点的状态
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        // 前驱节点状态为signal,返回true
        return true;
    // 前驱节点状态为CANCELLED
    if (ws > 0) {
        // 从队尾向前寻找第一个状态不为CANCELLED的节点
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 将前驱节点的状态设置为SIGNAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
  
/**
 * 挂起当前线程,返回线程中断状态并重置
 */
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

线程入队后能够挂起的前提是,它的前驱节点的状态为SIGNAL,它的含义是:“Hi,前面的兄弟,如果你获取锁并且出队后,记得把我唤醒!”。所以shouldParkAfterFailedAcquire会先判断当前节点的前驱是否状态符合要求,若符合则返回true,然后调用parkAndCheckInterrupt,将自己挂起;如果不符合,再看前驱节点是否>0(CANCELLED),若是那么向前遍历直到找到第一个符合要求(状态不大于0)的前驱,若不是则将前驱节点的状态设置为SIGNAL。

整个流程中,如果前驱结点的状态不是SIGNAL,那么自己就不能安心挂起,需要去找个安心的挂起点,同时可以再尝试下看有没有机会去尝试竞争锁。

最终队列可能会如下图所示

总结

用一张流程图总结一下非公平锁的获取锁的过程。 

非公平锁unlock()原理

public void unlock() {
    sync.release(1);
}
  
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

如果理解了加锁的过程,那么解锁看起来就容易多了。流程大致为先尝试释放锁,若释放成功,那么查看头结点的状态是否为SIGNAL,如果是则唤醒头结点的下个节点关联的线程,如果释放失败那么返回false表示解锁失败。这里我们也发现了,每次都只唤起头结点的下一个节点关联的线程。

最后我们再看下tryRelease的执行过程

/**
 * 释放当前线程占用的锁
 * @param releases
 * @return 是否释放成功
 */
protected final boolean tryRelease(int releases) {
    // 计算释放后state值
    int c = getState() - releases;
    // 如果不是当前线程占用锁,那么抛出异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        // 锁被重入次数为0,表示释放成功
        free = true;
        // 清空独占线程
        setExclusiveOwnerThread(null);
    }
    // 更新state值
    setState(c);
    return free;
}

这里入参为1。tryRelease的过程为:当前释放锁的线程若不持有锁,则抛出异常。若持有锁,计算释放后的state值是否为0,若为0表示锁已经被成功释放,并且则清空独占线程,最后更新state值,返回free。     

公平锁原理

公平锁和非公平锁不同之处在于,公平锁在获取锁的时候,不会先去检查state状态,而是直接执行aqcuire(1);

超时机制

在ReetrantLock的tryLock(long timeout, TimeUnit unit) 提供了超时获取锁的功能。它的语义是在指定的时间内如果获取到锁就返回true,获取不到则返回false。这种机制避免了线程无限期的等待锁释放。那么超时的功能是怎么实现的呢?我们还是用非公平锁为例来一探究竟。

public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

还是调用了内部类里面的方法。我们继续向前探究 

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}

这里的语义是:如果线程被中断了,那么直接抛出InterruptedException。如果未中断,先尝试获取锁,获取成功就直接返回,获取失败则进入doAcquireNanos。tryAcquire我们已经看过,这里重点看一下doAcquireNanos做了什么。 

/**
 * 在有限的时间内去竞争锁
 * @return 是否获取成功
 */
private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    // 起始时间
    long lastTime = System.nanoTime();
    // 线程入队
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        // 又是自旋!
        for (;;) {
            // 获取前驱节点
            final Node p = node.predecessor();
            // 如果前驱是头节点并且占用锁成功,则将当前节点变成头结点
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            // 如果已经超时,返回false
            if (nanosTimeout <= 0)
                return false;
            // 超时时间未到,且需要挂起
            if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                // 阻塞当前线程直到超时时间到期
                LockSupport.parkNanos(this, nanosTimeout);
            long now = System.nanoTime();
            // 更新nanosTimeout
            nanosTimeout -= now - lastTime;
            lastTime = now;
            if (Thread.interrupted())
                //相应中断
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

doAcquireNanos的流程简述为:线程先入等待队列,然后开始自旋,尝试获取锁,获取成功就返回,失败则在队列里找一个安全点把自己挂起直到超时时间过期。这里为什么还需要循环呢?因为当前线程节点的前驱状态可能不是SIGNAL,那么在当前这一轮循环中线程不会被挂起,然后更新超时时间,开始新一轮的尝试 

轮询与中断

ReentrantLock被保留了下来的原因是:ReentrantLock比synchronied多了两个功能:可轮询、可中断。

1、可轮询

原书上面的例子看着比较复杂,但意思很简单。一个转账的操作,要么在规定的时间内完成,要么在规定的时间内告诉调用者,操作没有完成。这个例子就是要了ReentrantLock的可轮询特性,就是在规定的时间内,反复去试图获得一个锁,如果获得成功,就能完成转账操作,如果在规定的时间内,没有获得这个锁,那么就是转账失败。如果使用synchronized的话,肯定是无法做到的。

public boolean transferMoney(Account fromAcct,  
                             Account toAcct,  
                             DollarAmount amount,  
                             long timeout,  
                             TimeUnit unit)  
        throws InsufficientFundsException, InterruptedException {  
    long fixedDelay = getFixedDelayComponentNanos(timeout, unit);  
    long randMod = getRandomDelayModulusNanos(timeout, unit);  
    long stopTime = System.nanoTime() + unit.toNanos(timeout);  
  
    while (true) {  
        if (fromAcct.lock.tryLock()) {  
            try {  
                if (toAcct.lock.tryLock()) {  
                    try {  
                        if (fromAcct.getBalance().compareTo(amount) < 0)  
                            throw new InsufficientFundsException();  
                        else {  
                            fromAcct.debit(amount);  
                            toAcct.credit(amount);  
                            return true;  
                        }  
                    } finally {  
                        toAcct.lock.unlock();  
                    }  
                 }  
             } finally {  
                 fromAcct.lock.unlock();  
             }  
         }  
         if (System.nanoTime() < stopTime)  
             return false;  
         NANOSECONDS.sleep(fixedDelay + rnd.nextLong() % randMod);  
     }  
}  

 2、可中断

在synchronied的代码中,进入临界区的代码是无法中断的,这个很不灵活,如果我们使用一个线程池来分发任务,如果一个代码长期占有锁肯定会影响到线程池的其他任务,因此,加入中断机制提高了对任务更强的控制性。

public boolean sendOnSharedLine(String message)  
        throws InterruptedException {  
    lock.lockInterruptibly();  
    try {  
        return cancellableSendOnSharedLine(message);  
    } finally {  
        lock.unlock();  
    }  
}  
  
private boolean cancellableSendOnSharedLine(String message)  
    throws InterruptedException { ... }

公平性:ReentrantLock默认采用非公平锁,synchronized锁也是采用的非公平锁。
如果你没有要求锁有可轮询和可中断的需求,还是使用synchronized内置锁吧。

0

评论6

请先

  1. 另外acquire方法里,if处理执行的selfInterrupt方法,点进去看跳到了Thread类里,加了synchronized (blockerLock),调用了native的interrupt0方法,看注释写着是为了中断线程。
    小楊同学 2024-02-21 0
    • 是。如果获得锁失败、加入队列失败,就会挂起(中断掉)。
      自学精灵 2024-02-21 0
  2. 非公平锁挂起场景acquireQueued里if(p == head && tryAcquire(arg)),if用的是短路&&,判断前节点不是head就直接去执行另一个if了,所以这里应该不会去tryAcquire尝试获取锁。
    小楊同学 2024-02-21 0
    • p是当前node的前一个节点,不是当前节点。
      自学精灵 2024-02-21 0
      • 是的,说错了,应该是如果前一个节点不是head,就不再尝试获取锁。所以理论上一次lock会至少尝试获取一次锁。
        小楊同学 2024-02-21 0
        • 嗯,是的
          自学精灵 2024-02-21 0
显示验证码
没有账号?注册  忘记密码?

社交账号快速登录