首页 >> 知识 >> Flink 实现电子围栏进出区域监控

Flink 实现电子围栏进出区域监控

目录

需求

技术点

实现

checkpoint

 State Backends

Watermark

Broadcast State 模式

Keyed State

需求

 用户自定义电子围栏,记录车辆进出电子围栏的时间。

技术点 checkpointstateBackendwatermarkBroadcast State 模式数据流连接广播流state 实现 checkpoint

Flink中的每个方法或算子都能够是有状态的。状态化的方法在处理单个元素/事件的时候存储数据,让状态成为使各个类型的算子更加精细的重要部分。为了让状态容错,Flink需要为状态添加checkpoint(检查点)。Checkpoint 使得Flink能够恢复状态和在流中的位置,从而向应用提供和无故障执行时一样的语义。

默认情况下,checkpoint 是禁用的,通过 StreamExecutionEnvironment 的 enableCheckpointing(n) 来启用 checkpoint,里面的 n 是进行 checkpoint 的间隔,单位毫秒。

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment // 开启checkpoint 单位毫秒 并设置模式为精准一次 env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE) // Checkpoint 必须在一分钟内完成,否则就会被抛弃 env.getCheckpointConfig.setCheckpointTimeout(60000) // 以下是高级选项: // 设置模式为精确一次 (这是默认值) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) // 确认 checkpoints 之间的时间会进行 500 ms env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500) // Checkpoint 必须在一分钟内完成,否则就会被抛弃 env.getCheckpointConfig.setCheckpointTimeout(60000) // 如果 task 的 checkpoint 发生错误,会阻止 task 失败,checkpoint 仅仅会被抛弃 env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false) // 同一时间只允许一个 checkpoint 进行 env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)  State Backends

Flink 内置了以下这些开箱即用的 state backends :

MemoryStateBackendFsStateBackendRocksDBStateBackend

如果不设置,默认使用 MemoryStateBackend。

MemoryStateBackend 适用场景:

本地开发和调试。状态很小的 Job,例如:由每次只处理一条记录的函数(Map、FlatMap、Filter 等)构成的 Job。Kafka Consumer 仅仅需要非常小的状态。

FsStateBackend 适用场景:

状态比较大、窗口比较长、key/value 状态比较大的 Job。所有高可用的场景。

RocksDBStateBackend 的适用场景:

状态非常大、窗口非常长、key/value 状态非常大的 Job。所有高可用的场景。 Watermark

策略简介

为了使用事件时间语义,Flink 应用程序需要知道事件时间戳对应的字段,意味着数据流中的每个元素都需要拥有可分配的事件时间戳。其通常通过使用 

TimestampAssigner API 从元素中的某个字段去访问/提取时间戳。

时间戳的分配与 watermark 的生成是齐头并进的,其可以告诉 Flink 应用程序事件时间的进度。其可以通过指定 

WatermarkGenerator 来配置 watermark 的生成方式。

使用 Flink API 时需要设置一个同时包含 TimestampAssigner 和 WatermarkGenerator 的 WatermarkStrategy。

consumer.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)) .withTimestampAssigner(new SerializableTimestampAssigner[ObjectNode] { override def extractTimestamp(element: ObjectNode, recordTimestamp: Long): Long = element.get("value").get("gps_time").asLong() }))

Broadcast State 模式

在本例中,草莓视频在线观看APP定义两个流,一个包含车辆位置(KafkaStreamSource),包含车牌号、经纬度、时间戳属性。另一个包含规则,即用户自定义的电子围栏(Rule)

在位置数据流中,需要首先使用车牌号将流进行区分(keybay),这能确保相同车牌的数据会流转到相同的物理机。

val keyedKafkaSourceStream: DataStream[KafkaStreamSource] = env.addSource(consumer) .filter(_.get("value").get("vno").asText() != "") .map(obj => { KafkaStreamSource(obj.get("value").get("vno").asText(), obj.get("value").get("longitude").asDouble(), obj.get("value").get("latitude").asDouble, obj.get("value").get("gps_time").asLong()) }) .keyBy(_.vno)

对于规则流,它应该被广播到所有的下游task中,下游task应当存储这些规则并根据它寻找满足规则的电子围栏,使用MapStateDescriptor来描述并创建broadcast state 在下游的存储结构

