读SynchronousQueue源码_JAVA_编程开发_程序员俱乐部

中国优秀的程序员网站程序员频道CXYCLUB技术地图
热搜:
更多>>
 
您所在的位置: 程序员俱乐部 > 编程开发 > JAVA > 读SynchronousQueue源码

读SynchronousQueue源码

 2017/8/10 12:31:19  红领巾丶  程序员俱乐部  我要评论(0)
  • 摘要://先看构造方法publicSynchronousQueue(){this(false);}//公平模式或者非公平模式publicSynchronousQueue(booleanfair){transferer=fair?newTransferQueue():newTransferStack();}//所有的生产和消费都走的这个方法Objecttransfer(Objecte,booleantimed,longnanos){SNodes=null
  • 标签:源码 Ron
class="java">
//先看构造方法
public SynchronousQueue() {
        this(false);
    }
 //公平模式或者非公平模式
 public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue() : new TransferStack();
    }



//所有的生产和消费都走的这个方法
Object transfer(Object e, boolean timed, long nanos) {
            
            SNode s = null; // constructed/reused as needed
	    //e==null是消费者,否则为生产者。
            int mode = (e == null) ? REQUEST : DATA;

            for (;;) {
                SNode h = head;
		//如果头为空或者有相同的模式说明没有其他线程
                if (h == null || h.mode == mode) {  
		  //说明不能等待
                    if (timed && nanos <= 0) {    
		     //再次判断有没有head
                        if (h != null && h.isCancelled())
                            casHead(h, h.next);     // pop cancelled node
                        else
                            return null;
                      //如果可以等待,CAS设置当前node为head。
                    } else if (casHead(h, s = snode(s, e, h, mode))) {
		        //一直自旋等到匹配
                        SNode m = awaitFulfill(s, timed, nanos);
                        if (m == s) {               // wait was cancelled
                            clean(s);
                            return null;
                        }
                        if ((h = head) != null && h.next == s)
                            casHead(h, s.next);     // help s's fulfiller
                        return (mode == REQUEST) ? m.item : s.item;
                    }
                } else if (!isFulfilling(h.mode)) { // try to fulfill
                    if (h.isCancelled())            // already cancelled
                        casHead(h, h.next);         // pop and retry
                    else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                        for (;;) { // loop until matched or waiters disappear
                            SNode m = s.next;       // m is s's match
                            if (m == null) {        // all waiters are gone
                                casHead(s, null);   // pop fulfill node
                                s = null;           // use new node next time
                                break;              // restart main loop
                            }
                            SNode mn = m.next;
			    //尝试匹配
                            if (m.tryMatch(s)) {
                                casHead(s, mn);     // pop both s and m
                                return (mode == REQUEST) ? m.item : s.item;
                            } else                  // lost match
                                s.casNext(m, mn);   // help unlink
                        }
                    }
                } else {                            // help a fulfiller
                    SNode m = h.next;               // m is h's match
                    if (m == null)                  // waiter is gone
                        casHead(h, null);           // pop fulfilling node
                    else {
                        SNode mn = m.next;
                        if (m.tryMatch(h))          // help match
                            casHead(h, mn);         // pop both h and m
                        else                        // lost match
                            h.casNext(m, mn);       // help unlink
                    }
                }
            }
        }


boolean casHead(SNode h, SNode nh) {
            return h == head &&
                UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
        }


 SNode awaitFulfill(SNode s, boolean timed, long nanos) {
            //阻塞版本timed为false所以这里是0
            long lastTime = timed ? System.nanoTime() : 0;
            Thread w = Thread.currentThread();
            SNode h = head;
	    //是否需要旋转如果需要则自旋需要则根据timed判断自旋多少次。
	    //这里有一个疑问为什么要自旋而不直接挂起。我觉得是因为效率问题。
	    //自旋等待可以减少线程调度。
            int spins = (shouldSpin(s) ?
                         (timed ? maxTimedSpins : maxUntimedSpins) : 0);
            for (;;) {
                if (w.isInterrupted())
                    s.tryCancel();
                SNode m = s.match;
                if (m != null)
                    return m;
                if (timed) {
                    long now = System.nanoTime();
                    nanos -= now - lastTime;
                    lastTime = now;
                    if (nanos <= 0) {
                        s.tryCancel();
                        continue;
                    }
                }
                if (spins > 0)
                    spins = shouldSpin(s) ? (spins-1) : 0;
                else if (s.waiter == null)
                    s.waiter = w; // establish waiter so can park next iter
               //自旋完了就挂起
		else if (!timed)
                    LockSupport.park(this);
                else if (nanos > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanos);
            }
        }

boolean shouldSpin(SNode s) {
            SNode h = head;
	    //如果是头节点或者头节点为null
            return (h == s || h == null || isFulfilling(h.mode));
        }

 boolean tryMatch(SNode s) {
                if (match == null &&
                    UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
                    Thread w = waiter;
                    if (w != null) {    // waiters need at most one unpark
                        waiter = null;
                        LockSupport.unpark(w);
                    }
                    return true;
                }
                return match == s;
            }


/**
总结:这个类相当于一个生产者对应一个消费者。一个人生产了就必须要有一个来消费。
相当于一对一。一个线程进来然后设置为头结点,然后自旋等待。另外一个线程进来,发现有头结点了,
说明有线程了。然后将自己设置为头结点,并将next指向刚才的头结点。然后尝试匹配。匹配成功。
判断当前线程的模式,是生产者则取头结点的值,消费者则取第二个节点的值。
*/

发表评论
用户名: 匿名