不止八股:阿里内部语雀一些有趣的并发编程笔试题1——一半容量才可以出队的阻塞队列

0丶引入

笔者社招一年半经验跳槽加入阿里约1年时间,无意间发现一些阿里语雀上的一些面试题题库,出于学习目的在此进行记录。

  • 这一篇主要写一些有趣的笔试题(非leetcode),这些有的考验并发编程,有的考验设计能力。
  • 笔者不是什么技术大牛,此处笔试题充满主观思考,并不一定是满分答案,欢迎评论区一起探讨。
  • 不止八股:面试题之外,笔者会更多的思考下底层原理,不只是简单的背诵。

下面这个题目也是笔者面试阿里笔试做过的一道笔试题,现在回想自己那时候写的也是一坨

1 题目

通过List实现一个阻塞队列类,该队列有最大长度限制,以下是该队列的一些特性

  • take方法:获取队列中队头元素,当队列中元素数量超过一半时,则可以出队,否则阻塞当前线程
  • offer(E element)方法:插入元素到队尾,当队列大小已经超过最大限制时,则阻塞当前线程
  • size()返回当前队列大小

2 笔者的题解

感觉这个队列现实中没啥实际用途,为啥超过一半才能出队
这一题主要考察候选人对并发编程的理解,可是笔者工作也没用到这些知识呀doge

在juc相关笔记中,我们学习了Condition和Object#wait notify的原理和基本使用,也学习了juc中常见的阻塞队列原理,这里可用使用等待唤醒进行实现。

2.1 基于Object等待唤醒的简单版本

2.1.1代码

import java.util.List;

public class HalfTakeAvailableBlockingQueueV1<E> {

    private List<E> delegateList;

    private int maxSize;

    private final Object lock;

    public HalfTakeAvailableBlockingQueueV1(List<E> delegateList, int maxSize) {
        this.delegateList = delegateList;
        this.maxSize = maxSize;
        lock = new Object();
    }

    /***
     * take方法:获取队列中队头元素,当队列中元素数量超过一半时,则可以出队,否则阻塞当前线程
     * @return 队列头部元素
     * @throws InterruptedException
     */
    public E take() throws InterruptedException {
        synchronized (lock) {
            // 使用while 避免虚假唤醒
            while (size() <= maxSize / 2) {
                lock.wait();
            }

            // 这里要使用remove,得删去
            E e = delegateList.remove(0);
            // notifyAll 不能和上面的remove换位置
            lock.notifyAll();
            return e;
        }
    }

    /**
     * offer(E element)方法:插入元素到队尾,当队列大小已经超过最大限制时,则阻塞当前线程
     *
     * @param e e
     */
    public void offer(E e) throws InterruptedException {
        synchronized (lock) {
            while (size() > maxSize) {
                lock.wait();
            }
            
            delegateList.add(e);
            // notifyAll 和上面的不能调换位置
            lock.notifyAll();
        }
    }

    /**
     * 需要加锁,否则有线程可见性问题
     * @return
     */
    public int size(){
        synchronized (lock){
            return delegateList.size();
        }
    }

}

2.1.2细节

