ReentrantLock,Condition_JAVA_编程开发_程序员俱乐部

中国优秀的程序员网站程序员频道CXYCLUB技术地图
热搜:
更多>>
 
您所在的位置: 程序员俱乐部 > 编程开发 > JAVA > ReentrantLock,Condition

ReentrantLock,Condition

 2018/1/26 12:50:00  knight_black_bob  程序员俱乐部  我要评论(0)
  • 摘要:publicclassReentrantLockAndConditionTest{publicstaticvoidmain(String[]args){ReentrantLockQueuequeue=newReentrantLockQueue();for(inti=0;i<100;i++){queue.put("a");Stringstring=queue.getString();System.out.println(string);}
  • 标签:Ant

?

?

?

class="java" name="code">public class ReentrantLockAndConditionTest {

    public static void main(String[] args) {
    	ReentrantLockQueue queue =new  ReentrantLockQueue();
    	for (int i = 0; i < 100; i++) {
			queue.put("a");
			String string = queue.getString();
			System.out.println(string); 
		}
    	
	}


    public abstract class MessageQueue<T>{
    	private Queue<T> queue;
    	private List<FailedMessageWrap> resendList;
    	protected int resendSleepInterval = 1000 * 60 ;
    	protected int maxFailedCount = 10;
    	private Lock sendLock = new ReentrantLock();
    	private Condition sendCondition = sendLock.newCondition();
    	private Lock resendLock = new ReentrantLock();
    	private volatile boolean stopRequired ;
    	 
    	public MessageQueue(){ 
    		queue = new LinkedList<T>(); 
    		resendList = new LinkedList<FailedMessageWrap>(); 
    		stopRequired = false;
    		 
    		ExecutorService sendService = Executors.newFixedThreadPool(1);
    		for (int i = 0; i < 1; i++) {
    			sendService.execute(new SendTask());
    		} 
    		Executors.newSingleThreadExecutor().execute(new ResendTask());
    	}
    	 
    	public void send(T message){
    		if(message == null){
    			return;
    		}
    		
    		try {
    			sendLock.lock(); 
    			queue.add(message); 
    			sendCondition.signalAll();
    		}finally{
    			sendLock.unlock();
    		}
    		
    	}
    	 
    	public void stop(){
    		stopRequired = true;
    	}
    	 
    	protected abstract boolean doSend(T message);
    	 
    	class FailedMessageWrap{
    		private T message;
    		private int failedCount;
    		
    		FailedMessageWrap(T message){
    			this.message = message;
    			failedCount = 0;
    		}

    		public int getFailedCount() {
    			return failedCount;
    		}
     
    		public void increaseFailedCount() {
    			this.failedCount += 1;
    		}

    		public T getMessage() {
    			return message;
    		}
    		
    	}
     
    	class SendTask implements Runnable{
    		@Override
    		public void run() {
    			while(!stopRequired){
    				T message;
    				
    				try {
    					sendLock.lock(); 
    					message = queue.poll();
    					if(message == null){
    						try {
    							sendCondition.await();
    						} catch (Exception e) {
    							e.printStackTrace();
    						}
    						continue;
    					}
    				}finally{
    					sendLock.unlock();
    				}
    					
    				if(!doSend(message)){
    					try {
    						resendLock.lock(); 
    						resendList.add(new FailedMessageWrap(message));
    					} finally{
    						resendLock.unlock();
    					}
    				}
    			
    			}
    			
    		}
    	}
    	 
    	class ResendTask implements Runnable{
    		@Override
    		public void run() { 
    			while(!stopRequired){
    				
    				try {
    					Thread.sleep(resendSleepInterval);
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				} 
    				List<FailedMessageWrap> removeList = new ArrayList<FailedMessageWrap>();
    				
    				try {
    					resendLock.lock();
    					 
    					for(FailedMessageWrap messageWrap : resendList){
    						if(messageWrap.getFailedCount() > maxFailedCount){
    							removeList.add(messageWrap);
    							continue;
    						}
    						
    						T message =  messageWrap.getMessage(); 
    						if(!doSend(message)){
    							messageWrap.increaseFailedCount();
    						}else{
    							removeList.add(messageWrap);
    						}
    					}
    				 
    					for (FailedMessageWrap messageWrap : removeList) {
    						resendList.remove(messageWrap);
    					}
    				} finally{
    					resendLock.unlock();
    				}
    			
    			}
    		}
    	}
    	

    }
    
    
	
    public static class ReentrantLockQueue{
    	private ReentrantLock  lock = new ReentrantLock();
    	private Queue<String> queue = new LinkedList<String>(); 
    	public  void put(String s){
    		try{
	    		lock.lock();
	    		queue.add(s);
    		}catch(Exception e){
    			
    		}finally{
    			lock.unlock();
    		}
    	}
    	
    	public String getString(){
    		try{
    			lock.lock();
        		String poll = queue.poll();
        		return poll;
    		}catch(Exception e){
    			
    		}finally{
    			lock.unlock();
    		}
			return null;
    	}
    	
    }
	
    
    
	
	

}

?

?

?

?

?

?

?

?

?

?

?

?

?

捐助开发者?

在兴趣的驱动下,写一个免费的东西,有欣喜,也还有汗水,希望你喜欢我的作品,同时也能支持一下。 当然,有钱捧个钱场(支持支付宝和微信 以及扣扣群),没钱捧个人场,谢谢各位。

?

个人主页:http://knight-black-bob.iteye.com/



?
?
?谢谢您的赞助,我会做的更好!

上一篇: Collections,Synchronized 下一篇: join
发表评论
用户名: 匿名