胖胖的枫叶
主页
博客
知识图谱
产品设计
数据分析
企业架构
项目管理
效率工具
全栈开发
后端
前端
测试
运维
数据
面试
  • openJdk-docs
  • spring-projects-docs
  • mysql-docs
  • redis-commands
  • redis-projects
  • apache-rocketmq
  • docker-docs
  • mybatis-docs
  • netty-docs
  • journaldev
  • geeksforgeeks
  • 浮生若梦
  • 后端进阶
  • 并发编程网
  • 英语肌肉记忆锻炼软件
  • 墨菲安全
  • Redisson-docs
  • jmh-Visual
  • 美团技术
  • MavenSearch
主页
博客
知识图谱
产品设计
数据分析
企业架构
项目管理
效率工具
全栈开发
后端
前端
测试
运维
数据
面试
  • openJdk-docs
  • spring-projects-docs
  • mysql-docs
  • redis-commands
  • redis-projects
  • apache-rocketmq
  • docker-docs
  • mybatis-docs
  • netty-docs
  • journaldev
  • geeksforgeeks
  • 浮生若梦
  • 后端进阶
  • 并发编程网
  • 英语肌肉记忆锻炼软件
  • 墨菲安全
  • Redisson-docs
  • jmh-Visual
  • 美团技术
  • MavenSearch
  • 博客

    • 博客迁移说明
    • 2024年
    • 2023年
    • 2022年
    • 2021年
    • 2020年
    • 2019年
    • 2018年

之前在订单场景使用过DelayedQueue延时列队,实际上还有阻塞列队的用法。刚工作那会写爬虫的时候有使用过。这里总结下使用。

BlockingQueue

  • 阻塞列队,BlockingQueue是java.util.concurrent包下的实现类。提供了线程安全的列队访问方式,当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满;从阻塞队列取数据时,如果队列已空,线程将会阻塞等待直到队列非空。

  • 一般使用场景,生产者消费者。
  • 列队类型有两种,无限列队、有限列队。
    • 无限队列 (unbounded queue ) - 几乎可以无限增长。
      • BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>();
        • 默认构造函数将容量设置成 Integer.MAX_VALUE
    • 有限队列 ( bounded queue ) - 定义了最大容量。
      • BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>(10);
      • BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(10);
        • ArrayBlockingQueue初始化的时候必须指定容器大小。
        • 同时可以指定是否公平锁。
  • LinkedBlockingDeque 与 ArrayBlockingQueue 都是 FIFO ,而PriorityBlockingQueue元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素。注意,此阻塞队列为无界阻塞队列,即容量没有上限,前面2种都是有界队列。
  • DelayQueue基于PriorityQueue,一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。

ArrayBlockingQueue

    // 列队存储数据
    final Object[] items;

    /** 元素索引位置,用于读取、取出 删除等做作*/
    int takeIndex;

    /** 下一个 put、offer 或 add 元素的索引 */
    int putIndex;

    /** 列队中的元素数量 */
    int count;

    /** 锁 */
    final ReentrantLock lock;

    /** 不为空的锁条件 */
    private final Condition notEmpty;

    /** 存储空间未满的锁条件 */
    private final Condition notFull;

    /**
     * 当前活动迭代器的共享状态,如果已知不存在则为 null。允许队列操作更新迭代器状态
     */
    transient Itrs itrs = null;
offer
    // 实际调用的是offer方法 
    // 如果可以在不超出队列容量的情况下立即插入指定元素,则在此队列的尾部插入指定元素,成功时返回true ,如果此队列已满则抛出IllegalStateException 。
    public boolean add(E e) {
        return super.add(e);
    }
    
    // 如果可以在不超过队列容量的情况下立即插入,则在此队列的尾部插入指定元素,成功时返回true ,如果队列已满则返回false 。
    public boolean offer(E e) {
        // 不能插入null ,否则异常。
        checkNotNull(e);
        // 获取锁
        final ReentrantLock lock = this.lock;
        // 加锁
        lock.lock();
        try {
            // 判断容器是否满了
            if (count == items.length)
                return false;
            else { 
                //
                enqueue(e);
                return true;
            }
        } finally {
            // 解锁
            lock.unlock();
        }
    }
   // 在当前放置位置插入元素,前进和信号。仅在持有锁时调用。
   private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        // 唤醒存储空间未满的锁条件
        notEmpty.signal();
    }

