多线程并发 (五) ReentrantLock 使用和源码

章节:
多线程并发 (一) 了解 Java 虚拟机 - JVM 
多线程并发 (二) 了解 Thread
多线程并发 (三) 锁 synchronized、volatile 
多线程并发 (四) 了解原子类 AtomicXX 属性地址偏移量
多线程并发 (五) ReentrantLock 使用和源码 
多线程并发 (六) 了解死锁
多线程并发 (七) 线程池

对于多线程并发学过了并发产生的原因,并发产生的问题,并发产生问题的解决方式,对于之前介绍的并发问题的解决方式有synchronzied、volatile、原子类型无锁控制。了解最后一个锁ReentrantLock重入锁。ReentrantLock的实现其实是利用了CAS + volatile+LockSupport 的方式控制线程安全的,也就是面试经常问道,不用锁如何控制多线程安全。

1.ReentrantLock简单使用

ReentrantLock和synchronzied都是独占式重入锁,之前介绍过ReentrantLock是显示锁、synchronzied是内部锁,对于synchronzied的使用十分简单,能满足我们工作中的大部分需求。相对于ReentrantLock的使用就比synchronzied略有复杂,但是ReentrantLock能解决业务比较复杂的场景。

1) 对比

  1. synchronzied锁的是对象(锁是保存在对象头里面的,根据对象头数据来标识是否有线程获得锁/争抢锁),ReentrantLock锁的是线程(根据进入的线程和int类型的state标识锁的获得/争抢)
  2. synchronzied通过Object中的wait()/nofify()方法实现线程间通讯,ReentrantLock通过Condition的await()/signal()方法实现线程间通讯
  3. synchronzied是非公平锁,ReentrantLock可选择公平锁/非公平锁
  4. synchronzied涉及到锁的升级无锁->偏向锁->自旋锁->向OS申请重量级锁,ReentrantLock实现不涉及锁,利用CAS自旋机制和volatile同步队列实现锁的功能
  5. ReentrantLock具有tryLock尝试获取锁以及tryLock timeout,可主动release释放使用灵活

2) 简单例子

public class Test {

    static ReentrantLock lock = new ReentrantLock();
    static Condition condition = lock.newCondition();
    public static void main(String[] args) throws InterruptedException {

        lock.lock();
        new Thread(new SignalThread()).start();
        System.out.println("等待通知");
        try {
            condition.await();
        } finally {
            lock.unlock();
        }
        System.out.println("恢复运行");
    }
    static class SignalThread implements Runnable {

        @Override
        public void run() {
            lock.lock();
            try {
                condition.signal();
                System.out.println("通知");
            } finally {
                lock.unlock();
            }
        }
    }
}

使用了Condition + Reentrantlock实现线程间通信,和synchronzied的使用其实差别不大,使用的时候要保证lock()和unLock()方法的调用对应,调用次数保证相同。对于ReentrantLock的使用不做太多介绍,不熟悉的可以搜索用法。

2.ReentrantLock 源码实现

ReentrantLock其实是对 AbstractQueuedSynchronizer 子类 Sync 的一个封装,可以把ReentrantLock理解成一个包装类,主要逻辑都在AbstractQueuedSynchronizer(AQS) 和 Sync 子类里面,所以首先我们要学习的源码要从AQS开始。
代码结构图:

