回忆背调业务核心组件的开发_JAVA_编程开发_程序员俱乐部

中国优秀的程序员网站程序员频道CXYCLUB技术地图
热搜:
更多>>
 
您所在的位置: 程序员俱乐部 > 编程开发 > JAVA > 回忆背调业务核心组件的开发

回忆背调业务核心组件的开发

 2020/1/11 3:32:23  herman_liu76  程序员俱乐部  我要评论(0)
  • 摘要:这是我第一次做java技术比较全面和复杂的系统,当时刚从事互联网开发,它与传统单机增删改查的Web应用差别很大,那只是业务复杂。当时除了学习很多工具技巧外包括maven/git使用都才入门,线程处理的相关技术整合使用还不多,自己一个人一下做这个还是蛮有压力的,新的工作需要打响第一P。这时我只看过一部分dubbo与druid源码,有了些想法,最后一步步也顺利完成了。开发一个系统其实就是组织一个工厂,安排各种人员,各司其职又相互协作,处理业务的过程。不过后来又看了些源码,比如rocketmq
  • 标签:开发
    这是我第一次做java技术比较全面和复杂的系统,当时刚从事互联网开发,它与传统单机增删改查的Web应用差别很大,那只是业务复杂。当时除了学习很多工具技巧外包括maven/git使用都才入门,线程处理的相关技术整合使用还不多,自己一个人一下做这个还是蛮有压力的,新的工作需要打响第一P。这时我只看过一部分dubbo与druid源码,有了些想法,最后一步步也顺利完成了。
    开发一个系统其实就是组织一个工厂,安排各种人员,各司其职又相互协作,处理业务的过程。不过后来又看了些源码,比如rocketmq,发现有些相似的思想,也有更多设计显的稚嫩。现在看的多了,更可以参考优秀的设计,无法结构/代码/细节功能都可以向高水平看齐。

    两年前做的系统,有时介绍起来都想不起来细节了,因为很多地方都反复想了多个方案,比如是否按类型分线程池处理等。所以打算总结一下,随便分析一下可以改进的地方。

# 1. 系统的功能与特点

?    这个系统有点类似于消息中间件的开发,调查服务端(以下简称  SS)接收的消息是一个个背调请求,系统要持久化,每个请求要再要分解出一个个单项调查子任务,推送到各个调查客户端(以下简称  SC)进行消费,及时的调查返回结果,人工调查 SC只返回收到任务。

?    比如用户选择调查一个人的身份/学历/处罚/不良信息。SS接收这个任务,拆解后分别发送给处理身份,学历,处罚,不良信息的四类SC,每类可能不只一个应用,也许有两个调用不良信息的SC都可以完成单独的任务。这个系统有以下几个特点:

  • - SS与SC之间通过公司自己开发的基于netty的通讯组件进行,我参与了这个组件开发,在分析通讯协议设计的文章中介绍了我们的通讯组件。rocketmq中也是自己的通讯组件,自己的专用协议。
  • - 这个过程中还要维护调查客户端的连接,状态,同类的客户端要进行选择性的推送。这个类似于nameServer的功能,有心跳功能,要监听通讯通道的状态,我们集成在SS中,更没有实现高可用。
  • - SS要嵌入到一个web应用中,接收企业服务应用过来的http背调请求,未来内部使用还考虑也支持自己的remote组件,比如相应的企业服务中可以引用背调请求的生产者。当然对外部推广客户还是要http的方式的跨防火墙请求。
  • - 每一个SC也会嵌入一个web应用中,可以查看接收的单项调查子任务的情况。SC要容纳各种子任务的实现,通过配置化只加载其中的一种。SC只处理push过来的子任务,没实现主动pull。
  • - SS收到的消息要支持持久化,提供相应的接口,可以配置具体的方式。
  • - 异步处理为主。比如SC完成的结果后通知SS,SS有监听器异步监听完成情况,进行更新。
  • - 后期由于提出了人工干预同类SC的处理,以及有人工电话调查SC并准备使用dubbo技术,所以又改造了一下,增加了中间处理层(以下简称SM),同类的SC注册到SM中,而SM又注册到SS中。SM也有局部的nameServer的功能,也有业务功能。



# 2. 项目的组成

? 项目由一个总的pom工程,和三个jar工程组成,分别是survey-client,survey-middle,survey-server组成。



? 下面以survey-server为主,其它两个模块简单介绍。

# 3. 服务端的设计

## 3.1 总体设计

? 服务端有两大功能,分别是处理背调请求,维护客户端的状态。有点类似与rocketmq的broker与nameSvr的两大功能。

? 这两个功能都有核心类,因为因为附属功能都在其中,所以我当时喜欢叫Container,也许现在会学rocketmq叫controller了。两个核心类不需要远程通讯,但它们相互引用。

? TaskContainer管理任务,AppContainer管理注册的客户端,它们由一个inti类进行统一启动两个核心类,以及注入持久化实现类。同时init类还会启动一个守护进程,输出一些核心类中的重要日志信息给控制台。




## 3.2 通讯中间件使用

基于netty开发。

服务端事先配置可以连接的客户端的信息,包括appKey,code,appSecret等信息。如果客户端连接成功了,会有一个sessionId,由服务端保存。其内部有验证,重连等机制。

通讯层本身有连接心跳功能,但上层还需要一个业务心跳,传递业务执行情况与服务器状态变化 ,上层选择客户端时就可以基于多种策略了。

### 3.2.1 服务端发消息与处理

**服务端的启动:**

class="java">```java
//服务端启动,包括端口以及可允许连接的客户端信息。这些信息用于底层校验客户端
ServerInit.init(serverPort, clientAppList);
```


**服务端push消息给客户端:**

一般通讯层服务端会返回连接好的channel给使用者,使用者可以包装成自己的channel进行使用。这里并不提供channel,其内部持有。外部使用如下方式:

```java
//ServerPushHandler是通讯层的类,有静态方法发信息给客户端。
//根据客户端的sessionId发送一个【任务(名称)】以及数据,并设置好返回结果的回调对象来异步处理返回值。
//当然也可以当成dispatcher来用。
boolean bln = ServerPushHandler.pushBySessionId(subTaskInfo.getSurveyClient().getSessionId(), "assignTaskToClient", body, new ServerPushTaskCallback());
```


客户端会注册对应【任务(名称)】的处理类,来处理接收到的数据并返回值。

```java
//客户端配置对应的处理类。
PushReceiverCenter.registReceiver("assignTaskToClient", new PushReceiverFeedbackHandler(apiInvorkerInterface));
```


### 3.2.2 客户端发消息给与服务端处理

**客户端启动:**

```java
//设置状态监听类。底层netty监控到状态变化会通知这个类对象来处理
client = ClientCenter.getAClient(this.serverIP, serverPort, this.appkey, this.appSecret,clientStatusListener);
```


**发送与接收处理:**

```java
//客户端发送业务心跳数据的方式,消息本身包含【消息名称】
//同时还设置了服务端返回值的处理类,确认服务端已经收到了。
MiddleMsg msg = new MiddleMsg("getClientSystemInfo", body);
ResultObject res = client.sendMsg(msg, new ReportClientInfoFeedbackHandler());


//服务端注册消息的处理类,根据【消息名称】,选择对应的处理类
register.addMsgServiceHandler("getClientSystemInfo",RecvClientInfoHandler);
```




### 3.2.3 设计改进

? 在rocketmq中,通讯层与核心类不直接引用,中间有一个outApi的类隔离着,对核心类与其它类提供所有的通讯功能服务。

? 我的设计中,通讯层属于AppContainer,直接使用了,考虑整体通讯的统一性,应该被一个独立的类被引用使用,独立的类负责通讯,注册消息处理类等功能。



