博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
11.并发包阻塞队列之LinkedBlockingQueue
阅读量:6614 次
发布时间:2019-06-24

本文共 6619 字,大约阅读时间需要 22 分钟。

jdk1.7.0_79 

  在上文中简要解析了ArrayBlockingQueue部分源码,在本文中同样要介绍的是Java并发包中的阻塞队列LinkedBlockingQueueArrayBlockingQueue队列是由数组实现,而LinkedBlockingQueue队列的实现则是链表(单向链表)实现,所以在LinkedBlockingQueue有一个Node内部类来表示链表的节点。  

static final class Node
{   E item;//入队元素   Node
next;//指向后继节点   Node(E x) {     item = x;   } }

  同样它也有3个构造方法,与ArrayBlockingQueue略有不同。

1 public LinkedBlockingQueue() {  2   this(Integer.MAX_VALUE)//默认构造容量为int型的最大值队列  3 }  4 public LinkedBlockingQueue(int capacity) {  5   if (capacity <= o) throw new IllegalArgumentException();  6   this.capacity = capacity;  7   last = head = new Node
(null);//头指针和尾指针指向头节点(null) 8 } 9 public LinkedBlockingQueue(Collection
c ) { 10   this(Integer.MAX_VALUE); 11   final ReentrantLock putLock = this.putLock; 12   putLock.lock();//这里和ArrayBlockingQueue也会获取锁,但它同样不是为了互斥操作,同样也是为了保证其可见性。 13   try { 14    int n = 0; 15   for (E e : c) { 16    if (e == null) 17    throw new NullPointerException(); 18   if (n == capacity) 19    throw new IllegalStateException("Queue full"); 20    enqueue(new Node
(e));//入队 21   ++n; 22   } 23   count.set(n); 24   } finally { 25    putLock.unlock(); 26   } 27 }

  在第12行中获取锁是为了保证可见性,这个的原因我认为是,线程T1是实例化LinkedBlockingQueue对象,T2是对实例化的LinkedBlockingQueue对象做入队操作(当然要保证T1T2的执行顺序),如果不对它进行加锁操作(加锁会保证其可见性,也就是写回主存),T1的集合c有可能只存在T1线程维护的缓存中,并没有写回主存,T2中实例化的LinkedBlockingQueue维护的缓存以及主存中并没有集合c,此时就因为可见性造成数据不一致的情况,引发线程安全问题。 

  在了解完LinkedBlockingQueue的构造方法后,我们回过头来看LinkedBlockingQueue的两个成员变量: 

private final ReentrantLock takeLock = new ReentrantLock(); private final ReentrantLock putLock = new ReentrantLock();

  可见LinkedBlockingQueue中有两个锁,一个是为了锁住入队操作,一个是为了锁住出队操作。而在ArrayBlockingQueue中则只有一个锁,同时锁住队列的入队、出队操作。 

private final Condition notEmpty = takeLock.newCondition(); private final Condition notFull = putLock.newCondition();

  这两个成员变量则是线程等待队列,一个是出队锁上的等待队列,一个是入队锁上的等待队列。在ArrayBlockingQueue也有两个等待队列,一个是非空等待队列,另一个则是非满等待队列,在这一点上两者一致。 

队列元素的插入 

 

抛出异常 

返回值(非阻塞) 

一定时间内返回值 

返回值(阻塞) 

插入 

add(e)//队列未满时,返回true;队列满则抛出IllegalStateException(“Queue full”)异常——AbstractQueue 

offer(e)//队列未满时,返回true;队列满时返回false。非阻塞立即返回。 

offer(e, time, unit)//设定等待的时间,如果在指定时间内还不能往队列中插入数据则返回false,插入成功返回true 

put(e)//队列未满时,直接插入没有返回值;队列满时会阻塞等待,一直等到队列未满时再插入。 

  LinkedBlockingQueue中并没有像ArrayBlockingQueue那样重写了AbstractQueueadd方法而直接调用父类的add方法,所以LinkedBlockingQueue#add方法与ArrayBlockingQueue#add一样,都是直接调用其AbstractQueue

//AbstractQueue#add,这是一个模板方法,只定义add入队算法骨架,成功时返回true,失败时抛出IllegalStateException异常,具体offer实现交给子类实现。 public boolean add(E e) {   if (offer(e))//offer方法由Queue接口定义     return true;   else     throw new IllegalStateException(); }
1 //LinkedBlockingQueue#offer  2 public boolean offer(E e) {  3   if (e == null) throw new NullPointerException();  4   final AtomicInteger count = this.count;//原子型int变量,线程安全,指向队列数据量引用  5   if (count.get() == capacity) //当数据量等于队列容量时,无法入队,返回false  6     return false;  7   int c = -1;  8   Node
node = new Node(e); 9   final ReentrantLock putLock = this.putLock;//插入锁 10   putLock.lock();//获得插入锁 11   try { 12     if (count.get() < capacity) { 13       enqueuer(node);//入队 14       c = count.getAndIncrement();//队列数据总数自增+1后返回 15       if (c + 1 < capacity) 16         notFull.signal();//唤醒非满等待队列上的线程 17     } 18   } finally { 19     putLock.unlock(); 20   } 21   if (c == 0) 22     signalNotEmpty();//队列中刚好有一个数据,唤醒非空等待队列 23   return c >= 0 24 }

  在第10行是获取插入锁,和ArrayBlockingQueue只有一个锁不同的是,LinkedBlockingQueue分为入队锁和出队锁,也就是说对于ArrayBlockingQueue同时只能有一个线程对它进行入队或者出队操作,而对于LinkedBlockingQueue来说同时能有两个线程对队列进行入队或者出队操作。 

