Java学习者论坛

 找回密码
 立即注册

QQ登录

只需一步,快速开始

手机号码,快捷登录

恭喜Java学习者论坛(https://www.javaxxz.com)已经为数万Java学习者服务超过8年了!积累会员资料超过10000G+
成为本站VIP会员,下载本站10000G+会员资源,购买链接:点击进入购买VIP会员
JAVA高级面试进阶视频教程Java架构师系统进阶VIP课程

分布式高可用全栈开发微服务教程

Go语言视频零基础入门到精通

Java架构师3期(课件+源码)

Java开发全终端实战租房项目视频教程

SpringBoot2.X入门到高级使用教程

大数据培训第六期全套视频教程

深度学习(CNN RNN GAN)算法原理

Java亿级流量电商系统视频教程

互联网架构师视频教程

年薪50万Spark2.0从入门到精通

年薪50万!人工智能学习路线教程

年薪50万!大数据从入门到精通学习路线年薪50万!机器学习入门到精通视频教程
仿小米商城类app和小程序视频教程深度学习数据分析基础到实战最新黑马javaEE2.1就业课程从 0到JVM实战高手教程 MySQL入门到精通教程
查看: 666|回复: 0

[算法学习]服务器端利器--java双缓冲队列

[复制链接]
  • TA的每日心情
    开心
    2021-3-12 23:18
  • 签到天数: 2 天

    [LV.1]初来乍到

    发表于 2014-12-2 00:06:35 | 显示全部楼层 |阅读模式
    统队列是生产者线程和消费者线程从同一个队列中存取数据,必然需要互斥访问,在互相同步等待中浪费了宝贵的时间,使队列吞吐量受影响。双缓冲队使用两个队列,将读写分离,一个队列专门用来读,另一个专门用来写,当读队列空或写队列满时将两个队列互换。这里为了保证队列的读写顺序,当读队列为空且写队列不为空时候才允许两个队列互换。 经过测试性能较JDK自带的queue的确有不小提高。  测试是和JDK6中性能最高的阻塞Queue:java.util.concurrent.ArrayBlockingQueue做比较,这个队列是环形队列的实现方式,性能还算不错,不过我们的目标是没有最好,只有更好。

    测试场景:
        起若干个生产者线程,往Queue中放数据,起若干个消费者线程从queue中取数据,统计每个消费者线程取N个数据的平均时间。
      
       
       
         
       

         
       
      
    数据如下:
    场景1
    生产者线程数:1
    消费者线程数:1
    Queue容量:5w
    取元素个数:1000w
    JDK ArrayBlockingQueue用时平均为:  5,302,938,177纳秒
    双缓冲队列用时平均为:                      5,146,302,116纳秒
    相差大概160毫秒

    场景2:
    生产者线程数:5
    消费者线程数:4
    Queue容量:5w
    取元素个数:1000w
    JDK ArrayBlockingQueue用时平均为:  32,824,744,868纳秒
    双缓冲队列用时平均为:                      20,508,495,221纳秒
    相差大概12.3秒

    可见在生产者消费者都只有一个的时候存和取的同步冲突比较小,双缓冲队列优势不是很大,当存取线程比较多的时候优势就很明显了。
      队列主要方法如下: /**
    *
    * CircularDoubleBufferedQueue.java
    * ��有神
    * @param <E>2010-6-12
    */
    public class CircularDoubleBufferedQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable
    {
            private static final long serialVersionUID = 1L;
            private Logger logger =   Logger.getLogger(CircularDoubleBufferedQueue.class.getName());
        /** The queued items  */
        private final E[] itemsA;
        private final E[] itemsB;
           
            private ReentrantLock readLock, writeLock;
            private Condition notEmpty;
            private Condition notFull;
            private Condition awake;
           
           
        private E[] writeArray, readArray;
        private volatile int writeCount, readCount;
        private int writeArrayHP, writeArrayTP, readArrayHP, readArrayTP;
           
           
            public CircularDoubleBufferedQueue(int capacity)
            {
                    if(capacity<=0)
                    {
                            throw new IllegalArgumentException("Queue initial capacity can"t less than 0!");
                    }
                   
                    itemsA = (E[])new Object[capacity];
                    itemsB = (E[])new Object[capacity];
                    readLock = new ReentrantLock();
                    writeLock = new ReentrantLock();
                   
                    notEmpty = readLock.newCondition();
                    notFull = writeLock.newCondition();
                    awake = writeLock.newCondition();
                   
                    readArray = itemsA;
                    writeArray = itemsB;
            }
           
            private void insert(E e)
            {
                    writeArray[writeArrayTP] = e;
                    ++writeArrayTP;
                    ++writeCount;
            }
           
            private E extract()
            {
                    E e = readArray[readArrayHP];
                    readArray[readArrayHP] = null;
                    ++readArrayHP;
                    --readCount;
                    return e;
            }
           
            /**
             *switch condition:
             *read queue is empty && write queue is not empty
             *
             *Notice:This function can only be invoked after readLock is
             * grabbed,or may cause dead lock
             * @param timeout
             * @param isInfinite: whether need to wait forever until some other
             * thread awake it
             * @return
             * @throws InterruptedException
             */
            private long queueSwitch(long timeout, boolean isInfinite) throws InterruptedException
            {
            writeLock.lock();
            try
            {
                    if (writeCount <= 0)
                    {
        logger.debug("Write Count:" + writeCount + ", Write Queue is empty, do not switch!");
                            try
                            {
                                    logger.debug("Queue is empty, need wait....");
                                    if(isInfinite && timeout<=0)
                                    {
                                            awake.await();
                                            return -1;
                                    }
                                    else
                                    {
                                            return awake.awaitNanos(timeout);
                                    }
                            }
                            catch (InterruptedException ie)
                            {
                                    awake.signal();
                                    throw ie;
                            }
                    }
                    else
                    {
                            E[] tmpArray = readArray;
                            readArray = writeArray;
                            writeArray = tmpArray;
                            readCount = writeCount;
                            readArrayHP = 0;
                            readArrayTP = writeArrayTP;
                            writeCount = 0;
                            writeArrayHP = readArrayHP;
                            writeArrayTP = 0;
                                   
                            notFull.signal();
                            logger.debug("Queue switch successfully!");
                            return -1;
                    }
            }
            finally
            {
                    writeLock.unlock();
            }
    }
            public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException
            {
                    if(e == null)
                    {
                            throw new NullPointerException();
                    }
                   
                    long nanoTime = unit.toNanos(timeout);
                    writeLock.lockInterruptibly();
                    try
                    {
                            for (;;)
                            {
                                    if(writeCount < writeArray.length)
                                    {
                                            insert(e);
                                            if (writeCount == 1)
                                            {
                                                    awake.signal();
                                            }
                                            return true;
                                    }
                                   
                                    //Time out
                                    if(nanoTime<=0)
                                    {
                                            logger.debug("offer wait time out!");
                                            return false;
                                    }
                                    //keep waiting
                                    try
                                    {
                                            logger.debug("Queue is full, need wait....");
                                            nanoTime = notFull.awaitNanos(nanoTime);
                                    }
                                    catch(InterruptedException ie)
                                    {
                                            notFull.signal();
                                            throw ie;
                                    }
                            }
                    }
                    finally
                    {
                            writeLock.unlock();
                    }
            }
            public E poll(long timeout, TimeUnit unit) throws InterruptedException
            {
                    long nanoTime = unit.toNanos(timeout);
                    readLock.lockInterruptibly();
                   
                    try
                    {
                            for(;;)
                            {
                                    if(readCount>0)
                                    {
                                            return extract();
                                    }
                                   
                                    if(nanoTime<=0)
                                    {
                                            logger.debug("poll time out!");
                                            return null;
                                    }
                                    nanoTime = queueSwitch(nanoTime, false);
                            }
                    }
                    finally
                    {
                            readLock.unlock();
                    }
            }
    }
                      [/code] 附带队列类代码和测试类代码如下: double_queue.zip 欢迎大家提意见!
    Ps:测试时候要把queue类中debug关掉,否则打印debug日志会对queue性能有不小的影响。  



      
      
       
       

         
       

         
       
      
    复制代码

    源码下载:http://file.javaxxz.com/2014/12/2/000634984.zip
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|手机版|Java学习者论坛 ( 声明:本站资料整理自互联网,用于Java学习者交流学习使用,对资料版权不负任何法律责任,若有侵权请及时联系客服屏蔽删除 )

    GMT+8, 2024-4-16 14:08 , Processed in 0.535234 second(s), 48 queries .

    Powered by Discuz! X3.4

    © 2001-2017 Comsenz Inc.

    快速回复 返回顶部 返回列表