0%

SpringBoot-Blocking-Queue

之前在订单场景使用过DelayedQueue延时列队,实际上还有阻塞列队的用法。刚工作那会写爬虫的时候有使用过。这里总结下使用。

BlockingQueue

  • 阻塞列队,BlockingQueue是java.util.concurrent包下的实现类。提供了线程安全的列队访问方式,当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满;从阻塞队列取数据时,如果队列已空,线程将会阻塞等待直到队列非空。

  • 一般使用场景,生产者消费者。
  • 列队类型有两种,无限列队、有限列队。
    • 无限队列 (unbounded queue ) - 几乎可以无限增长。
      • BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>();
        • 默认构造函数将容量设置成 Integer.MAX_VALUE
    • 有限队列 ( bounded queue ) - 定义了最大容量。
      • BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>(10);
      • BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(10);
        • ArrayBlockingQueue初始化的时候必须指定容器大小。
        • 同时可以指定是否公平锁。
  • LinkedBlockingDequeArrayBlockingQueue 都是 FIFO ,而PriorityBlockingQueue元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素。注意,此阻塞队列为无界阻塞队列,即容量没有上限,前面2种都是有界队列。
  • DelayQueue基于PriorityQueue,一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。

ArrayBlockingQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 列队存储数据
final Object[] items;

/** 元素索引位置,用于读取、取出 删除等做作*/
int takeIndex;

/** 下一个 put、offer 或 add 元素的索引 */
int putIndex;

/** 列队中的元素数量 */
int count;

/** 锁 */
final ReentrantLock lock;

/** 不为空的锁条件 */
private final Condition notEmpty;

/** 存储空间未满的锁条件 */
private final Condition notFull;

/**
* 当前活动迭代器的共享状态,如果已知不存在则为 null。允许队列操作更新迭代器状态
*/
transient Itrs itrs = null;
offer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
 // 实际调用的是offer方法 
// 如果可以在不超出队列容量的情况下立即插入指定元素,则在此队列的尾部插入指定元素,成功时返回true ,如果此队列已满则抛出IllegalStateException 。
public boolean add(E e) {
return super.add(e);
}

// 如果可以在不超过队列容量的情况下立即插入,则在此队列的尾部插入指定元素,成功时返回true ,如果队列已满则返回false 。
public boolean offer(E e) {
// 不能插入null ,否则异常。
checkNotNull(e);
// 获取锁
final ReentrantLock lock = this.lock;
// 加锁
lock.lock();
try {
// 判断容器是否满了
if (count == items.length)
return false;
else {
//
enqueue(e);
return true;
}
} finally {
// 解锁
lock.unlock();
}
}
// 在当前放置位置插入元素,前进和信号。仅在持有锁时调用。
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
// 唤醒存储空间未满的锁条件
notEmpty.signal();
}
take
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public E take() throws InterruptedException {
// 获取锁
final ReentrantLock lock = this.lock;
// 可中断锁,防止应为await()方法中断异常导致异常退出。
lock.lockInterruptibly();
try {
// 判断当前容器长度是否为0,如果为0则不能空锁条件阻塞。
while (count == 0)
notEmpty.await();
// 获取元素
return dequeue();
} finally {
// 解锁
lock.unlock();
}
}

//
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// 唤醒存储空间未满的锁条件
notFull.signal();
return x;
}

LinkedBlockingQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
static class Node<E> {
E item;

Node<E> next;

Node(E x) { item = x; }
}

/** 容量限制,如果没有,则为 Integer.MAX_VALUE */
private final int capacity;

/** 当前的元素数量 */
private final AtomicInteger count = new AtomicInteger();

/**
* 链表头,不存在则为null。
*/
transient Node<E> head;

/**
* 链表尾部,不存在则为null。
*/
private transient Node<E> last;

/** take poll remove 锁 */
private final ReentrantLock takeLock = new ReentrantLock();

/** 不为空的锁条件 */
private final Condition notEmpty = takeLock.newCondition();

/** 添加元素时候 put offer 锁 */
private final ReentrantLock putLock = new ReentrantLock();