## 3.2 客户端管理核心类的功能

### 3.2.1 属性

? 客户端管理类,主要有配置的客户端与在线的客户端。由于由服务端与客户端两层改为三层,并且原始的配置值被意义被要求改变,比如appKey由一个客户端变成了一类客户端。还涉及到客户端配置管理服务不能及时变化造成的一定冗余。有些传参被要求变动,还涉及到通讯层的参数变动,所以属性中有些意义发生变化。

? ClientApp是通讯层要的配置客户端类,SurveyApp是本业务中的客户端。

```java
public class AppContainer {
	
	private static final Logger logger =Logger.getLogger(AppContainer.class);
	private String                           serverIp;
	private String                           serverPort;
	/**用户单例锁*/
	private static Boolean lockSigleton = true; 
	private static AppContainer appContainer;//单例使用
	
	/**配置的app,由于底层没有code,用appKey记录,所以监听底层的Client用AppKey确定。
	//【appKey--(SurveyApp--SurveyClientList)】*/
	public Map<String,SurveyApp> appHolder=new HashMap<String,SurveyApp>();
	
	/**配置的app,任务用Code记录(在任务处理中,因为子任务中使用AppCode指明使用的App类型,这是从产品那边配置的,不太可能用AppKey这个不是很清晰的码)
	 * 【appCode--(SurveyApp--SurveyClientList)】
	 */
	public Map<String,SurveyApp> appCodeHolder=new HashMap<String,SurveyApp>();
	
	/**配置的Client(ClientCode--SurveyClient)*/
	public Map<String,SurveyClient> clientHolder=new HashMap<String,SurveyClient>();
	
	/**可维护的Client在线列表。
	 * <key为appKey(代表一类客户端),内部一组客户端,有并发控制<sessionId,client>>
	 【appKey--(sessionId--onlineClient)】
	 */
	public volatile Map<String, ConcurrentHashMap<String, SurveyClient>> onlineClientMap=new HashMap<String, ConcurrentHashMap<String,SurveyClient>>();
	
	/**根据配置的SurveyClient,生成ClientApp列表,仅用于启动中间件前给底层中间件传递*/
	private List<ClientApp> clientAppList=new ArrayList<ClientApp>();
	
	private TaskContainer taskContainer;//引用任务处理容器
	
	public static SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
	
	public boolean isMiddlewearStarted=false;
	
```


### 3.2.2 主要方法

- 启动通讯服务端,注册三个业务处理类:客户端状态监听,处理客户端业务心跳,处理客户端完成任务情况上报处理。

```java
    public void initMiddlerWareStart(){
		logger.info("【背调中心】注册客户心跳消息与任务完成消息的回调处理");
		try {
			// 获取应用client基本信息
			MsgServiceHandlerRegister register = MsgServiceHandlerRegister.getRegister();
			//注册事件处理类
			MsgServiceHandlerRegister.setEventHandlerClass(ClientStatusListener.class);
	register.addMsgServiceHandler("getClientSystemInfo",RecvClientInfoHandler.class);
			register.addMsgServiceHandler("subTaskFinishInfo", RecvClientTaskHandler.class);
			new Thread(new Runnable(){
			    @Override
			    public void run() {
			        // TODO Auto-generated method stub
			    	try {
			    		isMiddlewearStarted=true;
			    		ServerInit.init(StringUtils.isEmpty(serverPort)?9166:Integer.parseInt(serverPort), clientAppList);
					} catch (Exception e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
						isMiddlewearStarted=false;
					}
			    }
			}).start();
			logger.debug("【背调中心】启动消息服务成功!端口:" + serverPort);
		} catch (Exception e) {
			isMiddlewearStarted=false;
			e.printStackTrace();
			logger.error("【背调中心】启动消息服务失败!异常:" + e.toString());
		}
    }
```


- ClientStatusListener主要监听客户端登录成功事件,以及客户端断开连接事件。登录成功将产生一个surveyClient,并以sessionId为key存在map中。surveyClient还有一部分业务信息,比如权重等,来自业务心跳给补充。

```java
//public class ClientStatusListener extends AbstractEventHandler中的方法。
//将产生一个surveyClient客户端。
@Override
	public void loginSuccess(EventInfo res) {
		// TODO Auto-generated method stub
		super.loginSuccess(res);
		ContainerInit containerInit = ContainerInit.getInstance();
		if (containerInit != null) {
			ClientApp clientApp = res.getAppinfo();
			Map<String, SurveyClient> surveyClientMap = AppContainer.instance.onlineClientMap.get(clientApp.getAppKey());
			if (surveyClientMap == null)
				surveyClientMap = new ConcurrentHashMap<String, SurveyClient>();
			SurveyApp surveyApp = AppContainer.appHolder.get(clientApp.getAppKey());
			// 1.新建一个调查客户端,以sessionId为key,记录在app表下面。
			// surveyClient中一部分来源上监听,另一部分信息要来源于业务心跳。
			SurveyClient surveyClient = new SurveyClient(clientApp.getIp(), "", clientApp.getSessionId(), clientApp.getChannelId(), surveyApp);
			surveyClientMap.put(clientApp.getSessionId(), surveyClient);
			// 2.新建一个准备放置客户端下的子任务。
			TaskContainer.clientSubTaskInfoMap.put(clientApp.getSessionId(), new ArrayList<SubTaskInfo>());// 新建此客户端下的子任务容器
			logger.info("【中间件状态监听】此APP当前客户端总数:" + surveyClientMap.size());
		}
	}


//public class RecvClientInfoHandler implements MsgServiceHandler中的方法。
//将对surveyClient客户端信息进行补充。包括客户端的类型code,权重,更新时间等,未来增加其它性能数据。
	@Override
	public MiddleMsg handleMsgEvent(MsgEvent dm, MiddleMsg msg) {
		String body = msg.getBody() + "";
		String sessionId = msg.getHeader().getSessionID();
		String returnCode = "";
		SurveyResponse td = new SurveyResponse();
		try {
			ClientRealData clientRealData = JsonUtils.toBean(body, ClientRealData.class);
			//将客户端的实时信息设置到在线客户端的属性中
			Map<String, SurveyClient> onlineClientMap=(Map<String, SurveyClient>) AppContainer.onlineClientMap.get(clientRealData.getAppkey());
			SurveyClient surveyClient=onlineClientMap.get(sessionId);
//			logger.debug("【处理客户心跳】客户端【存在吗】?:"+surveyClient!=null);
			if(surveyClient!=null){
				logger.debug("【处理客户心跳】clientRealData:"+clientRealData.getClientCode()+"@"+clientRealData.getAppCode());
				surveyClient.setClientCode(clientRealData.getClientCode());
				surveyClient.setUpdateTime(new Date());
				surveyClient.setWeight(clientRealData.getWeight() == null ? "60" : clientRealData.getWeight());
			}
			returnCode = "success";
		} catch (Exception e) {
			e.printStackTrace();
			returnCode = "failure";
			logger.error("【处理客户心跳】消息失败!异常:" + e.toString());
		}
		td.setCode(returnCode);
		msg.setBody(td);
		return msg;
	}
```


  根据业务的类型与企业客户的级别,选择一个可用的客户端,这部分改变比较大,包括构建treemap得到权重与概率与级别要求,同类型还要按已经分配的任务决定给最少任务的。如果正好又不能用了,再递归找一个可用的。

