OutputFormat在hadoop源码中是一个抽象类 public abstract class OutputFormat<K, V>,其定义了reduce任务的输出格式
1 2 https://github.com/apache/hadoop/blob/master/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputFormat.java
可以参考文章
MapReduce快速入门系列(12) | MapReduce之OutputFormat
常用的OutputFormat 可以查看源码
1 2 https://github.com/apache/hadoop/tree/master/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output
1.文本输出TextOutputFormat ,是hadoop的默认实现输出功能的类
1 2 https://github.com/apache/hadoop/blob/master/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java
TextOutputFormat 实现了FileOutputFormat ,**FileOutputFormat** 也一个抽象类,是OutputFormat的子类,源码在
1 2 https://github.com/apache/hadoop/blob/release-2.6.0/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
****其中一个比较重要的是 RecordWriter接口,其中有两个方法,一个是write方法,另一个是close方法
1 2 https://github.com/apache/hadoop/blob/master/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/RecordWriter.java
源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public interface RecordWriter<K, V> { /** * Writes a key/value pair. * * @param key the key to write. * @param value the value to write. * @throws IOException */ void write(K key, V value) throws IOException; /** * Close this `RecordWriter` to future operations. * * @param reporter facility to report progress. * @throws IOException */ void close(Reporter reporter) throws IOException; }
在 TextOutputFormat 实现类中如下实现了** ****LineRecordWriter<K, V>**,源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 protected static class LineRecordWriter<K, V> extends RecordWriter<K, V> { private static final String utf8 = "UTF-8"; private static final byte[] newline; static { try { newline = "\n".getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } protected DataOutputStream out; private final byte[] keyValueSeparator; public LineRecordWriter(DataOutputStream out, String keyValueSeparator) { this.out = out; try { this.keyValueSeparator = keyValueSeparator.getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } public LineRecordWriter(DataOutputStream out) { this(out, "\t"); } /** * Write the object to the byte stream, handling Text as a special * case. * @param o the object to print * @throws IOException if the write throws, we pass it on */ private void writeObject(Object o) throws IOException { if (o instanceof Text) { Text to = (Text) o; out.write(to.getBytes(), 0, to.getLength()); } else { out.write(o.toString().getBytes(utf8)); } } public synchronized void write(K key, V value) throws IOException { boolean nullKey = key == null || key instanceof NullWritable; boolean nullValue = value == null || value instanceof NullWritable; if (nullKey && nullValue) { return; } if (!nullKey) { writeObject(key); } if (!(nullKey || nullValue)) { out.write(keyValueSeparator); } if (!nullValue) { writeObject(value); } out.write(newline); } public synchronized void close(TaskAttemptContext context) throws IOException { out.close(); } }
** **另外一个比较重要是 getRecordWriter 抽象方法,当实现 **FileOutputFormat抽象类 **的时候需要实现这个方法,从job当中获取 RecordWriter<K, V>
1 2 3 4 public abstract RecordWriter<K, V> getRecordWriter(TaskAttemptContext job ) throws IOException, InterruptedException;
在 TextOutputFormat 实现类中如下实现了 getRecordWriter ,
其中使用了LineRecordWriter ,源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job ) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); boolean isCompressed = getCompressOutput(job); String keyValueSeparator= conf.get(SEPERATOR, "\t"); CompressionCodec codec = null; String extension = ""; if (isCompressed) { Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class); codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); extension = codec.getDefaultExtension(); } Path file = getDefaultWorkFile(job, extension); FileSystem fs = file.getFileSystem(conf); if (!isCompressed) { FSDataOutputStream fileOut = fs.create(file, false); return new LineRecordWriter<K, V>(fileOut, keyValueSeparator); } else { FSDataOutputStream fileOut = fs.create(file, false); return new LineRecordWriter<K, V>(new DataOutputStream (codec.createOutputStream(fileOut)), keyValueSeparator); } }
2.二进制输出SequenceFileOutputFormat
1 2 https://github.com/apache/hadoop/blob/master/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java
SequenceFileOutputFormat 和TextOutputFormat 一样,同样实现了FileOutputFormat
其中一个比较重要的是 getSequenceWriter方法 ,返回二进制文件的Writer
1 2 https://github.com/apache/hadoop/blob/master/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java
源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 protected SequenceFile.Writer getSequenceWriter(TaskAttemptContext context, Class<?> keyClass, Class<?> valueClass) throws IOException { Configuration conf = context.getConfiguration(); CompressionCodec codec = null; CompressionType compressionType = CompressionType.NONE; if (getCompressOutput(context)) { // find the kind of compression to do compressionType = getOutputCompressionType(context); // find the right codec Class<?> codecClass = getOutputCompressorClass(context, DefaultCodec.class); codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); } // get the path of the temporary output file Path file = getDefaultWorkFile(context, ""); FileSystem fs = file.getFileSystem(conf); return SequenceFile.createWriter(fs, conf, file, keyClass, valueClass, compressionType, codec, context); }
** **另外一个比较重要是 getRecordWriter 抽象方法,源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context ) throws IOException, InterruptedException { final SequenceFile.Writer out = getSequenceWriter(context, context.getOutputKeyClass(), context.getOutputValueClass()); return new RecordWriter<K, V>() { public void write(K key, V value) throws IOException { out.append(key, value); } public void close(TaskAttemptContext context) throws IOException { out.close(); } }; }
3.用于输出thrift对象的Parquet文件 的ParquetThriftOutputFormat ,参考项目:adobe-research /spark-parquet-thrift-example
1 2 https://github.com/adobe-research/spark-parquet-thrift-example/blob/master/src/main/scala/SparkParquetThriftApp.scala
代码
1 2 3 4 5 6 7 8 9 10 11 12 ParquetThriftOutputFormat.setThriftClass(job, classOf[SampleThriftObject]) ParquetOutputFormat.setWriteSupportClass(job, classOf[SampleThriftObject]) sc.parallelize(sampleData) .map(obj => (null, obj)) .saveAsNewAPIHadoopFile( parquetStore, classOf[Void], classOf[SampleThriftObject], classOf[ParquetThriftOutputFormat[SampleThriftObject]], job.getConfiguration )
4.用于文本格式hive表的HiveIgnoreKeyTextOutputFormat
1 2 https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java
其实现了 HiveOutputFormat
1 2 https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormat.java
HiveIgnoreKeyTextOutputFormat的key为null,源码
1 2 3 4 5 @Override public synchronized void write(K key, V value) throws IOException { this.mWriter.write(null, value); }