虽然这个版本似乎很简单,但是还是存在一些细节。

  1. 命名

    HalfTakeAvailable是不是很见名知意,一半的容量才能获取。

  2. 使用while而不是if

    while (条件)
    为什么使用的while,而不是使用if?

    想想一种case,线程A和线程B阻塞于offer,线程C获取锁正在执行take,执行完take后进行notifyAll。随后线程A获得锁结束了if进行元素的塞入,此时size大于了maxSize,但是线程A的唤醒导致了线程B继续执行,线程B也结束了if,继续塞入了元素。

    这种情况被称作虚假唤醒

  3. size方法为什么要加锁

    保证线程可见性
    ,本质上size方法就是读没有并发修改的问题,这里加synchronized只是为了保证线程可见性

    当线程释放锁时,它所做的更改会被刷新到主内存中,同样,当线程获取锁时,它会从主内存中读取共享变量的最新值。这意味着通过synchronized 块的同步变量的更新对所有其他线程是可见的

  4. notifyAll不能和数据操作调换位置


    image-20240106115447716

    为什么不能先notifyAll然后再remove昵?

    在涉及等待/通知模式的同步代码中,典型的模式是
    先改变状态或条件(比如执行 remove(0) 移除队列头部元素)

    然后调用 notifyAll() 或 notify() 来唤醒等待该条件的线程

    如果你先调用 notifyAll(),然后再改变状态(执行 remove(0)),可能会出现以下问题:


    • 之前被唤醒的线程可能无法察觉到状态改变: 如果你先发送通知,那些等待的线程可能会在状态实际改变之前被唤醒(比如,某个元素还没从队列中移除)。那些线程可能检查到状态尚未改变(因为 remove(0) 还没执行),然后无意义地再次进入等待状态。

    • 可能会发生竞态条件: 如果一个线程执行完 notifyAll() 并且在 remove(0) 执行之前失去了锁,那么等待的线程将会开始运行。此时,由于元素还未被移除,它们可能看到错误的状态,或者在执行它们各自的操作时与其他线程发生冲突。

    • 降低了效率: 当线程被过早地唤醒,它们可能只是进行一次无效的状态检查然后再次挂起。这不仅浪费了CPU资源,还增加了线程上下文切换的开销。

2.2.3 这个版本的不足

  1. 为什么size需要加锁

    这里加锁不是因为存在并发更新的问题,而是需要保证线程可见性,但是这里的加锁却导致offer和take的阻塞,以及多个size不能并行。

    如何优化——volatile

  2. 一个等待条件的弊端

    想象一种case,当前容量为maxSize - 1,线程AB都在执行offer,线程A执行成功了进行唤醒了线程B,线程B唤醒后自旋while继续wait。

    有什么问题:线程B不应该被唤醒,应该是执行take的线程来唤醒执行offer的线程

    如何解决:多个等待条件,仿照ArrayBlockingQueue

2.2 使用volatile优化size

我们在HalfTakeAvailableBlockingQueueV2中,使用
private volatile int size
来记录当前容量,然后offer和take的时候修改加锁修改size。

然后size方法就不需要加锁了。

image-20240106120858629
image-20240106120921590

2.3 使用Condition实现多等待条件

我们使用juc中的ReentrantLock来解决并发修改问题,ReentrantLock#newCondition可创建多种等待唤醒条件,我们下面的代码创建了两个Condition

  • takeAvailable:如果无法take那么在此Condition上进行等待
  • offerAvailable:如果无法offer那么在此Condition上进行等待

从而实现offer的线程来唤醒阻塞在take上的线程,减少无意义的唤醒,这里也可以看出ReentrantLock相对于synchronized更牛的一点

import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class HalfTakeAvailableBlockingQueueV3<E> {
    private List<E> delegateList;
    private int maxSize;
    private volatile int size;
    private final ReentrantLock lock;

    /**
     * 可拿取条件
     */
    private final Condition takeAvailable;
    /**
     * 可塞入条件
     */
    private final Condition offerAvailable;

    public HalfTakeAvailableBlockingQueueV3(List<E> delegateList, int maxSize) {
        this.delegateList = delegateList;
        this.maxSize = maxSize;
        lock = new ReentrantLock();
        takeAvailable = lock.newCondition();
        offerAvailable = lock.newCondition();
        size = 0;
    }

    /***
     * take方法:获取队列中队头元素,当队列中元素数量超过一半时,则可以出队,否则阻塞当前线程
     * @return 队列头部元素
     * @throws InterruptedException
     */
    public E take() throws InterruptedException {
        lock.lock();
        try {
            // 使用while 避免虚假唤醒
            while (size() <= maxSize / 2) {
                takeAvailable.await();
            }

            // 这里要使用remove,得删去
            E e = delegateList.remove(0);
            size--;
            offerAvailable.signalAll();
            return e;
        } finally {
            lock.unlock();
        }
    }

    /**
     * offer(E element)方法:插入元素到队尾,当队列大小已经超过最大限制时,则阻塞当前线程
     *
     * @param e e
     */
    public void offer(E e) throws InterruptedException {
        lock.lock();
        try {
            while (size() > maxSize) {
                // 无法offer 那么await
                offerAvailable.await();
            }
            delegateList.add(e);
            size++;
            // 可拿去唤醒
            takeAvailable.signalAll();
        } finally {
            lock.unlock();
        }
    }


    public int size() {
        return size;
    }

}

