Flink CEP实现10秒内连续登录失败用户分析
•
大数据
1、什么是CEP?
Flink CEP即 Flink Complex Event Processing,是基于DataStream流式数据提供的一套复杂事件处理编程模型。你可以把他理解为基于无界流的一套正则匹配模型,即对于无界流中的各种数据(称为事件),提供一种组合匹配的功能。

上图中,以不同形状代表一个DataStream中不同属性的事件。以一个圆圈和一个三角组成一个Pattern后,就可以快速过滤出原来的DataStream中符合规律的数据。举个例子,比如很多网站需要对恶意登录的用户进行屏蔽,如果用户连续三次输入错误的密码,那就要锁定当前用户。在这个场景下,所有用户的登录行为就构成了一个无界的数据流DataStream。而连续三次登录失败就是一个匹配模型Pattern。CEP编程模型的功能就是从用户登录行为这个无界数据流DataStream中,找出符合这个匹配模Pattern的所有数据。这种场景下,使用我们前面介绍的各种DataStream API其实也是可以实现的,不过相对就麻烦很多。而CEP编程模型则提供了非常简单灵活的功能实现方式。
2、代码实现
2.1 引入maven依赖:
4.0.0
com.roy
FlinkDemo
1.0
1.12.5
1.8
1.8
2.12.1
org.apache.flink
flink-java
${flink.version}
org.apache.flink
flink-clients_2.12
${flink.version}
org.apache.flink
flink-streaming-java_2.12
${flink.version}
org.apache.flink
flink-table-api-java-bridge_2.12
${flink.version}
org.apache.flink
flink-table-planner-blink_2.12
${flink.version}
org.apache.flink
flink-table-common
${flink.version}
org.apache.flink
flink-cep_2.12
${flink.version}
org.apache.flink
flink-statebackend-rocksdb_2.12
${flink.version}
org.apache.flink
flink-shaded-hadoop-2-uber
2.8.3-10.0
log4j
log4j
1.2.14
org.apache.maven.plugins
maven-assembly-plugin
3.1.0
jar-with-dependencies
make-assembly
package
single
2.2 基本流程
//1、获取原始事件流
DataStream input = ......;
//2、定义匹配器
Pattern pattern = .......;
//3、获取匹配流
PatternStream patternStream = CEP.pattern(input, pattern);
//4、将匹配流中的数据处理形成结果数据流
DataStream resultStream = patternStream.process(
new PatternProcessFunction() {
@Override
public void processMatch(
Map<String, List> pattern,
Context ctx,
Collector out) throws Exception {
}
});
2.3 完整代码
注意:代码运行前,先启动2.4 nlk socket服务
package com.roy.flink.project.userlogin;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import java.time.Duration;
import java.util.List;
import java.util.Map;
/**
* @desc 十秒内连续登录失败的用户分析。使用Flink CEP进行快速模式匹配
*/
public class MyUserLoginAna {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// //BoundedOutOfOrdernessWatermarks定时提交Watermark的间隔
env.getConfig().setAutoWatermarkInterval(1000L);
// 使用Socket测试
env.setParallelism(1);
// 1、获取原始事件流(10.86.97.206改为实际地址)
final DataStreamSource dataStreamSource = env.socketTextStream("10.86.97.206",7777);
final SingleOutputStreamOperator userLoginRecordStream = dataStreamSource.map(new MapFunction() {
@Override
public UserLoginRecord map(String s) throws Exception {
final String[] splitVal = s.split(",");
return new UserLoginRecord(splitVal[0], Integer.parseInt(splitVal[1]), Long.parseLong(splitVal[2]));
}
}).assignTimestampsAndWatermarks(
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1))// 主要针对乱序流,由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间
.withTimestampAssigner((SerializableTimestampAssigner) (element, recordTimestamp) -> element.getLoginTime())
);
// 2、定义匹配器
// 2.1:10秒内出现3次登录失败的记录(不一定连续)
// Flink CEP定义消息匹配器。
// final Pattern pattern = Pattern.begin("start").where(new SimpleCondition() {
// @Override
// public boolean filter(UserLoginRecord userLoginRecord) throws Exception {
// return 1 == userLoginRecord.getLoginRes();
// }
// }).times(3).within(Time.seconds(10));
// 2.2:连续三次登录失败。next表示连续匹配。 不连续匹配使用followBy
final Pattern pattern = Pattern.begin("one").where(new SimpleCondition() {
@Override
public boolean filter(UserLoginRecord value) throws Exception {
return 1 == value.getLoginRes();
}
}).next("two").where(new SimpleCondition() {
@Override
public boolean filter(UserLoginRecord value) throws Exception {
return 1 == value.getLoginRes();
}
}).next("three").where(new SimpleCondition() {
@Override
public boolean filter(UserLoginRecord value) throws Exception {
return 1 == value.getLoginRes();
}
}).within(Time.seconds(10));
// 3、获取匹配流
final PatternStream badUser = CEP.pattern(userLoginRecordStream, pattern);
final MyProcessFunction myProcessFunction = new MyProcessFunction();
// 4、将匹配流中的数据处理成结果数据流
final SingleOutputStreamOperator badUserStream = badUser.process(myProcessFunction);
badUserStream.print("badUser");
env.execute("UserLoginAna");
}// main
public static class MyProcessFunction extends PatternProcessFunction{
@Override
public void processMatch(Map<String, List> match, Context ctx, Collector out) throws Exception {
// 针对2.1 连续3次登录失败
// final List records = match.get("start");
// for(UserLoginRecord record : records){
// out.collect(record);
// }
// 针对2.2 非连续3次登录失败
final List records = match.get("three");
for(UserLoginRecord record : records){
out.collect(record);
}
}// processMarch
}// MyProcessFunction
}
UserLoginRecord对象,如下:
public class UserLoginRecord {
private String userId;
private int loginRes; // 0-成功, 1-失败
private long loginTime;
public UserLoginRecord() {
}
public UserLoginRecord(String userId, int loginRes, long loginTime) {
this.userId = userId;
this.loginRes = loginRes;
this.loginTime = loginTime;
}
@Override
public String toString() {
return "UserLoginRecord{" +
"userId='" + userId + '\'' +
", loginRes=" + loginRes +
", loginTime=" + loginTime +
'}';
}
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public int getLoginRes() {
return loginRes;
}
public void setLoginRes(int loginRes) {
this.loginRes = loginRes;
}
public long getLoginTime() {
return loginTime;
}
public void setLoginTime(long loginTime) {
this.loginTime = loginTime;
}
}
2.4 nlk模拟socket服务端

2.5 IDEA控制台打印

本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://net2asp.com/450189ad1c.html
