Java源码示例:org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor

示例1
public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 指定系统时间概念为 event time
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        List<Tuple2<String, Long>> collectionInput = new ArrayList<>();
        Tuple2<String, Long> a = new Tuple2<>("first event", 1L);
        Tuple2<String, Long> b = new Tuple2<>("second event", 2L);
        collectionInput.add(a);
        collectionInput.add(b);

        // 使用 Ascending 分配 时间信息和 watermark
        DataStream<Tuple2<String, Long>> text = env.fromCollection(collectionInput);
        text.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple2<String, Long>>() {
            @Override
            public long extractAscendingTimestamp(Tuple2<String, Long> element) {
                return element.f1;
            }
        });

        env.execute();
    }
 
示例2
@Test
public void testUnboundedPojoStreamAndReturnPojo() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<Event> input = env.addSource(new RandomEventSource(5));
    input.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Event>() {
        @Override
        public long extractAscendingTimestamp(Event element) {
            return element.getTimestamp();
        }
    });

    DataStream<Event> output = SiddhiCEP
        .define("inputStream", input, "id", "name", "price", "timestamp")
        .cql("from inputStream select timestamp, id, name, price insert into  outputStream")
        .returns("outputStream", Event.class);

    String resultPath = tempFolder.newFile().toURI().toString();
    output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
    env.execute();
    assertEquals(5, getLineCount(resultPath));
}
 
示例3
@Test
public void testUnboundedPojoStreamAndReturnPojo() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<Event> input = env.addSource(new RandomEventSource(5));
    input.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Event>() {
        @Override
        public long extractAscendingTimestamp(Event element) {
            return element.getTimestamp();
        }
    });

    DataStream<Event> output = SiddhiCEP
        .define("inputStream", input, "id", "name", "price", "timestamp")
        .cql("from inputStream select timestamp, id, name, price insert into  outputStream")
        .returns("outputStream", Event.class);

    String resultPath = tempFolder.newFile().toURI().toString();
    output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
    env.execute();
    assertEquals(5, getLineCount(resultPath));
}
 
示例4
@Test
public void testWithFailingHandler() {
	AscendingTimestampExtractor<Long> extractor = new LongExtractor()
			.withViolationHandler(new AscendingTimestampExtractor.FailingHandler());

	runValidTests(extractor);
	try {
		runInvalidTest(extractor);
		fail("should fail with an exception");
	} catch (Exception ignored) {}
}
 
示例5
@Test
public void testWithIgnoringHandler() {
	AscendingTimestampExtractor<Long> extractor = new LongExtractor()
			.withViolationHandler(new AscendingTimestampExtractor.IgnoringHandler());

	runValidTests(extractor);
	runInvalidTest(extractor);
}
 
示例6
@Test
public void testWithLoggingHandler() {
	AscendingTimestampExtractor<Long> extractor = new LongExtractor()
			.withViolationHandler(new AscendingTimestampExtractor.LoggingHandler());

	runValidTests(extractor);
	runInvalidTest(extractor);
}
 
示例7
@Test
public void testWithDefaultHandler() {
	AscendingTimestampExtractor<Long> extractor = new LongExtractor();

	runValidTests(extractor);
	runInvalidTest(extractor);
}
 
示例8
@Test
public void testInitialAndFinalWatermark() {
	AscendingTimestampExtractor<Long> extractor = new LongExtractor();
	assertEquals(Long.MIN_VALUE, extractor.getCurrentWatermark().getTimestamp());

	extractor.extractTimestamp(Long.MIN_VALUE, -1L);

	extractor.extractTimestamp(Long.MAX_VALUE, -1L);
	assertEquals(Long.MAX_VALUE - 1, extractor.getCurrentWatermark().getTimestamp());
}
 
示例9
private void runValidTests(AscendingTimestampExtractor<Long> extractor) {
	assertEquals(13L, extractor.extractTimestamp(13L, -1L));
	assertEquals(13L, extractor.extractTimestamp(13L, 0L));
	assertEquals(14L, extractor.extractTimestamp(14L, 0L));
	assertEquals(20L, extractor.extractTimestamp(20L, 0L));
	assertEquals(20L, extractor.extractTimestamp(20L, 0L));
	assertEquals(20L, extractor.extractTimestamp(20L, 0L));
	assertEquals(500L, extractor.extractTimestamp(500L, 0L));

	assertEquals(Long.MAX_VALUE - 1, extractor.extractTimestamp(Long.MAX_VALUE - 1, 99999L));
}
 
