Java源码示例:net.openhft.compiler.CompilerUtils

示例1
@Test
public void testVanilla14FixedClassesIncompatibleWithAvro17() throws Exception {
  AvroVersion runtimeVersion = AvroCompatibilityHelper.getRuntimeAvroVersion();
  if (!runtimeVersion.equals(AvroVersion.AVRO_1_7)) {
    throw new SkipException("class only supported under avro 1.7. runtime version detected as " + runtimeVersion);
  }

  String sourceCode = TestUtil.load("Vanilla14Fixed");
  Class clazz = CompilerUtils.CACHED_COMPILER.loadFromJava("com.acme.generatedby14.Vanilla14Fixed", sourceCode);
  try {
    clazz.newInstance();
    Assert.fail("expecting an exception");
  } catch (AvroRuntimeException expected) {
    Assert.assertTrue(expected.getMessage().contains("Not a Specific class")); //fails to find SCHEMA$
  }
}
 
示例2
@Test
public void testVanilla14FixedClassesIncompatibleWithModernAvro() throws Exception {
  AvroVersion runtimeVersion = AvroCompatibilityHelper.getRuntimeAvroVersion();
  if (!runtimeVersion.laterThan(AvroVersion.AVRO_1_7)) {
    throw new SkipException("class only supported under modern avro. runtime version detected as " + runtimeVersion);
  }

  String sourceCode = TestUtil.load("Vanilla14Fixed");
  StringWriter sr = new StringWriter();
  PrintWriter compilerOutput = new PrintWriter(sr);
  try {
    Class aClass = CompilerUtils.CACHED_COMPILER.loadFromJava(getClass().getClassLoader(),
        "com.acme.generatedby14.Vanilla14Fixed", sourceCode, compilerOutput);
    Assert.fail("compilation expected to fail");
  } catch (ClassNotFoundException ignored) {
    //expected
  }
  String errorMsg = sr.toString();
  Assert.assertTrue(errorMsg.contains("is not abstract and does not override")); //doesnt implement Externalizable
}
 
示例3
@Test
public void testAddSchemaSupportToEnum() throws Exception {
  Schema parsed = AvroCompatibilityHelper.parse(ENUM_CLASS_JSON);
  Collection<AvroGeneratedSourceCode> compiled = _factory.compile(Collections.singletonList(parsed), AvroVersion.AVRO_1_4);
  Assert.assertEquals(1, compiled.size());
  AvroGeneratedSourceCode sourceCode = compiled.iterator().next();

  Class aClass = CompilerUtils.CACHED_COMPILER.loadFromJava("com.dot.BobSmith", sourceCode.getContents());
  Assert.assertNotNull(aClass);
  Field schemaField = aClass.getField("SCHEMA$");
  Assert.assertNotNull(schemaField);
  Schema schema = (Schema) schemaField.get(null);
  Assert.assertNotNull(schema);
  Assert.assertEquals("BobSmith", schema.getName());
  Assert.assertEquals("com.dot", schema.getNamespace());
  Assert.assertTrue(schema.getDoc().contains("Bob Smith Store"));
}
 
示例4
@Test
public void testAddSchemaSupportToEnumNoNamespace() throws Exception {
  Schema parsed = AvroCompatibilityHelper.parse(ENUM_CLASS_NO_NAMESPACE_JSON);
  Collection<AvroGeneratedSourceCode> compiled = _factory.compile(Collections.singletonList(parsed), AvroVersion.AVRO_1_4);
  Assert.assertEquals(1, compiled.size());
  AvroGeneratedSourceCode sourceCode = compiled.iterator().next();

  Class aClass = CompilerUtils.CACHED_COMPILER.loadFromJava("BobSmith", sourceCode.getContents());
  Assert.assertNotNull(aClass);
  Field schemaField = aClass.getField("SCHEMA$");
  Assert.assertNotNull(schemaField);
  Schema schema = (Schema) schemaField.get(null);
  Assert.assertNotNull(schema);
  Assert.assertEquals("BobSmith", schema.getName());
  Assert.assertNull(schema.getNamespace());
  Assert.assertTrue(schema.getDoc().contains("Bob Smith Store"));
}
 
