Java并发编程之可重入锁ReentrantLock_JAVA_编程开发_程序员俱乐部

中国优秀的程序员网站程序员频道CXYCLUB技术地图
热搜:
更多>>
 
您所在的位置: 程序员俱乐部 > 编程开发 > JAVA > Java并发编程之可重入锁ReentrantLock

Java并发编程之可重入锁ReentrantLock

 2019/7/18 19:09:55  yuqingshui  程序员俱乐部  我要评论(0)
  • 摘要:ReentrantLock简介ReentrantLock是一种可重入的独占锁。ReentrantLock构造方法://默认构建非公平锁publicReentrantLock(){sync=newNonfairSync();}//传入公平参数,构建公平锁/非公平锁publicReentrantLock(booleanfair){sync=fair?newFairSync():newNonfairSync();}从构造方法可知
  • 标签:Java 编程 Java并发 Ant

ReentrantLock简介

ReentrantLock是一种可重入的独占锁。ReentrantLock构造方法:

class="hljs java" style="font-family: Menlo, Monaco, Consolas, 'Courier New', monospace; font-size: 13px; display: block; color: #333333; margin: 0px; padding: 0px; border-radius: 3px; overflow: visible; line-height: 1.6;">    //默认构建非公平锁
    function" style="">public ReentrantLock() {
        sync = new NonfairSync();
    }
    //传入公平参数,构建公平锁/非公平锁
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

从构造方法可知,ReentrantLock支持公平锁FairSync和非公平锁NonfairSync。默认情况下,创建ReentrantLock实例会得到一个非公平锁。其锁的功能由内部类Sync完成,Sync扩展自AQS,并依赖AQS的基础行为。Sync的抽象方法lock()由子类FairSync和NonfairSync各自实现。先以NonfairSync为例介绍非公平锁的加锁和解锁原理。

非公平锁

NonfairSync是ReentrantLock的非公平锁实现,它主要实现Sync中定义的抽象方法lock()和AQS中未实现的抽象方法tryAcquire。

加锁

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
        //执行锁定
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
        //尝试获取锁
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

加锁时,首先通过CAS操作将状态从0修改为1,如果修改成功,则修改锁的持有者为当前线程,加锁成功。

什么是CAS?这里简单说明下:

CAS 指的是现代 CPU 广泛支持的一种对内存中的共享数据进行操作的一种特殊指令。这个指令会对内存中的共享数据做原子的读写操作。CAS有3个操作数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则返回V

如果修改状态失败,说明锁的状态不为0,仍被其他线程持有。调用acquire(1),acquire在AQS中提供了默认实现:

   public final void acquire(int arg) {
       if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
          selfInterrupt();
   }

调用tryAcquire()尝试获取锁,tryAcquire()调用nonfairTryAcquire方法,进入源码:

    final boolean nonfairTryAcquire(int acquires) {
         final Thread current = Thread.currentThread();
         int c = getState();
         //如果锁状态为0,则尝试锁定,成功返回true
         //这里是为了在并发条件下增加获取锁的可能性;若刚好之前持有锁的线程已经释放锁,则当前线程可再次尝试
         if (c == 0) {
             if (compareAndSetState(0, acquires)) {
                 setExclusiveOwnerThread(current);
                 return true;
             }
         }//否则,如果当前线程为锁的持有者,则state+acquires,这里即体现了ReentrantLock的可重入性
         else if (current == getExclusiveOwnerThread()) {
             int nextc = c + acquires;
             if (nextc < 0) // overflow
                 throw new Error("Maximum lock count exceeded");
             setState(nextc);
             return true;
         }
         return false;
    }

