读DelayQueen源码_JAVA_编程开发_程序员俱乐部

中国优秀的程序员网站程序员频道CXYCLUB技术地图
热搜:
更多>>
 
您所在的位置: 程序员俱乐部 > 编程开发 > JAVA > 读DelayQueen源码

读DelayQueen源码

 2017/8/7 18:31:18  红领巾丶  程序员俱乐部  我要评论(0)
  • 摘要://一个基于二叉堆优先级的延迟取出队列。//先看构造函数。publicDelayQueue(){}publicDelayQueue(Collection<?extendsE>c){this.addAll(c);}//将collection插入到内部的PriorityQueue中。publicbooleanaddAll(Collection<?extendsE>c){if(c==null)thrownewNullPointerException();if(c==this
  • 标签:源码
class="java">
//一个基于二叉堆优先级的延迟取出队列。
//先看构造函数。

public DelayQueue() {}

public DelayQueue(Collection<? extends E> c) {
        this.addAll(c);
    }

 //将collection插入到内部的PriorityQueue中。
 public boolean addAll(Collection<? extends E> c) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        boolean modified = false;
        for (E e : c)
            if (add(e))
                modified = true;
        return modified;
    }


    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

//插入元素是插入到内部的PriorityQueue中。
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);
	    //如果插入的是头元素(即当前头元素变了)
            if (q.peek() == e) {
	    //将leader线程置为null
                leader = null;
            //唤醒等待的线程
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }

    public boolean offer(E e, long timeout, TimeUnit unit) {
        return offer(e);
    }


    public void put(E e) {
        offer(e);
    }
//获取元素但是并不移出
     public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
	    //用底层的priorityQueue实现。
            return q.peek();
        } finally {
            lock.unlock();
        }
    }

//获取头元素如果头元素为空或者还没有到出列时间返回null。
     public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
	   //获取头元素
            E first = q.peek();
	    //如果头元素是空或者获取到的头元素还没有到出列时间返回null
            if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
                return null;
            else
                return q.poll();
        } finally {
            lock.unlock();
        }
    }

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
		//如果没空
                if (first == null) {
		//超时了返回null
                    if (nanos <= 0)
                        return null;
                    else
		    //挂起当前线程nanos时间
                        nanos = available.awaitNanos(nanos);
                } else {
                    long delay = first.getDelay(TimeUnit.NANOSECONDS);
		    //到时间了可以取出
                    if (delay <= 0)
                        return q.poll();
		    //超时了返回null
                    if (nanos <= 0)
                        return null;
		    //如果等待时间小于出列时间或者leader线程!=null则挂起当前线程
		    if (nanos < delay || leader != null)
                        nanos = available.awaitNanos(nanos);
                    else {
		    //没有其他leader线程则将自己置为leader线程
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            long timeLeft = available.awaitNanos(delay);
                            nanos -= delay - timeLeft;
                        } finally {
			  //如果leader是自己
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
	    //释放锁
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

//可阻塞的获取队列头结点
 public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
		//没有获取到则等待
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(TimeUnit.NANOSECONDS);
                    //到达队列出列时间返回
		    if (delay <= 0)
                        return q.poll();
		    //如果leader不等于null阻塞当前线程让leader线程取走当前元素
                    else if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
			//当前线程设置为leader
                        leader = thisThread;
                        try {
			    //等待delay时间被唤醒
                            available.awaitNanos(delay);
                        } finally {
			   //醒来之后如果我是leader让其他线程变为leader。
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
	    //如果leader为空并且有头唤醒其他线程。
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }


    /**leader线程是当前正在获取头节点的线程。在offer的时候如果插入的元素变为了头结点,
    此时将leader置为null,重新竞争leader。
    DelayQueen内部使用的是一个PriorityQueen类实现,所以可以保证插入顺序的优先级。
    然后根据实现Delayed接口的getDelay方法来获取元素是否已经到出列时间。
    */

//获取队列长度
    public int size() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.size();
        } finally {
            lock.unlock();
        }
    }
//获取容量(没有容量限制所以返回integer最大值)
public int remainingCapacity() {
        return Integer.MAX_VALUE;
    }

//移除元素
public boolean remove(Object o) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.remove(o);
        } finally {
            lock.unlock();
        }
    }


 public Object[] toArray() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.toArray();
        } finally {
            lock.unlock();
        }
    }

public <T> T[] toArray(T[] a) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.toArray(a);
        } finally {
            lock.unlock();
        }
    }
//移出队列中的可用元素并将他们封装到collection中
 public int drainTo(Collection<? super E> c) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int n = 0;
            for (;;) {
                E first = q.peek();
                if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
                    break;
                c.add(q.poll());
                ++n;
            }
            return n;
        } finally {
            lock.unlock();
        }
    }
//最多移除maxElements个可用元素到collection中。
public int drainTo(Collection<? super E> c, int maxElements) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        if (maxElements <= 0)
            return 0;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int n = 0;
            while (n < maxElements) {
                E first = q.peek();
                if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
                    break;
                c.add(q.poll());
                ++n;
            }
            return n;
        } finally {
            lock.unlock();
        }
    }


发表评论
用户名: 匿名