# ETC-bigdata **Repository Path**: XD_HD/etc-bigdata ## Basic Information - **Project Name**: ETC-bigdata - **Description**: 高速公路ETC入深圳-大数据分析 通过调用深圳市政府数据开放平台接口获取数据,利用大数据技术研究深圳市高速公路的运营情况,对数据进行汇总分析,探索对于高速收费站的优化方向。 感谢极客青年的深圳地铁大数据分析项目和尚硅谷的所有视频教学!!! - **Primary Language**: Java - **License**: MulanPSL-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 11 - **Forks**: 2 - **Created**: 2022-10-09 - **Last Updated**: 2025-10-20 ## Categories & Tags **Categories**: Uncategorized **Tags**: 大数据, Java, 数据分析 ## README # ETC-bigdata ## 介绍 高速公路ETC入深圳-大数据分析 通过调用深圳市政府数据开放平台接口获取数据,利用大数据技术研究深圳市高速公路的运营情况,对数据进行汇总分析,探索对于高速收费站的优化方向。 入门项目,没什么技术含量。。。。。。菜鸡痛哭D: 感谢极客青年的深圳地铁大数据分析项目和尚硅谷的所有视频教学!!! ## 软件架构 软件架构说明 ![输入图片说明](images/framework.jpg) ## 技术栈 1. java/scala 2. redis 3. Elasticsearch-7 4. hadoop3.2 5. hive 6. flume 7. kafka 8. flink ## 配置说明 最好是利用云服务器,之前嫖了一个月的阿里云,玩了两天redis就被黑客攻击了,于是作罢,回归虚拟机。 使用虚拟机进行配置环境,要求内存最好在32G,这样可以配置三台4G内存的linux服务器,同时还可以进行本机的其他活动。 经验:在自己模拟分布式系统时,服务器的存储大小要事先规划好,在master节点上,最好配置60G以上,其余节点可以在30G以上。有条件的话,master节点的内存上8G。我的虚拟机存储就不太够,导致Kafka总是崩溃。 环境配置选用原版Apache组件搭建大数据集群,再次感谢尚硅谷的教学视频。 ## 数据源 深圳市政府数据开放平台,感兴趣的同学可以找找其他数据源玩玩 https://opendata.sz.gov.cn/data/api/toApi ## 项目流程 ### 一、数据获取与清理 #### 1. 通过接口获取数据 HD-data/src/main/java/deal/D001_getDataFromHttp.java 通过深圳市政府数据开放平台,申请appKey,使用hutool工具直接爬取(非常好用)到本地文件, 根据已知共有178,396条数据,计算出每页爬取1000行,爬取179页 (页数和行数都是从一开始), 本次数据量较少 只有十万级别(小数据分析:D),下次弄个千万级别的。 #### 2.存放至Redis中 HD-data/src/main/java/deal/D002_saveDataToRedis.java 按行读取data-page.jsons文件,共178行,将每行以String的形式存放到redis db1中,命名为ETC- i #### 3. 数据清洗 HD-data/src/main/java/deal/D003_etlDataToRedis.java 读取redis中的ETC - i,(一行为一千条json)将数据分解,再以一个Set的形式存到redis db2 中(一行一个json),依据Set的天然去重,来处理重复数据,同时丢弃字段不足的数据(其实并没有) 接下来是最重要的,在json数据中,cksj出口时间与rksj入口时间都是不规范的日期字符串,要转换为yyyy-MM-dd HH:mm:ss的形式 同时深圳大数据对车牌进行了隐私处理,为了方便分析,数据好看一点,这里将车牌的后三位赋予随机值(***->483) 将处理后的数据放置到 redis db2 ETCData 中 #### 4. redisToEs HD-data/src/main/java/deal/D005_RedisToEs.java 从redis向Es写入的过程中,最好先建立一个索引模板,设定好哪些字段是需要倒排索引,哪些字段只需要keyword。尤为注意的是在日期字段上,Es只支持严格的时间字符或时间戳,因此在建立索引模板时要指定好format格式。 索引模板: 注意RKSJ 和 CKSJ ``` PUT _template/template_etc_data { "index_patterns": [ "etc_data*" ], "settings": { "number_of_shards": 1 }, "aliases": { "{index}-query": {}, "ETCData-query": {} }, "mappings": { "properties": { "XH": { "type": "keyword" }, "CKSJ": { "type": "date", "format": [ "yyyy-MM-dd HH:mm:ss" ] }, "CX": { "type": "keyword" }, "SFZRKMC": { "type": "text", "analyzer": "ik_smart" }, "RKSJ": { "format": [ "yyyy-MM-dd HH:mm:ss" ], "type": "date" }, "BZ": { "type": "text", "analyzer": "ik_smart" }, "SFZCKMC": { "type": "text", "analyzer": "ik_smart" }, "CP": { "type": "text", "analyzer": "ik_max_word" } } } } ``` 由于数据存放到redis set 中,每次取出来的数据日期不一定,所以最初的设想是按时间建立索引,将不同时间的索引都整体批量写入,但实际上,Es在一个时间段,只允许一个索引保持活跃,在这种情况下,只实现了单条写入。 ### 二、ES数据分析 将数据完全写入到Es中后,可以依赖kibana进行简单的数据图配置,让我们回到数据发生的那一天 因为我是按照高速etc入口时间进行划分索引的,所以能看到上高速最早的时间是2020-12-19,不过把整体数据都可视化后,就看不到那天的数据了,原因可能是19号数据量只有一个,太少了?观察不到了? 存在疑惑 ps:这位老哥从河北码头站出发,开着货车,在高速待了两天多,在广东罗田主线站下高速, 我们来看看整体的数据情况 整体时间轴的范围是2020-12-21 12:00 至 2020-12-23 12:00 这三天的数据 ![ETC三天数据](images/image-ESData1.png) 在分析的过程,发现只有22号的数据相对完整,我们来看看2020-12-22 这一天的数据 ![2020-12-22](images/image-ETCData-12-22.png) 发现入站的高峰时间集中在下午的16:40左右,共有2281车次,当然这些车都是从全国各地入的站 在22号当天,凌晨上高速的人比较少,但是当把时间轴延长到23号12:00 会发现23号凌晨上高速的人和22号下午的数量相差不多,对此我怀疑是深圳提供的数据不全面导致的。 不过管中窥豹,还是能发现15点-5点,登上高速路口的人较多,据生活经验,还是比较符合常识的,不少司机都喜欢晚上跑高速。 接下来我们将货车作为添加过滤条件,据我推测,可能深夜跑货车的人会比较多。 ![2020-12-22货车](images/image-ETCData-truck.png) 那么事实证明也的确如此,许多司机选择在20:20左右入场。 分析RJSK入站时间就到这里,还有很多可以深挖的业务。。。。。。 比较感兴趣的还是CKSJ,也就是司机们由全国各地到深圳的时间。值得注意的是,有部分数据的CKSJ是未知,因此不能启动Es的自动时间间隔功能。 在有确切CKSJ出口时间的数据中,我们还是能发现大多数司机都会在下午乃至深夜到达深圳,尤其以中午12点左右,车次是最少的。各个收费站每分钟只有个位的车次入站。 在后面的flink分析中,我怀疑是数据不全导致的落差。 ![输入图片说明](images/image-ETCData2.png) 其实深圳大数据平台给出的高速GPS数据也只是集中在2020-12-22 至 23 这两天内,十万级别的数据量,其实还是比较少的^_^ ### 三、数据仓库 在大数据领域肯定离不开数据仓库,虽然数据量特别少,而且字段内容也少,但我们还是按照数据仓库的规范进行数据分层。分层规划还是经典的维度模型。 ![维度模型分层规划](images/image-Dimension.png) 维度模型有两个概念,分别是对应业务过程的事实表,和对应业务发生环境的维度表。 设计事实表时,我们要**选择业务过程→声明粒度→确认维度→确认事实** 本次分析只有入站和出站两种业务,而且都差不多。。。。。。 在纠结能榨出什么,最后也只得到这样一个业务总线矩阵。 ![业务总线矩阵](images/image-matrix.png) **时间**,车型,收费站,**备注**都可以作为维度表来设计,但事实上,时间我们只有一天的数据是相对完好,备注更是只有深圳入和深圳出两个选项,因此我决定将这两项进行**维度退化**。 那么维度表就只剩车型和收费站两个表,收费站我准备采用地区表的设计方式,尽可能多的添加其地理位置字段(就是不知道能否成功)。 我不准备建立分区表,因为按日期分区数量太少了。 注意! etc原始数据格式并不是一致的,有部分数据的收费站出口名称是未知,那么它就没有出口时间这个字段,所以在ETCToSZ这个pojo类上,做了构造器的改动。 #### 1. 采集 fileToKafka 目前的设想是在虚拟机hadoop102上,执行1.1的采集程序,将json文件采集到本地,文件名为etcData.jsons,同时启用Flume对此文件进行监控,现在看来hadoop102已经不堪重负(想扩展存储就要删除之前的快照),flume采集要运行在hadoop103上了。 将其中的数据采集到Kafka中etc_topic这个topic中。为什么一定要加上Kafka呢? 一般使用 Flume + Kafka 来完成实时流式的日志处理,后面再连接上Flink/[Spark](https://so.csdn.net/so/search?q=Spark&spm=1001.2101.3001.7020) Streaming等流式实时处理技术,从而完成日志实时解析的目标。如果Flume直接对接实时计算框架,当数据采集速度大于数据处理速度,很容易发生数据堆积或者数据丢失,而kafka可以当做一个消息缓存队列,当数据从数据源到flume再到Kafka时,数据一方面可以同步到HDFS做离线计算,另一方面可以做实时计算,可实现数据多分发。 同时启用kafka-eagle 监控查看 topic。其实看不太懂,只能看出来kafka挂没挂掉,数据是否生产或消费,还有对于topic的修改不用记命令了。 ##### Flume conf 在Flume目录下的job目录创建 file_to_kafka.conf ``` # source channels 因为kafkachannels 不需要sink a1.sources = r1 a1.channels = c1 a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /opt/module/applog/etcData.jsons a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.hd.flume.interceptor.ETLInterceptor$Builder #描述channel a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092 a1.channels.c1.kafka.topic = etc_topic a1.channels.c1.parseAsFlumeEvent = false #绑定source和channel以及sink和channel的关系 a1.sources.r1.channels = c1 ``` ##### Flume interceptor flume-interceptor/src/main/java/com/hd/flume/interceptor/ETLInterceptor.java 将不是标准Json的数据过滤(这是有风险的,请确定自己的数据全部符合格式)Maven配置:flume fastjson 打包插件 bug: 在进行采集的过程中,瞬间的采集速度过快,Flume的堆内存不够了,导致前面的数据全部丢失,总共178*1000条json数据,只保留到168-178条,浪费一下午的时间进行排查,还以为是处理流程的代码不对。 解决方式:修改flume bin目录下flume-ng的配置信息 把JAVA_OPTS 默认的20m调整为2048m(2G)或其他大小都可 ##### command ``` bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf -Dflume.root.logger=info,console ``` 这样就把数据采集到kafka中了。 #### 2.DIM层 ##### 收费站维度表 flink/src/main/java/app/DimTollgate.java 准备在redis存储各个收费站的省份,这样在制作进出站事实表的时候,与其关联就可以增加始发地和目的地省份的字段。当然这个省份还可以扩展到地区(华东,华南等)扩展到国家,星球,宇宙^^ 当然,收费站维度表在数据到来之前就是应该存在的,不过我没有,只有先跑一遍数据,将维度表填充。 ##### 车型维度表 在处理数据的过程直接将字段补充到事实表中,事实上,如果它足够复杂,是应该作为维度表写入到redis中的 #### 3. DWD层 flink/src/main/java/app/DWDInOutApp.java ``` 进站事实表 车牌 CP 要经过随机处理 车型(一型/二型) 拆分 车类(客车/货车) 拆分 收费站入口名称 SFZRKMC 原来就有 入口时间 RZSJ 原来就有 出发地(省份) 维度 收费站出口名称 SFZCKMC 出口时间 目的地(省份) 序号 XH 备注 BZ 深圳入/深圳出 维度退化 ``` ``` 出站事实表 车牌 CP 车型(一型/二型) 车类(客车/货车) 收费站入口名称 SFZRKMC 入站时间 RZSJ 出发地(省份) 序号 XH 备注 BZ ``` 将原始数据进行和之前一样的数据处理,将时间格式化,将车牌具体化,然后与收费站维度表进行关联,再根据备注的不同,分成入深圳数据和出深圳数据,分别存入Kafka不同的topic中,方便日后的取用。 事实上,基于DWD层与DIM层相关联,以时间或其他信息为跨度,可以汇总出DWS层中的相关数据,鉴于此类数据没那么多信息,DWS就暂且不做了。 ##### DWDToHive flink/src/main/java/app/DWDToHDFS.java 将DWD层的数据从Kafka存放到HDFS中,这样可以让Hive进行灵活的SQL查询,可以得出更多的信息。 首先要在Hive中建立表(通过location直接指定位置) ``` drop table if exists DWD_ETCInSZ; // 进站事实表 进入深圳 create external table DWD_ETCInSZ ( bz STRING COMMENT '备注', cksj STRING comment '出口时间', cp string comment '车牌', dest string comment '目的地省份', mode STRING comment '车类(客/货)', type String comment '车型(一型/二型)', rksj STRING COMMENT '入站时间', sfzckmc STRING COMMENT '收费站出口名称', sfzrkmc STRING COMMENT '收费站入口名称', xh STRING COMMENT '序号', `start` string comment '出发地省份' ) comment '出站事实表' ROW FORMAT serde 'org.apache.hadoop.hive.serde2.JsonSerDe' LOCATION '/warehouse/etc/DWD/ETC_INSZ'; ``` 然后编写代码读取Kafka中DWDIn这个topic的数据,利用StreamingFileSink存放到HDFS指定的路径中。 注意:使用 StreamingFileSink 时,必须设置checkpoint,不然文件无法合并,永远处于progress状态,无法读取里面的内容。 这样一来,我们可分析的指标就会更加灵活,比如我们可以分析大家都是从哪个省进入深圳的,即出发地车流量排行榜(省份),在创建收费站维度表时,用了取巧的办法,把所有收费站前面两个汉字作为它的省份了。 而事实上,存在一些信息包含不全的数据,在入深圳数据中,某些sfzrkmc居然是0,其省份名自然为空,这是数据采集出错还是有什么特殊的含义呢?由于不确定含义,没有将其过滤掉,有没有一种可能,就是这些人中途上的高速,没有经过收费站入口,直接从出口出去的呢? 那这样肯定是逃费了,扣他们分。 ``` select count(etc.xh) count, etc.`start` from DWD_ETCInSZ etc group by `start` order by count desc; ``` ``` 84821,广东 275,江西 162,广西 117,湖南 93,福建 48,浙江 47,湖北 31,安徽 19,河南 16,江苏 15,河北 11,贵州 11,上海 9,山东 9, 7,重庆 5,辽宁 5,四川 4,云南 4,陕西 3,甘肃 2,宁夏 2,内蒙 2,北京 1,吉林 1,山西 1,塘沽 1,空港 ``` 很明显,离深圳近的省份其车次相对较多,广东省省内的跑动肯定最为频繁。 同样的,我们还可以查询收费站入口流量排行榜,收费站出口流量排行榜。。。。。。 ``` select count(etc.xh) count,etc.sfzrkmc from DWD_ETCInSZ etc group by sfzrkmc order by count desc limit 15; ``` 收费站入口车流量排行榜 ``` 21247,广东大岭山站 5379,松山湖南 5137,广东罗田主线站 2319,广东白泥坑站 2278,广东常平站 1973,广东松山湖站 1752,广东清湖站 1747,广东莞樟路站 1734,广东太平站广深 1731,广东福民站 1727,广东虎门北站 1717,广东龙岗站水官 1602,广东东深路站 1559,广东荷坳站机荷东 1396,广东金龙站 ``` 收费站出口车流量排行榜 ``` 62295,广东罗田主线站 18292,广东水朗D站 5135,松山湖南 ``` 可见,深圳市政府其实只开放了这三个站的收费站信息。 当然,我们在查询的时候完全可以指定好时间是哪一天的,但是由于数据只有12-22的数据相对全面,其余日期的数据都是零零散散,所以直接查询了整个数据集。 #### 4. ADS层 ##### 1. 每小时收费站出口流量排行榜 flink/src/main/java/app/ADSInSZ.java 根据Flink进行实时计算,观察每小时深圳哪个收费站车流量比较大。事实上,深圳只提供了三个收费站的信息。分别是广东罗田主线站,广东水朗D站,松山湖南 而最为遗憾的是深圳政府提供的数据是部分有序,整体乱序的,在流式数据处理的过程中,如果想要开窗计算,数据可能迟到十分钟,甚至可能迟到数个小时,这就为开窗统计每小时收费站出口流量带来了麻烦。 ## 模拟实时运算 由于深圳市数据开放平台接口的爬取速度还是挺快的,于是每爬取一千条数据随机休息一会儿,模拟连续数据的注入。 1. 修改代码,将数据下载到虚拟机,文件名 ETCData 2. 利用Flume 监控ETCData,将数据发送到Kafka etc_topic 3. 利用Flink将数据分流,与redis中的维度表结合,发送到Kafka DWDIn DWDOut 4. 实时运算 Kafka还是很好用的。 这个简单的项目到此就告一段落。。。。。。 ## TODO: Flink 和 kafka 的精确一次消费 更多的业务挖掘 ## 特技 1. 使用 Readme\_XXX.md 来支持不同的语言,例如 Readme\_en.md, Readme\_zh.md 2. Gitee 官方博客 [blog.gitee.com](https://blog.gitee.com) 3. 你可以 [https://gitee.com/explore](https://gitee.com/explore) 这个地址来了解 Gitee 上的优秀开源项目 4. [GVP](https://gitee.com/gvp) 全称是 Gitee 最有价值开源项目,是综合评定出的优秀开源项目 5. Gitee 官方提供的使用手册 [https://gitee.com/help](https://gitee.com/help) 6. Gitee 封面人物是一档用来展示 Gitee 会员风采的栏目 [https://gitee.com/gitee-stars/](https://gitee.com/gitee-stars/)