Java源码示例:net.openhft.compiler.CompilerUtils
示例1
@Test
public void testVanilla14FixedClassesIncompatibleWithAvro17() throws Exception {
AvroVersion runtimeVersion = AvroCompatibilityHelper.getRuntimeAvroVersion();
if (!runtimeVersion.equals(AvroVersion.AVRO_1_7)) {
throw new SkipException("class only supported under avro 1.7. runtime version detected as " + runtimeVersion);
}
String sourceCode = TestUtil.load("Vanilla14Fixed");
Class clazz = CompilerUtils.CACHED_COMPILER.loadFromJava("com.acme.generatedby14.Vanilla14Fixed", sourceCode);
try {
clazz.newInstance();
Assert.fail("expecting an exception");
} catch (AvroRuntimeException expected) {
Assert.assertTrue(expected.getMessage().contains("Not a Specific class")); //fails to find SCHEMA$
}
}
示例2
@Test
public void testVanilla14FixedClassesIncompatibleWithModernAvro() throws Exception {
AvroVersion runtimeVersion = AvroCompatibilityHelper.getRuntimeAvroVersion();
if (!runtimeVersion.laterThan(AvroVersion.AVRO_1_7)) {
throw new SkipException("class only supported under modern avro. runtime version detected as " + runtimeVersion);
}
String sourceCode = TestUtil.load("Vanilla14Fixed");
StringWriter sr = new StringWriter();
PrintWriter compilerOutput = new PrintWriter(sr);
try {
Class aClass = CompilerUtils.CACHED_COMPILER.loadFromJava(getClass().getClassLoader(),
"com.acme.generatedby14.Vanilla14Fixed", sourceCode, compilerOutput);
Assert.fail("compilation expected to fail");
} catch (ClassNotFoundException ignored) {
//expected
}
String errorMsg = sr.toString();
Assert.assertTrue(errorMsg.contains("is not abstract and does not override")); //doesnt implement Externalizable
}
示例3
@Test
public void testAddSchemaSupportToEnum() throws Exception {
Schema parsed = AvroCompatibilityHelper.parse(ENUM_CLASS_JSON);
Collection<AvroGeneratedSourceCode> compiled = _factory.compile(Collections.singletonList(parsed), AvroVersion.AVRO_1_4);
Assert.assertEquals(1, compiled.size());
AvroGeneratedSourceCode sourceCode = compiled.iterator().next();
Class aClass = CompilerUtils.CACHED_COMPILER.loadFromJava("com.dot.BobSmith", sourceCode.getContents());
Assert.assertNotNull(aClass);
Field schemaField = aClass.getField("SCHEMA$");
Assert.assertNotNull(schemaField);
Schema schema = (Schema) schemaField.get(null);
Assert.assertNotNull(schema);
Assert.assertEquals("BobSmith", schema.getName());
Assert.assertEquals("com.dot", schema.getNamespace());
Assert.assertTrue(schema.getDoc().contains("Bob Smith Store"));
}
示例4
@Test
public void testAddSchemaSupportToEnumNoNamespace() throws Exception {
Schema parsed = AvroCompatibilityHelper.parse(ENUM_CLASS_NO_NAMESPACE_JSON);
Collection<AvroGeneratedSourceCode> compiled = _factory.compile(Collections.singletonList(parsed), AvroVersion.AVRO_1_4);
Assert.assertEquals(1, compiled.size());
AvroGeneratedSourceCode sourceCode = compiled.iterator().next();
Class aClass = CompilerUtils.CACHED_COMPILER.loadFromJava("BobSmith", sourceCode.getContents());
Assert.assertNotNull(aClass);
Field schemaField = aClass.getField("SCHEMA$");
Assert.assertNotNull(schemaField);
Schema schema = (Schema) schemaField.get(null);
Assert.assertNotNull(schema);
Assert.assertEquals("BobSmith", schema.getName());
Assert.assertNull(schema.getNamespace());
Assert.assertTrue(schema.getDoc().contains("Bob Smith Store"));
}
示例5
@Test
public void testAddSchemaSupportToFixedClass() throws Exception {
Schema parsed = AvroCompatibilityHelper.parse(FIXED_TYPE_SCHEMA_JSON);
Collection<AvroGeneratedSourceCode> compiled = _factory.compile(Collections.singletonList(parsed), AvroVersion.AVRO_1_4);
Assert.assertEquals(1, compiled.size());
AvroGeneratedSourceCode sourceCode = compiled.iterator().next();
Class aClass = CompilerUtils.CACHED_COMPILER.loadFromJava("com.acme.Whatever", sourceCode.getContents());
Assert.assertNotNull(aClass);
Assert.assertTrue(SpecificFixed.class.isAssignableFrom(aClass));
Field schemaField = aClass.getField("SCHEMA$");
Assert.assertNotNull(schemaField);
Schema schema = (Schema) schemaField.get(null);
Assert.assertNotNull(schema);
Assert.assertEquals("Whatever", schema.getName());
Assert.assertEquals("com.acme", schema.getNamespace());
Assert.assertEquals(42, schema.getFixedSize());
Assert.assertTrue(schema.getDoc().contains("yadda yadda"));
}
示例6
@Test
public void testAddSchemaSupportToFixedClassNoNamespace() throws Exception {
Schema parsed = AvroCompatibilityHelper.parse(FIXED_TYPE_NO_NAMESPACE_SCHEMA_JSON);
Collection<AvroGeneratedSourceCode> compiled = _factory.compile(Collections.singletonList(parsed), AvroVersion.AVRO_1_4);
Assert.assertEquals(1, compiled.size());
AvroGeneratedSourceCode sourceCode = compiled.iterator().next();
Class aClass = CompilerUtils.CACHED_COMPILER.loadFromJava("Whatever", sourceCode.getContents());
Assert.assertNotNull(aClass);
Assert.assertTrue(SpecificFixed.class.isAssignableFrom(aClass));
Field schemaField = aClass.getField("SCHEMA$");
Assert.assertNotNull(schemaField);
Schema schema = (Schema) schemaField.get(null);
Assert.assertNotNull(schema);
Assert.assertEquals("Whatever", schema.getName());
Assert.assertNull(schema.getNamespace());
Assert.assertEquals(42, schema.getFixedSize());
Assert.assertTrue(schema.getDoc().contains("w00t"));
}
示例7
@Test
public void testTransformAvro19Enum() throws Exception {
String avsc = TestUtil.load("PerfectlyNormalEnum.avsc");
Schema schema = AvroCompatibilityHelper.parse(avsc);
String originalCode = runNativeCodegen(schema);
String transformedCode = CodeTransformations.transformEnumClass(originalCode, AvroVersion.earliest(), AvroVersion.latest());
Class<?> transformedClass = CompilerUtils.CACHED_COMPILER.loadFromJava(schema.getFullName(), transformedCode);
Assert.assertNotNull(transformedClass);
Assert.assertTrue(Enum.class.isAssignableFrom(transformedClass));
}
示例8
@Test
public void testTransformAvro19parseCalls() throws Exception {
String avsc = TestUtil.load("PerfectlyNormalRecord.avsc");
Schema schema = AvroCompatibilityHelper.parse(avsc);
String originalCode = runNativeCodegen(schema);
String transformedCode = CodeTransformations.transformParseCalls(originalCode, AvroVersion.AVRO_1_9, AvroVersion.earliest(), AvroVersion.latest());
Class<?> transformedClass = CompilerUtils.CACHED_COMPILER.loadFromJava(schema.getFullName(), transformedCode);
Assert.assertNotNull(transformedClass);
}
示例9
@Test
public void testTransformAvro19HugeRecord() throws Exception {
String avsc = TestUtil.load("HugeRecord.avsc");
Schema schema = AvroCompatibilityHelper.parse(avsc);
String originalCode = runNativeCodegen(schema);
String transformedCode = CodeTransformations.transformParseCalls(originalCode, AvroVersion.AVRO_1_9, AvroVersion.earliest(), AvroVersion.latest());
Class<?> transformedClass = CompilerUtils.CACHED_COMPILER.loadFromJava(schema.getFullName(), transformedCode);
Assert.assertNotNull(transformedClass);
}
示例10
@Test
public void testRemoveAvro19Builder() throws Exception {
String avsc = TestUtil.load("PerfectlyNormalRecord.avsc");
Schema schema = AvroCompatibilityHelper.parse(avsc);
String originalCode = runNativeCodegen(schema);
String transformedCode = CodeTransformations.removeBuilderSupport(originalCode, AvroVersion.earliest(), AvroVersion.latest());
Class<?> transformedClass = CompilerUtils.CACHED_COMPILER.loadFromJava(schema.getFullName(), transformedCode);
Assert.assertNotNull(transformedClass);
}
示例11
@Test
public void testTransformAvro19Externalizable() throws Exception {
String avsc = TestUtil.load("PerfectlyNormalRecord.avsc");
Schema schema = AvroCompatibilityHelper.parse(avsc);
String originalCode = runNativeCodegen(schema);
String transformedCode = CodeTransformations.transformExternalizableSupport(originalCode, AvroVersion.earliest(), AvroVersion.latest());
Class<?> transformedClass = CompilerUtils.CACHED_COMPILER.loadFromJava(schema.getFullName(), transformedCode);
Assert.assertNotNull(transformedClass);
}
示例12
@Test
public void testTransformAvro18Enum() throws Exception {
String avsc = TestUtil.load("PerfectlyNormalEnum.avsc");
Schema schema = AvroCompatibilityHelper.parse(avsc);
String originalCode = runNativeCodegen(schema);
String transformedCode = CodeTransformations.transformEnumClass(originalCode, AvroVersion.earliest(), AvroVersion.latest());
Class<?> transformedClass = CompilerUtils.CACHED_COMPILER.loadFromJava(schema.getFullName(), transformedCode);
Assert.assertNotNull(transformedClass);
Assert.assertTrue(Enum.class.isAssignableFrom(transformedClass));
}
示例13
@Test
public void testTransformAvro18HugeRecord() throws Exception {
String avsc = TestUtil.load("HugeRecord.avsc");
Schema schema = AvroCompatibilityHelper.parse(avsc);
String originalCode = runNativeCodegen(schema);
String transformedCode = CodeTransformations.transformParseCalls(originalCode, AvroVersion.AVRO_1_8, AvroVersion.earliest(), AvroVersion.latest());
Class<?> transformedClass = CompilerUtils.CACHED_COMPILER.loadFromJava(schema.getFullName(), transformedCode);
Assert.assertNotNull(transformedClass);
}
示例14
@Test
public void testTransformAvro15Enum() throws Exception {
String avsc = TestUtil.load("PerfectlyNormalEnum.avsc");
Schema schema = AvroCompatibilityHelper.parse(avsc);
String originalCode = runNativeCodegen(schema);
String transformedCode = CodeTransformations.transformEnumClass(originalCode, AvroVersion.earliest(), AvroVersion.latest());
Class<?> transformedClass = CompilerUtils.CACHED_COMPILER.loadFromJava(schema.getFullName(), transformedCode);
Assert.assertNotNull(transformedClass);
Assert.assertTrue(Enum.class.isAssignableFrom(transformedClass));
}
示例15
@Test
public void testTransformAvro15HugeRecord() throws Exception {
String avsc = TestUtil.load("HugeRecord.avsc");
Schema schema = AvroCompatibilityHelper.parse(avsc);
String originalCode = runNativeCodegen(schema);
String transformedCode = CodeTransformations.transformParseCalls(originalCode, AvroVersion.AVRO_1_5, AvroVersion.earliest(), AvroVersion.latest());
Class<?> transformedClass = CompilerUtils.CACHED_COMPILER.loadFromJava(schema.getFullName(), transformedCode);
Assert.assertNotNull(transformedClass);
}
示例16
@Test
public void testTransformAvro15RecordWithMultilineDoc() throws Exception {
String avsc = TestUtil.load("RecordWithMultilineDoc.avsc");
Schema schema = AvroCompatibilityHelper.parse(avsc);
String originalCode = runNativeCodegen(schema);
String transformedCode = CodeTransformations.transformParseCalls(originalCode, AvroVersion.AVRO_1_5, AvroVersion.earliest(), AvroVersion.latest());
Class<?> transformedClass = CompilerUtils.CACHED_COMPILER.loadFromJava(schema.getFullName(), transformedCode);
Assert.assertNotNull(transformedClass);
}
示例17
@Test
public void testTransformAvro16Enum() throws Exception {
String avsc = TestUtil.load("PerfectlyNormalEnum.avsc");
Schema schema = AvroCompatibilityHelper.parse(avsc);
String originalCode = runNativeCodegen(schema);
String transformedCode = CodeTransformations.transformEnumClass(originalCode, AvroVersion.earliest(), AvroVersion.latest());
Class<?> transformedClass = CompilerUtils.CACHED_COMPILER.loadFromJava(schema.getFullName(), transformedCode);
Assert.assertNotNull(transformedClass);
Assert.assertTrue(Enum.class.isAssignableFrom(transformedClass));
}
示例18
@Test
public void testTransformAvro16HugeRecord() throws Exception {
String avsc = TestUtil.load("HugeRecord.avsc");
Schema schema = AvroCompatibilityHelper.parse(avsc);
String originalCode = runNativeCodegen(schema);
String transformedCode = CodeTransformations.transformParseCalls(originalCode, AvroVersion.AVRO_1_6, AvroVersion.earliest(), AvroVersion.latest());
Class<?> transformedClass = CompilerUtils.CACHED_COMPILER.loadFromJava(schema.getFullName(), transformedCode);
Assert.assertNotNull(transformedClass);
}
示例19
@Test
public void testTransformAvro14Enum() throws Exception {
String avsc = TestUtil.load("PerfectlyNormalEnum.avsc");
Schema schema = AvroCompatibilityHelper.parse(avsc);
String originalCode = runNativeCodegen(schema);
String transformedCode = CodeTransformations.transformEnumClass(originalCode, AvroVersion.earliest(), AvroVersion.latest());
Class<?> transformedClass = CompilerUtils.CACHED_COMPILER.loadFromJava(schema.getFullName(), transformedCode);
Assert.assertNotNull(transformedClass);
Assert.assertTrue(Enum.class.isAssignableFrom(transformedClass));
}
示例20
@Test
public void testTransformAvro14HugeRecord() throws Exception {
String avsc = TestUtil.load("HugeRecord.avsc");
Schema schema = AvroCompatibilityHelper.parse(avsc);
String originalCode = runNativeCodegen(schema);
String transformedCode = CodeTransformations.transformParseCalls(originalCode, AvroVersion.AVRO_1_4, AvroVersion.earliest(), AvroVersion.latest());
Class<?> transformedClass = CompilerUtils.CACHED_COMPILER.loadFromJava(schema.getFullName(), transformedCode);
Assert.assertNotNull(transformedClass);
}
示例21
@Test
public void testTransformAvro14RecordWithMultilineDoc() throws Exception {
String avsc = TestUtil.load("RecordWithMultilineDoc.avsc");
Schema schema = AvroCompatibilityHelper.parse(avsc);
String originalCode = runNativeCodegen(schema);
String transformedCode = CodeTransformations.transformParseCalls(originalCode, AvroVersion.AVRO_1_4, AvroVersion.earliest(), AvroVersion.latest());
Class<?> transformedClass = CompilerUtils.CACHED_COMPILER.loadFromJava(schema.getFullName(), transformedCode);
Assert.assertNotNull(transformedClass);
}
示例22
@Test
public void testTransformAvro17Enum() throws Exception {
String avsc = TestUtil.load("PerfectlyNormalEnum.avsc");
Schema schema = AvroCompatibilityHelper.parse(avsc);
String originalCode = runNativeCodegen(schema);
String transformedCode = CodeTransformations.transformEnumClass(originalCode, AvroVersion.earliest(), AvroVersion.latest());
Class<?> transformedClass = CompilerUtils.CACHED_COMPILER.loadFromJava(schema.getFullName(), transformedCode);
Assert.assertNotNull(transformedClass);
Assert.assertTrue(Enum.class.isAssignableFrom(transformedClass));
}
示例23
@Test
public void testTransformAvro17HugeRecord() throws Exception {
String avsc = TestUtil.load("HugeRecord.avsc");
Schema schema = AvroCompatibilityHelper.parse(avsc);
String originalCode = runNativeCodegen(schema);
String transformedCode = CodeTransformations.transformParseCalls(originalCode, AvroVersion.AVRO_1_7, AvroVersion.earliest(), AvroVersion.latest());
Class<?> transformedClass = CompilerUtils.CACHED_COMPILER.loadFromJava(schema.getFullName(), transformedCode);
Assert.assertNotNull(transformedClass);
}
示例24
/**
* Compiles the generated source into a new {@link Class}
* @return a new {@code Supplier<RoutingHandler>} class
*/
public Class<? extends Supplier<RoutingHandler>> compileClass()
{
try {
this.generateRoutes();
log.debug("\n\nGenerated Class Source:\n\n" + this.sourceString);
return CompilerUtils.CACHED_COMPILER.loadFromJava(packageName + "." + className, this.sourceString);
} catch (Exception e) {
log.error(e.getMessage(), e);
return null;
}
}
示例25
public static Table getFlinkTableObj(String className, String javaCode){
try {
Class aClass = CompilerUtils.CACHED_COMPILER.loadFromJava(className, javaCode);
DynamicRunner runner = (DynamicRunner) aClass.newInstance();
//return runner.getTableObj();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
示例26
public static Class compile(String className, String code) {
Class aClass = null;
try {
aClass = CompilerUtils.CACHED_COMPILER.loadFromJava(className, code);
} catch (Exception e) {
e.printStackTrace();
}
return aClass;
}
示例27
@Test
public void testProjectWithClasspathDependencies() throws Exception {
//first we code-gen, compile AND LOAD the 1st module
generator.setInputs(new File(sourceRoot, "firstModule"));
generator.setOutputFolder(null); //no need to write java source files to disk
Collection<AvroGeneratedSourceCode> firstModuleSources = generator.generateCode();
Assert.assertEquals(firstModuleSources.size(), 3);
Class<?> enumClass = null;
Class<?> fixedClass = null;
Class<?> recordClass = null;
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
for (AvroGeneratedSourceCode code : firstModuleSources) {
String fqcn = code.getFullyQualifiedClassName();
switch (fqcn) {
case "com.acme.ClasspathEnum":
enumClass = CompilerUtils.CACHED_COMPILER.loadFromJava(classLoader, fqcn, code.getContents());
break;
case "com.acme.ClasspathFixed":
fixedClass = CompilerUtils.CACHED_COMPILER.loadFromJava(classLoader, fqcn, code.getContents());
break;
case "com.acme.ClasspathRecord":
recordClass = CompilerUtils.CACHED_COMPILER.loadFromJava(classLoader, fqcn, code.getContents());
break;
default:
throw new IllegalStateException("unhandled " + fqcn);
}
}
Assert.assertNotNull(enumClass);
Assert.assertNotNull(fixedClass);
Assert.assertNotNull(recordClass);
//make sure the class loaded represents a generated avro record
Assert.assertTrue(Enum.class.isAssignableFrom(enumClass));
Assert.assertTrue(SpecificFixed.class.isAssignableFrom(fixedClass));
Assert.assertTrue(SpecificRecord.class.isAssignableFrom(recordClass));
//and that its now on our classpath
Assert.assertEquals(Class.forName("com.acme.ClasspathEnum"), enumClass);
Assert.assertEquals(Class.forName("com.acme.ClasspathFixed"), fixedClass);
Assert.assertEquals(Class.forName("com.acme.ClasspathRecord"), recordClass);
//now go code-gen and compile the 2nd module
generator.setInputs(new File(sourceRoot, "secondModule"));
generator.setAllowClasspathLookup(true);
generator.setOutputFolder(outputRoot.toFile()); //these we want on disk
Collection<AvroGeneratedSourceCode> secondModuleSources = generator.generateCode();
Assert.assertEquals(secondModuleSources.size(), 6);
//TODO - figure out how to make the below code pick up classes dynamically defined above
//CompilerHelper.assertCompiles(outputRoot);
}
示例28
@Test
public void demonstrateAvro19CompatibleCode() throws Exception {
String sourceCode = TestUtil.load("under19/SimpleRecord.java");
Class<?> transformedClass = CompilerUtils.CACHED_COMPILER.loadFromJava("under19.SimpleRecord", sourceCode);
Assert.assertNotNull(transformedClass);
}
示例29
public static void tcFlinkAvroTableAPI(String KafkaServerHostPort, String SchemaRegistryHostPort,
String srcTopic, String targetTopic,
String consumerGroupId, String sinkKeys, String transScript) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
Properties properties = new Properties();
properties.setProperty(ConstantApp.PK_KAFKA_HOST_PORT.replace("_", "."), KafkaServerHostPort);
properties.setProperty(ConstantApp.PK_KAFKA_CONSUMER_GROURP, consumerGroupId);
properties.setProperty(ConstantApp.PK_KAFKA_SCHEMA_REGISTRY_HOST_PORT.replace("_", "."), SchemaRegistryHostPort);
properties.setProperty(ConstantApp.PK_FLINK_TABLE_SINK_KEYS, sinkKeys);
String[] srcTopicList = srcTopic.split(",");
for (int i = 0; i < srcTopicList.length; i++) {
properties.setProperty(ConstantApp.PK_SCHEMA_SUB_INPUT, srcTopicList[i]);
properties.setProperty(ConstantApp.PK_SCHEMA_ID_INPUT, SchemaRegistryClient.getLatestSchemaIDFromProperty(properties, ConstantApp.PK_SCHEMA_SUB_INPUT) + "");
properties.setProperty(ConstantApp.PK_SCHEMA_STR_INPUT, SchemaRegistryClient.getLatestSchemaFromProperty(properties, ConstantApp.PK_SCHEMA_SUB_INPUT).toString());
tableEnv.registerTableSource(srcTopic, new Kafka010AvroTableSource(srcTopicList[i], properties));
}
try {
Table result;
Table ingest = tableEnv.scan(srcTopic);
String className = "dynamic.FlinkScript";
String header = "package dynamic;\n" +
"import org.apache.flink.table.api.Table;\n" +
"import com.datafibers.util.*;\n";
String javaCode = header +
"public class FlinkScript implements DynamicRunner {\n" +
"@Override \n" +
" public Table transTableObj(Table tbl) {\n" +
"try {" +
"return tbl." + transScript + ";\n" +
"} catch (Exception e) {" +
"};" +
"return null;}}";
// Dynamic code generation
Class aClass = CompilerUtils.CACHED_COMPILER.loadFromJava(className, javaCode);
DynamicRunner runner = (DynamicRunner) aClass.newInstance();
result = runner.transTableObj(ingest);
SchemaRegistryClient.addSchemaFromTableResult(SchemaRegistryHostPort, targetTopic, result);
// delivered properties for sink
properties.setProperty(ConstantApp.PK_SCHEMA_SUB_OUTPUT, targetTopic);
properties.setProperty(ConstantApp.PK_SCHEMA_ID_OUTPUT, SchemaRegistryClient.getLatestSchemaIDFromProperty(properties, ConstantApp.PK_SCHEMA_SUB_OUTPUT) + "");
properties.setProperty(ConstantApp.PK_SCHEMA_STR_OUTPUT, SchemaRegistryClient.getLatestSchemaFromProperty(properties, ConstantApp.PK_SCHEMA_SUB_OUTPUT).toString());
Kafka09AvroTableSink avro_sink =
new Kafka09AvroTableSink(targetTopic, properties, new FlinkFixedPartitioner());
result.writeToSink(avro_sink);
env.execute("DF_FlinkTableAPI_Client_" + srcTopic + "-" + targetTopic);
} catch (Exception e) {
e.printStackTrace();
}
}
示例30
public static void main(String args[]) {
String transform = "flatMap(new FlinkUDF.LineSplitter()).groupBy(0).sum(1).print();\n";
String transform2 = "select(\"name\");\n";
String header = "package dynamic;\n" +
"import org.apache.flink.api.table.Table;\n" +
"import com.datafibers.util.*;\n";
String javaCode = header +
"public class FlinkScript implements DynamicRunner {\n" +
"@Override \n" +
" public void runTransform(DataSet<String> ds) {\n" +
"try {" +
"ds."+ transform +
"} catch (Exception e) {" +
"};" +
"}}";
String javaCode2 = header +
"public class FlinkScript implements DynamicRunner {\n" +
"@Override \n" +
" public Table transTableObj(Table tbl) {\n" +
"try {" +
"return tbl."+ transform2 +
"} catch (Exception e) {" +
"};" +
"return null;}}";
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
CsvTableSource csvTableSource = new CsvTableSource(
"/Users/will/Downloads/file.csv",
new String[] { "name", "id", "score", "comments" },
new TypeInformation<?>[] {
Types.STRING(),
Types.STRING(),
Types.STRING(),
Types.STRING()
}); // lenient
tableEnv.registerTableSource("mycsv", csvTableSource);
TableSink sink = new CsvTableSink("/Users/will/Downloads/out.csv", "|");
Table ingest = tableEnv.scan("mycsv");
try {
String className = "dynamic.FlinkScript";
Class aClass = CompilerUtils.CACHED_COMPILER.loadFromJava(className, javaCode2);
DynamicRunner runner = (DynamicRunner) aClass.newInstance();
//runner.runTransform(ds);
Table result = runner.transTableObj(ingest);
// write the result Table to the TableSink
result.writeToSink(sink);
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}