示例10
private void runInvalidTest(AscendingTimestampExtractor<Long> extractor) {
	assertEquals(1000L, extractor.extractTimestamp(1000L, 100));
	assertEquals(1000L, extractor.extractTimestamp(1000L, 100));

	// violation
	assertEquals(999L, extractor.extractTimestamp(999L, 100));
}
 
示例11
public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        // 构建输入数据
        List<Tuple3<String, Long, Long>> data = new ArrayList<>();
        Tuple3<String, Long, Long> a1 = new Tuple3<>("first event", 1L, 1111L);
        Tuple3<String, Long, Long> a2 = new Tuple3<>("second event", 1L, 1112L);
        Tuple3<String, Long, Long> a3 = new Tuple3<>("third event", 1L, 20121L);
        Tuple3<String, Long, Long> b1 = new Tuple3<>("first event", 2L, 1111L);
        Tuple3<String, Long, Long> b2 = new Tuple3<>("second event", 2L, 1112L);
        Tuple3<String, Long, Long> b3 = new Tuple3<>("third event", 2L, 30111L);
        data.add(a1);
        data.add(a2);
        data.add(a3);
        data.add(b1);
        data.add(b2);
        data.add(b3);
        DataStreamSource<Tuple3<String, Long, Long>> input = env.fromCollection(data);


        input.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<String, Long, Long>>() {
            @Override
            public long extractAscendingTimestamp(Tuple3<String, Long, Long> element) {
                return element.f2;
            }
        }).keyBy(x -> x.f1).timeWindow(Time.seconds(1), Time.seconds(1)).process(new MyProcessWindowFunction()).print();

        // 输出结果:
        // 3> window: TimeWindow{start=1000, end=2000}word count: 4
        // 4> window: TimeWindow{start=1000, end=2000}word count: 4
        // 3> window: TimeWindow{start=20000, end=21000}word count: 2
        // 4> window: TimeWindow{start=30000, end=31000}word count: 2


        env.execute();
    }
 
示例12
@Test
public void testUpsert() throws Exception {
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.getConfig().enableObjectReuse();
	env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
	StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

	Table t = tEnv.fromDataStream(get3TupleDataStream(env).assignTimestampsAndWatermarks(
			new AscendingTimestampExtractor<Tuple3<Integer, Long, String>>() {
				@Override
				public long extractAscendingTimestamp(Tuple3<Integer, Long, String> element) {
					return element.f0;
				}}), "id, num, text");

	tEnv.registerTable("T", t);

	String[] fields = {"cnt", "lencnt", "cTag"};
	tEnv.registerTableSink("upsertSink", JDBCUpsertTableSink.builder()
			.setOptions(JDBCOptions.builder()
					.setDBUrl(DB_URL)
					.setTableName(OUTPUT_TABLE1)
					.build())
			.setTableSchema(TableSchema.builder().fields(
					fields, new DataType[] {BIGINT(), BIGINT(), INT()}).build())
			.build());

	tEnv.sqlUpdate("INSERT INTO upsertSink SELECT cnt, COUNT(len) AS lencnt, cTag FROM" +
			" (SELECT len, COUNT(id) as cnt, cTag FROM" +
			" (SELECT id, CHAR_LENGTH(text) AS len, (CASE WHEN id > 0 THEN 1 ELSE 0 END) cTag FROM T)" +
			" GROUP BY len, cTag)" +
			" GROUP BY cnt, cTag");
	env.execute();
	check(new Row[] {
			Row.of(1, 5, 1),
			Row.of(7, 1, 1),
			Row.of(9, 1, 1)
	}, DB_URL, OUTPUT_TABLE1, fields);
}
 
示例13
@Test
public void testWithFailingHandler() {
	AscendingTimestampExtractor<Long> extractor = new LongExtractor()
			.withViolationHandler(new AscendingTimestampExtractor.FailingHandler());

	runValidTests(extractor);
	try {
		runInvalidTest(extractor);
		fail("should fail with an exception");
	} catch (Exception ignored) {}
}
 