2.3.1继续优化

这里其实还有两个点可优化

  • 如果容量大于等于一半才进行唤醒!

image-20240106122227548

  • 使用signal而不是signalAll

    signal只会唤醒一个消费者线程,线程这里我们也只塞了一个元素,不需要signalAll从而减少无意义的锁竞争,take部分同理

public void offer(E e) throws InterruptedException {
    lock.lock();
    try {
        while (size() > maxSize) {
            // 无法offer 那么await
            offerAvailable.await();
        }
        delegateList.add(e);
        size++;
        if (size>=maxSize/2){
            // 可拿唤醒
            takeAvailable.signal();
        }
    } finally {
        lock.unlock();
    }
}

3.不止八股

3.1 synchronized 原理

ObjectMonitor的结构:

ObjectMonitor() {
    _header       = NULL;
    _count        = 0;  //锁计数器
    _waiters      = 0,
    _recursions   = 0;
    _object       = NULL;
    _owner        = NULL;
    _WaitSet      = NULL; //处于wait状态的线程,会被加入到_WaitSet
    _WaitSetLock  = 0 ;
    _Responsible  = NULL ;
    _succ         = NULL ;
    _cxq          = NULL ;
    FreeNext      = NULL ;
    _EntryList    = NULL ; //处于等待锁block状态的线程,会被加入到该列表
    _SpinFreq     = 0 ;
    _SpinClock    = 0 ;
    OwnerIsThread = 0 ;
  }

当线程想要进入同步块(即被 synchronized 关键字修饰的方法或代码块)时,它必须首先获取该对象的内置锁。如果锁已经被另一个线程持有,该线程将会被阻塞,直到锁被释放。锁机制确保在任何时刻,只有一个线程能持有对象的内置锁,从而保障了同步块内代码的线程安全性。

image

ObjectMonitor 是如何实现线程安全的:

  • 互斥(Mutual Exclusion):

    当一个线程(称为线程A)尝试执行 synchronized 代码块时,它会尝试获取与对象关联的内置锁。
    如果该锁已经被另一个线程(称为线程B)持有,则线程A会被放入等待锁(_entryList)的队列中,并被阻塞。
    当线程B释放锁时,线程A或其他等待锁的线程将有机会获取锁。
    操作原子性(Operation Atomicity):在持有锁的过程中,线程可以原子性地执行同步代码块中的操作。意味着这些操作不会被其他同步代码块并发执行的线程打断。

  • 可见性(Visibility):

    当线程释放锁时,其所做的更改会立即对其他线程可见。即将工作内存中所作的更改同步到主内存,这确保了对共享变量的更改会及时地被其他线程察觉。

  • 等待/通知机制(Wait/Notify Mechanism):

    wait()、notify() 和 notifyAll() 方法提供了线程间的协调机制。
    当一个线程调用 wait() 方法时,它会释放当前持有的锁并进入等待状态,直到另一个线程调用 notify() 或 notifyAll()。
    调用 notify() 会随机唤醒等待队列中的一个线程,而 notifyAll() 会唤醒所有等待的线程,但这些线程仍需要在锁可用时竞争获取锁

