大数据平台数据交换(结构化集群HDFS client API 文件上传)_JAVA_编程开发_程序员俱乐部

中国优秀的程序员网站程序员频道CXYCLUB技术地图
热搜:
更多>>
 
您所在的位置: 程序员俱乐部 > 编程开发 > JAVA > 大数据平台数据交换(结构化集群HDFS client API 文件上传)

大数据平台数据交换(结构化集群HDFS client API 文件上传)

 2017/3/8 5:34:39  JsonLiangyoujun  程序员俱乐部  我要评论(0)
  • 摘要:本文介绍大数据平台数据交换案例,本案例为结构化集群HDFSclientAPI文件上传,结构化集群HDFSclientAPI文件下载和非结构化集群HDFSclientAPI文件上传/下载在此不做介绍(做法类似,自行脑补)。大数据平台数据交互整体架构主要讲解一下思路,给出部分代码,可根据实际情况进行修改或优化。定时任务:时间为每天上午10:00:00,task()方法为要执行的任务(文件上传)//定时任务主流程publicvoidtimer(){Calendarcalendar=Calendar
  • 标签:API 上传 文件 client 数据
    本文介绍大数据平台数据交换案例,本案例为结构化集群HDFS client API 文件上传,结构化集群HDFS client API 文件下载和非结构化集群HDFS client API 文件上传/下载在此不做介绍(做法类似,自行脑补)。

    大数据平台数据交互整体架构


    主要讲解一下思路,给出部分代码,可根据实际情况进行修改或优化。

    定时任务:时间为每天上午10:00:00,task()方法为要执行的任务(文件上传)
class="java" name="code">//定时任务主流程
    public void timer() {  
        Calendar calendar = Calendar.getInstance();  
        calendar.set(Calendar.HOUR_OF_DAY, 10);  // 控制时  
        calendar.set(Calendar.MINUTE, 00);       // 控制分  
        calendar.set(Calendar.SECOND, 00);       // 控制秒  
  
        Date time = calendar.getTime();         // 获取执行任务的时间  
  
        Timer timer = new Timer();
        //执行定时任务
        timer.scheduleAtFixedRate(new TimerTask() {  
            public void run() {  
                try {
					task();
				} catch (SQLException e) {
					log.error("获取数据失败!");
					//失败后的处理(如:入库记录,然后手工推送或者查询数据库自动推送)
					//do something for resolve error 
					e.printStackTrace();
				}
            }  
        }, time, 1000 * 60 * 60 * 24);// 这里设定循环时间为每天  
    }

    task()执行的任务:此处为部分代码,笔者只为介绍流程,不提供完整代码
/**
		 * windows环境生成文件路径
		 * **/  		
		String dbFilename = "F:/bigdata_file/KKPAY_ROUTE_INFO-a10.TXT";
    	String okFilename = "F:/bigdata_file/KKPAY_ROUTE_INFO-a10.OK";
		/**
		 * linux环境生成文件路径
		 * **/  	 	
    	//String dbFilename = "/data/app/bigdata/KKPAY_ROUTE_INFO-a10.TXT";
    	//String okFilename = "/data/app/bigdata/KKPAY_ROUTE_INFO-a10.OK";
    	
    	createFile(dbData,dbFilename);
    	createFile(OkData,okFilename);
    	
    	//上传数据文件和标志性文件,如果失败则5分钟后再次上传,直到上传成功为止
    	while(uploadFlag){
    		upload(dbFilename,okFilename);
    		try {
				Thread.sleep(1000*60*5);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
    	}

    获取数据:这里主要是注意防止数据串行,另外这里我对每个字段之间加了分隔符进行分割
DBUtil dbUtil = new DBUtil();
    	Connection conn = dbUtil.getConnection();
    	//PreparedStatement是statement的子类,提高了效率,扩展性安全性
    	PreparedStatement preparedState = null;
    	ResultSet resultSet = null;
    	
    	StringBuffer dbData = new StringBuffer();
		try {						
			preparedState = (PreparedStatement) conn.prepareStatement("select * from kkpay_route_info");					
			resultSet = preparedState.executeQuery();
			while(resultSet.next()){				
				for(int i = 1;i <= resultSet.getMetaData().getColumnCount();i++){
					if(i == resultSet.getMetaData().getColumnCount()){
						dbData.append(resultSet.getString(i).replace("\r\n", "").replace("\r\t", "") + "\r\n");
					}else{
						dbData.append(resultSet.getString(i).replace("\r\n", "").replace("\r\t", "") + "\001");
					}
				}
				datenum ++;
			}
		} catch (SQLException e) {
			throw e;
		}finally{
			if(resultSet!=null){
				log.info("获取数据成功!");
				try {
					resultSet.close();
				} catch (SQLException e) {
					e.printStackTrace();
				}
			}
			if(preparedState!=null){
				try {
					preparedState.close();
				} catch (SQLException e) {
					e.printStackTrace();
				}
			}
		}

    调用API上传文件:上传数据文件和标志性文件
//调用API上传文件
    public void upload(String dbFileSourcepath,String OkFileSourcepath){
    	HdfsClient client = HdfsClientInstance.getInstance();		
    	HdfsClientConf conf = new HdfsClientConf();
    	/**
    	 * linux环境配置文件读取路径
    	 * **/
    	//conf.setConfigFilePath("/data/app/bigdata/conf");
    	//conf.setKeyTabFile("/data/app/bigdata/conf/pcc_user.keytab");
    	/**
    	 * windows环境配置文件读取路径
    	 * **/
    	conf.setConfigFilePath("F:/conf");
    	conf.setKeyTabFile("F:/conf/pcc_user.keytab");
    	
    	conf.setKeyTabUserName("pcc_user@TDH");
    	client.setHdfsClientConf(conf);	
    	//上传数据文件
    	Map<String,String> dbFileRetrunMsg = client.putFileHdfs(dbFileSourcepath,"PCC");
    	if("0".equals(dbFileRetrunMsg.get("code"))){    		
        	Map<String,String> okFileRetrunMsg = null;
        	//上传标记性文件(由于这时数据文件已经上传成功,所以标志性文件必须有重发机制保证两个文件的同步)
        	while(uploadFlag){
        		okFileRetrunMsg = client.putFileHdfs(OkFileSourcepath, "PCC");
        		if("0".equals(okFileRetrunMsg.get("code"))){
            		uploadFlag = false;
            		log.info("数据文件(" + dbFileSourcepath +")、标志性文件(" + OkFileSourcepath + ")上传成功!");
            	}
        	}      	  		
    	}else{
    		log.error(dbFileRetrunMsg.get("ERROR_DESC"));
    	} 	
    }

    另外需要做以下配置:
    1.配置大数据平台Guardian 文件


    2.配置大数据平台HOSTS(windows hosts文件默认路径如图所示,linux一般在/etc目录下)


    3.配置大数据平台XMLKEYTAB文件(以笔者代码所指路径为例如图)

   
    eclipse打成可运行jar包,如图

    运行方式:java -jar *.jar(*.jar指的是你的jar包名称)
    以下分别为eclipse控制台、windows(cmd)环境、linux环境输出结果截图:

   



    本文永久地址:http://jsonliangyoujun.iteye.com/blog/2360795
  • 大小: 254.9 KB
  • 大小: 33.9 KB
  • 大小: 26.1 KB
  • 大小: 22 KB
  • 大小: 126.1 KB
  • 大小: 109.7 KB
  • 大小: 782.3 KB
  • 大小: 38.2 KB
  • 查看图片附件
发表评论
用户名: 匿名