示例5
@Test
public void testAddSchemaSupportToFixedClass() throws Exception {
  Schema parsed = AvroCompatibilityHelper.parse(FIXED_TYPE_SCHEMA_JSON);
  Collection<AvroGeneratedSourceCode> compiled = _factory.compile(Collections.singletonList(parsed), AvroVersion.AVRO_1_4);
  Assert.assertEquals(1, compiled.size());
  AvroGeneratedSourceCode sourceCode = compiled.iterator().next();

  Class aClass = CompilerUtils.CACHED_COMPILER.loadFromJava("com.acme.Whatever", sourceCode.getContents());
  Assert.assertNotNull(aClass);
  Assert.assertTrue(SpecificFixed.class.isAssignableFrom(aClass));
  Field schemaField = aClass.getField("SCHEMA$");
  Assert.assertNotNull(schemaField);
  Schema schema = (Schema) schemaField.get(null);
  Assert.assertNotNull(schema);
  Assert.assertEquals("Whatever", schema.getName());
  Assert.assertEquals("com.acme", schema.getNamespace());
  Assert.assertEquals(42, schema.getFixedSize());
  Assert.assertTrue(schema.getDoc().contains("yadda yadda"));
}
 
示例6
@Test
public void testAddSchemaSupportToFixedClassNoNamespace() throws Exception {
  Schema parsed = AvroCompatibilityHelper.parse(FIXED_TYPE_NO_NAMESPACE_SCHEMA_JSON);
  Collection<AvroGeneratedSourceCode> compiled = _factory.compile(Collections.singletonList(parsed), AvroVersion.AVRO_1_4);
  Assert.assertEquals(1, compiled.size());
  AvroGeneratedSourceCode sourceCode = compiled.iterator().next();

  Class aClass = CompilerUtils.CACHED_COMPILER.loadFromJava("Whatever", sourceCode.getContents());
  Assert.assertNotNull(aClass);
  Assert.assertTrue(SpecificFixed.class.isAssignableFrom(aClass));
  Field schemaField = aClass.getField("SCHEMA$");
  Assert.assertNotNull(schemaField);
  Schema schema = (Schema) schemaField.get(null);
  Assert.assertNotNull(schema);
  Assert.assertEquals("Whatever", schema.getName());
  Assert.assertNull(schema.getNamespace());
  Assert.assertEquals(42, schema.getFixedSize());
  Assert.assertTrue(schema.getDoc().contains("w00t"));
}
 
示例7
@Test
public void testTransformAvro19Enum() throws Exception {
  String avsc = TestUtil.load("PerfectlyNormalEnum.avsc");
  Schema schema = AvroCompatibilityHelper.parse(avsc);
  String originalCode = runNativeCodegen(schema);

  String transformedCode = CodeTransformations.transformEnumClass(originalCode, AvroVersion.earliest(), AvroVersion.latest());

  Class<?> transformedClass = CompilerUtils.CACHED_COMPILER.loadFromJava(schema.getFullName(), transformedCode);
  Assert.assertNotNull(transformedClass);
  Assert.assertTrue(Enum.class.isAssignableFrom(transformedClass));
}
 
示例8
@Test
public void testTransformAvro19parseCalls() throws Exception {
  String avsc = TestUtil.load("PerfectlyNormalRecord.avsc");
  Schema schema = AvroCompatibilityHelper.parse(avsc);
  String originalCode = runNativeCodegen(schema);

  String transformedCode = CodeTransformations.transformParseCalls(originalCode, AvroVersion.AVRO_1_9, AvroVersion.earliest(), AvroVersion.latest());

  Class<?> transformedClass = CompilerUtils.CACHED_COMPILER.loadFromJava(schema.getFullName(), transformedCode);
  Assert.assertNotNull(transformedClass);
}
 