// 一个 map descriptor,它描述了用于存储规则名称与规则本身的 map 存储结构val ruleStateDescriptor = new MapStateDescriptor[String, Rule]( "RulesBroadcastState", BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(new TypeHint[Rule] {}));// 广播流,广播规则并且创建 broadcast statevals ruleBroadcastStream: BroadcastStream[Rule] = env.addSource(new GetRule) .setParallelism(1) .broadcast(ruleStateDescriptor)

最终,使用电子围栏规则流来匹配车辆位置,需要:

将两个流连接起来完成草莓视频在线观看APP的处理逻辑

为了关联一个非广播流(keyed 或者 non-keyed)与一个广播流(BroadcastStream),草莓视频在线观看APP可以调用非广播流的方法 connect(),并将 BroadcastStream 当做参数传入。 

这个方法的返回参数是 broadcastConnectedStream,具有类型方法 process(),传入一个特殊的 CoProcessFunction 来书写草莓视频在线观看APP的模式识别逻辑。 具体传入 process() 的是哪个类型取决于非广播流的类型:

如果流是一个keyed流,那就是KeyedBroadcastProcessFunction 类型如果流是non-keyed, 那就是 BroadcastProcessFunction 类型

在本次例子中,位置数据是一个keyed stream,所以代码如下:

// 数据流与规则流连接处理逻辑keyedKafkaSourceStream .connect(ruleBroadcastStream) .process(new EfenceProcess)

EfenceProcess 类如下:

