第八章 FlinkCEP机制及代码练习

wuchangjian2021-11-16 18:25:46编程学习

Flink复杂事件处理CEP机制

1、CEP机制概述

1.1、CEP简介
  • CEP:复杂事件处理(Complex event processing),基于Flink之上的复杂事件处理(CEP)库,类似正则机制。
1.2、CEP用途

(1)检测和发现无界流中多个记录的关联规则,得到满足规则的复杂事件

(2)允许业务定义从输入流中提取的复杂模式序列

1.3、使用流程

(1)定义pattern

(2)pattern应⽤到数据流,得到模式流

(3)从模式流 获取结果

  • 注意事项:CEP并不包含在flink中,使⽤前需要⾃⼰导⼊
    <dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-cep_${scala.version}</artifactId>
     <version>${flink.version}</version>
    </dependency>

2、CEP机制相关API

2.1、模式API
  • 模式:定义处理事件的规则

(1)个体模式:组成复杂规则的每⼀ 个单独的模式定义

// expecting 4 occurrences
start.times(4);

// expecting 0 or 4 occurrences
start.times(4).optional();

// expecting 2, 3 or 4 occurrences
start.times(2, 4);

(2)组合模式:很多个体模式组合起来,最常用

// strict contiguity
Pattern<Event, ?> strict = start.next("middle").where(...);

// relaxed contiguity
Pattern<Event, ?> relaxed = start.followedBy("middle").where(...);

// non-deterministic relaxed contiguity
Pattern<Event, ?> nonDetermin = start.followedByAny("middle").where(...);
  • 2相关API说明
严格近邻:期望所有匹配事件严格地⼀个接⼀个出现,中间没有任何不匹配的事件, API是.next()
宽松近邻:允许中间出现不匹配的事件,API是.followedBy()
⾮确定性宽松近邻:可以忽略已经匹配的条件,API是followedByAny()
指定时间约束:指定模式在多⻓时间内匹配有效,API是within
notNext():不希望事件类型直接跟随另⼀个
notFollowedBy():不希望事件类型介于其他两种事件类型之间,

(3)模式组:将⼀个组合模式作为条 件嵌套在个体模式⾥

Pattern<Event, ?> start = Pattern.begin(
    Pattern.<Event>begin("start").where(...).followedBy("start_middle").where(...)
);

// strict contiguity
Pattern<Event, ?> strict = start.next(
    Pattern.<Event>begin("next_start").where(...).followedBy("next_middle").where(...)
).times(3);

// relaxed contiguity
Pattern<Event, ?> relaxed = start.followedBy(
    Pattern.<Event>begin("followedby_start").where(...).followedBy("followedby_middle").where(...)
).oneOrMore();