```java
	public static SurveyClient getClientByUserRankAndClinetLever(String appCode,int userRank) {
		// 从appCode得到appKey,从而找到可用的在线客户端.让前端根据appKey来分子任务不可靠
		SurveyApp surveyApp = appCodeHolder.get(appCode);
		logger.debug("【策略2】未配置客户端,返回!appKey:"+appCode);
		return getClientByUserRankAndClinetLeverByAppkey(surveyApp.getAppKey(),userRank);
	}
	private static SurveyClient getClientByUserRankAndClinetLeverByAppkey(String appKey,int userRank) {
		// 从appCode得到appKey,从而找到可用的在线客户端.让前端根据appKey来分子任务不可靠
		//SurveyApp surveyApp = appCodeHolder.get(appCode);
		Map<String, SurveyClient> onlineSurveyClientMap = onlineClientMap.get(appKey);
		if(onlineSurveyClientMap==null){
			logger.info("【策略2】未配置客户端,返回!appKey:"+appKey);
			return null;
		}
		
		int onlineClientNum=onlineSurveyClientMap.size();
		logger.debug("【策略2】surveyApp(code|在线客户端数):"+appKey+"|" + onlineClientNum);
		// 用于权重随机的参数对象
		//logger.debug("【策略】可选用客户端的数:" + onlineClientNum);
		Map<String, Integer> canUseClient = new HashMap<String, Integer>();//用于treemap排序。
		List<SurveyClient> sameLeverClientList = new LinkedList<SurveyClient>();//用于同级别客户端存放
		if (onlineClientNum == 0) {
			logger.info("【策略2】备选客户端为0,返回!");
			return null;
		}else {
			Iterator iter = onlineSurveyClientMap.entrySet().iterator();
			while (iter.hasNext()) {
				Map.Entry<String, SurveyClient> entry = (Map.Entry<String, SurveyClient>) iter.next();
				Integer eachWeight = new Integer(entry.getValue().getWeight() == null ? Constants.CLIENT_BASE_LEVER+"" : entry.getValue().getWeight());
				canUseClient.put(entry.getKey(), eachWeight);
				logger.debug("【策略2】循环可选用客户端详细信息(sessionId|weight):" + entry.getKey() + "|" + eachWeight);
				if(onlineClientNum==1) return entry.getValue();
			}
		}
		
		//找出可用的客户端,产生一个map,再构建treemap,用策略得到一个值。
		logger.info("【策略2】准备筛选的客户端个数:"+canUseClient.size());//canUseClient不含有重复的,所以得到选择的客户端值,还要再处理多个同值的情况。
		RankAndLeverTreeMapSelect rankAndLeverTreeMapSelect = new RankAndLeverTreeMapSelect(canUseClient,0);
		Integer chooseValue = rankAndLeverTreeMapSelect.chooseValue();
		logger.debug("【策略2】选择出的客户端lever:"+chooseValue);
		if(chooseValue==null) return null;
		for(SurveyClient surveyClient:onlineSurveyClientMap.values()){
			if(surveyClient.getWeight() == null) surveyClient.setWeight(Constants.CLIENT_BASE_LEVER+"");
			logger.debug("【策略2】当前比对的surveyClient.getWeight(null=60):"+(surveyClient.getWeight()));
			//if(StringUtils.isBlank(surveyClient.getWeight())) continue;//没有就是60分
			if(chooseValue.intValue()==new Integer(surveyClient.getWeight()).intValue()) sameLeverClientList.add(surveyClient);
		}
		if(sameLeverClientList.size()==1) return sameLeverClientList.get(0);
		//如果同一个值的客户端有多个,再排序,取任务最少的一个。
		Collections.sort(sameLeverClientList,new Comparator<SurveyClient>() {
			@Override
			public int compare(SurveyClient surveyClient1, SurveyClient surveyClient2) {
				//以下如果改变顺序则调换一下参数位置
				return surveyClient1.getTaskCount()-(surveyClient2.getTaskCount());
			}
			
		});
		SurveyClient surveyClient=sameLeverClientList.get(0);
		sameLeverClientList.clear();
		
		//如果找到的客户端不能用,就递归找一个,同时移除这个客户端
		SecretManagement m = ServerGlobal.sessionWithAppKeys.get(surveyClient.getSessionId());
		if(m!=null  && m.getChannel()!=null && m.getChannel().isWritable()){
			return surveyClient;
		}
		else{
			Map<String, SurveyClient> surveyClientMap=AppContainer.onlineClientMap.get(appKey);
			if(surveyClientMap.containsKey(surveyClient.getSessionId())){
				logger.debug("【策略。推送失败移除客户端再递归获取】sessionId:"+surveyClient.getSessionId());
				surveyClientMap.remove(surveyClient.getSessionId());// 根据通讯客户端,移除里面的调查客户端对象
			}
			return getClientByUserRankAndClinetLeverByAppkey(appKey,userRank);
		}
	}
```


```java
    /**rankAndLeverTreeMapSelect.chooseValue();
     * 策略:
     * 1.如果有用户级别值,比如90分,那100、80、70、60、40、20分的客户端中,选择最近的80分的客户端。
     * 2.如果用户没有级别,那80/70/60/40/20中,选择及格的最低的60,如果都不及格,选择最高的40。
     * 3.选择了一个分值的客户端,如果这里面有多个,再随机选择一个(未来根据完成情况或者性能)
     * <P></P>
     * @return
     */
	@Deprecated
    public K choose() {
...
    }
    
    /**
     * 考虑到重复情况,不能返回key了,只能返回特定value后再循环处理。
     * @return
     */
    public Integer chooseValue() {
    	if(treeMap.size()==0) return null;
    	if(treeMap.size()==1) return treeMap.firstEntry().getKey();
        if(_userRank>0d){//如果有用户级别
        	logger.debug("有用户级别值,找接近最大的。_userRank:"+_userRank);
        	SortedMap<Integer, K> headMap = this.treeMap.headMap(_userRank, true);
        	logger.debug("_userRank & headMap.size:"+_userRank+"|"+headMap.size());
        	if(headMap.size()==0) return treeMap.firstEntry().getKey();//如果找不到,给最低的。
        	return headMap.lastKey();
        }
        else{//如果无用户级别
        	logger.debug("无用户级别值,找及格里最小的。_baseLever:"+_baseLever);
        	SortedMap<Integer, K> tailMap = this.treeMap.tailMap(_baseLever, true);
        	logger.debug("_baseLever & headMap.size:"+_userRank+"|"+tailMap.size());
        	if(tailMap.size()==0) return treeMap.lastEntry().getKey();//如果都生活及格,找一个最大的。
        	return tailMap.firstKey();
        }
    }
```


## 3.3 业务处理核心类的功能

### 3.3.1 TaskContainer主要的属性

包括了背调任务存放,失败处理队列,子任务分发线程池,外部持久化接口实现,子任务完成监听类。超时时间,尝试次数配置。