/** * 数据流连接规则流处理逻辑 */class EfenceProcess extends KeyedBroadcastProcessFunction[String, KafkaStreamSource, Rule, KafkaStreamSource] { private val log = LoggerFactory.getLogger(classOf[JSONKeyValueDeserializationSchema]) /** * 矩形电子围栏转换 * * @param coo * @return */ def polygonCoordinateConvert(coo: String): Array[Point] = { val array: JSONArray = JSON.parseArray(coo) val ab = new ArrayBuffer[Point]() for (arr { sql"INSERT INTO fence_log(fence_id, car_vno, in_time) VALUES (?, ?, ?)" .bind(efenceId, vno, new Timestamp(inTime * 1000l)) .updateAndReturnGeneratedKey() .apply() }) dbIdState.update(id) log.info(s"insert op, id is ${id}") } /** * 数据库更新语句 * * @param outTime * @param id */ def updateDB(outTime: Long, id: Long): Unit = { DB.localTx(implicit session => { sql"UPDATE fence_log SET out_time = ? WHERE id = ?" .bind(new Timestamp(outTime * 1000l), id) .update() .apply() }) log.info(s"update op, id is ${id}") } /** * 关闭临时电子围栏 * @param fenceId */ def closeEFence(fenceId: Long): Unit = { DB.localTx(implicit session => { sql"UPDATE fence SET `state` = ? WHERE id = ?" .bind("关闭", fenceId) .update() .apply() }) log.info(s"close temporary e-fence, current id is ${fenceId}") } // 规则流 private var ruleStateDescriptor: MapStateDescriptor[String, Rule] = _ // 电子围栏状态 private var efenceState: ValueState[Boolean] = _ // 数据库 id private var dbIdState: ValueState[Long] = _ // 数据库连接池 private var dataSource: DruidDataSource = _ // 变量初始化、数据库连接池配置 override def open(parameters: Configuration): Unit = { ruleStateDescriptor = new MapStateDescriptor[String, Rule]("RulesBroadcastState", BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(new TypeHint[Rule]() {})) efenceState = getRuntimeContext.getState( new ValueStateDescriptor[Boolean]("efence", createTypeInformation[Boolean]) ) dbIdState = getRuntimeContext.getState( new ValueStateDescriptor[Long]("id", createTypeInformation[Long]) ) // 数据路连接池 DBs.setup() } // 处理数据流的数据 override def processElement(kafkaSource: KafkaStreamSource, readOnlyContext: KeyedBroadcastProcessFunction[String, KafkaStreamSource, Rule, KafkaStreamSource]#ReadOnlyContext, collector: Collector[KafkaStreamSource]): Unit = { // 车辆是否在电子围栏内的状态 true 为在, false 为不在 val tmpState: Boolean = efenceState.value() // 记录插入数据后返回的id , 用于更新离开时间操作 val tmpId: Long = dbIdState.value() // 初始状态 假设不在电子围栏 val currentState = if (tmpState != null) { tmpState } else { false } // 初始化 id val currentId = if (tmpId != null) { tmpId } else { -1L } // 获取广播流状态中的电子围栏规则信息 val iterator: util.Iterator[Map.Entry[String, Rule]] = readOnlyContext.getBroadcastState(ruleStateDescriptor).immutableEntries().iterator() while (iterator.hasNext) { val rule: Rule = iterator.next().getValue val efenceType: String = rule.`type` // 数据流的车牌与规则流的车牌进行对应 if (kafkaSource.vno == rule.car_vno) { // 电子围栏类型判断 if (efenceType.equals("polygon")) { val coordinates: String = rule.coordinates val points: Array[Point] = polygonCoordinateConvert(coordinates) // 如果点位出现在电子围栏区域并且当前状态为false,即未曾出现在电子围栏中,判定为首次进入,进行数据库插入操作 if (EFence.isPtInPoly(kafkaSource.longitude, kafkaSource.latitude, points) && !currentState) { collector.collect(kafkaSource) efenceState.update(true) val efenceId: Long = rule.`fence_id` val vno: String = kafkaSource.vno val inTime: Long = kafkaSource.gps_time insertDB(efenceId, vno, inTime) } // 如果点位不在电子围栏区域并且当前状态为 true,即上一条数据在电子围栏中,判定首次离开,进行数据库更新操作 if (!EFence.isPtInPoly(kafkaSource.longitude, kafkaSource.latitude, points) && currentState) { collector.collect(kafkaSource) efenceState.clear() val outTime: Long = kafkaSource.gps_time updateDB(outTime, currentId) // 如果电子围栏的状态为 临时, 一进一出后关闭该电子围栏 if (rule.`state` == "临时") { closeEFence(rule.`fence_id`) } dbIdState.clear() } } if (efenceType.equals("circle")) { val center: String = rule.circle_center val radius: Int = rule.circle_radius val aPoint = new Point(kafkaSource.longitude, kafkaSource.latitude) val cPoint: Point = circleCoordinateConvert(center) // 如果点位出现在电子围栏区域并且当前状态为false,即未曾出现在电子围栏中,判定为首次进入,进行数据库插入操作 if (EFence.isPtInCircle(aPoint, cPoint, radius) && !currentState) { collector.collect(kafkaSource) efenceState.update(true) val efenceId: Long = rule.`fence_id` val vno: String = kafkaSource.vno val inTime: Long = kafkaSource.gps_time insertDB(efenceId, vno, inTime) } // 如果点位不在电子围栏区域并且当前状态为 true,即上一条数据在电子围栏中,判定首次离开,进行数据库更新操作 if (!EFence.isPtInCircle(aPoint, cPoint, radius) && currentState) { collector.collect(kafkaSource) efenceState.clear() val outTime: Long = kafkaSource.gps_time updateDB(outTime, currentId) // 如果电子围栏的状态为 临时, 一进一出后关闭该电子围栏 if (rule.`state` == "临时") { closeEFence(rule.`fence_id`) } dbIdState.clear() } } } } } // 处理广播流的数据 override def processBroadcastElement(rule: Rule, context: KeyedBroadcastProcessFunction[String, KafkaStreamSource, Rule, KafkaStreamSource]#Context, collector: Collector[KafkaStreamSource]): Unit = { context.getBroadcastState(ruleStateDescriptor).put("broadcast", rule) } override def close(): Unit = { dataSource.close() }}

广播流处理如下:

class GetRule extends RichSourceFunction[Rule] { // 数据库配置初始化 DBs.setup() var isRunning: Boolean = _ val duration = 1 * 60 * 1000L override def open(parameters: Configuration): Unit = { isRunning = true } override def run(sourceContext: SourceFunction.SourceContext[Rule]): Unit = { while (isRunning) { val rules: List[Rule] = DB readOnly { implicit session => sql"select * from fence_car" .map(rs => Rule(rs.string("car_vno"), rs.long("fence_id"), rs.string("type") , rs.string("circle_center"), rs.int("circle_radius"), rs.string("state"), rs.string("coordinates"))) .list() .apply() } for (rule
网站地图