// non-deterministic relaxed contiguity
Pattern<Event, ?> nonDetermin = start.followedByAny(
    Pattern.<Event>begin("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...)
).optional();
2.21、参数

(1)times:指定固定的循环执⾏次数

(2)greedy:贪婪模式,尽可能多触发

(3)oneOrMore:指定触发⼀次或多次

(4)timesOrMore:指定触发固定以上的次数

(5)optional:要么不触发要么触发指定的次数

3、CEP项目实战

3.1、需求说明
  • 同一个账号,在5秒内连续登录失败2次,则认为存在登录问题
  • 数据格式:lz,2021-11-11 12:01:01,-1
3.2、代码实战
package com.lihaiwei.text1.app;

import com.lihaiwei.text1.util.TimeUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.List;
import java.util.Map;

public class flink10state {
    public static void main(String[] args) throws Exception {

        //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 设置检查点
        //两个检查点之间间隔时间,默认是0,单位毫秒
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

        //Checkpoint过程中出现错误,是否让整体任务都失败,默认值为0,表示不容忍任何Checkpoint失败
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);

        //Checkpoint是进行失败恢复,当一个 Flink 应用程序失败终止、人为取消等时,它的 Checkpoint 就会被清除
        //可以配置不同策略进行操作
        // DELETE_ON_CANCELLATION: 当作业取消时,Checkpoint 状态信息会被删除,因此取消任务后,不能从 Checkpoint 位置进行恢复任务
        // RETAIN_ON_CANCELLATION(多): 当作业手动取消时,将会保留作业的 Checkpoint 状态信息,要手动清除该作业的 Checkpoint 状态信息
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        //Flink 默认提供 Extractly-Once 保证 State 的一致性,还提供了 Extractly-Once,At-Least-Once 两种模式,
        // 设置checkpoint的模式为EXACTLY_ONCE,也是默认的,
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        //设置checkpoint的超时时间, 如果规定时间没完成则放弃,默认是10分钟
        env.getCheckpointConfig().setCheckpointTimeout(50000);

        //设置同一时刻有多少个checkpoint可以同时执行,默认为1就行,以避免占用太多正常数据处理资源
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);


        //设置了重启策略, 作业在失败后能自动恢复,失败后最多重启3次,每次重启间隔10s
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000));


        //java,2022-11-11 09:10:10,15
        DataStream<String> ds = env.socketTextStream("192.168.6.104", 8888);

        // 3、将输入进行提取转换
        DataStream<Tuple3<String,String,Integer>> flatMap = ds.flatMap(new FlatMapFunction<String, Tuple3<String, String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple3<String, String, Integer>> out) throws Exception {
                String[] arr = value.split(",");
                out.collect(Tuple3.of(arr[0],arr[1],Integer.parseInt(arr[2])));
            }
        });
        // 4、设置watermark,单调递增
        SingleOutputStreamOperator<Tuple3<String, String, Integer>> watermarkDS = flatMap.assignTimestampsAndWatermarks(WatermarkStrategy
                //针对乱序数据,生成3秒延迟的watermark
                //.<Tuple3<String, String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3))

                //正对顺序数据,时间单调递增,event time充当水印
                .<Tuple3<String, String, Integer>>forMonotonousTimestamps()
                .withTimestampAssigner((event, timestamp) -> TimeUtil.strToDate(event.f1).getTime()));

        // 5、进行分组
        KeyedStream<Tuple3<String, String, Integer>, String> keybyDS = watermarkDS.keyBy(new KeySelector<Tuple3<String, String, Integer>, String>() {
            @Override
            public String getKey(Tuple3<String, String, Integer> value) throws Exception {
                return value.f0;
            }
        });

        // CEP1、定义模式pattrn,即监控规则
        Pattern<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>> cepPatten = Pattern
                .<Tuple3<String, String, Integer>>begin("firstbeginTime")
                // 6.1、过滤出第一次登录失败的数据
                .where(new SimpleCondition<Tuple3<String, String, Integer>>() {
                    @Override
                    public boolean filter(Tuple3<String, String, Integer> value) throws Exception {
                        return value.f2 == -1;
                    }
                })
                // 6.2、过滤出第二次登陆失败的数据
                .next("secondLoginTime")
                .where(new SimpleCondition<Tuple3<String, String, Integer>>() {
                    @Override
                    public boolean filter(Tuple3<String, String, Integer> value) throws Exception {
                        return value.f2 == -1;
                    }
                })
                // 6.3、限制时间为10s
                .within(Time.seconds(5));

        // CEP2、匹配数据流
        PatternStream<Tuple3<String, String, Integer>> pattenDS = CEP.pattern(keybyDS, cepPatten);

        // CEP3、获取结果
        SingleOutputStreamOperator<Tuple3<String, String, String>> select = pattenDS.select(new PatternSelectFunction<Tuple3<String, String, Integer>, Tuple3<String, String, String>>() {
            @Override
            public Tuple3<String, String, String> select(Map<String, List<Tuple3<String, String, Integer>>> map) throws Exception {
                Tuple3<String, String, Integer> first = map.get("firstbeginTime").get(0);
                Tuple3<String, String, Integer> second = map.get("secondLoginTime").get(0);
                return Tuple3.of(first.f0,first.f1,second.f1);
            }
        });

        select.print("风险账号");

        env.execute("watermark job");
    }
}
3.3、调试结果

(1)测试数据

LL,2022-11-11 12:01:01,-1
LL,2022-11-11 12:01:02,-1
LL,2022-11-11 12:01:03,-1
LL,2022-11-11 12:01:04,-1
WW,2022-11-11 12:01:07,-1
WW,2022-11-11 12:01:19,-1
WW,2022-11-11 12:01:27,-1
WW,2022-11-11 12:01:35,1

(2)运行结果

在这里插入图片描述

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。