take
    public E take() throws InterruptedException {
        // 获取锁
        final ReentrantLock lock = this.lock;
        // 可中断锁,防止应为await()方法中断异常导致异常退出。
        lock.lockInterruptibly();
        try {
            // 判断当前容器长度是否为0,如果为0则不能空锁条件阻塞。
            while (count == 0)
                notEmpty.await();
            // 获取元素
            return dequeue();
        } finally {
            // 解锁
            lock.unlock();
        }
    }
    
    // 
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        // 唤醒存储空间未满的锁条件
        notFull.signal();
        return x;
    }

LinkedBlockingQueue

   static class Node<E> {
        E item;
      
        Node<E> next;

        Node(E x) { item = x; }
    }

    /** 容量限制,如果没有,则为 Integer.MAX_VALUE */
    private final int capacity;

    /** 当前的元素数量 */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * 链表头,不存在则为null。
     */
    transient Node<E> head;

    /**
     * 链表尾部,不存在则为null。
     */
    private transient Node<E> last;

    /** take poll remove 锁  */
    private final ReentrantLock takeLock = new ReentrantLock();

    /**  不为空的锁条件 */
    private final Condition notEmpty = takeLock.newCondition();

    /** 添加元素时候 put offer 锁 */
    private final ReentrantLock putLock = new ReentrantLock();

    /** 存储空间未满的锁条件 */
    private final Condition notFull = putLock.newCondition();
