即日起在codingBlog上分享您的技术经验即可获得积分,积分可兑换现金哦。

[JAVA学习与进阶学习与进阶笔记-77]相关BlockingQueue

编程语言 sinat_36263171 86℃ 0评论

java.util.concurrent


Interface BlockingQueue

All Superinterfaces:


Collection, Iterable, Queue


All Known Subinterfaces:


BlockingDeque, TransferQueue

All Known Implementing Classes:


LinkedBlockingQueue,


ArrayBlockingQueue,


LinkedBlockingDeque,


**************


LinkedTransferQueue, 【ConcurrentLinkedQueue、SynchronousQueue(公平模式)和LinkedBlockingQueue的超集,性能更优】


SynchronousQueue,


**************


DelayQueue,


PriorityBlockingQueue,

BlockingQueue implementations are designed to be used primarily for producer-consumer queues, but additionally support the Collection interface. So, for example, it is possible to remove an arbitrary element from a queue using remove(x). However, such operations are in general not performed very efficiently, and are intended for only occasional use, such as when a queued message is cancelled.
【BlockingQueue主要适用于生产者-消费者的场景。如果当做集合来使用,也可以,但通常效率不高。】

BlockingQueue implementations are thread-safe.
【BlockingQueue是线程安全的,基本操作都使用了锁或者其它同步操作确保是原子操作。】

各个子类的说明:


【LinkedBlockingQueue】


以链表形式实现的队列,其内部的基本元素为Node对象。


典型的FIFO队列,典型的操作为poll和put,分别在head和tail进行操作,分别使用了takeLock和putLock来进行同步。


上述的takeLock和putLock均使用了condition来配合使用。

<<补充blocking的实现原理>>
    LinkedBlockingQueue 通过reentrantLock的condition分别在队列为空时,对take线程进行阻塞,在队列满时,对put线程进行阻塞。
    condition的使用跟object对象锁非常类似,首先线程要持有锁,当条件满足时,调用condition.await()使当前线程加入到此condition
的等待队列,线程进入休眠状态(等待唤醒或中断),同时线程会释放锁(区别于Thread.sleep(time),sleep不释放锁)。
    当条件不满足时(限制解除),调用condition.signal()唤醒等待队列的线程,这些线程重新进入调度机制供调度(调度哪个线程来执行
基本上是随机的,取决于线程调度策略以及线程的状态),这个过程类似于Object.notifyAll()以及Object.notify()。

<<补充与Array的性能比较、适用场景>>
    整体的比较,这篇文章说的相对较全面:http://blog.csdn.net/zcc_0015/article/details/11695191
    Array的特点是长度固定,遍历查找比List要快;缺点也非常明显,在中间进行删除、插入操作开销较大,而且伸缩性差,queue的长度
快速增加而Array长度不足时,需要频繁地进行内存申请,数据拷贝,内存回收,效率较低。
    List的优点与缺点正好相反,除了遍历速度较慢,其它方面全面占优。当queue的长度非常大时,List在内存消耗上的优势较明显,因为
List不需要预先申请一块大的内存,因此在内存占用的伸缩性上要好很多。
    在BlockingQueue的实现中,Linked分别使用了2个锁(以及condition)来控制take和put操作,因此两种操作可并行,吞吐量更高;而Array
使用1个锁(搭配这把锁的2个condition)来控制take和put操作,因此两种操作不能并行,推测这样实现的原因,在于Array的put,take操作
影响的是一整块内存,如果Array头元素被取出,整体需要前移,此时如果另外一个线程在队列尾进行put操作,则会产生意外的结果,这点也
是源自Array与List在特性上的差异。

    实际的使用中:
    SynchronousQueue 可以看做是capacity为1的blockingQueue;
    ArrayBlockingQueue 适用于 线程少 (<20) ,Queue长度短 (<30) 
    LinkedBlockingQueue 线程多(>20),Queue长度长(>30)

综上:
    在需要考虑伸缩性以及不需要极致效率的通用场景下,使用Linked,Synchronous以及Array仅在特定的场景下使用,如queue的size可以准确
预估且put与take并发不频繁的场景。
    虽然Linked的Node创建和回收可能会对效率有一些影响(产生大量的Node内存垃圾对GC有影响),但在大部分的情况下,我认为这种影响甚微,
Linked的适用性明显强于Array,除非测试证明Linked不合适,否则尽量使用Linked,至少可以有更好的内存使用灵活性。

【ArrayBlockingQueue】


内部承载实质是一个Object数组: final Object[] items = new Object[capacity];