示例9
@Test
public void testTransformAvro19HugeRecord() throws Exception {
  String avsc = TestUtil.load("HugeRecord.avsc");
  Schema schema = AvroCompatibilityHelper.parse(avsc);
  String originalCode = runNativeCodegen(schema);

  String transformedCode = CodeTransformations.transformParseCalls(originalCode, AvroVersion.AVRO_1_9, AvroVersion.earliest(), AvroVersion.latest());

  Class<?> transformedClass = CompilerUtils.CACHED_COMPILER.loadFromJava(schema.getFullName(), transformedCode);
  Assert.assertNotNull(transformedClass);
}
 
示例10
@Test
public void testRemoveAvro19Builder() throws Exception {
  String avsc = TestUtil.load("PerfectlyNormalRecord.avsc");
  Schema schema = AvroCompatibilityHelper.parse(avsc);
  String originalCode = runNativeCodegen(schema);

  String transformedCode = CodeTransformations.removeBuilderSupport(originalCode, AvroVersion.earliest(), AvroVersion.latest());

  Class<?> transformedClass = CompilerUtils.CACHED_COMPILER.loadFromJava(schema.getFullName(), transformedCode);
  Assert.assertNotNull(transformedClass);
}
 
示例11
@Test
public void testTransformAvro19Externalizable() throws Exception {
  String avsc = TestUtil.load("PerfectlyNormalRecord.avsc");
  Schema schema = AvroCompatibilityHelper.parse(avsc);
  String originalCode = runNativeCodegen(schema);

  String transformedCode = CodeTransformations.transformExternalizableSupport(originalCode, AvroVersion.earliest(), AvroVersion.latest());

  Class<?> transformedClass = CompilerUtils.CACHED_COMPILER.loadFromJava(schema.getFullName(), transformedCode);
  Assert.assertNotNull(transformedClass);
}
 
示例12
@Test
public void testTransformAvro18Enum() throws Exception {
  String avsc = TestUtil.load("PerfectlyNormalEnum.avsc");
  Schema schema = AvroCompatibilityHelper.parse(avsc);
  String originalCode = runNativeCodegen(schema);

  String transformedCode = CodeTransformations.transformEnumClass(originalCode, AvroVersion.earliest(), AvroVersion.latest());

  Class<?> transformedClass = CompilerUtils.CACHED_COMPILER.loadFromJava(schema.getFullName(), transformedCode);
  Assert.assertNotNull(transformedClass);
  Assert.assertTrue(Enum.class.isAssignableFrom(transformedClass));
}
 
示例13
@Test
public void testTransformAvro18HugeRecord() throws Exception {
  String avsc = TestUtil.load("HugeRecord.avsc");
  Schema schema = AvroCompatibilityHelper.parse(avsc);
  String originalCode = runNativeCodegen(schema);

  String transformedCode = CodeTransformations.transformParseCalls(originalCode, AvroVersion.AVRO_1_8, AvroVersion.earliest(), AvroVersion.latest());

  Class<?> transformedClass = CompilerUtils.CACHED_COMPILER.loadFromJava(schema.getFullName(), transformedCode);
  Assert.assertNotNull(transformedClass);
}
 
示例14
@Test
public void testTransformAvro15Enum() throws Exception {
  String avsc = TestUtil.load("PerfectlyNormalEnum.avsc");
  Schema schema = AvroCompatibilityHelper.parse(avsc);

  String originalCode = runNativeCodegen(schema);

  String transformedCode = CodeTransformations.transformEnumClass(originalCode, AvroVersion.earliest(), AvroVersion.latest());

  Class<?> transformedClass = CompilerUtils.CACHED_COMPILER.loadFromJava(schema.getFullName(), transformedCode);
  Assert.assertNotNull(transformedClass);
  Assert.assertTrue(Enum.class.isAssignableFrom(transformedClass));
}
 