offer
   // 在双向链表中插入元素   
    public boolean add(E e) {
        return super.add(e);
    }
    // 在尾部插入元素
    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
     // 
   public boolean offer(E e) {
        // 判断元素是否为null
        if (e == null) throw new NullPointerException();
        // 记录当前元素长度
        final AtomicInteger count = this.count;
        // 如果容量已满,则返回插入失败
        if (count.get() == capacity)
            return false;
        int c = -1;
        Node<E> node = new Node<E>(e); 
        // 获取put锁
        final ReentrantLock putLock = this.putLock;
        // 加锁
        putLock.lock();
        try {
            // 如果没有满,在链表尾部插入数据
            if (count.get() < capacity) {
                enqueue(node);
                // 原子加一
                c = count.getAndIncrement();
                // 判断队列+1是否已满
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            // 写锁解锁
            putLock.unlock();
        }
        // 如果插入成功,则发送不为空锁的信号。
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }

    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        // 判断元素是否为空
        if (e == null) throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        int c = -1;
        // 获取put锁
        final ReentrantLock putLock = this.putLock; 
        // AtomicInteger 存放当前元素长度
        final AtomicInteger count = this.count;
        // 可中断锁,防止应为await()方法中断异常导致异常退出。
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(new Node<E>(e));
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            // put解锁
            putLock.unlock();
        }
         // 如果插入成功,则发送不为空的信号。
        if (c == 0)
            signalNotEmpty();
        return true;
    }
    
    // 将节点链接为最后一个元素,如果已满则返回 false
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }
take
    // 阻塞等待获取双向链表头元素   
     public E take() throws InterruptedException {
        E x;
        int c = -1;
        // 使用AtomicInteger保存元素中的个数
        final AtomicInteger count = this.count;
        // 获取take锁
        final ReentrantLock takeLock = this.takeLock;
        // 可中断锁,防止应为await()方法中断异常导致异常退出。
        takeLock.lockInterruptibly();
        try {
            // 如果容器长度为0,则为空锁阻塞。
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                // 唤醒存储空间未满的锁条件
                notEmpty.signal();
        } finally {
            // take锁解开
            takeLock.unlock();
        }
        if (c == capacity)
            // 通知take poll 锁信号。
        
            signalNotFull();
        return x;
    }
   
   private void signalNotFull() {
        // 获取put锁
        final ReentrantLock putLock = this.putLock;
        // put加锁
        putLock.lock();
        try {
            // 唤醒那些等待队列未满信号的线程,被唤醒的线程会尝试向队列中放入元素。
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }
    

    private void signalNotEmpty() {
        // 获取take锁,
        final ReentrantLock takeLock = this.takeLock;
        // take加锁
        takeLock.lock();
        try {
            // 唤醒那些等待队列非空信号的线程,被唤醒的线程会尝试从队列中取出元素。
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
         // 唤醒存储空间未满的锁条件
        notFull.signal();
        return x;
    }

PriorityBlockingQueue


    /**
     * 默认容器大小
     */
    private static final int DEFAULT_INITIAL_CAPACITY = 11;

    /**
     * 限制数组最大数量
     */
    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

    // 数组元素
    private transient Object[] queue;

    // 数组长度
    private transient int size;

    // 数组中的元素需要实现Comparator接口,比较大小获取优先级。
    private transient Comparator<? super E> comparator;

    // 锁
    private final ReentrantLock lock;

    // 如果列队是空的,就阻塞下去。这里只有一个条件变量,因为这个队列是无界的,向队列中插入数据的话就用CAS操作就行了
    private final Condition notEmpty;

    // 一个自旋锁,CAS使得同时只有一个线程可以进行扩容,0表示没有进行扩容,1表示正在进行扩容
    private transient volatile int allocationSpinLock;

    // 默认的构造方法,初始化容器大小
    public PriorityBlockingQueue() {
        this(DEFAULT_INITIAL_CAPACITY, null);
    }

    public PriorityBlockingQueue(int initialCapacity) {
        this(initialCapacity, null);
    }
    //初始化数组、锁、条件变量还有比较器
    public PriorityBlockingQueue(int initialCapacity,
                                Comparator<? super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    this.comparator = comparator;
    this.queue = new Object[initialCapacity];
}
offer
    // 将指定元素插入此优先级队列
    public boolean add(E e) {
        return offer(e);
    } 
    // 将指定元素插入此优先级队列。由于队列是无界的,这个方法永远不会返回fals
    public boolean offer(E e) {
        // 判断是否为null
        if (e == null)
            throw new NullPointerException();
        // 获取锁
        final ReentrantLock lock = this.lock;
        // 加锁
        lock.lock();
        int n, cap;
        Object[] array;
        // 当数组中的实际数量 大于等于 数组容量,就进行扩容。
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);
        try {
            Comparator<? super E> cmp = comparator;
            // 如果默认比较器为null。
            if (cmp == null)
                // 使用元素中Comparable方法比较优先级
                siftUpComparable(n, e, array);
            else
                // 默认比较器存在就使用默认的比较优先级
                siftUpUsingComparator(n, e, array, cmp);
            // 数组长度+1
            size = n + 1;
            // 唤醒存储空间未满的锁条件
            notEmpty.signal();
        } finally {
            // 解锁
            lock.unlock();
        }
        return true;
    }

  // 扩容方法,扩容50%
  private void tryGrow(Object[] array, int oldCap) {
        // 先释放锁,扩容可能会很慢,防止降低性能。
        lock.unlock(); // must release and then re-acquire main lock
        Object[] newArray = null;
        // 设置自旋锁,如果为0标识当前没有扩容,则使用CAS将自旋锁设置为1.
        if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                     0, 1)) {
            try {
                // 计算扩容后的大小,如果当前数组容量小于64,新数组容量就是2n+2,大于64,新的容量就是3n/2
                int newCap = oldCap + ((oldCap < 64) ?
                                       (oldCap + 2) : // grow faster if small
                                       (oldCap >> 1));
                // 判断新的数组大小释放超过最容量限制,如果超过了,则在老的数组上面+1
                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                    int minCap = oldCap + 1;
                    // 如果大于最大数组大小,就抛出异常。
                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                        throw new OutOfMemoryError();
                    newCap = MAX_ARRAY_SIZE;
                }
                if (newCap > oldCap && queue == array)
                    newArray = new Object[newCap];
            } finally {
                // 扩容完成自旋锁设置为0
                allocationSpinLock = 0;
            }
        }
        // 第一个线程在上面的if中执行CAS成功之后,第二个线程就会到这里,然后执行yield方法让出CPU,尽量让第一个线程执行完毕;
        if (newArray == null) // back off if another thread is allocating
            Thread.yield();
        // 之前已经释放锁了,这里再次加锁。
        lock.lock();
        // 复制老的数组到新的数组。
        if (newArray != null && queue == array) {
            queue = newArray;
            // 又是System.arraycopy
            System.arraycopy(array, 0, newArray, 0, oldCap);
        }
    }