其在接口上与LinkedBlockingQueue基本一致,性能的差异主要表现在链表和数组实现相同的操作时的差异。


ArrayBlockingQueue 的put和take操作不能并发,因为它们使用同一把锁控制。

【LinkedBlockingDeque】


因为是双端队列,因此操作上比单端的FIFO队列要多在head插入,在tail取出元素的操作,其它基本一致。

【LinkedTransferQueue】


ConcurrentLinkedQueue、SynchronousQueue(公平模式)和LinkedBlockingQueue的超集。


所谓transfer就是转交,producer调用transfer方法,将对象转交给consumer,直到这个过程完成,producer才会从阻塞状态恢复,而不是


像LinkedBlockingQueue那样producer只需要向queue放入一个对象就跑路了。


LinkedTransferQueue是SynchronousQueue的升级版,性能更佳,LinkedTransferQueue的性能分别是SynchronousQueue的3倍(非公平模式)


和14倍(公平模式)。


因为像ThreadPoolExecutor这样的类在任务传递时都是使用SynchronousQueue,所以使用LinkedTransferQueue来代替SynchronousQueue也会


使得ThreadPoolExecutor得到相应的性能提升。


回想起使用ThreadPoolExecutor方法时,有一个参数是指定任务队列,这个任务队列在coresize的线程不够用时,用来缓存任务,如果队列满,


则threadpool才会考虑逐步增加core thread,直到上限,然后触发丢弃策略。这里的core threads其实就是任务的consumer线程,而提交任务给


threadpool的业务线程则是producer,因此可以看到,指定的任务队列的特性,将影响threadpool的行为:


1、SynchronousQueue:默认长度为0,capacity为1,隐性实现了transfer接口,当任务被提交给线程池时,任务会立刻被取走执行(如果有


空闲工作线程的话)


2、LinkedBlockingQueue/ArrayBlockingQueue:普通的FIFO队列,可以是无界/有界,建议有界,可以有效防止内存被快速耗尽。


3、LinkedTransferQueue:比 SynchronousQueue 有更好的性能。

    LinkedTransferQueue的独特之处在于它实现了 TransferQueue 接口:
int getWaitingConsumerCount()
    Returns an estimate of the number of consumers waiting to receive elements via BlockingQueue.take() or timed poll.

boolean hasWaitingConsumer()
    Returns true if there is at least one consumer waiting to receive an element via BlockingQueue.take() or timed poll.

void    transfer(E e)
    Transfers the element to a consumer, waiting if necessary to do so.

boolean tryTransfer(E e)
    Transfers the element to a waiting consumer immediately, if possible.true if the element was transferred, else false
    [失败则将element加入到tail然后producer返回]

boolean tryTransfer(E e, long timeout, TimeUnit unit)
    Transfers the element to a consumer if it is possible to do so before the timeout elapses.

    transfer的实现原理简述:
        consumer预先往queue中插入一个null节点,同时阻塞等待,此时正好有producer要插入对象,发现有null节点,则直接放入null
    节点,同时调用类似于condition.signal()这样的方法,唤醒被阻塞而休眠的consumer线程,从而完成一个transfer(转交)过程。

注 1:
    特别注意,LinkedTransferQueue是无界的(unbounded),在没有consumer线程只有producer线程的情况下可能迅速耗尽内存。

注 2:
    线程调用transfer而阻塞,线程是在自旋,超时后将调用Thread.yield(),当前线程切换到就绪状态,重新竞争CPU,避免自旋线程长期占用
    CPU。

【 SynchronousQueue 】


Java6的并发编程包中的SynchronousQueue是一个没有数据缓冲的BlockingQueue,生产者线程对其的插入操作put必须等待消费者的移除


操作take,反过来也一样。


因为没有数据缓存,因此无法直接查看队列中是否有数据,数据是在producer和consumer之间直接传递的。该队列的peek()方法永远返回null,


它的队列中的第一个元素是等待将数据交给consumer的producer线程。


该队列隐性实现了transfer接口,因此其外部行为与LinkedTransferQueue类似,只是transfer方法被put所替代。

【 DelayQueue 】


无界队列,内部由PriorityBlockingQueue承载(此队列内部是Array),DelayQueue用于放置实现了Delayed接口的对象,其中的对象只能在


到期时才能从队列中取走。DelayQueue是有序队列,使用conpareTo方法排序。


Delayed接口的定义如下:


public interface Delayed extends Comparable {


/**


* Returns the remaining delay associated with this object, in the


* given time unit.


*


* @param unit the time unit


* @return the remaining delay; zero or negative values indicate


* that the delay has already elapsed


*/


long getDelay(TimeUnit unit);


}

    实现Delayed接口的对象也需要实现 compareTo方法,该方法是从Comparable接口继承而来:
    public interface Comparable {
        public int compareTo(T o);
    }

    /*测试验证一下几种情形:
     * 1、加入多个Delayed对象,观察DelayQueue的排序;
     *      默认根据delay按照升序排列
     * 2、consumer执行take操作,在Delayed条件不满足时,consumer线程的状态;
     *      consumer执行take操作会被阻塞,知道getDelay返回0或者negetive
     * 3、多个consumer的场景
     *      DelayQueue对于consumer与LinkedBlockingQueue等队列一致
     * 3、总结其应用场景
     *      DelayQueue适用于需要控制时序的场景*/

     <>
        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();  /*获取锁*/
            try {
                for (;;) {
                    E first = q.peek();   /*看队列头是否有数据*/
                    if (first == null)
                        available.await();   /*队列为空,则consumer进入等待队列,此为BlockingQueue的特性*/
                    else {
                        long delay = first.getDelay(TimeUnit.NANOSECONDS);  /*获取first的delay,delay的值是关键*/
                        if (delay <= 0)     /*delay时间到,取出数据并返回*/
                            return q.poll();
                        else if (leader != null)    /*delay时间未到,且前面leader线程在等待执行,则当前线程休眠,作为follower*/
                            available.await();
                        else {              /*delay时间未到,leader空缺,则当前线程成为leader,并休眠到delay超时,然后重新判断*/
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                available.awaitNanos(delay); /*挂起当前线程直到中断、signaled、或超时*/
                            } finally {     /*将leader复位*/
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {    /*在返回之前需要执行,如果队列头还有数据,且没有leader在执行,则激活一下condition上等待队列的followers*/
                if (leader == null && q.peek() != null)
                    available.signal();
                lock.unlock();  /*释放锁,然后才执行return*/
            }
        }

    综上:
        DelayQueue通过lock的condition.awaitNanos(delay)来让执行take操作的consumer进入休眠状态delay长的时间,然后让consumer
    重新判断first的delay是否<=0(超时),接着使用poll()将first取走返回给consume。

【PriorityBlockingQueue】


内部是数组实现的,private transient Object[] queue;


构造函数:


1、如果不指定大小,则默认是11,该队列是无界的,容量不够的时候会自动扩容


public PriorityBlockingQueue() {


this(DEFAULT_INITIAL_CAPACITY, null);


}

    2、public PriorityBlockingQueue(int initialCapacity,
                             Comparator comparator)
        需要指定一个Comparator,元素类型的下限是E

    <>
        指定比较器,需要在创建时指定一个Comparator,Comparator的泛型T就是放入队列的元素类型,在Comparator的compare方法中
    对两两元素进行比较,在offer方法中,最终会调用siftUpUsingComparator方法,加入指定的比较器,推测内部算法是基于二叉树的快速
    排序算法。
        PriorityBlockingQueue 并没有强制需要指定比较器,如果没有比较器,则使用natural compare,如填入整数,则按从小到大排列。
        PriorityBlockingQueue 并没有强制指定添加的元素要实现Comparable接口。

    <<适用场景>>
        在P-C的场景下,需要对待处理的对象进行优先级控制的时候,使用此队列,判断优先级的方式可以由使用者指定或使用默认比较器的
    默认比较规则。如DelayQueue,通过对delay的比较,对元素进行了排序,然后通过condition的await操作让consumer等待到delay时间到
    来实现功能。有个区别就是DelayQueue要求其元素必须实现Delayed接口,而非直接指定Comparator。

【总结】


通过调试与分析上述Queue的代码,有以下收获:


1、基本掌握了BlockingQueue家族成员的使用方法和场景


2、理解了Blocking的实现原理,并学习了lock的condition的使用


3、理解了LinkedBlockingQueue如何使用put和take两把锁来提高读/写操作的并发处理能力和吞吐量


4、比较了ArrayBlockingQueue与LinkedBlockingQueue的性能优劣


5、SynchronousQueue与LinkedTransferQueue的各自特点,尤其是前者的容量为0,后者的transfer以及tryTransfer方法的使用,以及两者作为


线程池的任务队列对线程池性能的影响,对内存消耗的影响


6、ScheduleExecutorService与DelayQueue,使用DelayQueue控制任务的执行


7、使用PriorityBlockingQueue实现DelayQueue

转载请注明:CodingBlog » [JAVA学习与进阶学习与进阶笔记-77]相关BlockingQueue

喜欢 (0)or分享 (0)
发表我的评论
取消评论

*

表情