示例15
@Test
public void testTransformAvro15HugeRecord() throws Exception {
  String avsc = TestUtil.load("HugeRecord.avsc");
  Schema schema = AvroCompatibilityHelper.parse(avsc);
  String originalCode = runNativeCodegen(schema);

  String transformedCode = CodeTransformations.transformParseCalls(originalCode, AvroVersion.AVRO_1_5, AvroVersion.earliest(), AvroVersion.latest());

  Class<?> transformedClass = CompilerUtils.CACHED_COMPILER.loadFromJava(schema.getFullName(), transformedCode);
  Assert.assertNotNull(transformedClass);
}
 
示例16
@Test
public void testTransformAvro15RecordWithMultilineDoc() throws Exception {
  String avsc = TestUtil.load("RecordWithMultilineDoc.avsc");
  Schema schema = AvroCompatibilityHelper.parse(avsc);
  String originalCode = runNativeCodegen(schema);

  String transformedCode = CodeTransformations.transformParseCalls(originalCode, AvroVersion.AVRO_1_5, AvroVersion.earliest(), AvroVersion.latest());

  Class<?> transformedClass = CompilerUtils.CACHED_COMPILER.loadFromJava(schema.getFullName(), transformedCode);
  Assert.assertNotNull(transformedClass);
}
 
示例17
@Test
public void testTransformAvro16Enum() throws Exception {
  String avsc = TestUtil.load("PerfectlyNormalEnum.avsc");
  Schema schema = AvroCompatibilityHelper.parse(avsc);
  String originalCode = runNativeCodegen(schema);

  String transformedCode = CodeTransformations.transformEnumClass(originalCode, AvroVersion.earliest(), AvroVersion.latest());

  Class<?> transformedClass = CompilerUtils.CACHED_COMPILER.loadFromJava(schema.getFullName(), transformedCode);
  Assert.assertNotNull(transformedClass);
  Assert.assertTrue(Enum.class.isAssignableFrom(transformedClass));
}
 
示例18
@Test
public void testTransformAvro16HugeRecord() throws Exception {
  String avsc = TestUtil.load("HugeRecord.avsc");
  Schema schema = AvroCompatibilityHelper.parse(avsc);
  String originalCode = runNativeCodegen(schema);

  String transformedCode = CodeTransformations.transformParseCalls(originalCode, AvroVersion.AVRO_1_6, AvroVersion.earliest(), AvroVersion.latest());

  Class<?> transformedClass = CompilerUtils.CACHED_COMPILER.loadFromJava(schema.getFullName(), transformedCode);
  Assert.assertNotNull(transformedClass);
}
 
示例19
@Test
public void testTransformAvro14Enum() throws Exception {
  String avsc = TestUtil.load("PerfectlyNormalEnum.avsc");
  Schema schema = AvroCompatibilityHelper.parse(avsc);
  String originalCode = runNativeCodegen(schema);

  String transformedCode = CodeTransformations.transformEnumClass(originalCode, AvroVersion.earliest(), AvroVersion.latest());

  Class<?> transformedClass = CompilerUtils.CACHED_COMPILER.loadFromJava(schema.getFullName(), transformedCode);
  Assert.assertNotNull(transformedClass);
  Assert.assertTrue(Enum.class.isAssignableFrom(transformedClass));
}
 
示例20
@Test
public void testTransformAvro14HugeRecord() throws Exception {
  String avsc = TestUtil.load("HugeRecord.avsc");
  Schema schema = AvroCompatibilityHelper.parse(avsc);
  String originalCode = runNativeCodegen(schema);

  String transformedCode = CodeTransformations.transformParseCalls(originalCode, AvroVersion.AVRO_1_4, AvroVersion.earliest(), AvroVersion.latest());

  Class<?> transformedClass = CompilerUtils.CACHED_COMPILER.loadFromJava(schema.getFullName(), transformedCode);
  Assert.assertNotNull(transformedClass);
}
 