take
  // 阻塞获取头元素
  public E take() throws InterruptedException {
        // 获取锁
        final ReentrantLock lock = this.lock;
        // 可中断锁,防止应为await()方法中断异常导致异常退出。
        lock.lockInterruptibly();
        try {
            // 判断容器大小是否0,如果是0则等待。
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            // 解锁
            lock.unlock();
        }
    }
    // 获取头元素
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        // 唤醒存储空间未满的锁条件
        notFull.signal();
        return x;
    }

演示代码

ArrayBlockingQueueManager

/**
 * @author z201.coding@gmail.com
 **/
@Lazy(false)
@Component
@Slf4j
public class ArrayBlockingQueueManager {
    // 必须指定容器大小
    private final static BlockingQueue<BlockingDto>  blockingQueue = new ArrayBlockingQueue(1000);

    @PostConstruct
    public void setup() {
        new Thread(() -> {
            BlockingDto blockingDto = null;
            for (; ; ) {
                try {
                    blockingDto = blockingQueue.take();
                    log.info("ArrayBlockingQueueManager  tate {}", blockingDto.getId());
                } catch (Exception e) {
                    log.error("执行队列_异常:" + e);
                }
            }
        }).start();
    }

    public void addItem(BlockingDto blockingDto) {
        if (!blockingQueue.contains(blockingDto)) {
            blockingQueue.add(blockingDto);
        }
    }

}

LinkedBlockingQueueManager

/**
 * @author z201.coding@gmail.com
 **/
@Lazy(false)
@Component
@Slf4j
public class LinkedBlockingQueueManager {

    private final static BlockingQueue<BlockingDto> blockingQueue = new LinkedBlockingQueue<>();

    @PostConstruct
    public void setup() {
        new Thread(() -> {
            BlockingDto blockingDto = null;
            for (; ; ) {
                try {
                    blockingDto = blockingQueue.take();
                    log.info("LinkedBlockingDequeManager tate {}", blockingDto.getId());
                } catch (Exception e) {
                    log.error("执行队列_异常:" + e);
                }
            }
        }).start();
    }

    public void addItem(BlockingDto blockingDto) {
        if (!blockingQueue.contains(blockingDto)) {
            blockingQueue.add(blockingDto);
        }
    }

}

PriorityBlockingQueueManager

/**
 * @author z201.coding@gmail.com
 **/
@Lazy(false)
@Component
@Slf4j
public class PriorityBlockingQueueManager {

    private final static BlockingQueue<BlockingDto> blockingQueue = new PriorityBlockingQueue<>();

    @PostConstruct
    public void setup() {
        new Thread(() -> {
            BlockingDto blockingDto = null;
            for (; ; ) {
                try {
                    blockingDto = blockingQueue.take();
                    log.info("PriorityBlockingQueueManager tate {} ", blockingDto.getId());
                } catch (Exception e) {
                    log.error("执行队列_异常:" + e);
                }
            }
        }).start();
    }

    public void addItem(BlockingDto blockingDto) {
        if (!blockingQueue.contains(blockingDto)) {
            blockingQueue.add(blockingDto);
        }
    }
}

测试代码

