Hadoop MapReduce多输出
FileOutputFormat及其子类产生的文件放在输出目录下。每个reducer一个文件并且文件由分区号命名:part-r-00000,part-r-00001,等等。有时可能要对输出的文件名进行控制或让每个reducer输出多个文件。MapReduce为此提供了MultipleOutputFormat类。
MultipleOutputFormat类可以将数据写到多个文件,这些文件的名称源于输出的键和值或者任意字符串。这允许每个reducer(或者只有map作业的mapper)创建多个文件。采用name-r-nnnnn形式的文件名用于map输出,name-r-nnnnn形式的文件名用于reduce输出,其中name是由程序设定的任意名字,nnnnn是一个指名块号的整数(从0开始)。块号保证从不同块(mapper或者reducer)写的输出在相同名字情况下不会冲突。
1. 重定义输出文件名
我们可以对输出的文件名进行控制。考虑这样一个需求:按男女性别来区分度假订单数据。这需要运行一个作业,作业的输出是男女各一个文件,此文件包含男女性别的所有数据记录。
这个需求可以使用MultipleOutputs来实现:
package com.sjf.open.test; import java.io.IOException; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.JobPriority; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import com.sjf.open.utils.ConfigUtil; /** * Created by xiaosi on 16-11-7. */ public class VacationOrderBySex extends Configured implements Tool { public static void main(String[] args) throws Exception { int status = ToolRunner.run(new VacationOrderBySex(), args); System.exit(status); } public static class VacationOrderBySexMapper extends Mapper<LongWritable, Text, Text, Text> { public String fInputPath = ""; @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); fInputPath = ((FileSplit) context.getInputSplit()).getPath().toString(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); if(fInputPath.contains("vacation_hot_country_order")){ String[] params = line.split("\t"); String sex = params[2]; if(StringUtils.isBlank(sex)){ return; } context.write(new Text(sex.toLowerCase()), value); } } } public static class VacationOrderBySexReducer extends Reducer<Text, Text, NullWritable, Text> { private MultipleOutputs<NullWritable, Text> multipleOutputs; @Override protected void setup(Context context) throws IOException, InterruptedException { multipleOutputs = new MultipleOutputs<NullWritable, Text>(context); } @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text value : values) { multipleOutputs.write(NullWritable.get(), value, key.toString()); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { multipleOutputs.close(); } } @Override public int run(String[] args) throws Exception { if (args.length != 2) { System.err.println("./run <input> <output>"); System.exit(1); } String inputPath = args[0]; String outputPath = args[1]; int numReduceTasks = 16; Configuration conf = this.getConf(); conf.setBoolean("mapred.output.compress", true); conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class); Job job = Job.getInstance(conf); job.setJobName("vacation_order_by_jifeng.si"); job.setJarByClass(VacationOrderBySex.class); job.setMapperClass(VacationOrderBySexMapper.class); job.setReducerClass(VacationOrderBySexReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, new Path(outputPath)); job.setNumReduceTasks(numReduceTasks); boolean success = job.waitForCompletion(true); return success "htmlcode">-rw-r--r-- 3 wirelessdev wirelessdev 0 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/_SUCCESS -rw-r--r-- 3 wirelessdev wirelessdev 88574 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/f-r-00005.gz -rw-r--r-- 3 wirelessdev wirelessdev 60965 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/m-r-00012.gz -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00000.gz -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00001.gz -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00002.gz -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00003.gz -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00004.gz -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00005.gz -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00006.gz -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00007.gz -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00008.gz我们可以看到在输出文件中不仅有我们想要的输出文件类型,还有part-r-nnnnn形式的文件,但是文件内没有信息,这是程序默认的输出文件。所以我们在指定输出文件名称时(name-r-nnnnn),不要指定name为part,因为它已经被使用为默认值了。
2. 多目录输出
在MultipleOutputs的write()方法中指定的基本路径相对于输出路径进行解释,因为它可以包含文件路径分隔符(/),创建任意深度的子目录。例如,我们改动上面的需求:按男女性别来区分度假订单数据,不同性别数据位于不同子目录(例如:sex=f/part-r-00000)。
public static class VacationOrderBySexReducer extends Reducer<Text, Text, NullWritable, Text> { private MultipleOutputs<NullWritable, Text> multipleOutputs; @Override protected void setup(Context context) throws IOException, InterruptedException { multipleOutputs = new MultipleOutputs<NullWritable, Text>(context); } @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text value : values) { String basePath = String.format("sex=%s/part", key.toString()); multipleOutputs.write(NullWritable.get(), value, basePath); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { multipleOutputs.close(); } }后产生的输出名称的形式为sex=f/part-r-nnnnn或者sex=m/part-r-nnnnn:
-rw-r--r-- 3 wirelessdev wirelessdev 0 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/_SUCCESS -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00000.gz -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00001.gz -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00002.gz -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00003.gz -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00004.gz -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00005.gz -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00006.gz -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00007.gz drwxr-xr-x - wirelessdev wirelessdev 0 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/sex=f drwxr-xr-x - wirelessdev wirelessdev 0 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/sex=m"htmlcode">
Configuration conf = this.getConf(); Job job = Job.getInstance(conf); LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);再次检查一下我们的输出文件(第一个例子):
sudo -uwirelessdev hadoop fs -ls tmp/data_group/order/vacation_hot_country_order_by_sex/ Found 3 items -rw-r--r-- 3 wirelessdev wirelessdev 0 2016-12-06 13:36 tmp/data_group/order/vacation_hot_country_order_by_sex/_SUCCESS -rw-r--r-- 3 wirelessdev wirelessdev 88574 2016-12-06 13:36 tmp/data_group/order/vacation_hot_country_order_by_sex/f-r-00005.gz -rw-r--r-- 3 wirelessdev wirelessdev 60965 2016-12-06 13:36 tmp/data_group/order/vacation_hot_country_order_by_sex/m-r-00012.gz感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!
免责声明:本站文章均来自网站采集或用户投稿,网站不提供任何软件下载或自行开发的软件! 如有用户或公司发现本站内容信息存在侵权行为,请邮件告知! 858582#qq.com
《魔兽世界》大逃杀!60人新游玩模式《强袭风暴》3月21日上线
暴雪近日发布了《魔兽世界》10.2.6 更新内容,新游玩模式《强袭风暴》即将于3月21 日在亚服上线,届时玩家将前往阿拉希高地展开一场 60 人大逃杀对战。
艾泽拉斯的冒险者已经征服了艾泽拉斯的大地及遥远的彼岸。他们在对抗世界上最致命的敌人时展现出过人的手腕,并且成功阻止终结宇宙等级的威胁。当他们在为即将于《魔兽世界》资料片《地心之战》中来袭的萨拉塔斯势力做战斗准备时,他们还需要在熟悉的阿拉希高地面对一个全新的敌人──那就是彼此。在《巨龙崛起》10.2.6 更新的《强袭风暴》中,玩家将会进入一个全新的海盗主题大逃杀式限时活动,其中包含极高的风险和史诗级的奖励。
《强袭风暴》不是普通的战场,作为一个独立于主游戏之外的活动,玩家可以用大逃杀的风格来体验《魔兽世界》,不分职业、不分装备(除了你在赛局中捡到的),光是技巧和战略的强弱之分就能决定出谁才是能坚持到最后的赢家。本次活动将会开放单人和双人模式,玩家在加入海盗主题的预赛大厅区域前,可以从强袭风暴角色画面新增好友。游玩游戏将可以累计名望轨迹,《巨龙崛起》和《魔兽世界:巫妖王之怒 经典版》的玩家都可以获得奖励。
更新日志
- 雨林唱片《赏》新曲+精选集SACD版[ISO][2.3G]
- 罗大佑与OK男女合唱团.1995-再会吧!素兰【音乐工厂】【WAV+CUE】
- 草蜢.1993-宝贝对不起(国)【宝丽金】【WAV+CUE】
- 杨培安.2009-抒·情(EP)【擎天娱乐】【WAV+CUE】
- 周慧敏《EndlessDream》[WAV+CUE]
- 彭芳《纯色角3》2007[WAV+CUE]
- 江志丰2008-今生为你[豪记][WAV+CUE]
- 罗大佑1994《恋曲2000》音乐工厂[WAV+CUE][1G]
- 群星《一首歌一个故事》赵英俊某些作品重唱企划[FLAC分轨][1G]
- 群星《网易云英文歌曲播放量TOP100》[MP3][1G]
- 方大同.2024-梦想家TheDreamer【赋音乐】【FLAC分轨】
- 李慧珍.2007-爱死了【华谊兄弟】【WAV+CUE】
- 王大文.2019-国际太空站【环球】【FLAC分轨】
- 群星《2022超好听的十倍音质网络歌曲(163)》U盘音乐[WAV分轨][1.1G]
- 童丽《啼笑姻缘》头版限量编号24K金碟[低速原抓WAV+CUE][1.1G]