示例21
@Test
public void testTransformAvro14RecordWithMultilineDoc() throws Exception {
  String avsc = TestUtil.load("RecordWithMultilineDoc.avsc");
  Schema schema = AvroCompatibilityHelper.parse(avsc);
  String originalCode = runNativeCodegen(schema);

  String transformedCode = CodeTransformations.transformParseCalls(originalCode, AvroVersion.AVRO_1_4, AvroVersion.earliest(), AvroVersion.latest());

  Class<?> transformedClass = CompilerUtils.CACHED_COMPILER.loadFromJava(schema.getFullName(), transformedCode);
  Assert.assertNotNull(transformedClass);
}
 
示例22
@Test
public void testTransformAvro17Enum() throws Exception {
  String avsc = TestUtil.load("PerfectlyNormalEnum.avsc");
  Schema schema = AvroCompatibilityHelper.parse(avsc);
  String originalCode = runNativeCodegen(schema);

  String transformedCode = CodeTransformations.transformEnumClass(originalCode, AvroVersion.earliest(), AvroVersion.latest());

  Class<?> transformedClass = CompilerUtils.CACHED_COMPILER.loadFromJava(schema.getFullName(), transformedCode);
  Assert.assertNotNull(transformedClass);
  Assert.assertTrue(Enum.class.isAssignableFrom(transformedClass));
}
 
示例23
@Test
public void testTransformAvro17HugeRecord() throws Exception {
  String avsc = TestUtil.load("HugeRecord.avsc");
  Schema schema = AvroCompatibilityHelper.parse(avsc);
  String originalCode = runNativeCodegen(schema);

  String transformedCode = CodeTransformations.transformParseCalls(originalCode, AvroVersion.AVRO_1_7, AvroVersion.earliest(), AvroVersion.latest());

  Class<?> transformedClass = CompilerUtils.CACHED_COMPILER.loadFromJava(schema.getFullName(), transformedCode);
  Assert.assertNotNull(transformedClass);
}
 
示例24
/**
 * Compiles the generated source into a new {@link Class}
 * @return a new {@code Supplier<RoutingHandler>} class
 */
public Class<? extends Supplier<RoutingHandler>> compileClass()
{
    try {
        this.generateRoutes();

        log.debug("\n\nGenerated Class Source:\n\n" + this.sourceString);

        return CompilerUtils.CACHED_COMPILER.loadFromJava(packageName + "." + className, this.sourceString);

    } catch (Exception e) {
        log.error(e.getMessage(), e);
        return null;
    }
}
 
示例25
public static Table getFlinkTableObj(String className, String javaCode){
	try {
		Class aClass = CompilerUtils.CACHED_COMPILER.loadFromJava(className, javaCode);
		DynamicRunner runner = (DynamicRunner) aClass.newInstance();
		//return runner.getTableObj();

	} catch (Exception e) {
		e.printStackTrace();
	}
	return null;
}
 
示例26
public static Class compile(String className, String code) {
    Class aClass = null;

    try {
        aClass = CompilerUtils.CACHED_COMPILER.loadFromJava(className, code);
    } catch (Exception e) {
        e.printStackTrace();
    }

    return aClass;
}
 