```java
/**
 * 任务容器-管理背调任务与相关子任务
 * @author liujun
 * @date 2018年1月18日 上午10:33:01
 */
public class TaskContainer {
	private static final Logger logger = Logger.getLogger(TaskContainer.class);

	/** 实时总任务信息(taskid---TaskInfo(SubTaskInfoMap)) */
	public volatile static Map<String, TaskInfo> taskInfoMap = new ConcurrentHashMap<String, TaskInfo>();

	/** 任务在内存中允许的最大存放数 */
	private static Integer maxTaskMapSize=Integer.MAX_VALUE;
	/**
	 * 实时子任务信息(subtaskid---SubTaskInfo)
	 * 目的:子任务完成后,根据subTaskId从这里快速拿到对应的子任务。从上面的主任务不方便找。
	 * 不需要了,子任务中带有主任务ID,所以还是先拿主任务,再取子任务处理。
	 */
//	public volatile static Map<String, SubTaskInfo> subTaskInfoMap = new ConcurrentHashMap<String, SubTaskInfo>();
	
	/**使用阻塞队列,放置所有要处理的失败子任务.失败的任务先会再回线程池,之后超时会触发返回*/
	BlockingQueue<SubTaskInfo> subTaskInfoQueue = new LinkedBlockingQueue<SubTaskInfo>();
	
	/**任务超时是否自动处理,此超时不是推送客端尝试多次,而是等待子任务完成*/
	public boolean autoDealTimeoutSubtask = false;
	/**
	 * 子任务推送的最多尝试次数
	 */
	public static int maxRePushSubTaskTimes=5;
	/**
	 * 等待子任务完成的超时的时间
	 */
//	public static long maxTimeoutSubTaskDealTime=24*60*60000L;
	
	/**线上子任务超时时间*/
	public static long maxTimeoutOnlineSubTaskTime=60*1000L;
	
	/**主任务超时时间*/
	public static long maxTimeoutTaskTime=2*24*60*60*1000L;

	/**
	 * 一个配置的client下的实时子任务信息(sessionId-List<SubTaskInfo>)
	 * 用sessionId方便应对底层的上下线变化。中间件的事件只能得到sessionId,没有ClientCode。
	 */
	public volatile static Map<String, List<SubTaskInfo>> clientSubTaskInfoMap = new ConcurrentHashMap<String, List<SubTaskInfo>>();

	/** app,client配置容器类 */
	private AppContainer appContainer;

	/** 子任务分派用线程池 */
	private static ExecutorService executor = Executors.newCachedThreadPool();

	/** 监听器-子任务完成 */
	public SubTaskListener subTaskListener = new SubTaskListener();

	/** 监听器-客户端上下线事件 */
	public ClientStatusListener clientStatusListener = new ClientStatusListener();

	/** 任务池锁 */
	private Object TaskLock = new Object();

	/** 任务处理线程 */
	public SubTaskRedo subTaskRedo = new SubTaskRedo();
	/**
	 * 外部持久化任务接口
	 */
	public TaskPersistenceInterface taskPersistenceInterface;
```


### 3.3.2 主要方法

#### 接收背调任务及选择客户端后发出去

在initContainer中已经用TaskFactory,把请求参数处理成TaskInfo对象了,处理过程中已经持久化了。
```java
		String queryJsonStr = jsonParam.toString();
		logger.info("【发起任务】背调任务请求参数:" + queryJsonStr);
		if (containerInit != null) {
			logger.info("---------【用户发起背调了...】--------");
			TaskInfo taskInfo = TaskFactory.creatTaskInfo(jsonParam);
			taskContainer.createAndPutTaskPool(taskInfo);
		} else {
			logger.warn("【发起任务】背调中心没有启动");
		}
```


taskContainer处理背调任务:

```java
	/**
	 * <P>
	 * 根据提交请求,生成主任务子任务,放入登记的map,并放入子任务队列
	 * </P>
	 * @param paras
	 * @param appCode
	 * @throws SurveyException 
	 */
	public boolean createAndPutTaskPool (TaskInfo taskInfo) throws SurveyException {
		boolean createResult=false;
//		TaskInfo taskInfo = TaskFactory.creatTaskInfo(paras);
		logger.debug("【主任务Map添加】当前总数:" + taskInfoMap.size());
		if (taskInfoMap.size() >= maxTaskMapSize) {
			logger.error("【任务登记MAP】已满!");
			throw new SurveyException("任务登记已经满");
//			return false;
		} else {
			taskInfoMap.put(taskInfo.getTaskId(), taskInfo);
			Iterator<Map.Entry<String,SubTaskInfo>> iter =taskInfo.getSubTaskMap().entrySet().iterator();
			while (iter.hasNext()) {
				Map.Entry<String, SubTaskInfo> entry = (Map.Entry<String, SubTaskInfo>) iter.next();
				try {
					startExecutorCompletionService(entry.getValue());
				} catch (InterruptedException | ExecutionException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
					throw new SurveyException("任务提交线程池失败",e);
				}
			}
			logger.debug("【主任务池添加】当前总数+1后:" + taskInfoMap.size());
			//改阻塞队列不需要另加锁
//			synchronized (TaskLock) {
//				TaskLock.notifyAll();
//			}
			createResult = true;
		}
		logger.debug("【任务登记MAP】情况如下:");
		Iterator<Map.Entry<String,TaskInfo>> iter =taskInfoMap.entrySet().iterator();
		while (iter.hasNext()) {
			Map.Entry<String, TaskInfo> entry = (Map.Entry<String, TaskInfo>) iter.next();
			logger.debug("【任务登记MAP】主任务id|(总|推|执|完):"+entry.getValue().getTaskId()+"|"+entry.getValue().getTotalTask()+"|"+entry.getValue().getPushTask()+"|"+entry.getValue().getExecuteTask()+"|"+entry.getValue().getCompleteTask() );
		}
		return createResult;
	}
```


上面的拆分后的子任务处理:startExecutorCompletionService(entry.getValue());