可知 ReentrantLock 分为公平锁FairSync和非公平锁NofairSync,这两种锁都是继承自Sync,并且是AQS的子类。
学习源码我们从两方面入手:1.数据结构、2.算法代码

  1. AQS的数据结构
    AQS是一个同步队列,是以Node类为一个节点的双向链表并且有首和尾指针。
         // 首指针
         private transient volatile Node head;
         // 尾指针
         private transient volatile Node tail;
         // 是否有线程占用:0-无,1-有线程占用,>1-当前线程重入的次数
         private volatile int state;

    AQS中主要有三个参数而且都是被volatile修饰的,其中他们的更新方式是通过CAS机制Unsafe更新的,这块可以看多线程并发 (四) 了解原子类 AtomicXX 属性地址偏移量,CAS机制 了解CAS的参数含义。
    Node内部类:

    static final class Node {
            volatile int waitStatus; //当前线程的等待状态
            volatile Node prev;        
            volatile Node next;      
            volatile Thread thread;  //当前线程
    }

    1)prev:指向前一个结点的指针
    2)  next:指向后一个结点的指针
    3)  thread:当前结点表示的线程,因为同步队列中的结点内部封装了之前竞争锁失败的线程,故而结点内部必然有一个对应线         程实例的引用
    4)  waitStatus:对于重入锁而言,主要有3个值。
            0:初始化状态;
           -1(SIGNAL):当前结点表示的线程在释放锁后需要唤醒后续节点的线程;
            1(CANCELLED):在同步队列中等待的线程等待超时或者被中断,取消继续等待

    1)队列中每个Node节点就代表一个等待获取锁的线程,其中head指的那个node节点就是当前当用锁的节点,当n1释放锁之后就会唤醒n2一次类推
    2)当有新线程n3加入队列时候,就会从tail尾部加入,改变tail的指向。
    从上图容易知道队列的结构,具体如何被添加进入队列又是如何释放的,继续看算法~

  2. 算法
    1)线程被加锁/加入队列
    从简单使用引入
     public static void main(String[] args) throws InterruptedException {
            ReentrantLock lock = new ReentrantLock();//初始化锁类型
    
            lock.lock(); //进入加锁流程
            try {
    
                } finally {
                    lock.unlock(); //释放锁
                }
        }
    初始化时候不传参数就是非公平锁,传参数跟据参数类型判断
       public ReentrantLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
        }
    主要以 NonfairSync 非公平锁代码为例,当调用lock()方法之后进入加锁流程
    final void lock() {
                if (compareAndSetState(0, 1)) //判断是否有线程获取了锁
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
         1)compareAndSetState(0, 1) 利用CAS机制判断state属性是否被其他线程修改了,state = 0未被其他线程占用,state > 1被其他线程占用了
        2)如果没被其他线程占用即state = 0,这时把当前线程设置到 AbstractOwnableSynchronizer 内存表示当前占用的线程
        3)如果state != 0 ,继续 acquire(1) 把当前线程加入等待队列
     public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
         1)首先会 tryAcquire(arg) 这个方法子类必须实现会引用到
       final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState(); // 当前状态
                if (c == 0) { // 非公平的这里会再次尝试获取锁的机会和上面类似
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }//如果还是当前的线程说明当前线程重入了这个锁,state +1 
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false; // 不是当前线程并且锁被其他线程占用了 返回false
            }
    可以看到这个方法主要分 三部分
        1)第一部分if 中如果state=0了,那就直接占用这个锁,这里也是非公平锁的体现,并没有从队列中取,直接把锁让给了当前申请的线程
        2)第二部分else if 中如果还是当前的线程那state +1 ,表示当前线程重入了这个锁 
        3)三 是个新的线程进入并且锁被其他线程占用,返回false
    所以回到上面 当tryAcquire(arg) 返回true 结束,返回false继续走
     acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    这里涉及到了两个方法,首先是加入队列然后从队列中取出线程。主要逻辑处
    首先看addWaiter()方法
     private Node addWaiter(Node mode) {
            Node node = new Node(mode); //创建一个新节点,mode = null
    
            for (;;) { //无线循环
                Node oldTail = tail; //拿到当前的未队列,
                if (oldTail != null) { //不为空
                    U.putObject(node, Node.PREV, oldTail);
                    if (compareAndSetTail(oldTail, node)) { //移动尾部指针对象
                        oldTail.next = node; //把当前node加入队列
                        return node;
                    }
                } else {
                    initializeSyncQueue(); //为空初始化 看下方
                }
            }
        }
    
      private final void initializeSyncQueue() {
            Node h;
            if (U.compareAndSwapObject(this, HEAD, null, (h = new Node()))) //给head赋值
                tail = h; //给tail赋值
        }
    这块代码比较简单,主要说一下for循环中 oldTail !=null的那块
          1)U.putObject(node, Node.PREV, oldTail); 这个是Unsafe 中的方法,意思是把oldTail 赋值给node中的 prev。
          2)compareAndSetTail(oldTail, node) if 判断中的这块代码,意思是把tail这个指针从之前的oldTail指向node 看图

            例如之前 tail = n2(oldTail) ,现在加入了一个线程n3,这时候 tail = n3 
         3)oldTail.next = node;  看图就是 n2.next = n3
    继续回到acquireQueued()方法
    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    通过addWaiter方法现在队列中已经加入了一个新的node节点。继续看acquireQueued方法
       final boolean acquireQueued(final Node node, int arg) {
            try {
                boolean interrupted = false;
                for (;;) { // 死循环
                    final Node p = node.predecessor(); //获取当前节点的上一个节点
                    if (p == head && tryAcquire(arg)) { //判断是否是head节点
                        setHead(node); // 把当前节点设置成head
                        p.next = null; // 把之前的head节点从链表中释放,让内存回收
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) && 
                        parkAndCheckInterrupt()) // 暂停当前线程
                        interrupted = true;
                }
            } catch (Throwable t) {
                cancelAcquire(node);
                throw t;
            }
        }
    这块代码比较好理解,我们传如的node是addwaiter方法return回来的,就是我们链表中最后一个节点,for循环中先通过node拿到最后一个节点的上一个结点,和head 首节点做比较,相同继续让当前线程 tryAcquire 获取当前锁,如果成功了那就是说上一个节点已经把锁释放了,当前节点就是链表中唯一一个节点了,然后把之前的节点p从链表中移除等待gc回收。如果获取没有成功判断是否需要暂停当前线程,如果pre节点的线程为SIGNAL状态那就调用LockSupport暂停当前线程。不然就一直循环直到前一个节点是head节点并且释放了锁。
      private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this); // 暂停当前线程
            return Thread.interrupted();
        }
    在这里主要把线程暂停,因为当前node的前一个node释放锁之后会通知他。
    2)线程释放锁/从队列中移除
    释放锁相对简单通过主动调用unLock()方法,
     public final boolean release(int arg) {
            if (tryRelease(arg)) { //释放 state = 0
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h); // 解除线程的park等待
                return true;
            }
            return false;
        }

    先是释放state的值,因为他是锁是否被占用的标识。然后unpark线程。

      private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            int ws = node.waitStatus; 
            if (ws < 0)
                node.compareAndSetWaitStatus(ws, 0);
    
            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             */
            Node s = node.next; // 把取消的线程移除,轮寻直到线程没有被取消
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node p = tail; p != node && p != null; p = p.prev)
                    if (p.waitStatus <= 0)
                        s = p;
            }
            if (s != null)
                LockSupport.unpark(s.thread); //释放当前节点的线程
        }


    好的博文:
    ReentrantLock应用
    ReentrantLock源码

翻遍朋友圈,也就这几张雪人图有点意思

 

  • 8
    点赞
  • 17
    收藏
    觉得还不错? 一键收藏
  • 打赏
    打赏
  • 2
    评论

“相关推荐”对你有帮助么?

  • 非常没帮助
  • 没帮助
  • 一般
  • 有帮助
  • 非常有帮助
提交
评论 2
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包

打赏作者

WangRain1

你的鼓励将是我创作的最大动力

¥1 ¥2 ¥4 ¥6 ¥10 ¥20
扫码支付:¥1
获取中
扫码支付

您的余额不足,请更换扫码支付或充值

打赏作者

实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值