/**
 * @author z201.coding@gmail.com
 **/

@Getter
@Setter
public class BlockingDto implements Comparable<BlockingDto> {

    private Long id;

    public BlockingDto(Long id) {
        this.id = id;
    }
    // 使用 PriorityBlockingQueue 需要实现Comparable 用于排序功能。
    @Override
    public int compareTo(BlockingDto o) {
        if (this.id > o.getId()) {
            return 1;
        }
        return -1;
    }

}


/**
 * @author z201.coding@gmail.com
 **/
@Component
@Slf4j
@Lazy(false)
public class AppCommandLineRunner implements CommandLineRunner {

    private ArrayBlockingQueueManager arrayBlockingQueueManager;

    private LinkedBlockingQueueManager linkedBlockingQueueManager;

    private PriorityBlockingQueueManager priorityBlockingQueueManager;

    @Autowired
    public AppCommandLineRunner(ArrayBlockingQueueManager arrayBlockingQueueManager,
                                LinkedBlockingQueueManager linkedBlockingQueueManager,
                                PriorityBlockingQueueManager priorityBlockingQueueManager) {
        this.arrayBlockingQueueManager = arrayBlockingQueueManager;
        this.linkedBlockingQueueManager = linkedBlockingQueueManager;
        this.priorityBlockingQueueManager = priorityBlockingQueueManager;
    }

    @Override
    public void run(String... args) throws Exception {
        List<BlockingDto> blockingDtoList = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            blockingDtoList.add(new BlockingDto(RandomUtil.randomLong(Long.valueOf(i),Integer.MAX_VALUE)));
        }
        for (BlockingDto dto : blockingDtoList) {
            arrayBlockingQueueManager.addItem(dto);
            linkedBlockingQueueManager.addItem(dto);
            priorityBlockingQueueManager.addItem(dto);
        }
    }
}

Console
[Thread-6]  PriorityBlockingQueueManager tate 2888990 
[Thread-4]  ArrayBlockingQueueManager  tate 2888990
[Thread-5]  linkedBlockingQueue tate 2888990
[Thread-6]  PriorityBlockingQueueManager tate 143639403 
[Thread-4]  ArrayBlockingQueueManager  tate 203825030
[Thread-5]  linkedBlockingQueue tate 203825030
[Thread-4]  ArrayBlockingQueueManager  tate 778725076
[Thread-6]  PriorityBlockingQueueManager tate 203825030 
[Thread-5]  linkedBlockingQueue tate 778725076
[Thread-4]  ArrayBlockingQueueManager  tate 476610227
[Thread-6]  PriorityBlockingQueueManager tate 327313816 
[Thread-4]  ArrayBlockingQueueManager  tate 143639403
[Thread-5]  linkedBlockingQueue tate 476610227
[Thread-6]  PriorityBlockingQueueManager tate 457729417 
[Thread-5]  linkedBlockingQueue tate 143639403
[Thread-4]  ArrayBlockingQueueManager  tate 543223451
[Thread-6]  PriorityBlockingQueueManager tate 476610227 
[Thread-5]  linkedBlockingQueue tate 543223451
[Thread-6]  PriorityBlockingQueueManager tate 543223451 
[Thread-5]  linkedBlockingQueue tate 327313816
[Thread-4]  ArrayBlockingQueueManager  tate 327313816
[Thread-6]  PriorityBlockingQueueManager tate 719396010 
[Thread-5]  linkedBlockingQueue tate 1956060571
[Thread-4]  ArrayBlockingQueueManager  tate 1956060571
[Thread-5]  linkedBlockingQueue tate 457729417
[Thread-6]  PriorityBlockingQueueManager tate 778725076 
[Thread-4]  ArrayBlockingQueueManager  tate 457729417
[Thread-5]  linkedBlockingQueue tate 719396010
[Thread-4]  ArrayBlockingQueueManager  tate 719396010
[Thread-6]  PriorityBlockingQueueManager tate 1956060571 

END

Last Updated:
Contributors: 庆峰