HDFS-压缩_JAVA_编程开发_程序员俱乐部

中国优秀的程序员网站程序员频道CXYCLUB技术地图
热搜:
更多>>
 
您所在的位置: 程序员俱乐部 > 编程开发 > JAVA > HDFS-压缩

HDFS-压缩

 2013/12/23 13:09:06  zjhdreams  程序员俱乐部  我要评论(0)
  • 摘要:http://marsorp.iteye.com/blog/1559613文件压缩带来了两大益处1)减少存贮空间2)加速网络(磁盘)传输。基于大数据的传输,都需要经过压缩处理。压缩格式压缩格式工具算法文件扩展名可分块DEFLATEN/ADEFLATE.deflateNogzipgzipDEFLATE.gzNobzip2bzip2bzip2.bz2YesLZOlzopLZO.lzoNoSnappyN/ASnappy.snappyNo压缩及解压缩文件解压实例Java代码复制代码收藏代码1
  • 标签:压缩
http://marsorp.iteye.com/blog/1559613
文件压缩带来了两大益处1)减少存贮空间2)加速网络(磁盘)传输。基于大数据的传输,都需要经过压缩处理。
压缩格式
压缩格式 工具 算法 文件扩展名 可分块
DEFLATE N/A DEFLATE .deflate No
gzip gzip DEFLATE .gz No
bzip2 bzip2 bzip2 .bz2 Yes
LZO lzop LZO .lzo No
Snappy N/A Snappy .snappy No



压缩及解压缩



文件解压实例





Java代码 复制代码 收藏代码
1.package com.bigdata.io; 
2. 
3.import java.io.IOException; 
4.import java.io.InputStream; 
5.import java.io.OutputStream; 
6.import java.net.URI; 
7. 
8.import org.apache.hadoop.conf.Configuration; 
9.import org.apache.hadoop.fs.FileSystem; 
10.import org.apache.hadoop.fs.Path; 
11.import org.apache.hadoop.io.IOUtils; 
12.import org.apache.hadoop.io.compress.CompressionCodec; 
13.import org.apache.hadoop.io.compress.CompressionCodecFactory; 
14. 
15.public class FileDecompressor { 
16.     
17.    public static void main(String[] args) throws IOException { 
18.        String uri = args[0]; 
19.        Configuration conf = new Configuration(); 
20.        FileSystem fs = FileSystem.get(URI.create(uri),conf); 
21.        Path inputPath = new Path(uri); 
22.         
23.        CompressionCodecFactory factory = new CompressionCodecFactory(conf); 
24.        // io.compression.codecs 定义列表中的一个 
25.        CompressionCodec codec  = factory.getCodec(inputPath);  
26.        if(null == codec){ 
27.            System.err.println("No codec for " + uri); 
28.            System.exit(-1); 
29.        } 
30.         
31.        String outputUri = CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension()); 
32.        InputStream in = null; 
33.        OutputStream out = null; 
34.        try{ 
35.            in = codec.createInputStream(fs.open(inputPath)); 
36.            out = fs.create(new Path(outputUri)); 
37.            IOUtils.copyBytes(in, out, conf); 
38.        }finally{ 
39.            IOUtils.closeStream(in); 
40.            IOUtils.closeStream(out); 
41.        } 
42.    } 
43.} 


  $hadoop jar stream.jar com.bigdata.io.FileDecompressor test.txt.gz 

   12/06/14 09:48:28 INFO util.NativeCodeLoader: Loaded the native-hadoop library

   12/06/14 09:48:28 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library



Native Libraries

Native gzip 库减少解压缩时间在50%,压缩时间在10%(同java实现的压缩算法)

禁用Native Library , 需要设置属性hadoop.native.lib 为 false







Java代码 复制代码 收藏代码
1.package com.bigdata.io; 
2. 
3.import java.io.IOException; 
4. 
5.import org.apache.hadoop.conf.Configuration; 
6.import org.apache.hadoop.io.IOUtils; 
7.import org.apache.hadoop.io.compress.CodecPool; 
8.import org.apache.hadoop.io.compress.CompressionCodec; 
9.import org.apache.hadoop.io.compress.CompressionOutputStream; 
10.import org.apache.hadoop.io.compress.Compressor; 
11.import org.apache.hadoop.util.ReflectionUtils; 
12. 
13.public class PooledStreamCompressor { 
14. 
15.    public static void main(String[] args) throws ClassNotFoundException { 
16.        String codecClassName = args[0]; 
17.        Class<?> codecClass = Class.forName(codecClassName); 
18.         
19.        Configuration conf = new Configuration(); 
20.        CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); 
21.        Compressor compressor = null; 
22.         
23.        try { 
24.            compressor = CodecPool.getCompressor(codec); 
25.            CompressionOutputStream out = codec.createOutputStream(System.out, compressor); 
26.            IOUtils.copyBytes(System.in, out,4096,false); 
27.            out.finish(); 
28.        } catch (IOException e) { 
29.            e.printStackTrace(); 
30.        }finally{ 
31.            CodecPool.returnCompressor(compressor); 
32.        } 
33.    } 
34.} 

 