3.2 ReentrentLock&Conditio原理

image

1. ReentrentLock 如何实现线程安全

JUC源码学习笔记1——AQS独占模式和ReentrantLock

ReentrentLock 基于AQS的独占模式,本质上是使用一个双向链表实现了
同步队列

img

AQS内部是state来标识当前同步状态,在不同的juc工具类中,state的含义各不相同。在ReentrentLock 中state标识占有锁线程加锁的次数(从而实现可重入)当state=0的时候,ReentrentLock 会使用CAS来改变state的值,cas成功的线程意味着加锁成功,反之加锁失败

  • 加锁失败的线程将自旋+cas的入队——将自己封装成node链接到队列尾部,然后调用LockSupport#park进行阻塞等待

  • 加锁成功的线程将成为同步队列的头部,继续执行后续的业务逻辑,执行完成后调用unlock解锁将修改state的值-1(重入多少次就要释放多少次),如果state的值为0意味着锁被释放,这时候将
    唤醒后续节点,如果后续节点为null or 后续节点阻塞等待的时候被打断那么从队列尾部找最靠近头部
    的阻塞于锁的线程调用LockSupport#unpark进行唤醒

    为什么要从尾部找?=>
    JUC源码学习笔记1——AQS独占模式和ReentrantLock

那么为什么cas可保证执行过程的原子性?LockSupport#park和unpark是如何实现线程的挂起和唤醒的?

  • CPU硬件级别的CAS是作为单个原子指令执行的,这意味着一旦CAS指令开始执行,它的整个状态转换(读取-比较-交换)过程就会在一个不可分割的步骤中完成。这个过程是连续的,CPU不会在CAS指令执行到一半时中断并切换到另一个线程。

    那么,CPU是如何保证这一点的呢?


    • 锁总线(Locking the Bus):

      通过锁定CPU和内存间的总线。在执行这样的原子操作时,处理器会向总线发出一个锁定的信号,没有其他的处理器可以访问内存,直到原子操作完成。

    • 缓存锁定(Cache Locking):
      在多核处理器中通常使用MESI(Modified, Exclusive, Shared, Invalid)协议来确保缓存一致性。当执行原子操作的时候,如果数据被缓存在当前核的缓存中并且是以独占状态存在的,那么该核可以直接对这个缓存行进行操作,而无需锁定总线。


    重要的是,虽然线程可能因为时间片用尽而被挂起,但CPU内部的原子指令一旦开始执行,就会完成整个指令序列,中间不会被操作系统的线程调度打断。

  • LockSupport是Java并发包里一个提供基本线程同步原语的工具类,其中park和unpark是它的两个核心方法。这两个方法为线程提供了阻塞(等待)和唤醒的能力,它们不需要同步器对象(比如锁或者信号量),可以非常灵活地使用。


    • park方法:

      当一个线程调用park时,如果已经有一个unpark操作授予了该线程一个许可(permit),它会立即返回。否则,它可能会阻塞。线程调用park时会被挂起(进入等待状态),等待unpark的唤醒,或者响应中断。

    • unpark方法:

      unpark方法用于“唤醒”或者“重新调度”一个被park挂起的线程。这个方法会给指定的线程提供一个许可(如果之前没有许可的话),使得当该线程调用park方法时不会被阻塞,或者如果该线程已经在调用park时挂起了,那么它将被唤醒。


    在Java中,LockSupport的实现依赖了本地方法,调用了操作系统层面的同步机制。简单来说,就是基于操作系统提供的挂起(suspend)和恢复(resume)线程的原语来实现的。

    在Linux平台下,park和unpark是通过pthread库中的条件变量(condition variables)来实现的。当线程调用park方法时,实际上它调用了本地方法,该本地方法会使用条件变量进行等待。当另一个线程调用unpark时,它实际上会发送信号到条件变量来唤醒挂起的线程。

2. Condition如何实现等待唤醒