示例27
@Test
public void testProjectWithClasspathDependencies() throws Exception {
    //first we code-gen, compile AND LOAD the 1st module
    generator.setInputs(new File(sourceRoot, "firstModule"));
    generator.setOutputFolder(null); //no need to write java source files to disk
    Collection<AvroGeneratedSourceCode> firstModuleSources = generator.generateCode();
    Assert.assertEquals(firstModuleSources.size(), 3);

    Class<?> enumClass = null;
    Class<?> fixedClass = null;
    Class<?> recordClass = null;

    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
    for (AvroGeneratedSourceCode code : firstModuleSources) {
        String fqcn = code.getFullyQualifiedClassName();
        switch (fqcn) {
            case "com.acme.ClasspathEnum":
                enumClass = CompilerUtils.CACHED_COMPILER.loadFromJava(classLoader, fqcn, code.getContents());
                break;
            case "com.acme.ClasspathFixed":
                fixedClass = CompilerUtils.CACHED_COMPILER.loadFromJava(classLoader, fqcn, code.getContents());
                break;
            case "com.acme.ClasspathRecord":
                recordClass = CompilerUtils.CACHED_COMPILER.loadFromJava(classLoader, fqcn, code.getContents());
                break;
            default:
                throw new IllegalStateException("unhandled " + fqcn);
        }
    }
    Assert.assertNotNull(enumClass);
    Assert.assertNotNull(fixedClass);
    Assert.assertNotNull(recordClass);

    //make sure the class loaded represents a generated avro record
    Assert.assertTrue(Enum.class.isAssignableFrom(enumClass));
    Assert.assertTrue(SpecificFixed.class.isAssignableFrom(fixedClass));
    Assert.assertTrue(SpecificRecord.class.isAssignableFrom(recordClass));
    //and that its now on our classpath
    Assert.assertEquals(Class.forName("com.acme.ClasspathEnum"), enumClass);
    Assert.assertEquals(Class.forName("com.acme.ClasspathFixed"), fixedClass);
    Assert.assertEquals(Class.forName("com.acme.ClasspathRecord"), recordClass);

    //now go code-gen and compile the 2nd module
    generator.setInputs(new File(sourceRoot, "secondModule"));
    generator.setAllowClasspathLookup(true);

    generator.setOutputFolder(outputRoot.toFile()); //these we want on disk
    Collection<AvroGeneratedSourceCode> secondModuleSources = generator.generateCode();
    Assert.assertEquals(secondModuleSources.size(), 6);
    //TODO - figure out how to make the below code pick up classes dynamically defined above
    //CompilerHelper.assertCompiles(outputRoot);
}
 
示例28
@Test
public void demonstrateAvro19CompatibleCode() throws Exception {
  String sourceCode = TestUtil.load("under19/SimpleRecord.java");
  Class<?> transformedClass = CompilerUtils.CACHED_COMPILER.loadFromJava("under19.SimpleRecord", sourceCode);
  Assert.assertNotNull(transformedClass);
}
 