echo "Text" | hadoop jar stream.jar com.bigdata.io.PooledStreamCompressor org.apache.hadoop.io.compress.GzipCodec | gunzip -

12/06/14 10:57:49 INFO util.NativeCodeLoader: Loaded the native-hadoop library

12/06/14 10:57:49 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library

12/06/14 10:57:49 INFO compress.CodecPool: Got brand-new compressor

Text




使用压缩格式规则



      压缩格式的使用取决于你的应用程序。你想最大化您的应用程序的速度,或者是你更关注关于保持存储成本下降?在一般情况下,你应该尝试不同的策略,根据数据集和基准测试,找到最好的办法。

      对于大文件,如日志文件,选项有:

      1.存储成非压缩格式

      2.使用一个支持块分割的压缩格式,比如bzip2(当然bzip2比较慢)或者可以使用LZO(被索引后支持块分割)

      3.把应用分割成大快,每个大快可以支持任意的压缩格式(不用关心在HDFS中是否可以分割)。在这种情况,你需要计算一下你的压缩后的尺寸,要同HDFS Block的大小相匹配

      4.使用Sequence 文件,支持压缩及分块

      5.使用Avro数据文件,支持压缩及分块,就像Sequence文件,但比Sequence要更高级,可以被多种语言支持

      对于大文件,你不应该使用不支持分块的压缩。因为,你失去了地方,使MapReduce应用非常没有效率

      为了归档目的,考虑Hadoop 归档格式,虽然他不支持压缩。
MapReduce中使用压缩

步骤:

      1.设置mapred.output.compress=true

      2.设置mapred.output.compression.codec=the classname of the above list

      或者

      FileOutputFormat.setCompressOutput(job,true)

      FileOutputFormat.setOutputCompressorClass(job,GzipCodec.class)



Java代码 复制代码 收藏代码
1.public class MaxTemperatureWithCompression { 
2.  public static void main(String[] args) throws Exception { 
3.    if (args.length != 2) { 
4.    System.err.println("Usage: MaxTemperatureWithCompression <input path> " + 
5.    "<output path>"); 
6.    System.exit(-1); 
7.    } 
8.    Job job = new Job(); 
9.    job.setJarByClass(MaxTemperature.class); 
10.    FileInputFormat.addInputPath(job, new Path(args[0])); 
11.    FileOutputFormat.setOutputPath(job, new Path(args[1])); 
12.    job.setOutputKeyClass(Text.class); 
13.    job.setOutputValueClass(IntWritable.class); 
14.<span style="color: rgb(255, 0, 0);">    FileOutputFormat.setCompressOutput(job, true); 
15.    FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);</span> 
16.    job.setMapperClass(MaxTemperatureMapper.class); 
17.    job.setCombinerClass(MaxTemperatureReducer.class); 
18.    job.setReducerClass(MaxTemperatureReducer.class); 

Java代码 复制代码 收藏代码
1.<span style="white-space: normal;"><pre class="java" name="code">System.exit(job.waitForCompletion(true) ? 0 : 1);</pre> 
2.</span> 
3.    } 
4.} 

复制代码收藏代码

   对于SequenceFile格式

   默认压缩类型是RECORD,建议更改为BLOCk,压缩一组record

   SqeuenceFileOutputFormat.setOutputCompressionType()设置这个压缩类型


MapReduce压缩属性汇总


Property name Type Default value Description
mapred.output.compress boolean false Compress outputs
mapred.output.compression.codec Class name org.apache.hadoop.io.compress.DefaultCodec The compression codec to use for outputs
mapred.output.compression.type String RECORD Type type of compression to use for SequenceFile outputs:NONE,RECORD,or BLOCK.

Map阶段压缩输出(即中间结果是否压缩)

建议使用性能比较好的LZO、Snappy

Map 输出压缩属性汇总
Property name Type Default value Description
mapred.compress.map.output boolean false Compress map outputs
mapred.map.output.compression.codec Class org.apache.hadoop.io.compress.DefaultCodec The compression codec to use for map outputs.
Map阶段使用压缩样例

Java代码 复制代码 收藏代码
1.Configuration conf = new Configuration(); 
2.conf.setBoolean("mapred.compress.map.output", true); 
3.conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, 
4.CompressionCodec.class); 
5.Job job = new Job(conf); 

 
发表评论
用户名: 匿名