JUC源码学习笔记3——AQS等待唤醒机制Condition和CyclicBarrier,BlockingQueue

  • 等待(await方法)

    如果当前线程未持有与Condition相关联的Lock,则将抛出IllegalMonitorStateException。
    将当前线程封装成一个节点,和其他等待的线程一起加入到AQS的条件队列。条件队列是一个单向队列,用来存放调用await()进入等待状态的线程。
    释放已持有的锁并将当前线程挂起
    。在AQS中叫做"fullyRelease",意味着可能会释放多个重入锁。这一操作必须确保至少能原子地释放锁并挂起线程,不然就可能出现死锁。
    如果在等待过程中线程被中断,根据不同的await()实现,线程可能会抛出InterruptedException,或者线程的中断状态将被设置。

  • 唤醒(signal或signalAll方法)

    如果当前线程未持有与Condition相关联的Lock,则将抛出IllegalMonitorStateException。
    对于signal()方法,AQS将从条件队列中选择一个等待最长时间的线程节点,并将其转移到同步队列。同步队列是用来竞争获取锁的队列。
    对于signalAll()方法,AQS将把条件队列的所有线程节点全部转移到同步队列。
    被移动到同步队列的线程节点将在锁释放时竞争获取锁,一旦获取锁成功,它们可以从await()方法返回并继续执行。

3. ReentrentLock如何实现线程可见性

这里说的不是AQS state属性的可见性,因为state是volatile修饰,自然保证了其可见性

看下面这个代码,为什么list没用被volatile修饰,但是保证了其线程可见性,main线程可用读到writeThread对list的修改

image-20240107160123065

这一切都是是因为unlock释放锁,会修改volatile修饰的state变量

对于volatile变量的写操作,会在其后插入一个内存屏障。在Java中,volatile变量的写操作后通常会插入一个"store-store"屏障(Store Barrier),以及一个"store-load"屏障。这些内存屏障确保了以下几点:

  • Store-Store Barrier:这个屏障防止volatile写与之前普通写的指令重排序。也就是说,对volatile变量的写操作之前的所有普通写操作都必须在volatile写之前完成,确保了volatile写之前的修改对于其他线程是可见的。
  • Store-Load Barrier:这个屏障防止volatile写与之后的读写操作重排序。它确保了volatile变量写之后的读取操作不会在volatile写之前发生。这保证了volatile写操作的结果对后续操作是可见的。

在x86架构中,由于其内存模型的特性,每次对volatile变量的写操作通常会伴随着一个lock前缀的指令,该指令实际上会执行一个"全能型"的内存屏障(Memory Fence),它包括了上述的store-store和store-load屏障,同时还会兼具load-store屏障的效果。这种屏障确保了指令执行的顺序性和数据的可见性。

volatile的这些内存屏障特性是由Java内存模型(JMM)强制实施的,JVM负责在编译时期和运行时期将这些规则映射到具体的硬件指令上。

也就是说
对volatile变量的写操作确实可以确保在这次写操作之前的所有普通变量的修改对其他线程是可见的

这是因为对volatile变量的写操作会在其写操作后插入一个内存屏障,防止之前的写入操作被重排序到这个内存屏障之后

简单来说,按照下面的顺序执行操作:

  • 线程A修改普通变量x。
  • 线程A写入volatile变量v。

这时,根据happens-before原则,当线程A写入volatile变量v后,任何线程B读取同一个volatile变量v,并看到线程A写入的值,那么它也保证看到线程A在写入volatile变量v之前对普通变量x所做的修改。

这个特性在并发编程中经常用来确保重要信号、状态或数据的传递是准确且及时的。当使用volatile变量作为状态标志或锁的一部分时,这个特性特别有用。

4.感想

看似很简单的一题,其实知识点也是蛮多的,结合这些原理可以看出——编程没有魔法,都是操作系统提供的机制吧,操作系统牛逼!

标签: none

添加新评论