示例29
public static void tcFlinkAvroTableAPI(String KafkaServerHostPort, String SchemaRegistryHostPort,
                                  String srcTopic, String targetTopic,
                                  String consumerGroupId, String sinkKeys, String transScript) {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

    Properties properties = new Properties();
    properties.setProperty(ConstantApp.PK_KAFKA_HOST_PORT.replace("_", "."), KafkaServerHostPort);
    properties.setProperty(ConstantApp.PK_KAFKA_CONSUMER_GROURP, consumerGroupId);
    properties.setProperty(ConstantApp.PK_KAFKA_SCHEMA_REGISTRY_HOST_PORT.replace("_", "."), SchemaRegistryHostPort);
    properties.setProperty(ConstantApp.PK_FLINK_TABLE_SINK_KEYS, sinkKeys);

    String[] srcTopicList = srcTopic.split(",");
    for (int i = 0; i < srcTopicList.length; i++) {
        properties.setProperty(ConstantApp.PK_SCHEMA_SUB_INPUT, srcTopicList[i]);
        properties.setProperty(ConstantApp.PK_SCHEMA_ID_INPUT, SchemaRegistryClient.getLatestSchemaIDFromProperty(properties, ConstantApp.PK_SCHEMA_SUB_INPUT) + "");
        properties.setProperty(ConstantApp.PK_SCHEMA_STR_INPUT, SchemaRegistryClient.getLatestSchemaFromProperty(properties, ConstantApp.PK_SCHEMA_SUB_INPUT).toString());
        tableEnv.registerTableSource(srcTopic, new Kafka010AvroTableSource(srcTopicList[i], properties));
    }

    try {
        Table result;
        Table ingest = tableEnv.scan(srcTopic);
        String className = "dynamic.FlinkScript";
        String header = "package dynamic;\n" +
                "import org.apache.flink.table.api.Table;\n" +
                "import com.datafibers.util.*;\n";
        String javaCode = header +
                "public class FlinkScript implements DynamicRunner {\n" +
                "@Override \n" +
                "    public Table transTableObj(Table tbl) {\n" +
                "try {" +
                "return tbl." + transScript + ";\n" +
                "} catch (Exception e) {" +
                "};" +
                "return null;}}";
        // Dynamic code generation
        Class aClass = CompilerUtils.CACHED_COMPILER.loadFromJava(className, javaCode);
        DynamicRunner runner = (DynamicRunner) aClass.newInstance();
        result = runner.transTableObj(ingest);

        SchemaRegistryClient.addSchemaFromTableResult(SchemaRegistryHostPort, targetTopic, result);
        // delivered properties for sink
        properties.setProperty(ConstantApp.PK_SCHEMA_SUB_OUTPUT, targetTopic);
        properties.setProperty(ConstantApp.PK_SCHEMA_ID_OUTPUT, SchemaRegistryClient.getLatestSchemaIDFromProperty(properties, ConstantApp.PK_SCHEMA_SUB_OUTPUT) + "");
        properties.setProperty(ConstantApp.PK_SCHEMA_STR_OUTPUT, SchemaRegistryClient.getLatestSchemaFromProperty(properties, ConstantApp.PK_SCHEMA_SUB_OUTPUT).toString());

        Kafka09AvroTableSink avro_sink =
                new Kafka09AvroTableSink(targetTopic, properties, new FlinkFixedPartitioner());
        result.writeToSink(avro_sink);
        env.execute("DF_FlinkTableAPI_Client_" + srcTopic + "-" + targetTopic);
    } catch (Exception e) {
        e.printStackTrace();
    }
}
 
示例30
public static void main(String args[]) {

		String transform = "flatMap(new FlinkUDF.LineSplitter()).groupBy(0).sum(1).print();\n";

		String transform2 = "select(\"name\");\n";

		String header = "package dynamic;\n" +
				"import org.apache.flink.api.table.Table;\n" +
				"import com.datafibers.util.*;\n";

		String javaCode = header +
				"public class FlinkScript implements DynamicRunner {\n" +
				"@Override \n" +
				"    public void runTransform(DataSet<String> ds) {\n" +
						"try {" +
						"ds."+ transform +
						"} catch (Exception e) {" +
						"};" +
				"}}";

		String javaCode2 = header +
				"public class FlinkScript implements DynamicRunner {\n" +
				"@Override \n" +
				"    public Table transTableObj(Table tbl) {\n" +
					"try {" +
					"return tbl."+ transform2 +
					"} catch (Exception e) {" +
					"};" +
					"return null;}}";

		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
		CsvTableSource csvTableSource = new CsvTableSource(
				"/Users/will/Downloads/file.csv",
				new String[] { "name", "id", "score", "comments" },
				new TypeInformation<?>[] {
						Types.STRING(),
						Types.STRING(),
						Types.STRING(),
						Types.STRING()
				}); // lenient

		tableEnv.registerTableSource("mycsv", csvTableSource);
		TableSink sink = new CsvTableSink("/Users/will/Downloads/out.csv", "|");
		Table ingest = tableEnv.scan("mycsv");

		try {
			String className = "dynamic.FlinkScript";
			Class aClass = CompilerUtils.CACHED_COMPILER.loadFromJava(className, javaCode2);
			DynamicRunner runner = (DynamicRunner) aClass.newInstance();
			//runner.runTransform(ds);
			Table result = runner.transTableObj(ingest);
			// write the result Table to the TableSink
			result.writeToSink(sink);
			env.execute();

		} catch (Exception e) {
			e.printStackTrace();
		}
	}