在Hadoop的streaming中使用自定义的inputformat和outputformat澳门金沙30064在线网站

By admin in 计算机教程 on 2019年11月5日

系统默认的LineRecordReader是按照每行的偏移量做为map输出时的key值,每行的内容作为map的value值,默认的分隔符是回车和换行。

Hadoop的streaming中有一个选项是指定输入输出格式化的:

现在要更改map对应的输入的<key,value>值,key对应的文件的路径(或者是文件名),value对应的是文件的内容(content)。

  1. -inputformat TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName Optional.  
  2. -outputformat TextOutputFormat(default)|JavaClassName  Optional.  

那么我们需要重写InputFormat和RecordReader,因为RecordReader是在InputFormat中调用的,当然重写RecordReader才是重点!

但是在0.14版本之后,hadoop不再支持带多个jar包文件,所以,如果要使用自己定义的Inputformat或者Outputformat,就得将对应的class文件加入到hadoop-streaming-1.0.1.jar中去,比如:

下面看代码InputFormat的重写:

  1. jar uf ../../contrib/streaming/hadoop-streaming-1.0.1.jar org/apache/hadoop/streaming/*.class   
  1. public class chDicInputFormat extends FileInputFormat<Text,Text>  
  2.     implements JobConfigurable{  
  3.     private CompressionCodecFactory compressionCodecs = null;  
  4.     public void configure(JobConf conf) {  
  5.         compressionCodecs = new CompressionCodecFactory(conf);  
  6.     }  
  7.     /** 
  8.      * @brief isSplitable 不对文件进行切分,必须对文件整体进行处理 
  9.      * 
  10.      * @param fs 
  11.      * @param file 
  12.      * 
  13.      * @return false 
  14.      */  
  15.     protected boolean isSplitable(FileSystem fs, Path file) {  
  16.     //  CompressionCodec codec = compressionCodecs.getCode(file);
      
  17.         return false;//以文件为单位,每个单位作为一个split,即使单个文件的大小超过了64M,也就是Hadoop一个块得大小,也不进行分片
      
  18.     }  
  19.   
  20.     public RecordReader<Text,Text> getRecordReader(InputSplit genericSplit,  
  21.                             JobConf job, Reporter reporter) throws IOException{  
  22.         reporter.setStatus(genericSplit.toString());  
  23.         return new chDicRecordReader(job,(FileSplit)genericSplit);  
  24.     }  
  25.   
  26. }  

然后在-inputformat后面就可以直接带类名了。

下面来看RecordReader的重写:

下面通过一个例子来说明下,实现Map的输入<key,value>,key为文件名,value为文档的整篇内容:

  1. public class chDicRecordReader implements RecordReader<Text,Text> {  
  2.     private static final Log LOG = LogFactory.getLog(chDicRecordReader.class.getName());  
  3.     private CompressionCodecFactory compressionCodecs = null;  
  4.     private long start;  
  5.     private long pos;  
  6.     private long end;  
  7.     private byte[] buffer;  
  8.     private String keyName;  
  9.     private FSDataInputStream fileIn;  
  10.       
  11.     public chDicRecordReader(Configuration job,FileSplit split) throws IOException{  
  12.         start = split.getStart(); //从中可以看出每个文件是作为一个split的   
  13.         end = split.getLength() + start;  
  14.         final Path path = split.getPath();  
  15.         keyName = path.toString();  
  16.         LOG.info(“filename in hdfs is : ” + keyName);  
  17.         final FileSystem fs = path.getFileSystem(job);  
  18.         fileIn = fs.open(path);  
  19.         fileIn.seek(start);  
  20.         buffer = 澳门金沙30064在线网站,new byte[(int)(end – start)];  
  21.         this.pos = start;  
  22.   
  23.     }  
  24.   
  25.     public Text createKey() {  
  26.         return new Text();  
  27.     }  
  28.   
  29.     public Text createValue() {  
  30.         return new Text();  
  31.     }  
  32.   
  33.     public long getPos() throws IOException{  
  34.         return pos;  
  35.     }  
  36.   
  37.     public float getProgress() {  
  38.         if (start == end) {  
  39.             return 0.0f;  
  40.         } else {  
  41.             return Math.min(1.0f, (pos – start) / (float)(end – start));  
  42.         }  
  43.     }  
  44.   
  45.         public boolean next(Text key, Text value) throws IOException{  
  46.         while(pos < end) {  
  47.             key.set(keyName);  
  48.             value.clear();  
  49.             fileIn.readFully(pos,buffer);  
  50.             value.set(buffer);  
  51.     //      LOG.info(“—内容: ” + value.toString());
      
  52.             pos += buffer.length;  
  53.             LOG.info(“end is : ” + end  + ” pos is : ” + pos);  
  54.             return true;  
  55.         }  
  56.         return false;  
  57.     }  
  58.   
  59.     public void close() throws IOException{  
  60.         if(fileIn != null) {  
  61.             fileIn.close();  
  62.         }  
  63.           
  64.     }  
  65.   
  66. }  

1.定义自己的InputFormat:

通过上面的代码,然后再在main函数中设置InputFormat对应的类,就可以使用这种新的读入格式了。

ContentRecordReder.java