示例14
@Test
public void testWithIgnoringHandler() {
	AscendingTimestampExtractor<Long> extractor = new LongExtractor()
			.withViolationHandler(new AscendingTimestampExtractor.IgnoringHandler());

	runValidTests(extractor);
	runInvalidTest(extractor);
}
 
示例15
@Test
public void testWithLoggingHandler() {
	AscendingTimestampExtractor<Long> extractor = new LongExtractor()
			.withViolationHandler(new AscendingTimestampExtractor.LoggingHandler());

	runValidTests(extractor);
	runInvalidTest(extractor);
}
 
示例16
@Test
public void testWithDefaultHandler() {
	AscendingTimestampExtractor<Long> extractor = new LongExtractor();

	runValidTests(extractor);
	runInvalidTest(extractor);
}
 
示例17
@Test
public void testInitialAndFinalWatermark() {
	AscendingTimestampExtractor<Long> extractor = new LongExtractor();
	assertEquals(Long.MIN_VALUE, extractor.getCurrentWatermark().getTimestamp());

	extractor.extractTimestamp(Long.MIN_VALUE, -1L);

	extractor.extractTimestamp(Long.MAX_VALUE, -1L);
	assertEquals(Long.MAX_VALUE - 1, extractor.getCurrentWatermark().getTimestamp());
}
 
示例18
private void runValidTests(AscendingTimestampExtractor<Long> extractor) {
	assertEquals(13L, extractor.extractTimestamp(13L, -1L));
	assertEquals(13L, extractor.extractTimestamp(13L, 0L));
	assertEquals(14L, extractor.extractTimestamp(14L, 0L));
	assertEquals(20L, extractor.extractTimestamp(20L, 0L));
	assertEquals(20L, extractor.extractTimestamp(20L, 0L));
	assertEquals(20L, extractor.extractTimestamp(20L, 0L));
	assertEquals(500L, extractor.extractTimestamp(500L, 0L));

	assertEquals(Long.MAX_VALUE - 1, extractor.extractTimestamp(Long.MAX_VALUE - 1, 99999L));
}
 
示例19
private void runInvalidTest(AscendingTimestampExtractor<Long> extractor) {
	assertEquals(1000L, extractor.extractTimestamp(1000L, 100));
	assertEquals(1000L, extractor.extractTimestamp(1000L, 100));

	// violation
	assertEquals(999L, extractor.extractTimestamp(999L, 100));
}
 
示例20
/**
 * Returns a DataStream of TaxiRide events from a CSV file.
 *
 * @param env The execution environment.
 * @param csvFile The path of the CSV file to read.
 * @return A DataStream of TaxiRide events.
 */
public static DataStream<TaxiRide> getRides(StreamExecutionEnvironment env, String csvFile) {

    // create input format to read the CSV file
    RowCsvInputFormat inputFormat = new RowCsvInputFormat(
            null, // input path is configured later
            inputFieldTypes,
            "\n",
            ",");

    // read file sequentially (with a parallelism of 1)
    DataStream<Row> parsedRows = env
            .readFile(inputFormat, csvFile)
            .returns(Types.ROW(inputFieldTypes))
            .setParallelism(1);

    // convert parsed CSV rows into TaxiRides, extract timestamps, and assign watermarks
    return parsedRows
            // map to TaxiRide POJOs
            .map(new RideMapper())
            // define drop-off time as event-time timestamps and generate ascending watermarks.
            .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<TaxiRide>() {
                @Override
                public long extractAscendingTimestamp(TaxiRide ride) {
                    return ride.dropOffTime;
                }
            });
}
 
示例21
@Test
public void testWithFailingHandler() {
	AscendingTimestampExtractor<Long> extractor = new LongExtractor()
			.withViolationHandler(new AscendingTimestampExtractor.FailingHandler());

	runValidTests(extractor);
	try {
		runInvalidTest(extractor);
		fail("should fail with an exception");
	} catch (Exception ignored) {}
}
 
示例22
@Test
public void testWithIgnoringHandler() {
	AscendingTimestampExtractor<Long> extractor = new LongExtractor()
			.withViolationHandler(new AscendingTimestampExtractor.IgnoringHandler());

	runValidTests(extractor);
	runInvalidTest(extractor);
}
 
