|
统队列是生产者线程和消费者线程从同一个队列中存取数据,必然需要互斥访问,在互相同步等待中浪费了宝贵的时间,使队列吞吐量受影响。双缓冲队使用两个队列,将读写分离,一个队列专门用来读,另一个专门用来写,当读队列空或写队列满时将两个队列互换。这里为了保证队列的读写顺序,当读队列为空且写队列不为空时候才允许两个队列互换。
经过测试性能较JDK自带的queue的确有不小提高。
测试是和JDK6中性能最高的阻塞Queue:java.util.concurrent.ArrayBlockingQueue做比较,这个队列是环形队列的实现方式,性能还算不错,不过我们的目标是没有最好,只有更好。
测试场景:
起若干个生产者线程,往Queue中放数据,起若干个消费者线程从queue中取数据,统计每个消费者线程取N个数据的平均时间。
数据如下:
场景1
生产者线程数:1
消费者线程数:1
Queue容量:5w
取元素个数:1000w
JDK ArrayBlockingQueue用时平均为: 5,302,938,177纳秒
双缓冲队列用时平均为: 5,146,302,116纳秒
相差大概160毫秒
场景2:
生产者线程数:5
消费者线程数:4
Queue容量:5w
取元素个数:1000w
JDK ArrayBlockingQueue用时平均为: 32,824,744,868纳秒
双缓冲队列用时平均为: 20,508,495,221纳秒
相差大概12.3秒
可见在生产者消费者都只有一个的时候存和取的同步冲突比较小,双缓冲队列优势不是很大,当存取线程比较多的时候优势就很明显了。
队列主要方法如下:
[pre]/** * * CircularDoubleBufferedQueue.java * 囧囧有神 * @param <E>2010-6-12 */public class CircularDoubleBufferedQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable{ private static final long serialVersionUID = 1L; private Logger logger = Logger.getLogger(CircularDoubleBufferedQueue.class.getName()); /** The queued items */ private final E[] itemsA; private final E[] itemsB; private ReentrantLock readLock, writeLock; private Condition notEmpty; private Condition notFull; private Condition awake; private E[] writeArray, readArray; private volatile int writeCount, readCount; private int writeArrayHP, writeArrayTP, readArrayHP, readArrayTP; public CircularDoubleBufferedQueue(int capacity) { if(capacity<=0) { throw new IllegalArgumentException("Queue initial capacity can't less than 0!"); } itemsA = (E[])new Object[capacity]; itemsB = (E[])new Object[capacity]; readLock = new ReentrantLock(); writeLock = new ReentrantLock(); notEmpty = readLock.newCondition(); notFull = writeLock.newCondition(); awake = writeLock.newCondition(); readArray = itemsA; writeArray = itemsB; } private void insert(E e) { writeArray[writeArrayTP] = e; ++writeArrayTP; ++writeCount; } private E extract() { E e = readArray[readArrayHP]; readArray[readArrayHP] = null; ++readArrayHP; --readCount; return e; } /** *switch condition: *read queue is empty && write queue is not empty * *Notice:This function can only be invoked after readLock is * grabbed,or may cause dead lock * @param timeout * @param isInfinite: whether need to wait forever until some other * thread awake it * @return * @throws InterruptedException */ private long queueSwitch(long timeout, boolean isInfinite) throws InterruptedException { writeLock.lock(); try { if (writeCount <= 0) { logger.debug("Write Count:" + writeCount + ", Write Queue is empty, do not switch!"); try { logger.debug("Queue is empty, need wait...."); if(isInfinite && timeout<=0) { awake.await(); return -1; } else { return awake.awaitNanos(timeout); } } catch (InterruptedException ie) { awake.signal(); throw ie; } } else { E[] tmpArray = readArray; readArray = writeArray; writeArray = tmpArray; readCount = writeCount; readArrayHP = 0; readArrayTP = writeArrayTP; writeCount = 0; writeArrayHP = readArrayHP; writeArrayTP = 0; notFull.signal(); logger.debug("Queue switch successfully!"); return -1; } } finally { writeLock.unlock(); }} public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if(e == null) { throw new NullPointerException(); } long nanoTime = unit.toNanos(timeout); writeLock.lockInterruptibly(); try { for (;;) { if(writeCount < writeArray.length) { insert(e); if (writeCount == 1) { awake.signal(); } return true; } //Time out if(nanoTime<=0) { logger.debug("offer wait time out!"); return false; } //keep waiting try { logger.debug("Queue is full, need wait...."); nanoTime = notFull.awaitNanos(nanoTime); } catch(InterruptedException ie) { notFull.signal(); throw ie; } } } finally { writeLock.unlock(); } } public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanoTime = unit.toNanos(timeout); readLock.lockInterruptibly(); try { for(;;) { if(readCount>0) { return extract(); } if(nanoTime<=0) { logger.debug("poll time out!"); return null; } nanoTime = queueSwitch(nanoTime, false); } } finally { readLock.unlock(); } }} [/pre]Ps:测试时候要把queue类中debug关掉,否则打印debug日志会对queue性能有不小的影响。
[/td][/tr][/table] |
|