Java源码示例:org.apache.flink.cep.CEP
示例1
private static <TWarningType extends IWarning> DataStream<TWarningType> toWarningStream(DataStream<LocalWeatherData> localWeatherDataDataStream, IWarningPattern<LocalWeatherData, TWarningType> warningPattern) {
PatternStream<LocalWeatherData> tempPatternStream = CEP.pattern(
localWeatherDataDataStream.keyBy(new KeySelector<LocalWeatherData, String>() {
@Override
public String getKey(LocalWeatherData localWeatherData) throws Exception {
return localWeatherData.getStation().getWban();
}
}),
warningPattern.getEventPattern());
DataStream<TWarningType> warnings = tempPatternStream.select(new PatternSelectFunction<LocalWeatherData, TWarningType>() {
@Override
public TWarningType select(Map<String, List<LocalWeatherData>> map) throws Exception {
return warningPattern.create(map);
}
}, new GenericTypeInfo<TWarningType>(warningPattern.getWarningTargetType()));
return warnings;
}
示例2
private static <TWarningType extends IWarning> DataStream<TWarningType> toWarningStream(DataStream<LocalWeatherData> localWeatherDataDataStream, IWarningPattern<LocalWeatherData, TWarningType> warningPattern) {
PatternStream<LocalWeatherData> tempPatternStream = CEP.pattern(
localWeatherDataDataStream.keyBy(new KeySelector<LocalWeatherData, String>() {
@Override
public String getKey(LocalWeatherData localWeatherData) throws Exception {
return localWeatherData.getStation().getWban();
}
}),
warningPattern.getEventPattern());
DataStream<TWarningType> warnings = tempPatternStream.select(new PatternSelectFunction<LocalWeatherData, TWarningType>() {
@Override
public TWarningType select(Map<String, List<LocalWeatherData>> map) throws Exception {
return warningPattern.create(map);
}
}, new GenericTypeInfo<TWarningType>(warningPattern.getWarningTargetType()));
return warnings;
}
示例3
public static void main(String[] args) throws Exception {
final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameterTool);
env.setParallelism(1);
DataStreamSource<String> data = env.socketTextStream("127.0.0.1", 9200);
Pattern<String, String> pattern = Pattern.<String>begin("start")
.where(new SimpleCondition<String>() {
@Override
public boolean filter(String s) throws Exception {
return "a".equals(s);
}
})
.times(5).optional();
CEP.pattern(data, pattern)
.select(new PatternSelectFunction<String, String>() {
@Override
public String select(Map<String, List<String>> map) throws Exception {
log.info(map.toString());
return map.get("start").get(0);
}
}).print();
env.execute("flink learning cep Individual Pattern Quantifier");
}
示例4
private static PatternStream<Event> compile(String expression, DataStream<Event> dataStream, boolean strictEventTypeMatching) {
CaseInsensitiveInputStream inputStream = new CaseInsensitiveInputStream(expression);
PatternLanguageLexer lexer = new PatternLanguageLexer(inputStream);
CommonTokenStream tokens = new CommonTokenStream(lexer);
PatternLanguageParser parser = new PatternLanguageParser(tokens);
ParseTree parseTree = parser.patternExpression();
ParseTreeWalker parseTreeWalker = new ParseTreeWalker();
FlinkCepPatternLanguageListener listener = new FlinkCepPatternLanguageListener(strictEventTypeMatching);
parseTreeWalker.walk(listener, parseTree);
return CEP.pattern(dataStream, listener.getPattern());
}
示例5
public static void main(String[] args) throws Exception {
final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameterTool);
env.setParallelism(1);
DataStreamSource<String> data = env.socketTextStream("127.0.0.1", 9200);
Pattern<String, String> pattern = Pattern.<String>begin("start")
.where(new SimpleCondition<String>() {
@Override
public boolean filter(String s) throws Exception {
return "a".equals(s);
}
})
.times(5).optional();
CEP.pattern(data, pattern)
.select(new PatternSelectFunction<String, String>() {
@Override
public String select(Map<String, List<String>> map) throws Exception {
log.info(map.toString());
return map.get("start").get(0);
}
}).print();
env.execute("flink learning cep Individual Pattern Quantifier");
}
示例6
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
DataStream<TemperatureEvent> inputEventStream = env.addSource(
new FlinkKafkaConsumer09<TemperatureEvent>("test", new EventDeserializationSchema(), properties));
Pattern<TemperatureEvent, ?> warningPattern = Pattern.<TemperatureEvent> begin("first")
.subtype(TemperatureEvent.class).where(new FilterFunction<TemperatureEvent>() {
private static final long serialVersionUID = 1L;
public boolean filter(TemperatureEvent value) {
if (value.getTemperature() >= 26.0) {
return true;
}
return false;
}
}).within(Time.seconds(10));
DataStream<Alert> patternStream = CEP.pattern(inputEventStream, warningPattern)
.select(new PatternSelectFunction<TemperatureEvent, Alert>() {
private static final long serialVersionUID = 1L;
public Alert select(Map<String, TemperatureEvent> event) throws Exception {
return new Alert("Temperature Rise Detected:" + event.get("first").getTemperature()
+ " on machine name:" + event.get("first").getMachineName());
}
});
patternStream.print();
env.execute("CEP on Temperature Sensor");
}
示例7
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<TemperatureEvent> inputEventStream = env.fromElements(new TemperatureEvent("xyz", 22.0),
new TemperatureEvent("xyz", 20.1), new TemperatureEvent("xyz", 21.1), new TemperatureEvent("xyz", 22.2),
new TemperatureEvent("xyz", 29.1), new TemperatureEvent("xyz", 22.3), new TemperatureEvent("xyz", 22.1),
new TemperatureEvent("xyz", 22.4), new TemperatureEvent("xyz", 22.7),
new TemperatureEvent("xyz", 27.0));
Pattern<TemperatureEvent, ?> warningPattern = Pattern.<TemperatureEvent> begin("first")
.subtype(TemperatureEvent.class).where(new FilterFunction<TemperatureEvent>() {
private static final long serialVersionUID = 1L;
public boolean filter(TemperatureEvent value) {
if (value.getTemperature() >= 26.0) {
return true;
}
return false;
}
}).within(Time.seconds(10));
DataStream<Alert> patternStream = CEP.pattern(inputEventStream, warningPattern)
.select(new PatternSelectFunction<TemperatureEvent, Alert>() {
private static final long serialVersionUID = 1L;
public Alert select(Map<String, TemperatureEvent> event) throws Exception {
return new Alert("Temperature Rise Detected");
}
});
patternStream.print();
env.execute("CEP on Temperature Sensor");
}
示例8
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStream<Event> eventStream = env.addSource(new OutOfOrderEventSource())
.assignTimestampsAndWatermarks(new TimestampsAndWatermarks());
Pattern<Event, ?> matchEverything =
Pattern.<Event>begin("any")
.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) throws Exception {
return true;
}
});
PatternStream<Event> patternStream = CEP.pattern(eventStream, matchEverything);
OutputTag<Event> lateDataOutputTag = new OutputTag<Event>("late-events"){};
SingleOutputStreamOperator<Event> sorted = patternStream
.sideOutputLateData(lateDataOutputTag)
.select(new PatternSelectFunction<Event, Event>() {
@Override
public Event select(Map<String, List<Event>> map) throws Exception {
return map.get("any").get(0);
}
});
sorted.print();
sorted
.getSideOutput(lateDataOutputTag)
.map(e -> new Tuple2<>(e, "LATE"))
.returns(Types.TUPLE(TypeInformation.of(Event.class), Types.STRING))
.print();
env.execute();
}
示例9
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
final String input = params.get("input", ExerciseBase.pathToRideData);
final int servingSpeedFactor = 600; // events of 10 minutes are served in 1 second
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(ExerciseBase.parallelism);
// CheckpointedTaxiRideSource delivers events in order
DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new CheckpointedTaxiRideSource(input, servingSpeedFactor)));
DataStream<TaxiRide> keyedRides = rides
.keyBy("rideId");
// A complete taxi ride has a START event followed by an END event
// This pattern is incomplete ...
Pattern<TaxiRide, TaxiRide> completedRides = Pattern.<TaxiRide>begin("start");
// We want to find rides that have NOT been completed within 120 minutes.
// This pattern matches rides that ARE completed.
// Below we will ignore rides that match this pattern, and emit those that timeout.
PatternStream<TaxiRide> patternStream = CEP.pattern(keyedRides, completedRides.within(Time.minutes(120)));
OutputTag<TaxiRide> timedout = new OutputTag<TaxiRide>("timedout"){};
SingleOutputStreamOperator<TaxiRide> longRides = patternStream.flatSelect(
timedout,
new TaxiRideTimedOut<TaxiRide>(),
new FlatSelectNothing<TaxiRide>()
);
printOrTest(longRides.getSideOutput(timedout));
throw new MissingSolutionException();
// env.execute("Long Taxi Rides (CEP)");
}