此前公司用MR程序解析json,将结果以text file保存在hive表的目录下。随着数据量增大,text file的性能逐渐跟不上,需要修改代码将文件格式修改成parquet。
以下是以parquet保存结果的demo。
将文本中的每行以空格分隔,第一列作为id(int),第二列作为name(string),直接保存到指定目录。
[root@kudu1 job]# cat test.txt 1 xiaoming 2 hanmeimei 3 kangkang 4 maria 5 yokiko 6 michael
以下代码只有map程序,没有reduce。以parquet格式输出步骤为:
parquet的数据类型有INT64, INT32, BOOLEAN, BINARY, FLOAT, DOUBLE, INT96, FIXED_LEN_BYTE_ARRAY,其中INT64对标Java的Long;INT32对标int;BINARY对标String,但是需要指定编码格式。
String writeSchema = "message example {\n optional INT32 id;\n optional binary name (UTF8);\n}";
// 配置MR的configuration Configuration configuration = new Configuration(this.getConf()); configuration.set("mapreduce.input.fileinputformat.split.minsize","2147483648"); configuration.set("parquet.example.schema",writeSchema);
parquet是列式存储,不同列的同一行表示为一个group。可以理解为一行就是一个group。每次map都是以group格式写入parquet文件的。
job.setMapOutputValueClass(Group.class);//TODO 设置value是parquet的Group
LazyOutputFormat.setOutputFormatClass(job, ExampleOutputFormat.class); MultipleOutputs.addNamedOutput(job, "output", ExampleOutputFormat.class, NullWritable.class,Group.class);
//map类中定义 factory属性 private SimpleGroupFactory factory; //setup初始化方法中创建SimpleGroupFactory factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(context.getConfiguration()));
String[] strings = value.toString().split(" "); //TODO 创建group Group group = factory.newGroup(); //TODO 为group赋值 group.add("id",Integer.valueOf(strings[0])); group.add("name",strings[1]); //TODO 写出 mos.write("output", null, group);
以下为完整代码
package com.zixuan.hadoop; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.parquet.example.data.Group; import org.apache.parquet.example.data.simple.SimpleGroupFactory; import org.apache.parquet.hadoop.example.GroupWriteSupport; import org.apache.parquet.hadoop.example.ExampleOutputFormat; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.schema.MessageTypeParser; import java.io.IOException; import java.util.Random; public class MrSavedAsParquet extends Configured implements Tool { public static void main(String[] args) throws Exception { int ret = ToolRunner.run(new MrSavedAsParquet(), args); System.exit(ret); } public int run(String[] strings) throws Exception { String inputDir = "/user/hive/warehouse/inputdirTest"; String outputDir = "/user/hive/warehouse/ods.db/usertest"; //为parquet生成schema String writeSchema = "message example {\n optional INT32 id;\n optional binary name (UTF8);\n}"; // 配置MR的configuration Configuration configuration = new Configuration(this.getConf()); configuration.set("mapreduce.input.fileinputformat.split.minsize","2147483648"); configuration.set("parquet.example.schema",writeSchema); Job job = new Job(configuration,"UserTest"); //配置parquet ExampleOutputFormat.setSchema(job, MessageTypeParser.parseMessageType(writeSchema)); ExampleOutputFormat.setCompression(job, CompressionCodecName.SNAPPY); ExampleOutputFormat.setOutputPath(job, new Path(outputDir)); //配置Job的基本信息 job.setJarByClass(MrSavedAsParquet.class); job.setMapperClass(MapTest.class); job.setInputFormatClass(TextInputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Group.class);//TODO 设置value是parquet的Group MultipleInputs.addInputPath(job,new Path(inputDir), TextInputFormat.class,MapTest.class); job.setNumReduceTasks(0); //TODO 设置输出格式是parquet LazyOutputFormat.setOutputFormatClass(job, ExampleOutputFormat.class); MultipleOutputs.addNamedOutput(job, "output", ExampleOutputFormat.class, NullWritable.class,Group.class); FileSystem fileSystem = FileSystem.get(configuration); if ( ! fileSystem.exists(new Path(inputDir))){ System.out.print("input path does not exist!"); return 1; } int ret = job.waitForCompletion(true) ? 0 : 1; return ret; } public static class MapTest extends Mapper<LongWritable, Text, NullWritable, Group> { // 多目录输出 private MultipleOutputs<NullWritable, Group> mos; // 多目录输出 //定义用于创建group的工厂类 private SimpleGroupFactory factory; //初始化,创建mos和factory @Override public void setup(Context context) throws IOException, InterruptedException { mos = new MultipleOutputs<NullWritable,Group >(context);// 初始化mos factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(context.getConfiguration())); } /* LongWritable key是文件偏移量 Text value是每行的数据 Context context是上下文对象,可以获取conf中的配置项 */ @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] strings = value.toString().split(" "); //TODO 创建group Group group = factory.newGroup(); //TODO 为group赋值 group.add("id",Integer.valueOf(strings[0])); group.add("name",strings[1]); //TODO 写出 mos.write("output", null, group); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { mos.close(); } } }
文件已经保存到/user/hive/warehouse/ods.db/usertest目录下,创建ods.usertest的hive表:
CREATE TABLE ods.usertest ( id int, name string ) stored as parquet;
查询:
hive> select * from ods.usertest; OK 1 xiaoming 2 hanmeimei 3 kangkang 4 maria 5 yokiko 6 michael Time taken: 0.053 seconds, Fetched: 6 row(s)
下一个:卖宠物粮的用品说说(宠物粮卖点)