```java
	/**
	 * <P>将子任务设置一个可用的客户端后,通过线程池执行发送</P>
	 * 
	 * @param subTaskInfo
	 * @param taskContainer
	 * @throws InterruptedException
	 * @throws ExecutionException
	 */
	public void startExecutorCompletionService(SubTaskInfo subTaskInfo) throws InterruptedException, ExecutionException {

		if (StringUtils.isBlank(subTaskInfo.getAppCode()) && StringUtils.isBlank(subTaskInfo.getAppKey())) {
			logger.warn("【分派预处理】子任务未设置AppCode或者AppKey,子任务Id:" + subTaskInfo.getSubTaskId());
			return;
		}
		if (!StringUtils.isBlank(subTaskInfo.getStatus())) {
			//已经有状态的,都是从库里加载的,不处理了,只放池子里。等线下的回调,或者人工处理,或者超时了。
			logger.warn("【分派预处理】子任务已经有状态,不再分配推 。子任务Id:" + subTaskInfo.getSubTaskId()+",状态:"+subTaskInfo.getStatus());
//			if(Constants.TASK_TYPE_ONLINE.equals(subTaskInfo.getSubTaskType()))
//				subTaskInfoQueue.put(subTaskInfo);//如果成功的,并且是线上的,进行超时处理。
			return;
		}
		// 从在线客户端表中取一个
//		SurveyClient surveyClient = AppContainer.getWeightRandomClient(subTaskInfo.getAppCode());
		SurveyClient surveyClient =null;
		if(StringUtils.isBlank(subTaskInfo.getAppKey())){
			logger.debug("【老版本-按AppCode分配】-----------旧版getAppCode--"+subTaskInfo.getAppCode());
			surveyClient=AppContainer.getClientByUserRankAndClinetLever(subTaskInfo.getAppCode(),0);
		}else
		{
			logger.debug("【新版本-按Appkey分配】-----------新版getAppKey--"+subTaskInfo.getAppKey());
			surveyClient=AppContainer.getClientByUserRankAndClinetLeverByAppkey(subTaskInfo.getAppKey(),0);
		}
		if (surveyClient == null || surveyClient.getClientCode() == null) {// 后面表示没有业务心跳补充属性
//			logger.debug("");
			logger.warn("【分派预处理】暂无可用的客户端");
			logger.debug("");
			subTaskInfo.setExeErrorCount(subTaskInfo.getExeErrorCount()+1);
//			subTaskInfoQueue.put(subTaskInfo);//没客户处理,则回炉
			
			logger.debug("【分派预处理】原来回炉再次尝试,现在直接返回任务推送失败,再推无意义");
			// 自动处理,作为失败子任务返回
			JSONObject finishJason = new JSONObject();
			finishJason.put("code", "failure");
			finishJason.put("msg", "任务推送失败");
			JSONObject finishData = new JSONObject();
			finishData.put("taskId", subTaskInfo.getTaskInfo().getTaskId());
			finishData.put("subTaskId", subTaskInfo.getSubTaskId());
			finishData.put("remark", "暂无可用的客户端");
			finishJason.put("data", finishData);
			logger.debug("【子任务队列消费】:【暂无可用的客户端");
			//通用处理完成或者失败的子任务。推送试过了也就当子任务结束了。
			updateSubTaskStatus(finishJason);
			return;
		} 
		
		//设置执行子任务的在线客户端,之前有设置过其它的,也会被替换成当前的
		String sessionId=surveyClient.getSessionId();
		logger.debug("【分派预处理】策略选出的客户端sessionId为:" +sessionId );
		subTaskInfo.setSurveyClient(surveyClient);
		subTaskInfo.setClientCode(surveyClient.getClientCode());//标识最后一次使用的客户端

		//一个在线客户端下所有的任务中加入此任务。用于客户端下线后,更新下面的子任务
		 List<SubTaskInfo> subTaskInfoList =clientSubTaskInfoMap.get(sessionId);
		 if(subTaskInfoList==null) subTaskInfoList=new ArrayList<SubTaskInfo>();
		 subTaskInfoList.add(subTaskInfo);

		// 交线程池执行分派子任务
		Future<String> future = executor.submit(new AssignTask(subTaskInfo));
		String exeResult = "";
		try {
			exeResult = future.get(15L, TimeUnit.SECONDS);
		} catch (TimeoutException|ExecutionException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
			logger.warn("【分派预处理】出错e:" + e.getMessage());
			exeResult = "failure";
		} finally {
			logger.info("【分派预处理】子任务分派结果为:" + exeResult);
			boolean isSubTaskSendOk = "success".equals(exeResult);
			//推送后的维护
			clientSubTaskInfoMap.get(sessionId).remove(subTaskInfo);//从当前客户端下移除子任务
			subTaskInfo.setSurveyClient(null);//子任务清除客户端
            
			if(!isSubTaskSendOk){
				logger.info("【分派预处理】子任务分派不成功,重新回队列subTaskId:" + subTaskInfo.getSubTaskId());
				subTaskInfo.setExeErrorCount(subTaskInfo.getExeErrorCount()+1);
				subTaskInfoQueue.put(subTaskInfo);//不成功,则回炉
			}else{
				subTaskInfo.setAsignStatus("success");//表示分配成功,等结果了
				if(Constants.TASK_TYPE_ONLINE.equals(subTaskInfo.getSubTaskType()))
				subTaskInfoQueue.put(subTaskInfo);//如果成功的,并且是线上的,进行超时处理。
			}
			
			//对主任务进行状态标识
			TaskInfo taskInfo = subTaskInfo.getTaskInfo();
			synchronized (taskInfo) {
				TaskInfo.modifyTaskInfoPush(taskInfo, isSubTaskSendOk);
			}
		}
	}
```


Future<String> future = executor.submit(new AssignTask(subTaskInfo));中的线程池任务。

```java
	/**
	 * <P>线程池执行发送任务</P>
	 * 
	 * @author liujun
	 * @date 2018年1月15日 下午5:36:48
	 */
	static class AssignTask implements Callable<String> {
		private SubTaskInfo subTaskInfo;

		public AssignTask(SubTaskInfo subTaskInfo) {
			this.subTaskInfo = subTaskInfo;
		}

		@Override
		public String call() throws Exception {
			// Thread.sleep(3000);
			 logger.info("【推送线程任务】任务执行...");
			// if(true) return "success";//测试

//			String body = subTaskInfo.getParaObj().toString();
			//推送的子任务,只包括总任务ID,子任务ID,其它都是一个json中。
			SubTaskData subTaskData=new SubTaskData();
			subTaskData.setTaskId(subTaskInfo.getTaskInfo().getTaskId());
			subTaskData.setSubTaskId(subTaskInfo.getSubTaskId());
			subTaskData.setSubTaskType(subTaskInfo.getSubTaskType());
			subTaskData.setQueryJsonStr(subTaskInfo.getParaObj().toString());
			//这两个用于异步任务时,客户端按里面的IP,PORT发结果消息上来。
			//不管是什么,都加上这个。如果多个中间层,那要按这个回复信息。
			subTaskData.setServerIp(ContainerInit.getInstance().appContainer.getServerIp());
			subTaskData.setServerPort(ContainerInit.getInstance().appContainer.getServerPort());
			
			String body =(JsonUtils.toString(subTaskData));
			
			logger.debug("----------------【推送子任务】--------------------body:"+body);
			// String appCode = subTaskInfo.getAppCode();

			if (subTaskInfo.getSurveyClient() == null) {
				logger.info("【推送线程任务】任务未设置执行客户端:" + subTaskInfo.getDescription());
				return "failure";
			}
			String clientSessionId=subTaskInfo.getSurveyClient().getSessionId();
			logger.info("【推送任务任务】推送目标sissionId:" + clientSessionId);
			boolean bln = ServerPushHandler.pushBySessionId(subTaskInfo.getSurveyClient().getSessionId(), "assignTaskToClient", body, new ServerPushTaskCallback());
			logger.info("【推送任务任务】bln:"+bln);
			// boolean bln =
			// ServerPushHandler.pushByAppKey(subTaskInfo.getSurveyClient().getClientCode(),
			// "assignTaskToClient", body);// 消息推送,推送所有服务器
			
			//不管成功不成功,去除子任务与动态客户端的关联(成功就不要关联了,不成功也应该换其它的客户端了)
//			List thisClientSubTaskList=clientSubTaskInfoMap.get(clientSessionId);
//			if(thisClientSubTaskList==null) thisClientSubTaskList=new ArrayList<SubTaskInfo>();
//			logger.info("【推送任务推送】当前session下的子任务数:" + thisClientSubTaskList.size());
//			if (thisClientSubTaskList.size() > 0) {
//				thisClientSubTaskList.remove(subTaskInfo);
//				subTaskInfo.setSurveyClient(null);
//			}
//			Map<String, List<SubTaskInfo>> 
			
			//注意:【在返回值的furturn中处理移除或者再入队的操作】
			
			if (bln) {
				logger.info("【推送任务推送】子任务成功");
				subTaskInfo.setAsignStatus("success");
				return "success";
			} else {// 推送失败
				logger.error("【推送任务推送】子任务推送失败:" + subTaskInfo.getSubTaskId());
				subTaskInfo.setAsignStatus("failure");

				//按说客户端下线可以事件中移除,但偶尔出现没有移除,所以在这里将无法推送的,
				//将这个不可用的channel的的客户端移除
				//注意:【在返回值的furturn中处理移除或者再入队的操作】
				Map<String, SurveyClient> surveyClientMap=AppContainer.onlineClientMap.get(subTaskInfo.getSurveyClient().getSurveyApp().getAppKey());
				if(surveyClientMap.containsKey(subTaskInfo.getSurveyClient().getSessionId())){
					logger.info("【推送失败移除客户端】sessionId:"+subTaskInfo.getSurveyClient().getSessionId());
					surveyClientMap.remove(subTaskInfo.getSurveyClient().getSessionId());// 根据通讯客户端,移除里面的调查客户端对象
				}
				return "failure";
			}
		}
	}
```


#### 接收客户端任务完成的handler

将完成情况告诉配置进来的监听器,监听器会发起更新子任务的相关操作。