/** 存储空间未满的锁条件 */
private final Condition notFull = putLock.newCondition();
offer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
// 在双向链表中插入元素   
public boolean add(E e) {
return super.add(e);
}
// 在尾部插入元素
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
//
public boolean offer(E e) {
// 判断元素是否为null
if (e == null) throw new NullPointerException();
// 记录当前元素长度
final AtomicInteger count = this.count;
// 如果容量已满,则返回插入失败
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
// 获取put锁
final ReentrantLock putLock = this.putLock;
// 加锁
putLock.lock();
try {
// 如果没有满,在链表尾部插入数据
if (count.get() < capacity) {
enqueue(node);
// 原子加一
c = count.getAndIncrement();
// 判断队列+1是否已满
if (c + 1 < capacity)
notFull.signal();
}
} finally {
// 写锁解锁
putLock.unlock();
}
// 如果插入成功,则发送不为空锁的信号。
if (c == 0)
signalNotEmpty();
return c >= 0;
}

public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
// 判断元素是否为空
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
// 获取put锁
final ReentrantLock putLock = this.putLock;
// AtomicInteger 存放当前元素长度
final AtomicInteger count = this.count;
// 可中断锁,防止应为await()方法中断异常导致异常退出。
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
// put解锁
putLock.unlock();
}
// 如果插入成功,则发送不为空的信号。
if (c == 0)
signalNotEmpty();
return true;
}

// 将节点链接为最后一个元素,如果已满则返回 false
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
take
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
 // 阻塞等待获取双向链表头元素   
public E take() throws InterruptedException {
E x;
int c = -1;
// 使用AtomicInteger保存元素中的个数
final AtomicInteger count = this.count;
// 获取take锁
final ReentrantLock takeLock = this.takeLock;
// 可中断锁,防止应为await()方法中断异常导致异常退出。
takeLock.lockInterruptibly();
try {
// 如果容器长度为0,则为空锁阻塞。
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
// 唤醒存储空间未满的锁条件
notEmpty.signal();
} finally {
// take锁解开
takeLock.unlock();
}
if (c == capacity)
// 通知take poll 锁信号。

signalNotFull();
return x;
}

private void signalNotFull() {
// 获取put锁
final ReentrantLock putLock = this.putLock;
// put加锁
putLock.lock();
try {
// 唤醒那些等待队列未满信号的线程,被唤醒的线程会尝试向队列中放入元素。
notFull.signal();
} finally {
putLock.unlock();
}
}


private void signalNotEmpty() {
// 获取take锁,
final ReentrantLock takeLock = this.takeLock;
// take加锁
takeLock.lock();
try {
// 唤醒那些等待队列非空信号的线程,被唤醒的线程会尝试从队列中取出元素。
notEmpty.signal();
} finally {
takeLock.unlock();
}
}

private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// 唤醒存储空间未满的锁条件
notFull.signal();
return x;
}

PriorityBlockingQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

/**
* 默认容器大小
*/
private static final int DEFAULT_INITIAL_CAPACITY = 11;

/**
* 限制数组最大数量
*/
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

// 数组元素
private transient Object[] queue;

// 数组长度
private transient int size;

// 数组中的元素需要实现Comparator接口,比较大小获取优先级。
private transient Comparator<? super E> comparator;

// 锁
private final ReentrantLock lock;

// 如果列队是空的,就阻塞下去。这里只有一个条件变量,因为这个队列是无界的,向队列中插入数据的话就用CAS操作就行了
private final Condition notEmpty;

// 一个自旋锁,CAS使得同时只有一个线程可以进行扩容,0表示没有进行扩容,1表示正在进行扩容
private transient volatile int allocationSpinLock;

// 默认的构造方法,初始化容器大小
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}

public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}
//初始化数组、锁、条件变量还有比较器
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
offer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
  // 将指定元素插入此优先级队列
