Java源码示例:org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction
示例1
@Test
public void testConfigurableMapper() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
JobConf conf = new JobConf();
conf.set("my.filterPrefix", "Hello");
DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env);
DataSet<Tuple2<IntWritable, Text>> hellos = ds.
flatMap(new HadoopMapFunction<IntWritable, Text, IntWritable, Text>(new ConfigurableMapper(), conf));
String resultPath = tempFolder.newFile().toURI().toString();
hellos.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
env.execute();
String expected = "(2,Hello)\n" +
"(3,Hello world)\n" +
"(4,Hello world, how are you?)\n";
compareResultsByLinesInMemory(expected, resultPath);
}
示例2
@Test
public void testConfigurableMapper() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
JobConf conf = new JobConf();
conf.set("my.filterPrefix", "Hello");
DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env);
DataSet<Tuple2<IntWritable, Text>> hellos = ds.
flatMap(new HadoopMapFunction<IntWritable, Text, IntWritable, Text>(new ConfigurableMapper(), conf));
String resultPath = tempFolder.newFile().toURI().toString();
hellos.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
env.execute();
String expected = "(2,Hello)\n" +
"(3,Hello world)\n" +
"(4,Hello world, how are you?)\n";
compareResultsByLinesInMemory(expected, resultPath);
}
示例3
@Test
public void testConfigurableMapper() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
JobConf conf = new JobConf();
conf.set("my.filterPrefix", "Hello");
DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env);
DataSet<Tuple2<IntWritable, Text>> hellos = ds.
flatMap(new HadoopMapFunction<IntWritable, Text, IntWritable, Text>(new ConfigurableMapper(), conf));
String resultPath = tempFolder.newFile().toURI().toString();
hellos.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
env.execute();
String expected = "(2,Hello)\n" +
"(3,Hello world)\n" +
"(4,Hello world, how are you?)\n";
compareResultsByLinesInMemory(expected, resultPath);
}
示例4
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: WordCount <input path> <result path>");
return;
}
final String inputPath = args[0];
final String outputPath = args[1];
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Set up the Hadoop Input Format
HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, new JobConf());
TextInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new Path(inputPath));
// Create a Flink job with it
DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);
DataSet<Tuple2<Text, LongWritable>> words =
text.flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(new Tokenizer()))
.groupBy(0).reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(new Counter(), new Counter()));
// Set up Hadoop Output Format
HadoopOutputFormat<Text, LongWritable> hadoopOutputFormat =
new HadoopOutputFormat<Text, LongWritable>(new TextOutputFormat<Text, LongWritable>(), new JobConf());
hadoopOutputFormat.getJobConf().set("mapred.textoutputformat.separator", " ");
TextOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf(), new Path(outputPath));
// Output & Execute
words.output(hadoopOutputFormat).setParallelism(1);
env.execute("Hadoop Compat WordCount");
}
示例5
@Test
public void testNonPassingMapper() throws Exception{
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env);
DataSet<Tuple2<IntWritable, Text>> nonPassingFlatMapDs = ds.
flatMap(new HadoopMapFunction<IntWritable, Text, IntWritable, Text>(new NonPassingMapper()));
String resultPath = tempFolder.newFile().toURI().toString();
nonPassingFlatMapDs.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
env.execute();
compareResultsByLinesInMemory("\n", resultPath);
}
示例6
@Test
public void testDataDuplicatingMapper() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env);
DataSet<Tuple2<IntWritable, Text>> duplicatingFlatMapDs = ds.
flatMap(new HadoopMapFunction<IntWritable, Text, IntWritable, Text>(new DuplicatingMapper()));
String resultPath = tempFolder.newFile().toURI().toString();
duplicatingFlatMapDs.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
env.execute();
String expected = "(1,Hi)\n" + "(1,HI)\n" +
"(2,Hello)\n" + "(2,HELLO)\n" +
"(3,Hello world)\n" + "(3,HELLO WORLD)\n" +
"(4,Hello world, how are you?)\n" + "(4,HELLO WORLD, HOW ARE YOU?)\n" +
"(5,I am fine.)\n" + "(5,I AM FINE.)\n" +
"(6,Luke Skywalker)\n" + "(6,LUKE SKYWALKER)\n" +
"(7,Comment#1)\n" + "(7,COMMENT#1)\n" +
"(8,Comment#2)\n" + "(8,COMMENT#2)\n" +
"(9,Comment#3)\n" + "(9,COMMENT#3)\n" +
"(10,Comment#4)\n" + "(10,COMMENT#4)\n" +
"(11,Comment#5)\n" + "(11,COMMENT#5)\n" +
"(12,Comment#6)\n" + "(12,COMMENT#6)\n" +
"(13,Comment#7)\n" + "(13,COMMENT#7)\n" +
"(14,Comment#8)\n" + "(14,COMMENT#8)\n" +
"(15,Comment#9)\n" + "(15,COMMENT#9)\n" +
"(16,Comment#10)\n" + "(16,COMMENT#10)\n" +
"(17,Comment#11)\n" + "(17,COMMENT#11)\n" +
"(18,Comment#12)\n" + "(18,COMMENT#12)\n" +
"(19,Comment#13)\n" + "(19,COMMENT#13)\n" +
"(20,Comment#14)\n" + "(20,COMMENT#14)\n" +
"(21,Comment#15)\n" + "(21,COMMENT#15)\n";
compareResultsByLinesInMemory(expected, resultPath);
}
示例7
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: WordCount <input path> <result path>");
return;
}
final String inputPath = args[0];
final String outputPath = args[1];
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Set up the Hadoop Input Format
HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, new JobConf());
TextInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new Path(inputPath));
// Create a Flink job with it
DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);
DataSet<Tuple2<Text, LongWritable>> words =
text.flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(new Tokenizer()))
.groupBy(0).reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(new Counter(), new Counter()));
// Set up Hadoop Output Format
HadoopOutputFormat<Text, LongWritable> hadoopOutputFormat =
new HadoopOutputFormat<Text, LongWritable>(new TextOutputFormat<Text, LongWritable>(), new JobConf());
hadoopOutputFormat.getJobConf().set("mapred.textoutputformat.separator", " ");
TextOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf(), new Path(outputPath));
// Output & Execute
words.output(hadoopOutputFormat).setParallelism(1);
env.execute("Hadoop Compat WordCount");
}
示例8
@Test
public void testNonPassingMapper() throws Exception{
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env);
DataSet<Tuple2<IntWritable, Text>> nonPassingFlatMapDs = ds.
flatMap(new HadoopMapFunction<IntWritable, Text, IntWritable, Text>(new NonPassingMapper()));
String resultPath = tempFolder.newFile().toURI().toString();
nonPassingFlatMapDs.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
env.execute();
compareResultsByLinesInMemory("\n", resultPath);
}
示例9
@Test
public void testDataDuplicatingMapper() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env);
DataSet<Tuple2<IntWritable, Text>> duplicatingFlatMapDs = ds.
flatMap(new HadoopMapFunction<IntWritable, Text, IntWritable, Text>(new DuplicatingMapper()));
String resultPath = tempFolder.newFile().toURI().toString();
duplicatingFlatMapDs.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
env.execute();
String expected = "(1,Hi)\n" + "(1,HI)\n" +
"(2,Hello)\n" + "(2,HELLO)\n" +
"(3,Hello world)\n" + "(3,HELLO WORLD)\n" +
"(4,Hello world, how are you?)\n" + "(4,HELLO WORLD, HOW ARE YOU?)\n" +
"(5,I am fine.)\n" + "(5,I AM FINE.)\n" +
"(6,Luke Skywalker)\n" + "(6,LUKE SKYWALKER)\n" +
"(7,Comment#1)\n" + "(7,COMMENT#1)\n" +
"(8,Comment#2)\n" + "(8,COMMENT#2)\n" +
"(9,Comment#3)\n" + "(9,COMMENT#3)\n" +
"(10,Comment#4)\n" + "(10,COMMENT#4)\n" +
"(11,Comment#5)\n" + "(11,COMMENT#5)\n" +
"(12,Comment#6)\n" + "(12,COMMENT#6)\n" +
"(13,Comment#7)\n" + "(13,COMMENT#7)\n" +
"(14,Comment#8)\n" + "(14,COMMENT#8)\n" +
"(15,Comment#9)\n" + "(15,COMMENT#9)\n" +
"(16,Comment#10)\n" + "(16,COMMENT#10)\n" +
"(17,Comment#11)\n" + "(17,COMMENT#11)\n" +
"(18,Comment#12)\n" + "(18,COMMENT#12)\n" +
"(19,Comment#13)\n" + "(19,COMMENT#13)\n" +
"(20,Comment#14)\n" + "(20,COMMENT#14)\n" +
"(21,Comment#15)\n" + "(21,COMMENT#15)\n";
compareResultsByLinesInMemory(expected, resultPath);
}
示例10
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: WordCount <input path> <result path>");
return;
}
final String inputPath = args[0];
final String outputPath = args[1];
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Set up the Hadoop Input Format
HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, new JobConf());
TextInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new Path(inputPath));
// Create a Flink job with it
DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);
DataSet<Tuple2<Text, LongWritable>> words =
text.flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(new Tokenizer()))
.groupBy(0).reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(new Counter(), new Counter()));
// Set up Hadoop Output Format
HadoopOutputFormat<Text, LongWritable> hadoopOutputFormat =
new HadoopOutputFormat<Text, LongWritable>(new TextOutputFormat<Text, LongWritable>(), new JobConf());
hadoopOutputFormat.getJobConf().set("mapred.textoutputformat.separator", " ");
TextOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf(), new Path(outputPath));
// Output & Execute
words.output(hadoopOutputFormat).setParallelism(1);
env.execute("Hadoop Compat WordCount");
}
示例11
@Test
public void testNonPassingMapper() throws Exception{
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env);
DataSet<Tuple2<IntWritable, Text>> nonPassingFlatMapDs = ds.
flatMap(new HadoopMapFunction<IntWritable, Text, IntWritable, Text>(new NonPassingMapper()));
String resultPath = tempFolder.newFile().toURI().toString();
nonPassingFlatMapDs.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
env.execute();
compareResultsByLinesInMemory("\n", resultPath);
}
示例12
@Test
public void testDataDuplicatingMapper() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env);
DataSet<Tuple2<IntWritable, Text>> duplicatingFlatMapDs = ds.
flatMap(new HadoopMapFunction<IntWritable, Text, IntWritable, Text>(new DuplicatingMapper()));
String resultPath = tempFolder.newFile().toURI().toString();
duplicatingFlatMapDs.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
env.execute();
String expected = "(1,Hi)\n" + "(1,HI)\n" +
"(2,Hello)\n" + "(2,HELLO)\n" +
"(3,Hello world)\n" + "(3,HELLO WORLD)\n" +
"(4,Hello world, how are you?)\n" + "(4,HELLO WORLD, HOW ARE YOU?)\n" +
"(5,I am fine.)\n" + "(5,I AM FINE.)\n" +
"(6,Luke Skywalker)\n" + "(6,LUKE SKYWALKER)\n" +
"(7,Comment#1)\n" + "(7,COMMENT#1)\n" +
"(8,Comment#2)\n" + "(8,COMMENT#2)\n" +
"(9,Comment#3)\n" + "(9,COMMENT#3)\n" +
"(10,Comment#4)\n" + "(10,COMMENT#4)\n" +
"(11,Comment#5)\n" + "(11,COMMENT#5)\n" +
"(12,Comment#6)\n" + "(12,COMMENT#6)\n" +
"(13,Comment#7)\n" + "(13,COMMENT#7)\n" +
"(14,Comment#8)\n" + "(14,COMMENT#8)\n" +
"(15,Comment#9)\n" + "(15,COMMENT#9)\n" +
"(16,Comment#10)\n" + "(16,COMMENT#10)\n" +
"(17,Comment#11)\n" + "(17,COMMENT#11)\n" +
"(18,Comment#12)\n" + "(18,COMMENT#12)\n" +
"(19,Comment#13)\n" + "(19,COMMENT#13)\n" +
"(20,Comment#14)\n" + "(20,COMMENT#14)\n" +
"(21,Comment#15)\n" + "(21,COMMENT#15)\n";
compareResultsByLinesInMemory(expected, resultPath);
}