```java
	@Override
	public MiddleMsg handleMsgEvent(MsgEvent dm, MiddleMsg msg) {
		// TODO Auto-generated method stub
		String body = msg.getBody() + "";
		String code = "";
		logger.debug("【得到Client子任务返回结果】API返回子任务结果:" + body);
		SurveyResponse td = new SurveyResponse();
		try {
			JSONObject tdObject = JsonUtils.toJSONObject(body);
			// 子任务失败原因
			subTaskListener.onSubTaskFinished(tdObject);
			code = "success";
		} catch (Exception e) {
			e.printStackTrace();
			code = "failure";
			logger.error("【得到Client子任务返回结果】背调中心消息处理任务完成消息失败!异常:" + e.toString());
		}
		td.setCode(code);
		msg.setBody(td);
		return msg;

	}
```


监听后发起的更新操作:

```java
	/**
	 * <P>
	 * 根据监听的子任务完成结果,更新任务的状态,都完成后有可能从总任务表中移除,并且调用外部接口,进行持久化操作。
	 * </P>
	 * 
	 * @param finishJason
	 */
	public void updateSubTaskStatus(JSONObject finishJason) {
		// 根据子任务完成情况,修改子任务的状态,以及主任务的状态
		logger.info("【修改子任务的完成状态】:"+finishJason.toString());
		
		try {
			String code = finishJason.getString("code");
			String msg = finishJason.getString("msg");
			String subTaskId = finishJason.getJSONObject("data").getString("subTaskId");
			String taskId = finishJason.getJSONObject("data").getString("taskId");
			String remark = (finishJason.getJSONObject("data").containsKey("remark"))? finishJason.getJSONObject("data").getString("remark"):"";
//			String clientCode = finishJason.getString("clientCode");
			TaskInfo taskInfo=taskInfoMap.get(taskId);
			if(taskInfo==null){
				logger.info("【修改子任务的完成状态】内存中已经没有这个主任务了!taskId:"+taskId);
				//如果需要,找不到主任务了,还可以直接入库
				//如果非线上任务,有可能内存中没有了,因为非线上,内存中存在的时间太长了,占用比较大
				//是否都完成,以及上报都在接口中实现
				if(taskPersistenceInterface!=null) taskPersistenceInterface.modifySubTask2DB(finishJason);//子任务入库
				return;
			}
			SubTaskInfo subTaskInfo = taskInfo.getSubTaskMap().get(subTaskId);
			
			
//			if(Constants.TASK_TYPE_ONLINE.equals(subTaskInfo.getSubTaskType())){
//				logger.info("【修改子任务的完成状态】线上子任务不能人工处理!subTaskInfo.getSubTaskType():"+subTaskInfo.getSubTaskType());
//				//如果需要,找不到主任务了,还可以直接入库
//				return;
//			}
			//1.【收到】
			if("received".equals(code)) {
				logger.info("【修改子任务的完成状态,并移除】子任务为异步的,对方已经收到!code:"+code);
				subTaskInfo.setStatus(Constants.TASK_DOING);
				subTaskInfo.setDescription("子应用已经收到子任务");
				synchronized (taskInfo) {
					taskInfo.setReceivedAsyncTask(taskInfo.getReceivedAsyncTask()+1);
				}
				//这里啥也不做。因为是异步的,只是对方已经收到了。
				return;
			}
			if(Constants.TASK_FAILURE.equals(subTaskInfo.getStatus()) ||  Constants.TASK_DONE.equals(subTaskInfo.getStatus())  ){
				//这里啥也不做。可能总任务检测时设置了结果
				return;
			}
			subTaskInfo.setSurveyClient(null);
//			subTaskInfo.setUpdateTime(new Date());//数据持久化时再设置
			//2.【不成功】 默认成功。成功时...(一定是执行且成功的)
			if(!"success".equals(code)) {
				logger.info("【修改子任务的完成状态,并移除】子任务推送或者执行出错了!返回code:"+code);
				subTaskInfo.setStatus(Constants.TASK_FAILURE);
				subTaskInfo.setDescription(msg+remark);//中文失败与原因
				//这里是否加入出错队列再处理?还是先入库,以后从库中加载呢?都可以,目前先入。如果推送失败的,已经推过多次了,如果执行失败的,先不再推送了。
//				return;
			}//3.【成功】
			else{
				logger.info("【修改子任务的完成状态,并移除】子任务执行成功!code:"+code);
				String hasData = (finishJason.getJSONObject("data").containsKey("hasData"))? finishJason.getJSONObject("data").getString("hasData"):"";
				Integer dataCount =0;
				try {
					dataCount = (finishJason.getJSONObject("data").containsKey("dataCount"))? finishJason.getJSONObject("data").getInt("dataCount"):0;
				} catch (Exception e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
				subTaskInfo.setHasData(hasData);
				subTaskInfo.setDataCount(dataCount);
				subTaskInfo.setStatus(Constants.TASK_DONE);
			}

			
			if(taskPersistenceInterface!=null) taskPersistenceInterface.modifySubTask2DB(subTaskInfo);//子任务入库
			synchronized (taskInfo) {//计算完成数
				//执行数据+1(包括推失败的,对方收到的异步的),完成数要看成功才成。
				TaskInfo.modifyTaskInfoFinish(taskInfo, "success".equals(code));
//				logger.info("【修改子任务的完成状态,并移除】总任务【移除前】有:"+taskInfoMap.size());
//				logger.info("【修改主任务的状态,并可能移除】当前主任务的-总|执|完|线:" + taskInfo.getTotalTask() + "|" + taskInfo.getExecuteTask()+ "|" + taskInfo.getCompleteTask()+ "|" + taskInfo.getOnlineTask());
			}
			
			//del--->当执行数与线上数一样时。线上都完成了。就持久化,但不移除。当与总数一样时,持久化并移除。
			//都执行了就移除,并持久化。但如果不全是线上的,置的状态不一样
			if (taskInfo.getExecuteTask().intValue() == taskInfo.getTotalTask().intValue()) {
				//【移除的情况:】当线上数与总数一样的时候,全完成了。就从内存中移除,并且调外部接口类进行持久化。线下子任务没有超时机制,可能一直接没反馈,由主任务总超时处理。
//				if (taskInfo.getOnlineTask().intValue() == taskInfo.getTotalTask().intValue()  || taskInfo.getExecuteTask().intValue() == taskInfo.getTotalTask().intValue() ) {
				//如果没有异步的任务
				if (taskInfo.getReceivedAsyncTask()==0 ) {
					//主任务状态为:
					boolean isAllSubtaskOk=taskInfo.getCompleteTask().intValue()==taskInfo.getExecuteTask();
					taskInfo.setStatus(isAllSubtaskOk?Constants.TASK_DONE:Constants.TASK_FAILURE);
//					if(taskInfo.getCompleteTask().intValue() == taskInfo.getExecuteTask().intValue()) taskInfo.setStatus(Constants.TASK_FAILURE);
//					taskInfoMap.remove(taskInfo.getTaskId());// 从任务记录中移除
//					if(taskPersistenceInterface!=null) taskPersistenceInterface.modifyTask2DB(taskInfo);//主任务入库
					logger.info("【修子任务的完成状态,全部完成并移除】移除任务id:" + taskInfo.getTaskId());
				}else{
					//【持久化】都移除,线下不适合在内存中长时间放。
					taskInfo.setStatus(Constants.TASK_DOING);
					logger.info("【修子任务的完成状态,只持久化线上部分】任务id:" + taskInfo.getTaskId()+"。此时收到线下任务回复数为:"+taskInfo.getReceivedAsyncTask());
				}
				taskInfoMap.remove(taskInfo.getTaskId());// 从任务记录中移除(线下的不适合长时间放在内存中)
				logger.info("【修改任务的完成状态,全部完成并移除】有完成后移除。总任务【移除后】有:" + taskInfoMap.size());
				if(taskPersistenceInterface!=null) taskPersistenceInterface.modifyTask2DB(taskInfo);//主任务入库
			}

		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
			logger.info("【修改子任务的状态,并移除】出错:"+e.toString());
		}
	}
```


