JDK数组阻塞队列源码深入剖析

科技资讯 投稿 29000 0 评论

JDK数组阻塞队列源码深入剖析

从零开始自己动手写阻塞队列当中我们仔细介绍了阻塞队列提供给我们的功能,以及他的实现原理,并且基于谈到的内容我们自己实现了一个低配版的数组阻塞队列。在这篇文章当中我们将仔细介绍JDK具体是如何实现数组阻塞队列的。

阻塞队列的功能

并发不安全的,但是阻塞队列在并发下情况是安全的。阻塞队列的主要的需求如下:

  • 队列基础的功能需要有,往队列当中放数据,从队列当中取数据。

  • 所有的队列操作都要是并发安全的。

  • 当队列满了之后再往队列当中放数据的时候,线程需要被挂起,当队列当中的数据被取出,让队列当中有空间的时候线程需要被唤醒。

  • 当队列空了之后再往队列当中取数据的时候,线程需要被挂起,当有线程往队列当中加入数据的时候被挂起的线程需要被唤醒。

  • 在我们实现的队列当中我们使用数组去存储数据,因此在构造函数当中需要提供数组的初始大小,设置用多大的数组。

数组阻塞队列设计

阅读这部分内容你需要熟悉可重入锁和条件变量的使用。

数组的循环使用

我们知道上图在加入数据的时候不仅将数组后半部分的空间使用完了,而且可以继续使用前半部分没有使用过的空间,也就是说在队列内部实现了一个循环使用的过程。

字段设计

    /** The queued items */
    final Object[] items; // 这个就是具体存储数据的数组

    /** items index for next take, poll, peek or remove */
    int takeIndex; // 因为是队列 因此我们需要知道下一个出队的数据的下标 这个就是表示下一个将要出队的数据的下标

    /** items index for next put, offer, or add */
    int putIndex; // 我们同时也需要下一个入队的数据的下标

    /** Number of elements in the queue */
    int count; // 统计队列当中一共有多少个数据

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
    final ReentrantLock lock; // 因为阻塞队列是一种可以并发使用的数据结构

    /** Condition for waiting takes */
    private final Condition notEmpty; // 这个条件变量主要用于唤醒被 take 函数阻塞的线程 也就是从队列当中取数据的线程

    /** Condition for waiting puts */
    private final Condition notFull; // 这个条件变量主要用于唤醒被 put 函数阻塞的线程 也就是从队列当中放数据的线程

构造函数

构造函数的主要功能是申请指定大小的内存空间,并且对类的成员变量进行赋值操作。