示例23
@Test
public void testWithLoggingHandler() {
	AscendingTimestampExtractor<Long> extractor = new LongExtractor()
			.withViolationHandler(new AscendingTimestampExtractor.LoggingHandler());

	runValidTests(extractor);
	runInvalidTest(extractor);
}
 
示例24
@Test
public void testWithDefaultHandler() {
	AscendingTimestampExtractor<Long> extractor = new LongExtractor();

	runValidTests(extractor);
	runInvalidTest(extractor);
}
 
示例25
@Test
public void testInitialAndFinalWatermark() {
	AscendingTimestampExtractor<Long> extractor = new LongExtractor();
	assertEquals(Long.MIN_VALUE, extractor.getCurrentWatermark().getTimestamp());

	extractor.extractTimestamp(Long.MIN_VALUE, -1L);

	extractor.extractTimestamp(Long.MAX_VALUE, -1L);
	assertEquals(Long.MAX_VALUE - 1, extractor.getCurrentWatermark().getTimestamp());
}
 
示例26
private void runValidTests(AscendingTimestampExtractor<Long> extractor) {
	assertEquals(13L, extractor.extractTimestamp(13L, -1L));
	assertEquals(13L, extractor.extractTimestamp(13L, 0L));
	assertEquals(14L, extractor.extractTimestamp(14L, 0L));
	assertEquals(20L, extractor.extractTimestamp(20L, 0L));
	assertEquals(20L, extractor.extractTimestamp(20L, 0L));
	assertEquals(20L, extractor.extractTimestamp(20L, 0L));
	assertEquals(500L, extractor.extractTimestamp(500L, 0L));

	assertEquals(Long.MAX_VALUE - 1, extractor.extractTimestamp(Long.MAX_VALUE - 1, 99999L));
}
 
示例27
private void runInvalidTest(AscendingTimestampExtractor<Long> extractor) {
	assertEquals(1000L, extractor.extractTimestamp(1000L, 100));
	assertEquals(1000L, extractor.extractTimestamp(1000L, 100));

	// violation
	assertEquals(999L, extractor.extractTimestamp(999L, 100));
}
 
示例28
/**
 * This tests whether timestamps are properly extracted in the timestamp
 * extractor and whether watermarks are also correctly forwarded from this with the auto watermark
 * interval.
 */
@Test
public void testTimestampExtractorWithAutoInterval() throws Exception {
	final int numElements = 10;

	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

	env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
	env.getConfig().setAutoWatermarkInterval(10);
	env.setParallelism(1);
	env.getConfig().disableSysoutLogging();

	DataStream<Integer> source1 = env.addSource(new SourceFunction<Integer>() {
		@Override
		public void run(SourceContext<Integer> ctx) throws Exception {
			int index = 1;
			while (index <= numElements) {
				ctx.collect(index);
				latch.await();
				index++;
			}
		}

		@Override
		public void cancel() {}
	});

	DataStream<Integer> extractOp = source1.assignTimestampsAndWatermarks(
			new AscendingTimestampExtractor<Integer>() {
				@Override
				public long extractAscendingTimestamp(Integer element) {
					return element;
				}
			});

	extractOp
			.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true))
			.transform("Timestamp Check",
					BasicTypeInfo.INT_TYPE_INFO,
					new TimestampCheckingOperator());

	// verify that extractor picks up source parallelism
	Assert.assertEquals(extractOp.getTransformation().getParallelism(), source1.getTransformation().getParallelism());

	env.execute();

	// verify that we get NUM_ELEMENTS watermarks
	for (int j = 0; j < numElements; j++) {
		if (!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j))) {
			long wm = CustomOperator.finalWatermarks[0].get(j).getTimestamp();
			Assert.fail("Wrong watermark. Expected: " + j + " Found: " + wm + " All: " + CustomOperator.finalWatermarks[0]);
		}
	}

	// the input is finite, so it should have a MAX Watermark
	assertEquals(Watermark.MAX_WATERMARK,
			CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size() - 1));
}
 