#### 其它方法介绍

监听客户端下线时(客户端管理里也有去监听),将其它持有的子任务中标识的客户端置空,并移除重新选择客户端。

```
public synchronized void updateSubTaskInfoByOffline(String clientSessionId)
```


定时任务:清理、失败任务没超时,没超重试次数时的再次执行。

```java
    /**
     * <P>守护线程中定时会清理超时的主任务。(线上子任务等待结果超时在队列里处理,线下超时不处理,由主任务超时处理)</P>
     */
    public void dealTimeoutTask(TaskInfo taskInfo)

    /**
	 * <P>用于处理阻塞对列里的子任务,再扔进线程池。</P>
	 * 1.反复处理再分配的子任务,一定次数后超时。主要是原通讯没返回sessionId,监听移除有问题,所以多试几次。
	 * 每次试如果不能会真正移除不在线的客户端。
	 * 2.处理线上子任务,已经推送出去了,状态发生变化。反复检测是不是超时没有收到反馈。有反馈的会设置子任务状态,就不再入队了。
	 * @author liujun
	 * @date 2018年1月22日 上午9:59:10
	 */
    private class SubTaskRedo implements Runnable
```

此外,还有从持久层加载可以再执行的任务,提供人工控制的接口方法等。

# 4. 中间层的设计

## 4.1 作为客户端的核心处理类AsClientContainer

这个对接前面提到的SS。

```java
public class AsClientContainer {

	private static final Logger logger = Logger.getLogger(AsClientContainer.class);

	public static final long THEARTBEAT_INTERVAL = 3 * 1000L;
	
	private static JSONArray REMORTSERVERS=new JSONArray();
	
	public static Map<String,JSONObject> REMORTSERVERS_MAP =new HashMap<String,JSONObject>();

	/*子任务存放*/
//	public volatile static Map<String, JSONObject> subTaskInfoMap = new ConcurrentHashMap<String, JSONObject>();
	/**使用阻塞队列,放置所有要处理的子任务*/
	public volatile static BlockingQueue<JSONObject> subTaskInfoQueue = new LinkedBlockingQueue<JSONObject>();
	/**
	 * 需要人工干预的子任务池
	 */
	public volatile static Map<String, JSONObject> subTaskInfoMap = new ConcurrentHashMap<String,JSONObject>();
	/*同步对象存放*/
	public volatile static Map<String, PushFuture<CommonReturnData>> syncKey = new ConcurrentHashMap<String, PushFuture<CommonReturnData>>();
	
	/**用户单例线程锁*/
	private static Boolean lockSigleton = true;
	/**用户单例对象*/
	private static AsClientContainer clientContainer;

	public TaskConsumer taskConsumer = new TaskConsumer();
	
	public TimeoutSubTask timeoutSubTask = new TimeoutSubTask();
	
	public OfflineSendInterface offlineSendInterface;
	
	public DateFormatInterface dateFormatInterface;
	
	/** 失败的任务用线程池 */
	private static ExecutorService executor = Executors.newCachedThreadPool();
	
	private ScheduledExecutorService executorTimeout = Executors.newScheduledThreadPool(1);
	
	public static AtomicLong                 subTaskNum                = new AtomicLong();

```


任务消费,对重试任务,线上任务,人工任务都分别进行处理,人工任务的处理由外部提供处理类,真正实现由dubbo完成。

```java
	private class TaskConsumer implements Runnable {
		@Override
		public void run() {
			while (true) {
				try {
					//因为这个队列中都是出问题的子任务,所以要等待一下处理。
					Thread.sleep(1500);// 调节频率,过快容易撑死~~
//					logger.debug("【子任务队列】的任务数1:" + taskNum);
					JSONObject subtaskInfo=subTaskInfoQueue.take();
					logger.debug("【子任务队列消费】取出子任务JASON:"+subtaskInfo.toString());
					
					String subTaskType=subtaskInfo.containsKey("subTaskType")?subtaskInfo.getString("subTaskType"):null;
					//!!!!!!检查这个子任务是还否可以复制之前的结果,如果可以就复制出来,返回一个成功的结果。
					//这里交冯实现的接口,由于工作变动,还没出来。
					
//					logger.debug("【子任务队列】的任务数2:" + subTaskInfoQueue.size());
					//如果重试了50次或者超时了5分钟,那么子任务失败吧
					long reDoTime=new Date().getTime()-subtaskInfo.getLong("startDate");
					logger.debug("reDoTime:"+reDoTime+"。testnum:"+subtaskInfo.getInt("testNum"));
					if(subtaskInfo.getInt("testNum")>Integer.parseInt(MiddleConfig.getFailureSubTaskMaxRetryTimes()) || (reDoTime>Long.parseLong(MiddleConfig.getFailureSubTaskMaxRetryTimes())) ){
						logger.debug("【子任务队列消费】子任务超时失败:"+subtaskInfo.getString("subTaskId"));
						logger.debug("【子任务队列消费】子任务超时失败,尝试次数为:"+subtaskInfo.getInt("testNum"));
						
						AsClientContainer.subTaskInfoMap.remove(subtaskInfo.getString("subTaskId"));
						
						//通用处理完成或者失败的子任务
						Thread.sleep(1000);// 调节频率,过快容易撑死~~
						String isAsync=subtaskInfo.containsKey("isAsync")? subtaskInfo.getString("isAsync"):null;
						//1.【推失败了,如果是异步的,就发消息给服务端】
						if("async".equals(isAsync) || !Constants.TASK_TYPE_ONLINE.equals(subTaskType)){
//							AsClientContainer.sendTaskAsyncResult2Server(asyncServerCode,finishJason);
							//如果异步调用失败。
							logger.debug("【推送异步任务】超时了,发消息给服务器");
							JSONObject finishJason=new JSONObject();
							finishJason.put("code", "failure");
							finishJason.put("msg", "OFFLINE_RPC_FAIL中间层任务调用C端失败");
							finishJason.put("data", subtaskInfo);//这里面有ip/port用于异步。
							
							
							//如果是线下的推送或者调用失败了,只持久化到本地,再重试。或者超时。不可以迅速返回失败的。
							sendTaskAsyncResult2Server(finishJason);
							return;

						}
						//2.【如果是线上任务推送失败,设置同步等待对象。】
						JSONObject finishJason=new JSONObject();
						finishJason.put("code", "failure");
						finishJason.put("msg", "中间层任务推送失败");
						JSONObject finishData=new JSONObject();
						finishData.put("taskId", subtaskInfo.getString("taskId"));
						finishData.put("subTaskId", subtaskInfo.getString("subTaskId"));
						finishData.put("remark", "重试了"+subtaskInfo.getInt("testNum")+"次,用时"+reDoTime+"ms");
						finishJason.put("data", finishData);
						PushFuture<CommonReturnData> responseFuture = AsClientContainer.syncKey.get(subtaskInfo.getString("subTaskId"));
						if(responseFuture!=null){
							CommonReturnData response=new CommonReturnData();
							response.setCode("failure");
							response.setMsg("任务超时失败");
							response.setData(finishData);
							responseFuture.setResponse(response);
							logger.debug("【推送任务任务】超时了,设置同步对象的返回值");
						}
						else{
							logger.debug("【推送任务任务】设置超时时,同步对象已经被移除。");
						}
					}
					else//如果是正常处理子任务
					{
						//如果非线上任务,就走外部接口(注入的实现类)发出去(实现类会持久化,再发的)
						logger.debug("subTaskType:"+subTaskType+"。offlineSendInterface:"+offlineSendInterface);
						if(!Constants.TASK_TYPE_ONLINE.equals(subTaskType)){
							try {
								if(offlineSendInterface!=null){
									offlineSendInterface.sendOfflineQuery(subtaskInfo);
								}else{
									logger.warn("【找不到外部(非线上子任务)调用的接口】");
									throw new SurveyException("找不到外部(非线上子任务)调用的接口实现类");
								}
								logger.info("【推送任务任务】成功推送非线上任务到C端!");
							} catch (SurveyException e) {
								// TODO Auto-generated catch block
								e.printStackTrace();
								logger.warn("【推送任务任务】推送非线上任务到C端失败!");
								ReDoTask reDoTask = new ReDoTask(subtaskInfo);
								executor.submit(reDoTask);
							}
						} 
						else //如果是线上的,就推送出去。
						{
							SurveyClient surveyClient = AsServerContainer.getClientByUserRankAndClinetLever(0);
							// 如果找到策略的客户端
							if (surveyClient != null && surveyClient.getSessionId() != null) {
								// 开始推送
								String body = JsonUtils.toString(subtaskInfo);
								String clientSessionId = surveyClient.getSessionId();
								logger.info("【推送任务任务】推送目标sissionId:" + clientSessionId);
								boolean bln = ServerPushHandler.pushBySessionId(clientSessionId, "assignTaskToClient", body, new MiddlePushClientTaskCallback());
								logger.info("【推送任务任务】bln:" + bln);
								if (bln) {
									logger.info("【推送任务任务】成功!");
									// 推成功了,但一直不返回,也是个问题。不过同步对象会被移除的。

								} else {
									logger.warn("【推送任务任务】推送失败!");
									ReDoTask reDoTask = new ReDoTask(subtaskInfo);
									executor.submit(reDoTask);
								}
							} else {// 没有可用的客户端
								logger.info("【推送任务任务】bln:没有可用的客户端,直接失败返回。次数:" + subtaskInfo.getInt("testNum"));
								ReDoTask reDoTask = new ReDoTask(subtaskInfo);
								executor.submit(reDoTask);
							}
						}
					}
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		}
	}
```


