HappyLifeLife.com
HappyLifeLife.com
HappyLifeLife.com 登录 HappyLifeLife.com 注册 HappyLifeLife.com
爱新闻 爱生活
爱分享 爱学习
爱读书 爱探索
爱音乐 爱宇宙
爱电影 爱地球
爱阅读 爱世界
爱运动 爱科技
资料宝库

Hadoop多文件输出
HappyLifeLife.com HappyLifeLife.com HappyLifeLife.com HappyLifeLife.com HappyLifeLife.com HappyLifeLife.com HappyLifeLife.com HappyLifeLife.com HappyLifeLife.com HappyLifeLife.com
关健词: Hadoo 输出
在旧的mapred包里面的,Hadoop直接支持多文件输出
static class newMultipleTextOutputFormat extends MultipleTextOutputFormat<Text, Text>{
protected String generateFileNameForKeyValue (Text key,Text content, String name)
{
return key.toString().split("\t")[1];//直接以这个名字为文件名
}
}
最后在driver中设置conf.setOutputFormat(newMultipleTextOutputFormat.class);
在新的mapreduce包里面,用以下方式解决:(zz:http://blog.csdn.net/inkfish )
Hadoop默认的输出是TextOutputFormat,输出文件名不可定制。hadoop 0.19.X中有一个org.apache.hadoop.mapred.lib.MultipleOutputFormat,可以输出多份文件且可以自定义文件名,但是从hadoop 0.20.x中MultipleOutputFormat所在包的所有类被标记为“已过时”,当前如果再使用MultipleOutputFormat,在将来版本的hadoop中可能无法使用。本篇文章中,我们自己实现一个简单的MultipleOutputFormat,并修改hadoop自带的 WordCount示例程序来测试结果。
环境:
  Ubuntu 8.0.4 Server 32bit
  Hadoop 0.20.1
  JDK 1.6.0_16-b01
  Eclipse 3.5
所有代码分为3个类:
1.LineRecordWriter:
  RecordWriter的一个实现,用于把<Key, Value>转化为一行文本。在Hadoop中,这个类作为TextOutputFormat的一个子类存在,protected访问权限,因此普通程序无法访问。这里仅仅是把LineRecordWriter从TextOutputFormat抽取出来,作为一个独立的公共类使用。
view plaincopy to clipboardprint?
package inkfish.hadoop.study;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class LineRecordWriter<K, V> extends RecordWriter<K, V> {
private static final String utf8 = "UTF-8";
private static final byte[] newline;
static {
try {
newline = "\n".getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
}
protected DataOutputStream out;
private final byte[] keyValueSeparator;
public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
this.out = out;
try {
this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
}
public LineRecordWriter(DataOutputStream out) {
this(out, "\t");
}
private void writeObject(Object o) throws IOException {
if (o instanceof Text) {
Text to = (Text) o;
out.write(to.getBytes(), 0, to.getLength());
} else {
out.write(o.toString().getBytes(utf8));
}
}
public synchronized void write(K key, V value) throws IOException {
boolean nullKey = key == null || key instanceof NullWritable;
boolean nullValue = value == null || value instanceof NullWritable;
if (nullKey && nullValue) {
return;
}
if (!nullKey) {
writeObject(key);
}
if (!(nullKey || nullValue)) {
out.write(keyValueSeparator);
}
if (!nullValue) {
writeObject(value);
}
out.write(newline);
}
public synchronized void close(TaskAttemptContext context) throws IOException {
out.close();
}
}
2.MultipleOutputFormat:
  抽象类,主要参考org.apache.hadoop.mapred.lib.MultipleOutputFormat。子类唯一需要实现的方法是:String generateFileNameForKeyValue(K key, V value, Configuration conf),即通过key和value及conf配置信息决定文件名(含扩展名)。
view plaincopy to clipboardprint?
package inkfish.hadoop.study;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
public abstract class MultipleOutputFormat<K extends WritableComparable<?>, V extends Writable>
extends FileOutputFormat<K, V> {
private MultiRecordWriter writer = null;
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException,
InterruptedException {
if (writer == null) {
writer = new MultiRecordWriter(job, getTaskOutputPath(job));
}
return writer;
}
private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException {
Path workPath = null;
OutputCommitter committer = super.getOutputCommitter(conf);
if (committer instanceof FileOutputCommitter) {
workPath = ((FileOutputCommitter) committer).getWorkPath();
} else {
Path outputPath = super.getOutputPath(conf);
if (outputPath == null) {
throw new IOException("Undefined job output-path");
}
workPath = outputPath;
}
return workPath;
}
protected abstract String generateFileNameForKeyValue(K key, V value, Configuration conf);
public class MultiRecordWriter extends RecordWriter<K, V> {
private HashMap<String, RecordWriter<K, V>> recordWriters = null;
private TaskAttemptContext job = null;
private Path workPath = null;
public MultiRecordWriter(TaskAttemptContext job, Path workPath) {
super();
this.job = job;
this.workPath = workPath;
recordWriters = new HashMap<String, RecordWriter<K, V>>();
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
Iterator<RecordWriter<K, V>> values = this.recordWriters.values().iterator();
while (values.hasNext()) {
values.next().close(context);
}
this.recordWriters.clear();
}
@Override
public void write(K key, V value) throws IOException, InterruptedException {
String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration());
RecordWriter<K, V> rw = this.recordWriters.get(baseName);
if (rw == null) {
rw = getBaseRecordWriter(job, baseName);
this.recordWriters.put(baseName, rw);
}
rw.write(key, value);
}
private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job, String baseName)
throws IOException, InterruptedException {
Configuration conf = job.getConfiguration();
boolean isCompressed = getCompressOutput(job);
String keyValueSeparator = ",";
RecordWriter<K, V> recordWriter = null;
if (isCompressed) {
Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
GzipCodec.class);
CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);
Path file = new Path(workPath, baseName + codec.getDefaultExtension());
FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);
recordWriter = new LineRecordWriter<K, V>(new DataOutputStream(codec
.createOutputStream(fileOut)), keyValueSeparator);
} else {
Path file = new Path(workPath, baseName);
FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);
recordWriter = new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
}
return recordWriter;
}
}
}
3.WordCount:
基本上维持hadoop示例中的WordCount原样,主要增加一个静态内部类AlphabetOutputFormat,这个类实现了 MultipleOutputFormat,文件命名规则是:以英文字母开头的单词以“首字母.txt”为文件名保存,其他以“other.txt”保存。这里只列出AlphabetOutputFormat
public static class AlphabetOutputFormat extends MultipleOutputFormat<Text, IntWritable> {
@Override
protected String generateFileNameForKeyValue(Text key, IntWritable value, Configuration conf) {
char c = key.toString().toLowerCase().charAt(0);
if (c >= 'a' && c <= 'z') {
return c + ".txt";
}
return "other.txt";
}
}
HappyLifeLife.com HappyLifeLife.com HappyLifeLife.com HappyLifeLife.com HappyLifeLife.com HappyLifeLife.com HappyLifeLife.com HappyLifeLife.com HappyLifeLife.com HappyLifeLife.com
首篇 上一篇 总共539篇,当前第78 下一篇 末篇
天天快乐生活[HappyLifeLife.com]
欢迎来访 快乐空间 热点新闻 我的分享 读书频道 七彩生活 精彩世界 快乐搜索 
ICP备15040518 | ©1999-2018 HappyLiveLife.com 版权所有 | 服务 | 爱新闻 | 爱分享 | 在线搜索 | 招贤纳士
欢迎来访 快乐空间 热点新闻 我的分享 读书频道 七彩生活 精彩世界 快乐搜索 
ICP备15040518 | ©1999-2018 HappyLiveLife.com 版权所有 | 服务 | 爱新闻 | 爱分享 | 在线搜索 | 招贤纳士