public boolean add(E e) {
return offer(e);
}
// 将指定元素插入此优先级队列。由于队列是无界的,这个方法永远不会返回fals
public boolean offer(E e) {
// 判断是否为null
if (e == null)
throw new NullPointerException();
// 获取锁
final ReentrantLock lock = this.lock;
// 加锁
lock.lock();
int n, cap;
Object[] array;
// 当数组中的实际数量 大于等于 数组容量,就进行扩容。
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
// 如果默认比较器为null。
if (cmp == null)
// 使用元素中Comparable方法比较优先级
siftUpComparable(n, e, array);
else
// 默认比较器存在就使用默认的比较优先级
siftUpUsingComparator(n, e, array, cmp);
// 数组长度+1
size = n + 1;
// 唤醒存储空间未满的锁条件
notEmpty.signal();
} finally {
// 解锁
lock.unlock();
}
return true;
}

// 扩容方法,扩容50%
private void tryGrow(Object[] array, int oldCap) {
// 先释放锁,扩容可能会很慢,防止降低性能。
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
// 设置自旋锁,如果为0标识当前没有扩容,则使用CAS将自旋锁设置为1.
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
// 计算扩容后的大小,如果当前数组容量小于64,新数组容量就是2n+2,大于64,新的容量就是3n/2
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
// 判断新的数组大小释放超过最容量限制,如果超过了,则在老的数组上面+1
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
// 如果大于最大数组大小,就抛出异常。
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
// 扩容完成自旋锁设置为0
allocationSpinLock = 0;
}
}
// 第一个线程在上面的if中执行CAS成功之后,第二个线程就会到这里,然后执行yield方法让出CPU,尽量让第一个线程执行完毕;
if (newArray == null) // back off if another thread is allocating
Thread.yield();
// 之前已经释放锁了,这里再次加锁。
lock.lock();
// 复制老的数组到新的数组。
if (newArray != null && queue == array) {
queue = newArray;
// 又是System.arraycopy
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
take
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 阻塞获取头元素
public E take() throws InterruptedException {
// 获取锁
final ReentrantLock lock = this.lock;
// 可中断锁,防止应为await()方法中断异常导致异常退出。
lock.lockInterruptibly();
try {
// 判断容器大小是否0,如果是0则等待。
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
// 解锁
lock.unlock();
}
}
// 获取头元素
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// 唤醒存储空间未满的锁条件
notFull.signal();
return x;
}

演示代码

ArrayBlockingQueueManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* @author z201.coding@gmail.com
**/
@Lazy(false)
@Component
@Slf4j
public class ArrayBlockingQueueManager {
// 必须指定容器大小
private final static BlockingQueue<BlockingDto> blockingQueue = new ArrayBlockingQueue(1000);

@PostConstruct
public void setup() {
new Thread(() -> {
BlockingDto blockingDto = null;
for (; ; ) {
try {
blockingDto = blockingQueue.take();
log.info("ArrayBlockingQueueManager tate {}", blockingDto.getId());
} catch (Exception e) {
log.error("执行队列_异常:" + e);
}
}
}).start();
}

public void addItem(BlockingDto blockingDto) {
if (!blockingQueue.contains(blockingDto)) {
blockingQueue.add(blockingDto);
}
}

}

LinkedBlockingQueueManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* @author z201.coding@gmail.com
**/
@Lazy(false)
@Component
@Slf4j
public class LinkedBlockingQueueManager {

private final static BlockingQueue<BlockingDto> blockingQueue = new LinkedBlockingQueue<>();

@PostConstruct
public void setup() {
new Thread(() -> {
BlockingDto blockingDto = null;
for (; ; ) {
try {
blockingDto = blockingQueue.take();
log.info("LinkedBlockingDequeManager tate {}", blockingDto.getId());
} catch (Exception e) {
log.error("执行队列_异常:" + e);
}
}
}).start();
}

public void addItem(BlockingDto blockingDto) {
if (!blockingQueue.contains(blockingDto)) {
blockingQueue.add(blockingDto);
}
}

}

PriorityBlockingQueueManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* @author z201.coding@gmail.com
**/
@Lazy(false)
@Component
@Slf4j
public class PriorityBlockingQueueManager {

private final static BlockingQueue<BlockingDto> blockingQueue = new PriorityBlockingQueue<>();

@PostConstruct
public void setup() {
new Thread(() -> {
BlockingDto blockingDto = null;
for (; ; ) {
try {
blockingDto = blockingQueue.take();
log.info("PriorityBlockingQueueManager tate {} ", blockingDto.getId());
} catch (Exception e) {
log.error("执行队列_异常:" + e);
}
}
}).start();
}

public void addItem(BlockingDto blockingDto) {
if (!blockingQueue.contains(blockingDto)) {
blockingQueue.add(blockingDto);
}
}
}

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
/**
* @author z201.coding@gmail.com
**/

@Getter
@Setter
public class BlockingDto implements Comparable<BlockingDto> {

private Long id;

public BlockingDto(Long id) {
this.id = id;
}
// 使用 PriorityBlockingQueue 需要实现Comparable 用于排序功能。
@Override
public int compareTo(BlockingDto o) {
if (this.id > o.getId()) {
return 1;
}
return -1;
}

}


/**
* @author z201.coding@gmail.com
**/
@Component
@Slf4j
@Lazy(false)
public class AppCommandLineRunner implements CommandLineRunner {

private ArrayBlockingQueueManager arrayBlockingQueueManager;

private LinkedBlockingQueueManager linkedBlockingQueueManager;

private PriorityBlockingQueueManager priorityBlockingQueueManager;

@Autowired
public AppCommandLineRunner(ArrayBlockingQueueManager arrayBlockingQueueManager,
LinkedBlockingQueueManager linkedBlockingQueueManager,
PriorityBlockingQueueManager priorityBlockingQueueManager) {
this.arrayBlockingQueueManager = arrayBlockingQueueManager;
this.linkedBlockingQueueManager = linkedBlockingQueueManager;
this.priorityBlockingQueueManager = priorityBlockingQueueManager;
}

@Override
public void run(String... args) throws Exception {
List<BlockingDto> blockingDtoList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
blockingDtoList.add(new BlockingDto(RandomUtil.randomLong(Long.valueOf(i),Integer.MAX_VALUE)));
}
for (BlockingDto dto : blockingDtoList) {
arrayBlockingQueueManager.addItem(dto);
linkedBlockingQueueManager.addItem(dto);
priorityBlockingQueueManager.addItem(dto);
}
}
}
Console
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
[Thread-6]  PriorityBlockingQueueManager tate 2888990 
[Thread-4] ArrayBlockingQueueManager tate 2888990
[Thread-5] linkedBlockingQueue tate 2888990
[Thread-6] PriorityBlockingQueueManager tate 143639403
[Thread-4] ArrayBlockingQueueManager tate 203825030
[Thread-5] linkedBlockingQueue tate 203825030
[Thread-4] ArrayBlockingQueueManager tate 778725076
[Thread-6] PriorityBlockingQueueManager tate 203825030
[Thread-5] linkedBlockingQueue tate 778725076
[Thread-4] ArrayBlockingQueueManager tate 476610227
[Thread-6] PriorityBlockingQueueManager tate 327313816
[Thread-4] ArrayBlockingQueueManager tate 143639403
[Thread-5] linkedBlockingQueue tate 476610227
[Thread-6] PriorityBlockingQueueManager tate 457729417
[Thread-5] linkedBlockingQueue tate 143639403
[Thread-4] ArrayBlockingQueueManager tate 543223451
[Thread-6] PriorityBlockingQueueManager tate 476610227
[Thread-5] linkedBlockingQueue tate 543223451
[Thread-6] PriorityBlockingQueueManager tate 543223451
[Thread-5] linkedBlockingQueue tate 327313816
[Thread-4] ArrayBlockingQueueManager tate 327313816
[Thread-6] PriorityBlockingQueueManager tate 719396010
[Thread-5] linkedBlockingQueue tate 1956060571
[Thread-4] ArrayBlockingQueueManager tate 1956060571
[Thread-5] linkedBlockingQueue tate 457729417
[Thread-6] PriorityBlockingQueueManager tate 778725076
[Thread-4] ArrayBlockingQueueManager tate 457729417
[Thread-5] linkedBlockingQueue tate 719396010
[Thread-4] ArrayBlockingQueueManager tate 719396010
[Thread-6] PriorityBlockingQueueManager tate 1956060571

END