# flink-sql **Repository Path**: mdusa_java/flink-sql ## Basic Information - **Project Name**: flink-sql - **Description**: 解析并执行 flink sql 的后台。 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 1 - **Created**: 2022-10-29 - **Last Updated**: 2022-10-29 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # flink-sql-submit ## 介绍 直接编写sql语句到文件中,之后通过flink命令提交至yarn,以减少flink sql任务开发的工作量。 ## 软件架构 java语言实现,调用flink底层api。 ## 安装教程 1. 集群上已有hadoop(必须开启yarn)集群,以将flink应用程序提交至yarn。 2. 有flink客户端,无需修改配置。 3. 将项目打包(直接在idea中双击package即可),然后放到flink客户端节点。 4. 将其他需要的jar包放到flink lib目录下,比如:hadoop组件、hive连接器、kafka连接器、jsbc格式化等。 5. 也可通过dolphinScheduler等调度器,通过shell脚本命令来提交,以实现调度。 ## 提交命令 1. 示例 1. 流处理 ```shell /data/soft/flink-1.13.2/bin/flink run \ -t yarn-per-job \ -Djobmanager.memory.process.size=4G \ -Dtaskmanager.memory.process.size=4G \ -Dtaskmanager.numberOfTaskSlots=2 \ --parallelism=2 \ -Dpipeline.classpaths='file://data/data/yourself_lib;file://data/data/yourself_lib2' \ /data/data/data-max-1.0.jar \ --executionMode streaming \ --sqlFilePath sql2.sql ``` 2. 批处理 ```shell /data/soft/flink-1.13.2/bin/flink run \ -t yarn-per-job \ --detached \ -Djobmanager.memory.process.size=4G \ -Dtaskmanager.memory.process.size=4G \ -Dtaskmanager.numberOfTaskSlots=2 \ --parallelism=2 \ -Dpipeline.classpaths='file://data/data/yourself_lib;file://data/data/yourself_lib2' \ /data/data/data-max-1.0.jar \ --executionMode batch \ --sqlFilePath sql2.sql ``` 2. 参数说明 1. executionMode:执行模式:streaming、batch,分别对应流模式和批模式不区分大小写。 1. 读取hive表时,可用batch模式,其他的无边界流,使用流模式。 2. 实践中发现,在读写hive表时,也可以使用流模式,并且处理完批量数据之后,任务也会自动停止。 3. 不过对于批处理任务,建议使用batch模式,该模式对于批量处理任务,有一定的优化。 2. sqlFilePath:要读取的sql文件相对路径。 ## idea运行说明 ### 运行说明 1. 修改 pom 文件中的依赖版本为自己虚拟机或线上的版本。 2. 在 [sqlFile](sqlFile) 目录下编写自己的 sql 文件。 3. 直接运行 [SubmitClient](flink-sql-1.15/src/main/java/cn/com/SubmitClient.java) 类的 main 函数。 1. 运行之后,肯定会报错,因为该类运行需要设置一些必要的参数。运行之后,通过 idea 上面的菜单栏中运行的窗口进入类运行的编辑页面,图示如下:
![img.png](img/运行类参数编辑入口.png) 2. 在运行配置页面,输入主类需要的参数,具体参数含义,可参考上面的提交命令中的参数说明。
![img.png](img/运行类参数编辑页面.png) 1. streaming 模式
```shell --executionMode streaming --sqlFilePath sqlFile\kafka_to_hive.sql ``` 2. batch 模式
``` --executionMode batch --sqlFilePath sqlFile\hive_to_hive.sql ``` 1. --sqlFilePath ,只需要从 sqlFile 开始编写即可。 ### 操作 hdfs、hive 如果在 idea 中运行的 flink sql 任务需要做 checkpoint ,并持久化到 hdfs,或者是需要读写 hive ,则需要执行以下步骤。 1. 从 github 上下载该项目:https://github.com/cdarlint/winutils.git。
![img.png](img/winutils项目.png) 2. 配置 windows 环境变量。 1. 新建环境变量,名称为 HADOOP_HOME ,其路径为上面项目中的某个 hadoop 的绝对路径,版本要和虚拟机中 hadoop 版本一致。 2. 在 path 环境变量增加 %HADOOP_HOME%\bin 。 3. 最好重启一下电脑,然后打开 cmd 窗口进行验证。
![img.png](img/hadoop环境配置验证.png)
只要是出现了命令的相关信息,就说明配置成功了。 3. 从线上环境下载 core-site.xml、yarn-site.xml、hdfs-site.xml、hive-site.xml、mapred-site.xml这五个配置文件,然后放在本项目的主代码下的 resources 目录下(该位置不强制)。 4. 以上步骤做完之后,就可以去运行主类来进行 flink sql 测试了。