今天我們來聊一聊以數(shù)組為數(shù)據(jù)結(jié)構(gòu)的阻塞隊(duì)列 ArrayBlockingQueue,它實(shí)現(xiàn)了 BlockingQueue 接口,繼承了抽象類 AbstractQueue。
BlockingQueue 提供了三個(gè)元素入隊(duì)的方法。
boolean add(E e);
boolean offer(E e);
void put(E e) throws InterruptedException;
三個(gè)元素出隊(duì)的方法。
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
boolean remove(Object o);
一起來看看,ArrayBlockingQueue 是如何實(shí)現(xiàn)的吧。
初識(shí)
首先看一下 ArrayBlockingQueue 的主要屬性和構(gòu)造函數(shù)。
屬性
//存放元素
final Object[] items;
//取元素的索引
int takeIndex;
//存元素的索引
int putIndex;
//元素的數(shù)量
int count;
//控制并發(fā)的鎖
final ReentrantLock lock;
//非空條件信號(hào)量
private final Condition notEmpty;
//非滿條件信號(hào)量
private final Condition notFull;
transient Itrs itrs = null;
從以上屬性可以看出:
- 以數(shù)組的方式存放元素。
- 用 putIndex 和 takeIndex 控制元素入隊(duì)和出隊(duì)的索引。
- 用重入鎖控制并發(fā)、保證線程的安全。
構(gòu)造函數(shù)
ArrayBlockingQueue 有三個(gè)構(gòu)造函數(shù),其中 public ArrayBlockingQueue(int capacity, boolean fair, Collection c)
構(gòu)造函數(shù)并不常用,暫且不提。看其中兩個(gè)構(gòu)造函數(shù)。
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
//構(gòu)造數(shù)組
this.items = new Object[capacity];
//默認(rèn)以非公平鎖初始化 ReentrantLock
lock = new ReentrantLock(fair);
//創(chuàng)建兩個(gè)條件信號(hào)量
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
可以看出 ArrayBlockingQueue 必須再創(chuàng)建時(shí)傳入數(shù)組的大小。
元素入隊(duì)
ArrayBlockingQueue 有 add()、offer()、put()、offer(E e, long timeout, TimeUnit unit) 方法用來元素的入隊(duì)。
add
//ArrayBlockingQueue.add()
public boolean add(E e) {
//調(diào)用父類的 AbstractQueue.add() 方法
return super.add(e);
}
//AbstractQueue.add()
public boolean add(E e) {
//調(diào)用 ArrayBlockingQueue.offer(),成功則返回 true,否則拋出異常
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
//ArrayBlockingQueue.offer()
public boolean offer(E e) {
//非空檢查
checkNotNull(e);
//加鎖
final ReentrantLock lock = this.lock;
lock.lock();
try {
//數(shù)組滿了,返回 false
if (count == items.length)
return false;
else {
//添加元素
enqueue(e);
return true;
}
} finally {
//解鎖
lock.unlock();
}
}
//ArrayBlockingQueue.enqueue()
private void enqueue(E x) {
final Object[] items = this.items;
//直接放到 putIndex 的位置
items[putIndex] = x;
//如果索引滿了,putIndex 就從 0 開始,為什么呢?
if (++putIndex == items.length)
putIndex = 0;
//數(shù)量加一
count++;
//數(shù)組里面有數(shù)據(jù)了,對(duì) notEmpty 條件隊(duì)列進(jìn)行通知
notEmpty.signal();
}
上面留下了一個(gè)坑,索引等于數(shù)組的長(zhǎng)度的時(shí)候,索引就從 0 開始了。其實(shí)很簡(jiǎn)單,這個(gè)數(shù)組是不是先入先出的,0 索引的數(shù)組先入隊(duì),也是先出隊(duì)的。這時(shí)候 0 索引的位置就空了,所以 putIndex 到達(dá)數(shù)組長(zhǎng)度的時(shí)候就可以從 0 開始。這里可以看出,ArrayBlockingQueue 是絕對(duì)不可以修改數(shù)組長(zhǎng)度的,一旦初始化后長(zhǎng)度就不能再改變了。
put
//ArrayBlockingQueue.put()
public void put(E e) throws InterruptedException {
//非空檢查
checkNotNull(e);
final ReentrantLock lock = this.lock;
//加鎖
lock.lockInterruptibly();
try {
//數(shù)組滿了,線程加入 notFull 隊(duì)列中等待被喚醒
while (count == items.length)
notFull.await();
//添加元素
enqueue(e);
} finally {
//解鎖
lock.unlock();
}
}
offer
ArrayBlockingQueue 中有兩個(gè) offer() 方法,offer(E e) 和 offer(E e, long timeout, TimeUnit unit),add() 方法調(diào)用的就是 offer(E e) 方法。
//ArrayBlockingQueue.offer(E e, long timeout, TimeUnit unit)
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
//非空檢查
checkNotNull(e);
//將時(shí)間轉(zhuǎn)換為納秒
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
//加鎖
lock.lockInterruptibly();
try {
//當(dāng)數(shù)組滿了
while (count == items.length) {
//時(shí)間到了,元素還沒有入隊(duì),則返回 false
if (nanos <= 0)
return false;
//線程加入 notFull 隊(duì)列中,等待被喚醒,到達(dá) nanos 時(shí)間返回剩余的 nanos 時(shí)間
nanos = notFull.awaitNanos(nanos);
}
//元素入隊(duì)
enqueue(e);
return true;
} finally {
//解鎖
lock.unlock();
}
}
以上就是所有的元素入隊(duì)的方法,可以得出一些結(jié)論:
- add() 元素滿了,就拋出異常。
- offer() 元素滿了,返回 false。
- put() 元素滿了,線程阻塞等待被入隊(duì)。
- offer(E e, long timeout, TimeUnit unit) 加入超時(shí)時(shí)間,如果時(shí)間到了元素還是沒有被入隊(duì),則返回 false
移除元素
ArrayBlockingQueue 提供了 poll()、take()、poll(long timeout, TimeUnit unit)、remove() 方法用于元素的出隊(duì)。
poll
ArrayBlockingQueue 中有兩個(gè) poll() 方法,poll() 和 poll(long timeout, TimeUnit unit)。
//ArrayBlockingQueue.poll()
public E poll() {
final ReentrantLock lock = this.lock;
//加鎖
lock.lock();
try {
//沒有元素返回 null,否則元素出隊(duì)
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
//ArrayBlockingQueue.dequeue()
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
//獲取 takeIndex 上的元素
E x = (E) items[takeIndex];
//設(shè)置 takeIndex 索引上的元素為 null
items[takeIndex] = null;
//當(dāng) takeIndex 長(zhǎng)度是數(shù)組長(zhǎng)度,takeIndex 索引從 0 開始
if (++takeIndex == items.length)
takeIndex = 0;
//元素?cái)?shù)量 -1
count--;
if (itrs != null)
//更新迭代器
itrs.elementDequeued();
//喚醒 notFull 的等待隊(duì)列,其中等待的第一個(gè)線程可以添加元素了
notFull.signal();
return x;
}
//ArrayBlockingQueue.poll(long timeout, TimeUnit unit)
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
////將時(shí)間轉(zhuǎn)換為納秒
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
//加鎖
lock.lockInterruptibly();
try {
//數(shù)組為空,超時(shí)還沒有元素出隊(duì),則返回 null
while (count == 0) {
if (nanos <= 0)
return null;
//線程加入 notEmpty 中,等待被喚醒,到達(dá) nanos 時(shí)間返回剩余的 nanos 時(shí)間
nanos = notEmpty.awaitNanos(nanos);
}
//元素出隊(duì)
return dequeue();
} finally {
lock.unlock();
}
}
take
//ArrayBlockingQueue.take()
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//加鎖
lock.lockInterruptibly();
try {
//無元素
while (count == 0)
//將線程加入 notEmpty 的等待隊(duì)列中,等待被入隊(duì)的元素喚醒
notEmpty.await();
//元素出隊(duì)
return dequeue();
} finally {
//解鎖
lock.unlock();
}
}
remove
//ArrayBlockingQueue.remove()
public boolean remove(Object o) {
//非空檢查
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
//加鎖
lock.lock();
try {
if (count > 0) {
//入隊(duì)元素的索引
final int putIndex = this.putIndex;
//出隊(duì)元素的索引
int i = takeIndex;
do {
//找到元素
if (o.equals(items[i])) {
removeAt(i);
return true;
}
//i 等于數(shù)組長(zhǎng)度的時(shí)候,從 0 開始
if (++i == items.length)
i = 0;
// i == putIndex 說明已經(jīng)遍歷了一遍
} while (i != putIndex);
}
return false;
} finally {
//解鎖
lock.unlock();
}
}
//ArrayBlockingQueue.removeAt()
void removeAt(final int removeIndex) {
final Object[] items = this.items;
//需要出隊(duì)的 removeIndex 正好是 takeIndex
if (removeIndex == takeIndex) {
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
//更新迭代器
if (itrs != null)
itrs.elementDequeued();
} else {
final int putIndex = this.putIndex;
// 循環(huán)移動(dòng)元素,將 next 元素向前移動(dòng) 1 個(gè)
for (int i = removeIndex;;) {
int next = i + 1;
if (next == items.length)
next = 0;
if (next != putIndex) {
items[i] = items[next];
i = next;
} else {
//設(shè)置 i 索引的位置為空,putIndex 索引為 i
items[i] = null;
this.putIndex = i;
break;
}
}
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
}
// 喚醒 notFull 隊(duì)列中等待的線程,通知可以元素入隊(duì)了
notFull.signal();
}
以上就是所有的元素出隊(duì)的方法,可以得出一些結(jié)論:
- poll() 元素出隊(duì)為空,則返回空
- take() 元素出隊(duì)為空的時(shí)候,會(huì)阻塞線程
- remove() 元素出隊(duì)的時(shí)候可能會(huì)移動(dòng)數(shù)組
- poll(long timeout, TimeUnit unit) 加入超時(shí)時(shí)間,如果時(shí)間到了還是沒有元素需要出隊(duì),則返回 null
總結(jié)
ArrayBlockingQueue 可以被用在生產(chǎn)者和消費(fèi)者模型中。
- ArrayBlockingQueue,不能被擴(kuò)容,初始化被指定容量。
- 利用 putIndex 和 takeIndex 循環(huán)利用數(shù)組。
- 利用了 ReentrantLock 和 兩個(gè) Condition 保證了線程的安全。
-
接口
+關(guān)注
關(guān)注
33文章
9001瀏覽量
153735 -
函數(shù)
+關(guān)注
關(guān)注
3文章
4381瀏覽量
64865 -
數(shù)據(jù)結(jié)構(gòu)
+關(guān)注
關(guān)注
3文章
573瀏覽量
40748 -
數(shù)組
+關(guān)注
關(guān)注
1文章
420瀏覽量
26542
發(fā)布評(píng)論請(qǐng)先 登錄
SystemVerilog中的類構(gòu)造函數(shù)new
一個(gè)基于多屬性協(xié)商的效用函數(shù)研究
基于生成函數(shù)的格雷對(duì)分析與構(gòu)造
基于plateaued函數(shù)的平衡布爾函數(shù)構(gòu)造
如何深度解析C++拷貝構(gòu)造函數(shù)詳細(xì)資料說明

Linux共享庫的構(gòu)造函數(shù)和析構(gòu)函數(shù)

類的拷貝構(gòu)造函數(shù)主要用途是什么?
C++:詳談構(gòu)造函數(shù)

C++:詳談拷貝構(gòu)造函數(shù)

C++之拷貝構(gòu)造函數(shù)的淺copy及深copy
c++中構(gòu)造函數(shù)學(xué)習(xí)的總結(jié)(一)
基于布爾函數(shù)導(dǎo)數(shù)的布爾置換構(gòu)造
2.10 學(xué)生類-構(gòu)造函數(shù) (15分)

評(píng)論