## 4.2 作为服务的核心处理类AsServerContainer

```java
public class AsServerContainer {
	private static final Logger logger = Logger.getLogger(AsServerContainer.class);

	/** 实时总任务信息(taskid---TaskInfo(SubTaskInfoMap)) */
	public static volatile Map<String, SurveyClient> onlineClientMap=new ConcurrentHashMap<String,SurveyClient>();

	public boolean isMiddlewearStarted=false;
	
	/** 任务在内存中允许的最大存放数 */
//	private static Integer maxTaskMapSize=Integer.MAX_VALUE;

	/**
	 * 可接入的类型列表,目前一个中间层只支持一种类型的接口接入。(同一类型的app都一样,不同的不一样)
	 */	
	private static List<ClientApp> clientAppList=new ArrayList<ClientApp>();
	
//	static{
//		//设置有效的客户端
//
//	}
	
	/**
	 * 失败子任务重复处理次数、与超时处理的时间
	 */
	public static int maxReDealSubTaskTimes=20;
	/**
	 * 失败子任务重复处理次数、与超时处理的时间
	 */
	public static long maxTimeoutSubTaskTime=60000L;
	
	/**主任务超时时间*/
	public static long maxTimeoutTaskTime=150000L;

	/**
	 * 一个配置的client下的实时子任务信息(sessionId-List<SubTaskInfo>)
	 * 用sessionId方便应对底层的上下线变化。中间件的事件只能得到sessionId,没有ClientCode。
	 */
//	public volatile static Map<String, List<SubTaskInfo>> clientSubTaskInfoMap = new ConcurrentHashMap<String, List<SubTaskInfo>>();

	/**用户单例线程锁*/
	private static Boolean lockSigleton = true; 
	private static AsServerContainer asServerContainer;
```




# 5. 客户端的设计

## 5.1 核心类的设计

属性如下,主要功能有子任务执行与客户端业务心跳。

```java
/**
 * 背调中间层容器-管理通讯层并处理背调任务
 * @author liujun
 */
public class ClientContainer {
	private static final Logger logger = Logger.getLogger(ClientContainer.class);

	public static final long THEARTBEAT_INTERVAL = 3 * 1000L;
	/**appKey-同类的客户端相同*/
	private String appkey;
	/**appSec-同类的客户端相同*/
	private String appSecret;
	/**客户端标识Code*/
	private String clientCode;
	/**所连接服务器IP*/
	private String serverIP;
	/**此客户端的权重*/
	private String weight;
	/**通讯层客户端*/
	Client client = null;
	/**子任务处理接口对象*/
	ApiInvorkerInterface apiInvorkerInterface;
	/**监听器-监听通讯层客户端状态*/
	ClientStatusListener clientStatusListener;
	/**客户端上报状态传输对象*/
	ClientRealData clientRealData;
	/**客户端是否连接状态标识*/
	private boolean isConnected = false;
	private Object lock=new Object();
//	ScheduledExecutorService service = Executors.newScheduledThreadPool(1);//不需要线程池,只要一个循环的守护线程
	/**用户单例线程锁*/
	private static Boolean lockSigleton = true;
	/**用户单例对象*/
	private static ClientContainer clientContainer;
```


## 5.2 背调任务执行

每一个包装的Web客户端,要实现这个接口来做具体任务。

```java
/**
 * 调用第三方功能的接口,请各个第三方接口应用实现些接口
 * <P></P>
 * @author liujun
 * @date 2018年1月12日 下午5:10:02
 */
public interface ApiInvorkerInterface {
	/**
	 * 根据请求参数与所选择的一组app产生任务并处理
	 * <P></P>
	 * @param paras
	 * @param appCode
	 */
	public CommonReturnData dealSubTaskByApi(JSONObject taskData) throws SurveyException;
}
```


# 6.  其它

## 6.1 与Web应用的整合

外部Web应用提供数据持久化与具体任务执行等接口的实现。实现都是spring容器中的类,由一个统一管理的@Componet类@Autowired这些实现,在其afterPropetiesSet方法时启动核心业务组件,并按提供的接口注入实现类。当然也可以考虑都交给spring管理。

## 6.2 子任务相关Web工程

近20个客户端我做了一个例子工程,后来交办出去发现在clone工程,于是我要求只用一个工程,各种实现类通过配置的不同进行加载。

## 6.3 反思

  • 除了前面提到的通讯层更加独立外,细节问题不少,比如线程池使用不够规范。
  • 有些参数不完全确定就用jason。一些参数要根据测试从外部配置进来。
  • 内部系统的启动停止最好由smartCycle控制,这里用afterPropetiesSet只管启动,没有优雅的停止。
  • 有些优化还需要相关应用与人员配合才能整体优化,比如通讯层,配置服务。
  • 如果要高可用,还有非常多的工作要做。


虽然很多问题,但运行稳定,可以顺利的工作。由于手上还有其它任务,比如企业服务,调查发起,定单处理,被查人同意,根据结果费用核算等应用与功能处理。所以除了功能变更,没真正进行重构过。
上一篇: Java 版SpringCloud分布式微服务b2b2c电子商务-定时任务 下一篇: 没有下一篇了!
发表评论
用户名: 匿名