LinkedBlockingQueue是使用独占锁(ReentrantLock)实现的阻塞队列,是线程安全的。底层使用的数据结构为单向链表。只能从队头获取元素,只能从队尾添加元素,且添加元素和获取元素都有独立的锁,从而实现读写分离,读写操作可并行执行。
类图如下:
head、last:队列头节点和队列尾节点。
count:队列中元素的个数,初始值为0。
takeLock:获取队头元素时,使用该独占锁
putLock:在队尾位置添加元素时,使用该独占锁
notEmpty:出队时的条件队列
notFull:入队时的条件队列
类图中查看属性类型。
添加数据:
boolean offer(E e):添加数据,队满则抛弃
void put(E e):添加数据,队满则阻塞。推荐
源码解析
boolean offer(E e):向队列尾部添加一个元素,队满返回false且遗弃该元素。源码如下
public boolean offer(E e) {
//当元素为空时,抛出NullPointerException异常
if (e == null) throw new NullPointerException();
//当队满时,遗弃数据元素并返回false
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
//构造新节点,并获取putLock独占锁
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
//入队且元素个数加1
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty(); //唤醒出队时阻塞,放入notEmpty条件队列中的线程
return c >= 0;
}
//往队尾添加新的节点
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
void put(E e):向队列尾部添加一个元素,队满线程阻塞,并放入notFull添加队列。源码如下
public void put(E e) throws InterruptedException {
//数据元素为空,返回NullPointerException异常
if (e == null) throw new NullPointerException();
int c = -1;
//创建新节点并获取putLock独占锁
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
//队满时,线程阻塞,放入notFull条件队列
while (count.get() == capacity) {
notFull.await();
}
//入队且元素个数加1
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
区别:1、队满时,offer()方法遗弃数据元素并返回false,put()方法会把线程阻塞并放入notFull条件队列中
2、如果在被阻塞时被其他线程设置中断标识时,offer()方法不做处理,put()方法会抛出InterruptedException异常。
获取数据:
E take():获取并弹出元素,队空则阻塞 推荐
E poll():获取并弹出元素,队空则返回null
E peek():获取不弹出
源码解析
E take():弹出并获取队头元素。队列为空,线程阻塞且放入notEmpty条件队列中
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
//获取takeLock独占锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//队列为空时,线程阻塞并放入notEmpty条件队列中
while (count.get() == 0) {
notEmpty.await();
}
//删除且返回队头元素
x = dequeue();
//元素个数减1
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull(); //唤醒入队时阻塞,放入notFull条件队列中的线程
return x;
}
//删除队头并返回队头元素
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
E poll():弹出并获取数据元素,队列为空时,线程不会阻塞且返回null。
public E poll() {
final AtomicInteger count = this.count;
//队列为空,返回null
if (count.get() == 0)
return null;
E x = null;
int c = -1;
//获取takeLock 锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
//删除并返回队头元素
x = dequeue();
//元素个数减1
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull(); //唤醒入队时阻塞,放入notFull条件队列中的线程
return x;
}
E peek():获取队头元素,但不弹出队头元素
public E peek() {
//队列为空,返回null
if (count.get() == 0)
return null;
//获取takeLock 锁,并返回队头元素
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
其他方法:
boolean remove(Object o):删除指定元素。
说明:调用remove方法时,由于获取了putLock和takeLock独占锁。所以遍历查找元素是线程安全的。此时,调用入队和出队操作的线程都会被阻塞。
public boolean remove(Object o) {
//删除的元素为空返回false
if (o == null) return false;
//获取putLock和takeLock独占锁
fullyLock();
try {
//对队头向队尾依次查找,存在,删除并返回true.不存在,返回false
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
//释放putLock和takeLock独占锁
fullyUnlock();
}
}
//获取putLock和takeLock独占锁
void fullyLock() {
putLock.lock();
takeLock.lock();
}
//释放putLock和takeLock独占锁
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
int size():获取元素个数。
模拟场景:一个餐馆,由一个厨师和一个服务员,接到订单后,服务员需等到厨师做好食物,厨师准备好食物后,会通知服务员取走食物。然后继续等待。厨师代表生产者,服务员代表消费者
/**
* 订单
*/
public class Order {
private static final int foodMaterialCount=10;
private static int initCount=0;
private int finishCount;
public Order() {
finishCount=++initCount;
if(finishCount==foodMaterialCount){
System.out.println("食材已经用完,餐馆打样了");
System.exit(0);
}
}
public int getFinishCount() {
return finishCount;
}
}
/**
* 服务员
*/
public class Waiter extends Thread{
private BlockingQueue<Order> blockingQueue;
public Waiter(BlockingQueue blockingQueue) {
this.blockingQueue=blockingQueue;
}
@Override
public void run() {
while (true){
try {
Order take = blockingQueue.take();
System.out.println("服务员"+Thread.currentThread().getId()+"接到通知,取走第"+ take.getFinishCount() +"份食物");
System.out.println();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
/**
* 厨师
*/
public class Chef extends Thread{
private BlockingQueue<Order> blockingQueue;
public Chef(BlockingQueue blockingQueue) {
this.blockingQueue=blockingQueue;
}
@Override
public void run() {
while (true){
try {
System.out.println("厨师"+Thread.currentThread().getId()+"接到订单,并做好食物");
Order order=new Order();
blockingQueue.put(order);
System.out.println("厨师"+Thread.currentThread().getId()+"---制作了第"+ order.getFinishCount() +"份食物-- 通知服务员取走食物 ------");
Thread.sleep(100);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
测试代码:
public static void main(String[] args) {
BlockingQueue<Order> blockingQueue=new LinkedBlockingDeque<>();
Waiter waiter = new Waiter(blockingQueue);
Chef chef = new Chef(blockingQueue);
waiter.start();
chef.start();
}
测试结果:
参考:《java并发编程之美》
因篇幅问题不能全部显示,请点此查看更多更全内容