如果此次尝试成功获取锁,直接返回。否则获取锁失败,将当前线程加入等待队列。ReentrantLock是可重入的独占锁,因此以独占模式在链表中添加等待节点。

    private Node addWaiter(Node mode) {
        //创建当前结点
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        //tail节点不空,则使用CAS操作将node添加到链表尾部,成功则返回;失败则进入enq
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //tail节点为空,调用enq初始化链表
        //或节点加入链表尾部CAS操作失败,说明当前存在竞争
        enq(node);
        return node;
    }
    private Node enq(final Node node) {
        //由于这里存在多线程并发问题,使用自旋保证node能够添加到链表中,因此enq本身是线程安全
        for (;;) {
            Node t = tail;
            //再次判断tail指向的节点是否为空,之所以再次判断,可能有其他线程已经初始化链表
            if (t == null) { 
                //多线程并发时,只有一个线程执行compareAndSetHead返回成功,执行失败的线程将进入else代码块
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                //这里同样存在多线程并发,一次只会有一个线程成功,失败的线程将进入下一次循环,直至成功
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

addWaiter保证将节点加入等待队列。若队列非空,首先会尝试一次将节点插入队列,若成功则无需进入自旋代码块。通常情况下,没有竞争时,无需自旋即可完成。进入自旋通常是因为CAS操作竞争比较激烈。当前节点加入等待队列之后,进入acquireQueued等待挂起。

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //获取当前结点的前驱节点
                final Node p = node.predecessor();
                //前驱节点是head,表示当前节点之前已无节点在等待锁,再次尝试获取锁
                //这里不会和等待队列中的其他线程发生锁竞争,但会和尝试获取锁且尚未进入等待队列的线程发生竞争
                if (p == head && tryAcquire(arg)) {
                    //获取锁成功,则设置node为新的head
                    setHead(node);
                    //释放节点,帮助GC
                    p.next = null; 
                    failed = false;
                    return interrupted;
                }
                //前驱节点不是head
                //parkAndCheckInterrupt会挂起当前线程,直到调用release收到唤醒信号
                if (shouldParkAfterFailedAcquire(p, node)&&parkAndCheckInterrupt())
                    interrupted = true;//如果线程被中断,则将interrupted设为true
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    //将node设为头部节点,并将thread和prev置空
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

如果当前节点的前驱节点为头节点,当前线程会再次尝试获取锁。如果再次尝试失败或当前节点的前驱节点不是头节点,调用shouldParkAfterFailedAcquire判断是否应当挂起当前线程,看实现:

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        //表示pred的后继节点node可以安全的挂起
        if (ws == Node.SIGNAL)
            return true;
        //表示pred处于取消状态
        if (ws > 0) {
            
            do {
                //从链表中移除pred,一直循环直到pred前面的节点不是取消状态
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            //修改next指针
            pred.next = node;
        } else {
            //ws为0或者PROPAGATE,将pred状态设置为SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    //阻塞当前线程,并返回线程的中断情况
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
    public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        unsafe.park(false, 0L);
        setBlocker(t, null);
    }
    private static void setBlocker(Thread t, Object arg) {
        unsafe.putObject(t, parkBlockerOffset, arg);
    }

只有当前节点的前驱节点的状态为SIGNAL时,线程可安全挂起。其他情况线程会再次进入自旋。

总体看来,shouldParkAfterFailedAcquire就是靠前驱节点判断当前线程是否应该被挂起,如果前驱节点为SIGNAL,则挂起当前线程;如果前继节点处于CANCELLED状态,移除取消节点,并更新当前节点pred指针;前驱节点状态为0(默认进入队列即为0)或PROPAGATE,设前驱节点的状态为SIGNAL,当前线程再次进入自旋,还是未获取锁就会被安全挂起。acquireQueued是一个自旋操作,线程在此挂起,直到被唤醒,满足p == head,执行tryAcquire(arg)尝试获取锁,成功则将当前节点设为新的head,该操作非常重要(想想为什么,后面解释)。

解锁

解锁操作实际调用AQS中的release方法:

    public void unlock() {
        sync.release(1);
    }
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            //head不空且waitStatus不为0,唤醒后继节点;waitStatus为0则说明无线程在等待队列中。
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

release首先调用tryRelease尝试释放锁。

      protected final boolean tryRelease(int releases) {
            //如果线程被锁定多次,则需要进行多次释放
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

调用tryRelease返回true,说明锁已释放,判断head节点,若还有后继节点,调用unparkSuccessor进行唤醒。执行唤醒操作前,首先将当前节点(这里是head)的waitStatus置为0,再判断后继节点是否为取消状态,若为取消状态,需要使用pred指针从tail向前遍历,找到最靠近head的非CANCEL节点,使用LockSupport进行唤醒。

    private void unparkSuccessor(Node node) {
        
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        //获取后继节点
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {//s为空或node处于取消状态
            s = null;
            //从链表尾部向前遍历,直到找到位于链表最前端且waitStatus小于0的节点
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);//唤醒线程
    }

被唤醒的线程将继续在acquireQueued的自旋操作中进行锁竞争,直到成功获取锁。

上面提到acquireQueued中,线程(假设为线程B)获取锁之后,会调用setHead将当前节点设为新的head。原因是,当持有锁的线程(假设为线程A)释放锁之后,是否需要唤醒后继节点是根据head的waitStatus是否为0来判定,当调用unparkSuccessor进行唤醒时,又会将head的waitStatus置为0。假设线程B之后仍然有线程C在阻塞,线程A唤醒B后将head的waitStatus置为0,若不执行setHead,线程B释放锁时,看到head的waitStatus为0,则将不会唤醒线程C,线程C及后面的节点将一直阻塞。当执行setHead之后,若线程B之后仍然有线程在阻塞,则新的head节点waitStatus必然是SIGNAL,唤醒操作将有序进行。好精妙,佩服!!!

公平锁

FairSync是ReentrantLock的公平锁实现,公平锁的加锁原理:

static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;
        final void lock() {
            acquire(1);
        }
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

由FairSync源码可知,加锁方法直接调用acquire,然后调用tryAcquire方法。与NonfairSync最大的区别就在于tryAcquire实现不同,首先调用hasQueuedPredecessors确定队列中没有等待线程,或者队列中第一个等待者为当前线程,FairSync才会尝试获取锁。否则,直接进入等待队列。

    public final boolean hasQueuedPredecessors() {
        Node t = tail; 
        Node h = head;
        Node s;
        //h != t,说明队列不空
        //(s = h.next) == null,队列中无排队线程
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

?

公平锁实现完全遵循FIFO的原则,在进入等待队列之前不会存在锁竞争的问题。即所有线程按公平原则获取锁,如果锁未被占用且队列为空,获取锁;如果队列为空,自动去排队,不去争抢锁。通常情况下,公平锁的效率不如非公平锁。

发表评论
用户名: 匿名