更多Hadoop相关信息见Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13
  1. package org.apache.hadoop.streaming;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. //import org.apache.commons.logging.Log;
      
  6. //import org.apache.commons.logging.LogFactory;
      
  7. import org.apache.hadoop.conf.Configuration;  
  8. import org.apache.hadoop.fs.FSDataInputStream;  
  9. import org.apache.hadoop.fs.FileSystem;  
  10. import org.apache.hadoop.fs.Path;  
  11. import org.apache.hadoop.io.Text;  
  12. import org.apache.hadoop.io.compress.CompressionCodecFactory;  
  13. import org.apache.hadoop.mapred.FileSplit;  
  14. import org.apache.hadoop.mapred.RecordReader;  
  15.   
  16. import com.sun.org.apache.commons.logging.Log;  
  17. import com.sun.org.apache.commons.logging.LogFactory;  
  18.   
  19. public class ContentRecordReder implements RecordReader<Text,Text> {  
  20.     private static final Log LOG = LogFactory.getLog(ContentRecordReder.class.getName());    
  21.     private CompressionCodecFactory compressionCodecs = null;    
  22.     private long start;    
  23.     private long pos;    
  24.     private long end;    
  25.     private byte[] buffer;    
  26.     private String keyName;    
  27.     private FSDataInputStream fileIn;    
  28.         
  29.     public ContentRecordReder(Configuration job,FileSplit split) throws IOException{    
  30.         start = split.getStart(); //从中可以看出每个文件是作为一个split的     
  31.         end = split.getLength() + start;  
  32.         final Path path = split.getPath();  
  33.         keyName = path.toString();    
  34.         LOG.info(“filename in hdfs is : ” + keyName);    
  35.         System.out.println(“filename in hdfs is : ” + keyName);  
  36.         final FileSystem fs = path.getFileSystem(job);    
  37.         fileIn = fs.open(path);    
  38.         fileIn.seek(start);    
  39.         buffer = new byte[(int)(end – start)];    
  40.         this.pos = start;  
  41.   
  42.     }    
  43.     
  44.     public Text createKey() {    
  45.         return new Text();    
  46.     }    
  47.     
  48.     public Text createValue() {    
  49.         return new Text();    
  50.     }    
  51.     
  52.     public long getPos() throws IOException{    
  53.         return pos;    
  54.     }    
  55.     
  56.     public float getProgress() {    
  57.         if (start == end) {    
  58.             return 0.0f;    
  59.         } else {    
  60.             return Math.min(1.0f, (pos – start) / (float)(end – start));    
  61.         }    
  62.     }    
  63.     
  64.     public boolean next(Text key, Text value) throws IOException{    
  65.         while(pos < end) {    
  66.             key.set(keyName);    
  67.             value.clear();    
  68.             fileIn.readFully(pos,buffer);    
  69.             value.set(buffer);    
  70.             LOG.info(“—内容: ” + value.toString());    
  71.             System.out.println(“—内容: ” + value.toString());  
  72.             pos += buffer.length;    
  73.             LOG.info(“end is : ” + end  + ” pos is : ” + pos);    
  74.             return true;    
  75.         }    
  76.         return false;    
  77.     }    
  78.     
  79.     public void close() throws IOException{    
  80.         if(fileIn != null) {    
  81.             fileIn.close();    
  82.         }    
  83.             
  84.     }    
  85. }   

澳门金沙30064在线网站 1

ContentInputFormat.java

  1. package org.apache.hadoop.streaming;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.fs.FileSystem;  
  6. import org.apache.hadoop.fs.Path;  
  7. import org.apache.hadoop.io.Text;  
  8. import org.apache.hadoop.io.compress.CompressionCodecFactory;  
  9. import org.apache.hadoop.mapred.FileSplit;  
  10. import org.apache.hadoop.mapred.JobConf;  
  11. import org.apache.hadoop.mapred.JobConfigurable;  
  12. import org.apache.hadoop.mapred.Reporter;  
  13. import org.apache.hadoop.mapred.InputSplit;  
  14. import org.apache.hadoop.mapred.RecordReader;  
  15. import org.apache.hadoop.mapred.FileInputFormat;  
  16.   
  17. public class ContentInputFormat extends FileInputFormat<Text,Text>{  
  18.     private long mySplitSize = 1024*1024;  
  19.     private CompressionCodecFactory compressionCodecs = null;    
  20.     public void configure(JobConf conf) {    
  21.         compressionCodecs = new CompressionCodecFactory(conf);    
  22.     }  
  23.       
  24.     /**  
  25.      * @brief isSplitable 不对文件进行切分,必须对文件整体进行处理  
  26.      *  
  27.      * @param fs  
  28.      * @param file  
  29.      *  
  30.      * @return false  
  31.      */    
  32.     protected boolean isSplitable(FileSystem fs, Path file) {    
  33.         return false;   
  34.     }    
  35.     
  36.     public RecordReader<Text,Text> getRecordReader(InputSplit genericSplit,    
  37.                             JobConf job, Reporter reporter) throws IOException{    
  38.         reporter.setStatus(genericSplit.toString());    
  39.         ContentRecordReder contentRecordReder = new ContentRecordReder(job,(FileSplit)genericSplit);  
  40.         return (RecordReader<Text, Text>) contentRecordReder;  
  41.     }  
  42.   

2.编译

  1. javac -classpath ~/hadoop-1.0.1/hadoop-core-1.0.1.jar:~/hadoop-1.0.1/lib/*:./con  
  2. tent-record-reader.jar ./*.java -Xlint:deprecation

发表评论

电子邮件地址不会被公开。 必填项已用*标注

网站地图xml地图
Copyright @ 2010-2019 澳门金沙30064在线网站 版权所有