Java源码示例:org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor
示例1
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 指定系统时间概念为 event time
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
List<Tuple2<String, Long>> collectionInput = new ArrayList<>();
Tuple2<String, Long> a = new Tuple2<>("first event", 1L);
Tuple2<String, Long> b = new Tuple2<>("second event", 2L);
collectionInput.add(a);
collectionInput.add(b);
// 使用 Ascending 分配 时间信息和 watermark
DataStream<Tuple2<String, Long>> text = env.fromCollection(collectionInput);
text.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple2<String, Long>>() {
@Override
public long extractAscendingTimestamp(Tuple2<String, Long> element) {
return element.f1;
}
});
env.execute();
}
示例2
@Test
public void testUnboundedPojoStreamAndReturnPojo() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Event> input = env.addSource(new RandomEventSource(5));
input.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Event>() {
@Override
public long extractAscendingTimestamp(Event element) {
return element.getTimestamp();
}
});
DataStream<Event> output = SiddhiCEP
.define("inputStream", input, "id", "name", "price", "timestamp")
.cql("from inputStream select timestamp, id, name, price insert into outputStream")
.returns("outputStream", Event.class);
String resultPath = tempFolder.newFile().toURI().toString();
output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
env.execute();
assertEquals(5, getLineCount(resultPath));
}
示例3
@Test
public void testUnboundedPojoStreamAndReturnPojo() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Event> input = env.addSource(new RandomEventSource(5));
input.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Event>() {
@Override
public long extractAscendingTimestamp(Event element) {
return element.getTimestamp();
}
});
DataStream<Event> output = SiddhiCEP
.define("inputStream", input, "id", "name", "price", "timestamp")
.cql("from inputStream select timestamp, id, name, price insert into outputStream")
.returns("outputStream", Event.class);
String resultPath = tempFolder.newFile().toURI().toString();
output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
env.execute();
assertEquals(5, getLineCount(resultPath));
}
示例4
@Test
public void testWithFailingHandler() {
AscendingTimestampExtractor<Long> extractor = new LongExtractor()
.withViolationHandler(new AscendingTimestampExtractor.FailingHandler());
runValidTests(extractor);
try {
runInvalidTest(extractor);
fail("should fail with an exception");
} catch (Exception ignored) {}
}
示例5
@Test
public void testWithIgnoringHandler() {
AscendingTimestampExtractor<Long> extractor = new LongExtractor()
.withViolationHandler(new AscendingTimestampExtractor.IgnoringHandler());
runValidTests(extractor);
runInvalidTest(extractor);
}
示例6
@Test
public void testWithLoggingHandler() {
AscendingTimestampExtractor<Long> extractor = new LongExtractor()
.withViolationHandler(new AscendingTimestampExtractor.LoggingHandler());
runValidTests(extractor);
runInvalidTest(extractor);
}
示例7
@Test
public void testWithDefaultHandler() {
AscendingTimestampExtractor<Long> extractor = new LongExtractor();
runValidTests(extractor);
runInvalidTest(extractor);
}
示例8
@Test
public void testInitialAndFinalWatermark() {
AscendingTimestampExtractor<Long> extractor = new LongExtractor();
assertEquals(Long.MIN_VALUE, extractor.getCurrentWatermark().getTimestamp());
extractor.extractTimestamp(Long.MIN_VALUE, -1L);
extractor.extractTimestamp(Long.MAX_VALUE, -1L);
assertEquals(Long.MAX_VALUE - 1, extractor.getCurrentWatermark().getTimestamp());
}
示例9
private void runValidTests(AscendingTimestampExtractor<Long> extractor) {
assertEquals(13L, extractor.extractTimestamp(13L, -1L));
assertEquals(13L, extractor.extractTimestamp(13L, 0L));
assertEquals(14L, extractor.extractTimestamp(14L, 0L));
assertEquals(20L, extractor.extractTimestamp(20L, 0L));
assertEquals(20L, extractor.extractTimestamp(20L, 0L));
assertEquals(20L, extractor.extractTimestamp(20L, 0L));
assertEquals(500L, extractor.extractTimestamp(500L, 0L));
assertEquals(Long.MAX_VALUE - 1, extractor.extractTimestamp(Long.MAX_VALUE - 1, 99999L));
}
示例10
private void runInvalidTest(AscendingTimestampExtractor<Long> extractor) {
assertEquals(1000L, extractor.extractTimestamp(1000L, 100));
assertEquals(1000L, extractor.extractTimestamp(1000L, 100));
// violation
assertEquals(999L, extractor.extractTimestamp(999L, 100));
}
示例11
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 构建输入数据
List<Tuple3<String, Long, Long>> data = new ArrayList<>();
Tuple3<String, Long, Long> a1 = new Tuple3<>("first event", 1L, 1111L);
Tuple3<String, Long, Long> a2 = new Tuple3<>("second event", 1L, 1112L);
Tuple3<String, Long, Long> a3 = new Tuple3<>("third event", 1L, 20121L);
Tuple3<String, Long, Long> b1 = new Tuple3<>("first event", 2L, 1111L);
Tuple3<String, Long, Long> b2 = new Tuple3<>("second event", 2L, 1112L);
Tuple3<String, Long, Long> b3 = new Tuple3<>("third event", 2L, 30111L);
data.add(a1);
data.add(a2);
data.add(a3);
data.add(b1);
data.add(b2);
data.add(b3);
DataStreamSource<Tuple3<String, Long, Long>> input = env.fromCollection(data);
input.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<String, Long, Long>>() {
@Override
public long extractAscendingTimestamp(Tuple3<String, Long, Long> element) {
return element.f2;
}
}).keyBy(x -> x.f1).timeWindow(Time.seconds(1), Time.seconds(1)).process(new MyProcessWindowFunction()).print();
// 输出结果:
// 3> window: TimeWindow{start=1000, end=2000}word count: 4
// 4> window: TimeWindow{start=1000, end=2000}word count: 4
// 3> window: TimeWindow{start=20000, end=21000}word count: 2
// 4> window: TimeWindow{start=30000, end=31000}word count: 2
env.execute();
}
示例12
@Test
public void testUpsert() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
Table t = tEnv.fromDataStream(get3TupleDataStream(env).assignTimestampsAndWatermarks(
new AscendingTimestampExtractor<Tuple3<Integer, Long, String>>() {
@Override
public long extractAscendingTimestamp(Tuple3<Integer, Long, String> element) {
return element.f0;
}}), "id, num, text");
tEnv.registerTable("T", t);
String[] fields = {"cnt", "lencnt", "cTag"};
tEnv.registerTableSink("upsertSink", JDBCUpsertTableSink.builder()
.setOptions(JDBCOptions.builder()
.setDBUrl(DB_URL)
.setTableName(OUTPUT_TABLE1)
.build())
.setTableSchema(TableSchema.builder().fields(
fields, new DataType[] {BIGINT(), BIGINT(), INT()}).build())
.build());
tEnv.sqlUpdate("INSERT INTO upsertSink SELECT cnt, COUNT(len) AS lencnt, cTag FROM" +
" (SELECT len, COUNT(id) as cnt, cTag FROM" +
" (SELECT id, CHAR_LENGTH(text) AS len, (CASE WHEN id > 0 THEN 1 ELSE 0 END) cTag FROM T)" +
" GROUP BY len, cTag)" +
" GROUP BY cnt, cTag");
env.execute();
check(new Row[] {
Row.of(1, 5, 1),
Row.of(7, 1, 1),
Row.of(9, 1, 1)
}, DB_URL, OUTPUT_TABLE1, fields);
}
示例13
@Test
public void testWithFailingHandler() {
AscendingTimestampExtractor<Long> extractor = new LongExtractor()
.withViolationHandler(new AscendingTimestampExtractor.FailingHandler());
runValidTests(extractor);
try {
runInvalidTest(extractor);
fail("should fail with an exception");
} catch (Exception ignored) {}
}
示例14
@Test
public void testWithIgnoringHandler() {
AscendingTimestampExtractor<Long> extractor = new LongExtractor()
.withViolationHandler(new AscendingTimestampExtractor.IgnoringHandler());
runValidTests(extractor);
runInvalidTest(extractor);
}
示例15
@Test
public void testWithLoggingHandler() {
AscendingTimestampExtractor<Long> extractor = new LongExtractor()
.withViolationHandler(new AscendingTimestampExtractor.LoggingHandler());
runValidTests(extractor);
runInvalidTest(extractor);
}
示例16
@Test
public void testWithDefaultHandler() {
AscendingTimestampExtractor<Long> extractor = new LongExtractor();
runValidTests(extractor);
runInvalidTest(extractor);
}
示例17
@Test
public void testInitialAndFinalWatermark() {
AscendingTimestampExtractor<Long> extractor = new LongExtractor();
assertEquals(Long.MIN_VALUE, extractor.getCurrentWatermark().getTimestamp());
extractor.extractTimestamp(Long.MIN_VALUE, -1L);
extractor.extractTimestamp(Long.MAX_VALUE, -1L);
assertEquals(Long.MAX_VALUE - 1, extractor.getCurrentWatermark().getTimestamp());
}
示例18
private void runValidTests(AscendingTimestampExtractor<Long> extractor) {
assertEquals(13L, extractor.extractTimestamp(13L, -1L));
assertEquals(13L, extractor.extractTimestamp(13L, 0L));
assertEquals(14L, extractor.extractTimestamp(14L, 0L));
assertEquals(20L, extractor.extractTimestamp(20L, 0L));
assertEquals(20L, extractor.extractTimestamp(20L, 0L));
assertEquals(20L, extractor.extractTimestamp(20L, 0L));
assertEquals(500L, extractor.extractTimestamp(500L, 0L));
assertEquals(Long.MAX_VALUE - 1, extractor.extractTimestamp(Long.MAX_VALUE - 1, 99999L));
}
示例19
private void runInvalidTest(AscendingTimestampExtractor<Long> extractor) {
assertEquals(1000L, extractor.extractTimestamp(1000L, 100));
assertEquals(1000L, extractor.extractTimestamp(1000L, 100));
// violation
assertEquals(999L, extractor.extractTimestamp(999L, 100));
}
示例20
/**
* Returns a DataStream of TaxiRide events from a CSV file.
*
* @param env The execution environment.
* @param csvFile The path of the CSV file to read.
* @return A DataStream of TaxiRide events.
*/
public static DataStream<TaxiRide> getRides(StreamExecutionEnvironment env, String csvFile) {
// create input format to read the CSV file
RowCsvInputFormat inputFormat = new RowCsvInputFormat(
null, // input path is configured later
inputFieldTypes,
"\n",
",");
// read file sequentially (with a parallelism of 1)
DataStream<Row> parsedRows = env
.readFile(inputFormat, csvFile)
.returns(Types.ROW(inputFieldTypes))
.setParallelism(1);
// convert parsed CSV rows into TaxiRides, extract timestamps, and assign watermarks
return parsedRows
// map to TaxiRide POJOs
.map(new RideMapper())
// define drop-off time as event-time timestamps and generate ascending watermarks.
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<TaxiRide>() {
@Override
public long extractAscendingTimestamp(TaxiRide ride) {
return ride.dropOffTime;
}
});
}
示例21
@Test
public void testWithFailingHandler() {
AscendingTimestampExtractor<Long> extractor = new LongExtractor()
.withViolationHandler(new AscendingTimestampExtractor.FailingHandler());
runValidTests(extractor);
try {
runInvalidTest(extractor);
fail("should fail with an exception");
} catch (Exception ignored) {}
}
示例22
@Test
public void testWithIgnoringHandler() {
AscendingTimestampExtractor<Long> extractor = new LongExtractor()
.withViolationHandler(new AscendingTimestampExtractor.IgnoringHandler());
runValidTests(extractor);
runInvalidTest(extractor);
}
示例23
@Test
public void testWithLoggingHandler() {
AscendingTimestampExtractor<Long> extractor = new LongExtractor()
.withViolationHandler(new AscendingTimestampExtractor.LoggingHandler());
runValidTests(extractor);
runInvalidTest(extractor);
}
示例24
@Test
public void testWithDefaultHandler() {
AscendingTimestampExtractor<Long> extractor = new LongExtractor();
runValidTests(extractor);
runInvalidTest(extractor);
}
示例25
@Test
public void testInitialAndFinalWatermark() {
AscendingTimestampExtractor<Long> extractor = new LongExtractor();
assertEquals(Long.MIN_VALUE, extractor.getCurrentWatermark().getTimestamp());
extractor.extractTimestamp(Long.MIN_VALUE, -1L);
extractor.extractTimestamp(Long.MAX_VALUE, -1L);
assertEquals(Long.MAX_VALUE - 1, extractor.getCurrentWatermark().getTimestamp());
}
示例26
private void runValidTests(AscendingTimestampExtractor<Long> extractor) {
assertEquals(13L, extractor.extractTimestamp(13L, -1L));
assertEquals(13L, extractor.extractTimestamp(13L, 0L));
assertEquals(14L, extractor.extractTimestamp(14L, 0L));
assertEquals(20L, extractor.extractTimestamp(20L, 0L));
assertEquals(20L, extractor.extractTimestamp(20L, 0L));
assertEquals(20L, extractor.extractTimestamp(20L, 0L));
assertEquals(500L, extractor.extractTimestamp(500L, 0L));
assertEquals(Long.MAX_VALUE - 1, extractor.extractTimestamp(Long.MAX_VALUE - 1, 99999L));
}
示例27
private void runInvalidTest(AscendingTimestampExtractor<Long> extractor) {
assertEquals(1000L, extractor.extractTimestamp(1000L, 100));
assertEquals(1000L, extractor.extractTimestamp(1000L, 100));
// violation
assertEquals(999L, extractor.extractTimestamp(999L, 100));
}
示例28
/**
* This tests whether timestamps are properly extracted in the timestamp
* extractor and whether watermarks are also correctly forwarded from this with the auto watermark
* interval.
*/
@Test
public void testTimestampExtractorWithAutoInterval() throws Exception {
final int numElements = 10;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(10);
env.setParallelism(1);
env.getConfig().disableSysoutLogging();
DataStream<Integer> source1 = env.addSource(new SourceFunction<Integer>() {
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
int index = 1;
while (index <= numElements) {
ctx.collect(index);
latch.await();
index++;
}
}
@Override
public void cancel() {}
});
DataStream<Integer> extractOp = source1.assignTimestampsAndWatermarks(
new AscendingTimestampExtractor<Integer>() {
@Override
public long extractAscendingTimestamp(Integer element) {
return element;
}
});
extractOp
.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true))
.transform("Timestamp Check",
BasicTypeInfo.INT_TYPE_INFO,
new TimestampCheckingOperator());
// verify that extractor picks up source parallelism
Assert.assertEquals(extractOp.getTransformation().getParallelism(), source1.getTransformation().getParallelism());
env.execute();
// verify that we get NUM_ELEMENTS watermarks
for (int j = 0; j < numElements; j++) {
if (!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j))) {
long wm = CustomOperator.finalWatermarks[0].get(j).getTimestamp();
Assert.fail("Wrong watermark. Expected: " + j + " Found: " + wm + " All: " + CustomOperator.finalWatermarks[0]);
}
}
// the input is finite, so it should have a MAX Watermark
assertEquals(Watermark.MAX_WATERMARK,
CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size() - 1));
}
示例29
/**
* This tests whether timestamps are properly extracted in the timestamp
* extractor and whether watermarks are also correctly forwarded from this with the auto watermark
* interval.
*/
@Test
public void testTimestampExtractorWithAutoInterval() throws Exception {
final int numElements = 10;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(10);
env.setParallelism(1);
env.getConfig().disableSysoutLogging();
DataStream<Integer> source1 = env.addSource(new SourceFunction<Integer>() {
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
int index = 1;
while (index <= numElements) {
ctx.collect(index);
latch.await();
index++;
}
}
@Override
public void cancel() {}
});
DataStream<Integer> extractOp = source1.assignTimestampsAndWatermarks(
new AscendingTimestampExtractor<Integer>() {
@Override
public long extractAscendingTimestamp(Integer element) {
return element;
}
});
extractOp
.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true))
.transform("Timestamp Check",
BasicTypeInfo.INT_TYPE_INFO,
new TimestampCheckingOperator());
// verify that extractor picks up source parallelism
Assert.assertEquals(extractOp.getTransformation().getParallelism(), source1.getTransformation().getParallelism());
env.execute();
// verify that we get NUM_ELEMENTS watermarks
for (int j = 0; j < numElements; j++) {
if (!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j))) {
long wm = CustomOperator.finalWatermarks[0].get(j).getTimestamp();
Assert.fail("Wrong watermark. Expected: " + j + " Found: " + wm + " All: " + CustomOperator.finalWatermarks[0]);
}
}
// the input is finite, so it should have a MAX Watermark
assertEquals(Watermark.MAX_WATERMARK,
CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size() - 1));
}
示例30
@Test
public void testStreamTableSinkUsingDescriptorWithWatermark() throws Exception {
// create a Pravega stream for test purposes
Stream stream = Stream.of(setupUtils.getScope(), "testStreamTableSinkUsingDescriptorWithWatermark");
this.setupUtils.createTestStream(stream.getStreamName(), 1);
// create a Flink Table environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
EnvironmentSettings.newInstance()
// watermark is only supported in blink planner
.useBlinkPlanner()
.inStreamingMode()
.build());
DataStream<SampleRecordWithTimestamp> dataStream = env.fromCollection(SAMPLES)
.map(SampleRecordWithTimestamp::new)
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<SampleRecordWithTimestamp>() {
@Override
public long extractAscendingTimestamp(SampleRecordWithTimestamp sampleRecordWithTimestamp) {
return sampleRecordWithTimestamp.getTimestamp();
}
});
Table table = tableEnv.fromDataStream(dataStream, "category, value, UserActionTime.rowtime");
Pravega pravega = new Pravega();
pravega.tableSinkWriterBuilder()
.withRoutingKeyField("category")
.enableWatermark(true)
.forStream(stream)
.withPravegaConfig(setupUtils.getPravegaConfig());
TableSchema tableSchema = TableSchema.builder()
.field("category", DataTypes.STRING())
.field("value", DataTypes.INT())
.field("timestamp", DataTypes.TIMESTAMP(3))
.build();
Schema schema = new Schema().schema(tableSchema);
ConnectTableDescriptor desc = tableEnv.connect(pravega)
.withFormat(new Json().failOnMissingField(true))
.withSchema(schema)
.inAppendMode();
desc.createTemporaryTable("test");
final Map<String, String> propertiesMap = desc.toProperties();
final TableSink<?> sink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
.createStreamTableSink(propertiesMap);
String tablePath = tableEnv.getCurrentDatabase() + "." + "PravegaSink";
ConnectorCatalogTable<?, ?> connectorCatalogTable = ConnectorCatalogTable.sink(sink, false);
tableEnv.getCatalog(tableEnv.getCurrentCatalog()).get().createTable(
ObjectPath.fromString(tablePath),
connectorCatalogTable, false);
table.insertInto(tablePath);
env.execute();
}