public ArrayBlockingQueue(int capacity {
  // capacity 表示用与存储数据的数组的长度
  this(capacity, false;
}
// fair 这个参数主要是用于说明 是否使用公平锁
// 如果为 true 表示使用公平锁 执行效率低 但是各个线程进入临界区的顺序是先来后到的顺序 更加公平
// 如果为 false 表示使用非公平锁 执行效率更高
public ArrayBlockingQueue(int capacity, boolean fair {
  if (capacity <= 0
    throw new IllegalArgumentException(;
  this.items = new Object[capacity];
  // 对变量进行赋值操作
  lock = new ReentrantLock(fair;
  notEmpty = lock.newCondition(;
  notFull =  lock.newCondition(;
}

put函数

这个函数是阻塞队列对核心的函数之一了,首先我们需要了解的是,如果一个线程调用了这个函数往队列当中加入数据,如果此时队列已经满了则线程需要被挂起,如果没有满则需要将数据加入到队列当中,也就是将数据存储到数组当中。注意还有一个很重要的一点是,当我们往队列当中加入一个数据之后需要发一个信号给其他被函数阻塞的线程,因为这些线程在取数据的时候可能队列当中已经空了,因此需要将这些线程唤醒。

public void put(E e throws InterruptedException {
  checkNotNull(e; // 保证输入的数据不为 null 代码在下方
  final ReentrantLock lock = this.lock;
  // 进行加锁操作,因为下面是临界区
  lock.lockInterruptibly(;
  try {
    while (count == items.length // 如果队列已经满了 也就是队列当中数据的个数 count == 数组的长度的话 就需要将线程挂起
      notFull.await(;
    // 当队列当中有空间的之后将数据加入到队列当中 这个函数在下面仔细分析 代码在下方
    enqueue(e;
  } finally {
    lock.unlock(;
  }
}

private static void checkNotNull(Object v {
  if (v == null
    throw new NullPointerException(;
}

private void enqueue(E x {
  // assert lock.getHoldCount( == 1;
  // assert items[putIndex] == null;
  // 进入这个函数的线程已经在 put 函数当中加上锁了 因此这里不需要加锁
  final Object[] items = this.items;
  items[putIndex] = x;
  if (++putIndex == items.length // 因为这个数据是循环使用的 因此可以回到下标为0的位置
    // 因为队列当中的数据可以出队 因此下标为 0 的位置不存在数据可以使用
    putIndex = 0;
  count++;
  // 在这里需要将一个被 take 函数阻塞的线程唤醒 如果调用这个方法的时候没有线程阻塞
  // 那么调用这个方法相当于没有调用 如果有线程阻塞那么将会唤醒一个线程
  notEmpty.signal(;
}

注意:这里有一个地方非常容易被忽略,那就是在将线程挂起的时候使用的是循环而不是条件语句,代码:

final ReentrantLock lock = this.lock;
lock.lockInterruptibly(;
try {
  while (count == items.length
    notFull.await(;
  enqueue(e;
} finally {
  lock.unlock(;
}

这是因为,线程被唤醒之后并不会立马执行,因为线程在调用方法的之后会释放锁🔒,他想再次执行还需要再次获得锁,然后就在他获取锁之前的这段时间里面,可能其他的线程也会从数组当中放数据,因此这个线程执行的时候队列可能还是满的,因此需要再次判断,否则就会覆盖数据,像这种唤醒之后并没有满足线程执行条件的现象叫做虚假唤醒,因此大家在写程序的时候要格外注意,当需要将线程挂起或者唤醒的之后,最好考虑清楚,如果不确定可以使用替代,这样的话更加保险。

take函数

public E take( throws InterruptedException {
  final ReentrantLock lock = this.lock;
  // 因为取数据的代码涉及到数据竞争 也就是说多个线程同时竞争 数组数据items 因此需要用锁保护起来
  lock.lockInterruptibly(;
  try {
    // 当 count == 0 说明队列当中没有数据
    while (count == 0
      notEmpty.await(;
    // 当队列当中还有数据的时候可以将数据出队
    return dequeue(;
  } finally {
    lock.unlock(;
  }
}

private E dequeue( {
  // assert lock.getHoldCount( == 1;
  // assert items[takeIndex] != null;
  final Object[] items = this.items;
  @SuppressWarnings("unchecked"
  // 取出数据
  E x = (E items[takeIndex];
  items[takeIndex] = null; // 将对应的位置设置为 null 数据就可以被垃圾收集器回收了
  if (++takeIndex == items.length
    takeIndex = 0;
  count--;
  // 迭代器也需要出队 如果不了
  if (itrs != null
    itrs.elementDequeued(;
  // 调用 signal 函数 将被 put 函数阻塞的线程唤醒 如果调用这个方法的时候没有线程阻塞
  // 那么调用这个方法相当于没有调用 如果有线程阻塞那么将会唤醒一个线程
  notFull.signal(;
  return x;
}

同样的道理这里也需要使用循环去进行阻塞,否则可能存在虚假唤醒,可能队列当中没有数据返回的数据为 null,而且会破坏队列的结构因为会涉及队列的两个端点的值的改变,也就是和的改变。

offer函数

public boolean offer(E e {
  checkNotNull(e;
  final ReentrantLock lock = this.lock;
  lock.lock(;
  try {
    // 如果队列当中的数据个数和数组的长度相等 说明队列满了 直接返回 false 即可
    if (count == items.length
      return false;
    else {
      enqueue(e;
      return true;
    }
  } finally {
    lock.unlock(;
  }
}

add函数

这个函数和上面两个函数的意义也是一样的,只不过当队列满了之后这个函数会抛出异常。

public boolean add(E e {
  if (offer(e
    return true;
  else
    throw new IllegalStateException("Queue full";
}

poll函数

这个函数和take函数的作用差不多,但是这个函数不会阻塞,当队列当中没有数据的时候直接返回null,有数据的话返回数据。

public E poll( {
  final ReentrantLock lock = this.lock;
  lock.lock(;
  try {
    return (count == 0 ? null : dequeue(;
  } finally {
    lock.unlock(;
  }
}

总结

在本篇文章当中主要介绍了JDK内部是如何实现的,如果你对锁和队列的使用有一定的了解本篇文章应该还是比较容易理解的。在实现当中有以下需要注意的点:

  • 函数,如果在往队列当中加入数据的时候队列满了,则需要将线程挂起。在队列当中有空间之后,线程被唤醒继续执行,在往队列当中加入了数据之后,需要调用方法,唤醒被函数阻塞的线程。

  • 函数,如果在往队列当中取出数据的时候队列空了,则需要将线程挂起。在队列当中有数据之后,线程被唤醒继续执行,在从队列当中取出数据之后,需要调用方法,唤醒被函数阻塞的线程。

  • 在调用函数的时候,需要小心虚假唤醒现象。

编程笔记 » JDK数组阻塞队列源码深入剖析

赞同 (38) or 分享 (0)
游客 发表我的评论   换个身份
取消评论

表情
(0)个小伙伴在吐槽