# Flink实时项目 **Repository Path**: redhacker1994/flink-real-time-project ## Basic Information - **Project Name**: Flink实时项目 - **Description**: Flink实时项目,实时处理数据 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 11 - **Created**: 2022-07-06 - **Last Updated**: 2022-07-06 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # Flink实时项目 实时推荐就是根据用户的自身属性结合当前的访问行为,经过实时的推荐算法计算,从而将用户可能喜欢的商品、新闻、视频等推送给用户。 这种系统一般是由一个用户画像批处理加一个用户行为分析的流处理组合而成。 ## flink数据采集 #### flink日志数据采集到ods层 ![输入图片说明](https://z3.ax1x.com/2021/04/18/cIINd0.png "在这里输入图片标题") #### flink业务数据采集ods层 ![输入图片说明](https://z3.ax1x.com/2021/04/18/cIIUoV.png "在这里输入图片标题") #### flink数据从ods层采集到dwd层 ![输入图片说明](https://z3.ax1x.com/2021/04/18/cIokkV.png "在这里输入图片标题") # 整体流程 ## 1:ods层数据采集 #### 日志数据采集 通过前端埋点获取用户行为日志,将数据发往nginx所在的节点,在ngnix中配置要,负载 均衡的所有节点以及要拦截的请求路径. 然后通过写的springboot日志采集服务,将分发到节点的数据进行采集后,将日志落盘, 通过springboot集成的kafkaTemplate类,将采集到的日志数据写入到ods层的kakfa主题. #### 业务数据采集 通过maxwell监控,mysql数据库中的binlog二进制文件信息的变化,将数据写入到kafka的ods层的topic中. 如果要读取数据库的历史表信息,需要加入初始化表命令。与cannal的区别也就是,是否可以读取历史表信息。 ## 2:dwd层数据采集 #### 日志数据处理 通过写的Flink应用,先从kafka数据,从ods日志主题消费数据,然后我们可以通过用户行为,将行为数据分为3种,页面日志,曝光日志,启动日志。我们在分流之前先对数据进行了简单 的清洗,对新旧用户进行了修复。通过定义了一个flink的状态变量,为时间,只过滤新用户的。判断是否是新用户,如果是第一次来的话,他的状态就是空,将现在的时间更新进状态,如果再来的话,状态不为空,那就把当前用户修改为老用户。 然后再对修正后的数据进行处理,来进行分流,启动日志和曝光数据写入侧输出流,页面数据写入主流,通过json获取对应的值,start,写入启动测输出流,除了启动日志全部都是页面日志,将页面日志写入到主流,再判断否是是曝光日志,然后写入到曝光侧输出流。 将对用的流写入到对应的kafka主题。为dwd层。 #### 业务数据处理 从kafka种读取主题为ods_db主题的数据,也是对读取的流做了一个简单的清洗,去除了读取的数据table为null,和data为null,的数据,从ods_db种读取数据后,我们需要将业务数据写入到kafka种,然后将维度数据写入到hbase种,为什么写入到hbase中,因为我们是后面进行维度关联时,是通过key查询来进行关联的,通过我们可以可以采用hbase和redis,因为它们两个都是采用键值对来储存的数据库,但是redis是基于内存的,我们需要把维度数据储存起来,所以就采用了将维度数据写入到hbase数据库。但是我们是不知道从ods主题读取的业务数据表那些是业务表,那些是维度表,所以我们在这里创建了以恶配置表,表中记录了,ods层过来的流中的表,那些是配置表,哪些是维度表,这个是需要我们手动去操作的。然后通过flink-cdc动态去监控这张表,作为配置流,将表中的配置信息向下广播,然后将信息存入到状态中,具体就是将两天流进行合流,然后进行处理,先处理广播流的,将广播流读拼接的key写入到状态中,也就是拼接一个key,为表名和操作类型,如果是写入到hbase的话,读取流中的表配置数据,在来进行建表的sql。因为hbase的语法繁琐,所以这里我们给hbase,套了一层,加了一层phoenix,使得她可以执行平常的sql语句,然后再处理主流业务数据,然后在在主流中拼接一个key,将key放入状态中去获取,若是不为空则是维度数据,然后根据配置表字段进行过滤,然后输入到侧输出流,事实表输入主流,将侧输出流维度数据写入到phoneix中,主流的事实数据写入到kafak中. ## 3:dwm层数据采集 在dwm中对dwd层的数据进行了,轻度的处理,为dws层做准备,在这里我们进行了,对日活的统计, 用户跳出统计,订单宽表,支付宽表。 #### 日活 uv 通过读取从日志数据分流后,写入到kafka的dwd_page页面主题,获取流,计算日活,将数据写入 dwm_unique_visit主题,日活的条件为last_pageid为null,并且今天只访问了一次,所以我们定义了一个状态描述器,时间为一天,如果是第一次来的话,状态是null,将他的时间戳放进到状态中,如果再里一条,状态不为空,则证明她是第二次来,不算日活了,过滤掉。将日活数据写入到kafka中 #### 用户跳出统计 用户跳出的条件是用户没有下一个页面,我们规定如果10s没有下个页面的跳转就属于跳出数据,也是从页面主题中读取数据,转变成流,我们这里使用了fink-cep进行处理,如果满足 lastpageId=null,并且超时了,我们规定是10s,就属于跳出数据,然后处理超时数据,然后将数据写入到dwm_jump主题。 #### 订单宽表 我们订单宽表是通过将kafka中的订单信息表和订单详情表进行关联,然后再关联写入的phoenix中的维度表,先从kafka中读取订单信息表和订单详情表,然后获取流,通过转换为他们创建对应的实体类,然后分组后合成一条流,输出为一张大的实体类,然后得到一条流,来进行和各个维度进行关联,我们这里采用异步查询,好处就是增加效率,异步查询的时候我们自定义了维度关联的函数,把需要关联的维度表放进去,这里我们查询的维度数据我们将它写到了redis缓存里,可以增加查询速率,就是当查询时,缓存中没有,就去phoenix中查询,然后将查询的数据写到redis中,然后这里我们再自定函数,采用了模版方法定义了父类的结构骨架,定义了两个方法,一个是获取key,另一个是join方法,方法我们在子类中进行了实现,在子类中我们通过对方法进行重写,可以获取id,也就是key,然后在join中传入的有两个参数,一个是宽表,一个是查询出来的json对象,然后将json对象中的一些订单宽表需要的属性set,进去,其他需要关联的这个一样,然后最后需要将数据写入到dwm层的kafka中. #### 支付宽表 支付宽表这里我们时将业务数据的支付订单信息和支付详细信息进行合成了一个宽表,从kafka中读取数据,得到流然后keyby后intervalJoin,然后写到宽表主题。 ## 4:dws层各个在主题统计 #### 访客主题统计 从dwd_page的page页面主题我们可以获取点击量,并且判断如果没有lastpageId,就是一次会话,从日活可以获取访客量,用户跳出可以获取用户跳出量,读取上面的数据,将其变成三条流,然后将其map转换格式,为访客实体类,然后将流中读取的数据set到实体类中,其他两条流也是若此,形式如下 将其每一条数据自己需要计算的度量值set为1,然后进行union成一条流,然后按照那几个维度数据进行分组开窗聚合,然后将数据写入到clickhouse数据库中,前提是再clickhouse中提前出创建表. #### 商品主题统计 商品主题统计我们需要曝光次数,加购购物车次数,下单次数,评论次数,退款次数,支付次数,所以我们需要从dwm层中拿到订单宽表和支付宽表,从dwd中拿到其他主题,然后获取各个的流,将各个流转换为商品实体类,然后各自的度量值set为1,也是进行union,然后分组通过id,开窗聚合, 也是异步查询,进行维度关联,之后再写入到clickhouse中. #### 地区主题 这里我们通过FlinkSql进行处理,创建流环境,表环境,从订单宽表中作为输入源的流,然后进行分组开窗聚合,计算各个地区的订单数量,金额,转换为数据流写入clickhouse #### 关键字主题