tonglin0325的个人主页

MapReduce中的OutputFormat

OutputFormat在hadoop源码中是一个抽象类 public abstract class OutputFormat<K, V>,其定义了reduce任务的输出格式

1
2
https://github.com/apache/hadoop/blob/master/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputFormat.java

可以参考文章

MapReduce快速入门系列(12) | MapReduce之OutputFormat

 

常用的OutputFormat可以查看源码

1
2
https://github.com/apache/hadoop/tree/master/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output

1.文本输出TextOutputFormat,是hadoop的默认实现输出功能的类

1
2
https://github.com/apache/hadoop/blob/master/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java

TextOutputFormat实现了FileOutputFormat,**FileOutputFormat**也一个抽象类,是OutputFormat的子类,源码在

1
2
https://github.com/apache/hadoop/blob/release-2.6.0/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java

****其中一个比较重要的是 RecordWriter接口,其中有两个方法,一个是write方法,另一个是close方法

1
2
https://github.com/apache/hadoop/blob/master/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/RecordWriter.java

源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public interface RecordWriter<K, V> {
/**
* Writes a key/value pair.
*
* @param key the key to write.
* @param value the value to write.
* @throws IOException
*/
void write(K key, V value) throws IOException;

/**
* Close this `RecordWriter` to future operations.
*
* @param reporter facility to report progress.
* @throws IOException
*/
void close(Reporter reporter) throws IOException;
}

在 TextOutputFormat 实现类中如下实现了** ****LineRecordWriter<K, V>**,源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
protected static class LineRecordWriter<K, V>
extends RecordWriter<K, V> {
private static final String utf8 = "UTF-8";
private static final byte[] newline;
static {
try {
newline = "\n".getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
}

protected DataOutputStream out;
private final byte[] keyValueSeparator;

public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
this.out = out;
try {
this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
}

public LineRecordWriter(DataOutputStream out) {
this(out, "\t");
}

/**
* Write the object to the byte stream, handling Text as a special
* case.
* @param o the object to print
* @throws IOException if the write throws, we pass it on
*/
private void writeObject(Object o) throws IOException {
if (o instanceof Text) {
Text to = (Text) o;
out.write(to.getBytes(), 0, to.getLength());
} else {
out.write(o.toString().getBytes(utf8));
}
}

public synchronized void write(K key, V value)
throws IOException {

boolean nullKey = key == null || key instanceof NullWritable;
boolean nullValue = value == null || value instanceof NullWritable;
if (nullKey &amp;&amp; nullValue) {
return;
}
if (!nullKey) {
writeObject(key);
}
if (!(nullKey || nullValue)) {
out.write(keyValueSeparator);
}
if (!nullValue) {
writeObject(value);
}
out.write(newline);
}

public synchronized
void close(TaskAttemptContext context) throws IOException {
out.close();
}
}

** **另外一个比较重要是 getRecordWriter 抽象方法,当实现 **FileOutputFormat抽象类 **的时候需要实现这个方法,从job当中获取 RecordWriter<K, V>

1
2
3
4
public abstract RecordWriter<K, V> 
getRecordWriter(TaskAttemptContext job
) throws IOException, InterruptedException;

在 TextOutputFormat 实现类中如下实现了 getRecordWriter

其中使用了LineRecordWriter,源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public RecordWriter<K, V> 
getRecordWriter(TaskAttemptContext job
) throws IOException, InterruptedException {
Configuration conf = job.getConfiguration();
boolean isCompressed = getCompressOutput(job);
String keyValueSeparator= conf.get(SEPERATOR, "\t");
CompressionCodec codec = null;
String extension = "";
if (isCompressed) {
Class<? extends CompressionCodec> codecClass =
getOutputCompressorClass(job, GzipCodec.class);
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
extension = codec.getDefaultExtension();
}
Path file = getDefaultWorkFile(job, extension);
FileSystem fs = file.getFileSystem(conf);
if (!isCompressed) {
FSDataOutputStream fileOut = fs.create(file, false);
return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
} else {
FSDataOutputStream fileOut = fs.create(file, false);
return new LineRecordWriter<K, V>(new DataOutputStream
(codec.createOutputStream(fileOut)),
keyValueSeparator);
}
}

 

2.二进制输出SequenceFileOutputFormat

1
2
https://github.com/apache/hadoop/blob/master/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java

SequenceFileOutputFormatTextOutputFormat一样,同样实现了FileOutputFormat

其中一个比较重要的是 getSequenceWriter方法,返回二进制文件的Writer

1
2
https://github.com/apache/hadoop/blob/master/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java

源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
protected SequenceFile.Writer getSequenceWriter(TaskAttemptContext context,
Class<?> keyClass, Class<?> valueClass)
throws IOException {
Configuration conf = context.getConfiguration();

CompressionCodec codec = null;
CompressionType compressionType = CompressionType.NONE;
if (getCompressOutput(context)) {
// find the kind of compression to do
compressionType = getOutputCompressionType(context);
// find the right codec
Class<?> codecClass = getOutputCompressorClass(context,
DefaultCodec.class);
codec = (CompressionCodec)
ReflectionUtils.newInstance(codecClass, conf);
}
// get the path of the temporary output file
Path file = getDefaultWorkFile(context, "");
FileSystem fs = file.getFileSystem(conf);
return SequenceFile.createWriter(fs, conf, file,
keyClass,
valueClass,
compressionType,
codec,
context);
}

** **另外一个比较重要是 getRecordWriter 抽象方法,源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public RecordWriter<K, V> 
getRecordWriter(TaskAttemptContext context
) throws IOException, InterruptedException {
final SequenceFile.Writer out = getSequenceWriter(context,
context.getOutputKeyClass(), context.getOutputValueClass());

return new RecordWriter<K, V>() {

public void write(K key, V value)
throws IOException {

out.append(key, value);
}

public void close(TaskAttemptContext context) throws IOException {
out.close();
}
};
}

  

3.用于输出thrift对象的Parquet文件ParquetThriftOutputFormat,参考项目:adobe-research/spark-parquet-thrift-example

1
2
https://github.com/adobe-research/spark-parquet-thrift-example/blob/master/src/main/scala/SparkParquetThriftApp.scala

代码

1
2
3
4
5
6
7
8
9
10
11
12
ParquetThriftOutputFormat.setThriftClass(job, classOf[SampleThriftObject])
ParquetOutputFormat.setWriteSupportClass(job, classOf[SampleThriftObject])
sc.parallelize(sampleData)
.map(obj => (null, obj))
.saveAsNewAPIHadoopFile(
parquetStore,
classOf[Void],
classOf[SampleThriftObject],
classOf[ParquetThriftOutputFormat[SampleThriftObject]],
job.getConfiguration
)

 

4.用于文本格式hive表的HiveIgnoreKeyTextOutputFormat

1
2
https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java

其实现了 HiveOutputFormat

1
2
https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormat.java

HiveIgnoreKeyTextOutputFormat的key为null,源码

1
2
3
4
5
@Override
public synchronized void write(K key, V value) throws IOException {
this.mWriter.write(null, value);
}