Java源码示例:org.apache.flink.cep.CEP

示例1
private static <TWarningType extends IWarning> DataStream<TWarningType> toWarningStream(DataStream<LocalWeatherData> localWeatherDataDataStream, IWarningPattern<LocalWeatherData, TWarningType> warningPattern) {
    PatternStream<LocalWeatherData> tempPatternStream = CEP.pattern(
            localWeatherDataDataStream.keyBy(new KeySelector<LocalWeatherData, String>() {
                @Override
                public String getKey(LocalWeatherData localWeatherData) throws Exception {
                    return localWeatherData.getStation().getWban();
                }
            }),
            warningPattern.getEventPattern());

    DataStream<TWarningType> warnings = tempPatternStream.select(new PatternSelectFunction<LocalWeatherData, TWarningType>() {
        @Override
        public TWarningType select(Map<String, List<LocalWeatherData>> map) throws Exception {
            return warningPattern.create(map);
        }
    }, new GenericTypeInfo<TWarningType>(warningPattern.getWarningTargetType()));

    return warnings;
}
 
示例2
private static <TWarningType extends IWarning> DataStream<TWarningType> toWarningStream(DataStream<LocalWeatherData> localWeatherDataDataStream, IWarningPattern<LocalWeatherData, TWarningType> warningPattern) {
    PatternStream<LocalWeatherData> tempPatternStream = CEP.pattern(
            localWeatherDataDataStream.keyBy(new KeySelector<LocalWeatherData, String>() {
                @Override
                public String getKey(LocalWeatherData localWeatherData) throws Exception {
                    return localWeatherData.getStation().getWban();
                }
            }),
            warningPattern.getEventPattern());

    DataStream<TWarningType> warnings = tempPatternStream.select(new PatternSelectFunction<LocalWeatherData, TWarningType>() {
        @Override
        public TWarningType select(Map<String, List<LocalWeatherData>> map) throws Exception {
            return warningPattern.create(map);
        }
    }, new GenericTypeInfo<TWarningType>(warningPattern.getWarningTargetType()));

    return warnings;
}
 
示例3
public static void main(String[] args) throws Exception {
    final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.getConfig().setGlobalJobParameters(parameterTool);
    env.setParallelism(1);

    DataStreamSource<String> data = env.socketTextStream("127.0.0.1", 9200);

    Pattern<String, String> pattern = Pattern.<String>begin("start")
            .where(new SimpleCondition<String>() {
                @Override
                public boolean filter(String s) throws Exception {
                    return "a".equals(s);
                }
            })
            .times(5).optional();

    CEP.pattern(data, pattern)
            .select(new PatternSelectFunction<String, String>() {
                @Override
                public String select(Map<String, List<String>> map) throws Exception {
                    log.info(map.toString());
                    return map.get("start").get(0);
                }
            }).print();
    env.execute("flink learning cep Individual Pattern Quantifier");
}
 
示例4
private static PatternStream<Event> compile(String expression, DataStream<Event> dataStream, boolean strictEventTypeMatching) {

        CaseInsensitiveInputStream inputStream = new CaseInsensitiveInputStream(expression);
        PatternLanguageLexer lexer = new PatternLanguageLexer(inputStream);

        CommonTokenStream tokens = new CommonTokenStream(lexer);
        PatternLanguageParser parser = new PatternLanguageParser(tokens);
        ParseTree parseTree = parser.patternExpression();

        ParseTreeWalker parseTreeWalker = new ParseTreeWalker();
        FlinkCepPatternLanguageListener listener = new FlinkCepPatternLanguageListener(strictEventTypeMatching);
        parseTreeWalker.walk(listener, parseTree);

        return CEP.pattern(dataStream, listener.getPattern());
    }
 
示例5
public static void main(String[] args) throws Exception {
    final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.getConfig().setGlobalJobParameters(parameterTool);
    env.setParallelism(1);

    DataStreamSource<String> data = env.socketTextStream("127.0.0.1", 9200);

    Pattern<String, String> pattern = Pattern.<String>begin("start")
            .where(new SimpleCondition<String>() {
                @Override
                public boolean filter(String s) throws Exception {
                    return "a".equals(s);
                }
            })
            .times(5).optional();

    CEP.pattern(data, pattern)
            .select(new PatternSelectFunction<String, String>() {
                @Override
                public String select(Map<String, List<String>> map) throws Exception {
                    log.info(map.toString());
                    return map.get("start").get(0);
                }
            }).print();
    env.execute("flink learning cep Individual Pattern Quantifier");
}
 
