Java源码示例:org.apache.flink.cep.PatternStream
示例1
private void executeTest(String pattern, List<Event> data) throws Exception {
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Event> eventDataStream = streamExecutionEnvironment.fromCollection(data);
PatternStream<Event> patternStream = Dsl.compile(pattern, eventDataStream);
patternStream.select(new PatternSelectFunction<Event, Event>() {
private static final long serialVersionUID = 7242171752905668044L;
@Override
public Event select(Map<String, List<Event>> map) {
results.putAll(map);
return null;
}
});
streamExecutionEnvironment.execute("test");
}
示例2
static void executeTest(String pattern, List<Event> data) throws Exception {
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Event> eventDataStream = streamExecutionEnvironment.fromCollection(data);
PatternStream<Event> patternStream = Dsl.compile(pattern, eventDataStream);
patternStream.select(new PatternSelectFunction<Event, Event>() {
private static final long serialVersionUID = 7242171752905668044L;
@Override
public Event select(Map<String, List<Event>> map) {
results.add(map);
return null;
}
});
streamExecutionEnvironment.execute("test");
}
示例3
private static <TWarningType extends IWarning> DataStream<TWarningType> toWarningStream(DataStream<LocalWeatherData> localWeatherDataDataStream, IWarningPattern<LocalWeatherData, TWarningType> warningPattern) {
PatternStream<LocalWeatherData> tempPatternStream = CEP.pattern(
localWeatherDataDataStream.keyBy(new KeySelector<LocalWeatherData, String>() {
@Override
public String getKey(LocalWeatherData localWeatherData) throws Exception {
return localWeatherData.getStation().getWban();
}
}),
warningPattern.getEventPattern());
DataStream<TWarningType> warnings = tempPatternStream.select(new PatternSelectFunction<LocalWeatherData, TWarningType>() {
@Override
public TWarningType select(Map<String, List<LocalWeatherData>> map) throws Exception {
return warningPattern.create(map);
}
}, new GenericTypeInfo<TWarningType>(warningPattern.getWarningTargetType()));
return warnings;
}
示例4
private static <TWarningType extends IWarning> DataStream<TWarningType> toWarningStream(DataStream<LocalWeatherData> localWeatherDataDataStream, IWarningPattern<LocalWeatherData, TWarningType> warningPattern) {
PatternStream<LocalWeatherData> tempPatternStream = CEP.pattern(
localWeatherDataDataStream.keyBy(new KeySelector<LocalWeatherData, String>() {
@Override
public String getKey(LocalWeatherData localWeatherData) throws Exception {
return localWeatherData.getStation().getWban();
}
}),
warningPattern.getEventPattern());
DataStream<TWarningType> warnings = tempPatternStream.select(new PatternSelectFunction<LocalWeatherData, TWarningType>() {
@Override
public TWarningType select(Map<String, List<LocalWeatherData>> map) throws Exception {
return warningPattern.create(map);
}
}, new GenericTypeInfo<TWarningType>(warningPattern.getWarningTargetType()));
return warnings;
}
示例5
private static PatternStream<Event> compile(String expression, DataStream<Event> dataStream, boolean strictEventTypeMatching) {
CaseInsensitiveInputStream inputStream = new CaseInsensitiveInputStream(expression);
PatternLanguageLexer lexer = new PatternLanguageLexer(inputStream);
CommonTokenStream tokens = new CommonTokenStream(lexer);
PatternLanguageParser parser = new PatternLanguageParser(tokens);
ParseTree parseTree = parser.patternExpression();
ParseTreeWalker parseTreeWalker = new ParseTreeWalker();
FlinkCepPatternLanguageListener listener = new FlinkCepPatternLanguageListener(strictEventTypeMatching);
parseTreeWalker.walk(listener, parseTree);
return CEP.pattern(dataStream, listener.getPattern());
}
示例6
@Test
public void shouldEvaluateNextPattern() throws Exception {
DataStream<Event> dataStream = null;
PatternStream<Event> patternStream = Dsl.compile("A(attribute='testabc') B(attribute=30)", dataStream);
assertThat(patternStream.getPattern(), is(notNullValue()));
assertThat(patternStream.getPattern().getName(), is("B"));
assertThat(patternStream.getPattern().getPrevious(), is(notNullValue()));
assertThat(patternStream.getPattern().getPrevious().getName(), is("A"));
}
示例7
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStream<Event> eventStream = env.addSource(new OutOfOrderEventSource())
.assignTimestampsAndWatermarks(new TimestampsAndWatermarks());
Pattern<Event, ?> matchEverything =
Pattern.<Event>begin("any")
.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) throws Exception {
return true;
}
});
PatternStream<Event> patternStream = CEP.pattern(eventStream, matchEverything);
OutputTag<Event> lateDataOutputTag = new OutputTag<Event>("late-events"){};
SingleOutputStreamOperator<Event> sorted = patternStream
.sideOutputLateData(lateDataOutputTag)
.select(new PatternSelectFunction<Event, Event>() {
@Override
public Event select(Map<String, List<Event>> map) throws Exception {
return map.get("any").get(0);
}
});
sorted.print();
sorted
.getSideOutput(lateDataOutputTag)
.map(e -> new Tuple2<>(e, "LATE"))
.returns(Types.TUPLE(TypeInformation.of(Event.class), Types.STRING))
.print();
env.execute();
}
示例8
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
final String input = params.get("input", ExerciseBase.pathToRideData);
final int servingSpeedFactor = 600; // events of 10 minutes are served in 1 second
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(ExerciseBase.parallelism);
// CheckpointedTaxiRideSource delivers events in order
DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new CheckpointedTaxiRideSource(input, servingSpeedFactor)));
DataStream<TaxiRide> keyedRides = rides
.keyBy("rideId");
// A complete taxi ride has a START event followed by an END event
// This pattern is incomplete ...
Pattern<TaxiRide, TaxiRide> completedRides = Pattern.<TaxiRide>begin("start");
// We want to find rides that have NOT been completed within 120 minutes.
// This pattern matches rides that ARE completed.
// Below we will ignore rides that match this pattern, and emit those that timeout.
PatternStream<TaxiRide> patternStream = CEP.pattern(keyedRides, completedRides.within(Time.minutes(120)));
OutputTag<TaxiRide> timedout = new OutputTag<TaxiRide>("timedout"){};
SingleOutputStreamOperator<TaxiRide> longRides = patternStream.flatSelect(
timedout,
new TaxiRideTimedOut<TaxiRide>(),
new FlatSelectNothing<TaxiRide>()
);
printOrTest(longRides.getSideOutput(timedout));
throw new MissingSolutionException();
// env.execute("Long Taxi Rides (CEP)");
}
示例9
public PatternStream<Event> compile(String expression, DataStream<Event> dataStream) {
return Dsl.compile(expression, dataStream, strictEventTypeMatching);
}
示例10
/**
* Returns a pattern stream based on a data stream and a pattern to search for
* @param expression The expression representing the pattern which should be evaluated, refer to the projects documentation on syntax and features
* @param dataStream The data stream which should be evaluated
* @return The pattern stream providing the found patterns
*/
public static PatternStream<Event> compile(String expression, DataStream<Event> dataStream) {
return Dsl.compile(expression, dataStream, false);
}