  前两个addoffer方法都是非阻塞的,对于put方法则是阻塞的,线程会一直阻塞直到线程非空或者非满,但是它在阻塞时能被线程中断返回。

//LinkedBlockingQueue#put public void put(E e) throws InterruptedException {   if (e == null) throws new NullPointerException();   int c = -1;   Node
node = new Node(e);   final ReentrantLock putLock = this.putLock;   final AtomicInteger count = this.count;   putLock.lockInterrupted();//能被线程中断地获取锁   try {     while (count.get() == capacity) {
//队列数据量等于队列容量       notFull.await();//休眠非满等待队列上的线程     }     enqueuer(node);//入队     c = count.getAndIncrement();//队列数据总数自增+1后返回     if (c + 1 < capacity)//还没有达到队列容量       notFull.signal();//唤醒非满等待队列上的线程   } finally {     putLock.unlock();   }   if (c == 0)   signalNotEmpty();//唤醒非空等待队列上的线程 }

  队列插入的最后一个方法来看上面出现的enqueue入队方法。

private void enqueuer(Node
node) {   last = last.next = node;//将LinkedBlockingQueue中指向队尾的last.next指向新加入的node节点 }

队列元素的删除 

抛出异常 

返回值(非阻塞) 

一定时间内返回值 

返回值(阻塞) 

remove()//队列不为空时,返回队首值并移除;队列为空时抛出NoSuchElementException()异常——AbstractQueue 

poll()//队列不为空时返回队首值并移除;队列为空时返回null。非阻塞立即返回。 

poll(time, unit)//设定等待的时间,如果在指定时间内队列还未孔则返回null,不为空则返回队首值 

take(e)//队列不为空返回队首值并移除;当队列为空时会阻塞等待,一直等到队列不为空时再返回队首值。 

//AbstractQueue#remove,同样这也是一个模板方法,定义删除队列元素的算法骨架,具体实现由子类来实现poll方法 public E remove() {    E x = poll();//poll方法由Queue接口定义    if (x != null)      return x;    else      throw new NoSuchElementException();  }
//LinkedBlockingQueue#poll public E poll() {   final AtomicInteger count = this.count;   if (count.get() == 0)      return null;   E x = null;   int c = -1;   final ReentrantLock takeLock = this.takeLock;   takeLock.lock();//获取出队锁   try {     if (count.get() > 0) {
//队列不为空       x = dequeuer();//出队       c = count.getAndDecrement();//队列数据自减-1返回       if ( c > 1)         notEmpty.signal();//唤醒非空等待队列上的线程     }   } finally {     takeLock.unlock();   }   if (c == capacity)     signalNotFull();//唤醒非满等待队列上的线程   return x; }

  前两个removepoll方法都是非阻塞的,对于take方法则是阻塞的,线程会一直阻塞直到线程非空或者非满,但是它在阻塞时能被线程中断返回。

public E take() throws InterruptedException {   E x;   int c = -1;   final AtomicInteger count = this.count;   final ReentrantLock takeLock = this.takeLock;   take.lockInterruptibly();//可被线程中断返回地获取锁   try {     while (count.get() == 0) {
//队列数据为空       notEmpty.await();//休眠非空等待队列上的线程     }     x = dequeuer();//此时非空等待队列上的线程被唤醒,队列数据不为空,出队     c = count.getAndDecrement();   if (c > 1)     notEmpty.signal();//唤醒非空等待队列上的线程   } finally {     takeLock.unlock();   }   if (c == capacity)     signalNotFull();//唤醒非满等待队列   return x; }

  队列出队的最后一个方法来看上面出现的dequeue入队方法

private E dequeue() {   Node
h = head;//头节点,为空   Node
first = h.next;   h.next = h;//此时没有节点指向头节点,便于GC   head = first;   E x = first.item;   first.item = null;   return x; }

  最后一个方法size。

public int size() {   return count.get();//和ArrayBlockingQueue类似,与ConcurrentLinkedQueue不同,没有遍历整个队列,而是直接返回count变量。此处的count是AtomicInteger变量。 }

 


这是一个能给程序员加buff的公众号 

 

转载地址:http://qweso.baihongyu.com/

你可能感兴趣的文章
SpringBoot权限控制
查看>>
阿里云中间件技术 促进互联网高速发展
查看>>
智能时代悄然到来 物联网称王将引爆传感器产业
查看>>
物理隔离计算机被USB蜜蜂刺破 数据通过无线信号泄露
查看>>
利用一点机器学习来加速你的网站
查看>>
中国域名现状:应用水平较低,安全仍存隐患
查看>>
Java中HashMap的原理分析
查看>>
React Native入门项目与解析
查看>>
云计算:大势所趋 你准备好了么?
查看>>
数据资产的运营商--天市大数据交易平台
查看>>
中小企业如何成功转型跨境电商
查看>>
java中文乱码解决之道(二)—–字符编码详解:基础知识 + ASCII + GB**
查看>>
《ANTLR 4权威指南》——2.5 语法分析树监听器和访问器
查看>>
02_JNI中Java代码调用C代码,Android中使用log库打印日志,javah命令的使用,Android.mk文件的编写,交叉编译...
查看>>
TIOBE 2016 年 5 月编程语言排行榜:Ruby 排名创历史新高
查看>>
这些国货,在阿里平台上被美国剁手党抢疯了
查看>>
《Excel 职场手册:260招菜鸟变达人》一第 2 招 常用快捷键Windows与Mac对照
查看>>
《Greenplum企业应用实战》一第1章 Greenplum简介1.1 Greenplum的起源和发展历程
查看>>
开源世界已成围城:成本让企业蜂拥而来,也让企业退缩转投
查看>>
嵌入式实时应用开发实战(原书第3版)》——3.3 保护模式架构
查看>>