示例6
public static void main(String[] args) throws Exception {
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

	Properties properties = new Properties();
	properties.setProperty("bootstrap.servers", "localhost:9092");
	properties.setProperty("group.id", "test");

	DataStream<TemperatureEvent> inputEventStream = env.addSource(
			new FlinkKafkaConsumer09<TemperatureEvent>("test", new EventDeserializationSchema(), properties));

	Pattern<TemperatureEvent, ?> warningPattern = Pattern.<TemperatureEvent> begin("first")
			.subtype(TemperatureEvent.class).where(new FilterFunction<TemperatureEvent>() {
				private static final long serialVersionUID = 1L;

				public boolean filter(TemperatureEvent value) {
					if (value.getTemperature() >= 26.0) {
						return true;
					}
					return false;
				}
			}).within(Time.seconds(10));

	DataStream<Alert> patternStream = CEP.pattern(inputEventStream, warningPattern)
			.select(new PatternSelectFunction<TemperatureEvent, Alert>() {
				private static final long serialVersionUID = 1L;

				public Alert select(Map<String, TemperatureEvent> event) throws Exception {

					return new Alert("Temperature Rise Detected:" + event.get("first").getTemperature()
							+ " on machine name:" + event.get("first").getMachineName());
				}

			});

	patternStream.print();
	env.execute("CEP on Temperature Sensor");
}
 
示例7
public static void main(String[] args) throws Exception {
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	DataStream<TemperatureEvent> inputEventStream = env.fromElements(new TemperatureEvent("xyz", 22.0),
			new TemperatureEvent("xyz", 20.1), new TemperatureEvent("xyz", 21.1), new TemperatureEvent("xyz", 22.2),
			new TemperatureEvent("xyz", 29.1), new TemperatureEvent("xyz", 22.3), new TemperatureEvent("xyz", 22.1),
			new TemperatureEvent("xyz", 22.4), new TemperatureEvent("xyz", 22.7),
			new TemperatureEvent("xyz", 27.0));

	Pattern<TemperatureEvent, ?> warningPattern = Pattern.<TemperatureEvent> begin("first")
			.subtype(TemperatureEvent.class).where(new FilterFunction<TemperatureEvent>() {
				private static final long serialVersionUID = 1L;

				public boolean filter(TemperatureEvent value) {
					if (value.getTemperature() >= 26.0) {
						return true;
					}
					return false;
				}
			}).within(Time.seconds(10));

	DataStream<Alert> patternStream = CEP.pattern(inputEventStream, warningPattern)
			.select(new PatternSelectFunction<TemperatureEvent, Alert>() {
				private static final long serialVersionUID = 1L;

				public Alert select(Map<String, TemperatureEvent> event) throws Exception {

					return new Alert("Temperature Rise Detected");
				}

			});

	patternStream.print();
	env.execute("CEP on Temperature Sensor");
}
 
示例8
public static void main(String[] args) throws Exception {

		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
		env.setParallelism(1);

		DataStream<Event> eventStream = env.addSource(new OutOfOrderEventSource())
				.assignTimestampsAndWatermarks(new TimestampsAndWatermarks());

		Pattern<Event, ?> matchEverything =
				Pattern.<Event>begin("any")
						.where(new SimpleCondition<Event>() {
							@Override
							public boolean filter(Event event) throws Exception {
								return true;
							}
						});

		PatternStream<Event> patternStream = CEP.pattern(eventStream, matchEverything);
		OutputTag<Event> lateDataOutputTag = new OutputTag<Event>("late-events"){};

		SingleOutputStreamOperator<Event> sorted = patternStream
				.sideOutputLateData(lateDataOutputTag)
				.select(new PatternSelectFunction<Event, Event>() {
					@Override
					public Event select(Map<String, List<Event>> map) throws Exception {
						return map.get("any").get(0);
					}
				});

		sorted.print();
		sorted
				.getSideOutput(lateDataOutputTag)
				.map(e -> new Tuple2<>(e, "LATE"))
				.returns(Types.TUPLE(TypeInformation.of(Event.class), Types.STRING))
				.print();

		env.execute();
	}
 
示例9
public static void main(String[] args) throws Exception {

		ParameterTool params = ParameterTool.fromArgs(args);
		final String input = params.get("input", ExerciseBase.pathToRideData);

		final int servingSpeedFactor = 600; // events of 10 minutes are served in 1 second

		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
		env.setParallelism(ExerciseBase.parallelism);

		// CheckpointedTaxiRideSource delivers events in order
		DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new CheckpointedTaxiRideSource(input, servingSpeedFactor)));

		DataStream<TaxiRide> keyedRides = rides
			.keyBy("rideId");

		// A complete taxi ride has a START event followed by an END event
		// This pattern is incomplete ...
		Pattern<TaxiRide, TaxiRide> completedRides = Pattern.<TaxiRide>begin("start");

		// We want to find rides that have NOT been completed within 120 minutes.
		// This pattern matches rides that ARE completed.
		// Below we will ignore rides that match this pattern, and emit those that timeout.
		PatternStream<TaxiRide> patternStream = CEP.pattern(keyedRides, completedRides.within(Time.minutes(120)));

		OutputTag<TaxiRide> timedout = new OutputTag<TaxiRide>("timedout"){};

		SingleOutputStreamOperator<TaxiRide> longRides = patternStream.flatSelect(
				timedout,
				new TaxiRideTimedOut<TaxiRide>(),
				new FlatSelectNothing<TaxiRide>()
		);

		printOrTest(longRides.getSideOutput(timedout));

		throw new MissingSolutionException();

//		env.execute("Long Taxi Rides (CEP)");
	}