示例29
/**
 * This tests whether timestamps are properly extracted in the timestamp
 * extractor and whether watermarks are also correctly forwarded from this with the auto watermark
 * interval.
 */
@Test
public void testTimestampExtractorWithAutoInterval() throws Exception {
	final int numElements = 10;

	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

	env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
	env.getConfig().setAutoWatermarkInterval(10);
	env.setParallelism(1);
	env.getConfig().disableSysoutLogging();

	DataStream<Integer> source1 = env.addSource(new SourceFunction<Integer>() {
		@Override
		public void run(SourceContext<Integer> ctx) throws Exception {
			int index = 1;
			while (index <= numElements) {
				ctx.collect(index);
				latch.await();
				index++;
			}
		}

		@Override
		public void cancel() {}
	});

	DataStream<Integer> extractOp = source1.assignTimestampsAndWatermarks(
			new AscendingTimestampExtractor<Integer>() {
				@Override
				public long extractAscendingTimestamp(Integer element) {
					return element;
				}
			});

	extractOp
			.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true))
			.transform("Timestamp Check",
					BasicTypeInfo.INT_TYPE_INFO,
					new TimestampCheckingOperator());

	// verify that extractor picks up source parallelism
	Assert.assertEquals(extractOp.getTransformation().getParallelism(), source1.getTransformation().getParallelism());

	env.execute();

	// verify that we get NUM_ELEMENTS watermarks
	for (int j = 0; j < numElements; j++) {
		if (!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j))) {
			long wm = CustomOperator.finalWatermarks[0].get(j).getTimestamp();
			Assert.fail("Wrong watermark. Expected: " + j + " Found: " + wm + " All: " + CustomOperator.finalWatermarks[0]);
		}
	}

	// the input is finite, so it should have a MAX Watermark
	assertEquals(Watermark.MAX_WATERMARK,
			CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size() - 1));
}
 
示例30
@Test
public void testStreamTableSinkUsingDescriptorWithWatermark() throws Exception {
    // create a Pravega stream for test purposes
    Stream stream = Stream.of(setupUtils.getScope(), "testStreamTableSinkUsingDescriptorWithWatermark");
    this.setupUtils.createTestStream(stream.getStreamName(), 1);

    // create a Flink Table environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(1);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
            EnvironmentSettings.newInstance()
                    // watermark is only supported in blink planner
                    .useBlinkPlanner()
                    .inStreamingMode()
                    .build());
    DataStream<SampleRecordWithTimestamp> dataStream = env.fromCollection(SAMPLES)
            .map(SampleRecordWithTimestamp::new)
            .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<SampleRecordWithTimestamp>() {
                @Override
                public long extractAscendingTimestamp(SampleRecordWithTimestamp sampleRecordWithTimestamp) {
                    return sampleRecordWithTimestamp.getTimestamp();
                }
            });

    Table table = tableEnv.fromDataStream(dataStream, "category, value, UserActionTime.rowtime");

    Pravega pravega = new Pravega();
    pravega.tableSinkWriterBuilder()
            .withRoutingKeyField("category")
            .enableWatermark(true)
            .forStream(stream)
            .withPravegaConfig(setupUtils.getPravegaConfig());

    TableSchema tableSchema = TableSchema.builder()
            .field("category", DataTypes.STRING())
            .field("value", DataTypes.INT())
            .field("timestamp", DataTypes.TIMESTAMP(3))
            .build();

    Schema schema = new Schema().schema(tableSchema);

    ConnectTableDescriptor desc = tableEnv.connect(pravega)
            .withFormat(new Json().failOnMissingField(true))
            .withSchema(schema)
            .inAppendMode();
    desc.createTemporaryTable("test");

    final Map<String, String> propertiesMap = desc.toProperties();
    final TableSink<?> sink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
            .createStreamTableSink(propertiesMap);

    String tablePath = tableEnv.getCurrentDatabase() + "." + "PravegaSink";

    ConnectorCatalogTable<?, ?> connectorCatalogTable = ConnectorCatalogTable.sink(sink, false);

    tableEnv.getCatalog(tableEnv.getCurrentCatalog()).get().createTable(
            ObjectPath.fromString(tablePath),
            connectorCatalogTable, false);

    table.insertInto(tablePath);
    env.execute();
}