# flink-jobs-launcher **Repository Path**: snow3212/flink-jobs-launcher ## Basic Information - **Project Name**: flink-jobs-launcher - **Description**: flink-jobs应用程序启动器类库,玩转flink - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 11 - **Created**: 2025-05-18 - **Last Updated**: 2025-05-18 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # flink-jobs-launcher

maven

**重要提示:** [flink-jobs](https://gitee.com/tenmg/flink-jobs)项目中已提供了flink-jobs-clients作为该项目的替代,强烈建议用户更换使用,本项目将在flink-jobs 1.3+以后不再更新。 ## 介绍 flink-jobs-launcher是[flink-jobs](https://gitee.com/tenmg/flink-jobs)应用程序启动器类库,可用于启动flink-jobs或普通flink作业,通过flink-jobs-launcher可将Flink快速集成到现有基于Java实现的系统中,还可以通过XML格式的配置文件玩转Flink SQL。一个典型的flink-jobs-launcher部署架构如下: ![典型的flink-jobs-launcher部署架构](%E5%85%B8%E5%9E%8B%E6%9E%B6%E6%9E%84.png) ## 起步 1.1.3及以前版本仅支持CommandLineFlinkJobsLauncher,1.1.4版本开始支持并推荐使用RestClusterClientFlinkJobsLauncher,以下以Maven项目为例介绍RestClusterClientFlinkJobsLauncher的使用: ### 添加依赖 ``` cn.tenmg flink-jobs-launcher ${flink-jobs-launcher.version} ``` ### 添加配置 ``` # RPC configuration jobmanager.rpc.address= 192.168.100.27 jobmanager.rpc.port=6123 # The default jar that the flink-jobs-launcher submits for execution, it is recommended but not required. flink.jobs.default.jar=/yourpath/your-flink-jobs-app-1.0.0.jar # The default class that the flink-jobs-launcher submits for execution, it is not required. You can also specify the main class in jar #flink.jobs.default.class=yourpackage.App ``` ### 提交作业 调用XMLConfigLoader的load方法加载XML配置文件并提交给启动器执行: ``` FlinkJobs flinkJobs = XMLConfigLoader.getInstance().load(ClassUtils.getDefaultClassLoader().getResourceAsStream("flink-jobs.xml")); RestClusterClientFlinkJobsLauncher launcher = new RestClusterClientFlinkJobsLauncher(); FlinkJobsInfo flinkJobsInfo = launcher.launch(flinkJobs); System.out.println("Flink job launched: " + JSON.toJSONString(flinkJobsInfo));// 启动flink-jobs作业 ``` 或 ``` FlinkJobs flinkJobs = XMLConfigLoader.getInstance() .load("\r\n" + "\r\n" + ""); RestClusterClientFlinkJobsLauncher launcher = new RestClusterClientFlinkJobsLauncher(); System.out.println("Flink job launched: " + JSON.toJSONString(flinkJobsInfo));// 启动flink-jobs作业 ``` ### 停止作业 ``` System.out.println("Flink job of jobId: " + flinkJobsInfo.getJobId() + " stopped, savepoint path: " + launcher.stop(flinkJobsInfo.getJobId()));// 停止flink-jobs作业 ``` ## 快速入门 详见[https://gitee.com/tenmg/flink-jobs-launcher-quickstart](https://gitee.com/tenmg/flink-jobs-launcher-quickstart) ## 任务配置 ### `` flink-jobs是flink-jobs任务XML配置文件的根节点,需注意必须配置正确的命名空间,通常结构如下: ``` ``` 相关属性及说明: 属性 | 类型 | 必需 | 说明 ------------|----------------------|----|-------- jar | `String` | 否 | 运行的JAR包。可通过配置文件的`flink.jobs.default.jar`配置指定默认运行的JAR包。 class | `String` | 否 | 运行的主类。可通过配置文件的`flink.jobs.default.class`配置指定默认运行的主类。 serviceName | `String` | 否 | 运行的服务名称。该名称由用户定义并实现根据服务名称获取服务的方法,[flink-jobs](https://gitee.com/tenmg/flink-jobs)则在运行时调用并确定运行的实际服务。在运行SQL任务时,通常通过flink-jobs内的其他标签(如``)指定操作,而无需指定serviceName。 runtimeMode | `String` | 否 | 运行模式。可选值:"BATCH"/"STREAMING"/"AUTOMATIC",相关含义详见[Flink](https://flink.apache.org)官方文档。 #### `` Flink作业的个性化配置,格式为`k1=v1[,k2=v3…]`。例如:``表示自定义Flink SQL作业的名称为`customJobName`。具体配置项详见[Flink官方文档](https://flink.apache.org/)。 #### `` 运行选项配置,用于指定Flink程序的运行选项。 ##### ``或``。 #### `` 参数查找表配置。通常可用于SQL中,也可以在[flink-jobs](https://gitee.com/tenmg/flink-jobs)应用程序自定义的服务中通过arguments参数获取。 ##### `` 特定参数配置。 属性 | 类型 | 必需 | 说明 ------|----------|----|-------- name | `String` | 是 | 参数名。 value | `String` | 否 | 参数值。 #### `` 运行基于Beanshell的java代码的配置。 属性 | 类型 | 必需 | 说明 -------|-----------|----|-------- saveAs | `String` | 否 | 操作结果另存为一个新的变量的名称。变量的值是基于Beanshell的java代码的返回值(通过`return xxx;`表示)。 ##### `` 基于Beanshell的java代码使用的变量声明配置。 属性 | 类型 | 必需 | 说明 ------|--------|----|-------- name | `String` | 是 | Beanshell中使用的变量名称 value | `String` | 否 | 变量对应的值的名称。默认与name相同。[flink-jobs](https://gitee.com/tenmg/flink-jobs)会从参数查找表中查找名称为value值的参数值,如果指定参数存在且不是null,则该值作为该参数的值;否则,使用value值作为该变量的值。 ##### `` java代码。采用文本表示,如:`java code`或``。注意:使用泛型时,不能使用尖括号声明泛型。例如,使用Map不能使用“Map map = new HashMap();”,但可以使用“Map map = new HashMap();”。 #### `` 运行基于[DSL](https://gitee.com/tenmg/dsl)的SQL代码配置。 属性 | 类型 | 必需 | 说明 -----------|--------|----|-------- saveAs | `String` | 否 | 操作结果另存为一个新的变量的名称。变量的值是flink的`tableEnv.executeSql(statement);`的返回值。 dataSource | `String` | 否 | 使用的数据源名称。这里的数据源是在[flink-jobs](https://gitee.com/tenmg/flink-jobs)应用程序的配置文件中配置,并非在flink-jobs-launcher应用程序的配置文件中配置。详见[flink-jobs数据源配置](https://gitee.com/tenmg/flink-jobs#%E6%95%B0%E6%8D%AE%E6%BA%90%E9%85%8D%E7%BD%AE)。 catalog | `String` | 否 | 执行SQL使用的Flink SQL的catalog名称。 script | `String` | 否 | 基于[DSL](https://gitee.com/tenmg/dsl)的SQL脚本。采用文本表示,如:`SQL code`或``。由于Flink SQL不支持DELETE、UPDATE语句,因此如果配置的SQL脚本是DELETE或者UPDATE语句,该语句将在程序main函数中采用JDBC执行。 #### `` 运行基于[DSL](https://gitee.com/tenmg/dsl)的SQL查询代码配置。 属性 | 类型 | 必需 | 说明 -----------|--------|----|-------- saveAs | `String` | 否 | 查询结果另存为临时表的表名及操作结果另存为一个新的变量的名称。变量的值是Flink的`tableEnv.executeSql(statement);`的返回值。 catalog | `String` | 否 | 执行SQL使用的Flink SQL的catalog名称。 script | `String` | 否 | 基于[DSL](https://gitee.com/tenmg/dsl)的SQL脚本。采用文本表示,如:`SQL code`或``。 #### `` 运行基于[DSL](https://gitee.com/tenmg/dsl)的JDBC SQL代码配置。目标JDBC SQL代码是在[flink-jobs](https://gitee.com/tenmg/flink-jobs)应用程序的main函数中运行的。 属性 | 类型 | 必需 | 说明 -----------|----------|----|-------- saveAs | `String` | 否 | 执行结果另存为一个新的变量的名称。变量的值是执行JDBC指定方法的返回值。 dataSource | `String` | 是 | 使用的数据源名称。这里的数据源是在flink-jobs应用程序的配置文件中配置,并非在flink-jobs-launcher应用程序的配置文件中配置。详见[flink-jobs数据源配置](#%E6%95%B0%E6%8D%AE%E6%BA%90%E9%85%8D%E7%BD%AE)。 method | `String` | 否 | 调用的JDBC方法。默认是"executeLargeUpdate"。 script | `String` | 是 | 基于[DSL](https://gitee.com/tenmg/dsl)的SQL脚本。 #### `` 运行基于Flink SQL的流式任务实现数据同步,支持版本:1.1.2+,相关属性及说明如下: 属性 | 类型 | 必需 | 说明 -----------|----------------|----|-------- saveAs | `String` | 否 | 执行结果另存为一个新的变量的名称。变量的值是执行`INSERT`语句返回的`org.apache.flink.table.api.TableResult`对象。一般不使用。 from | `String` | 是 | 来源数据源名称。目前仅支持Kafka数据源。 topic | `String` | 否 | Kafka主题。也可在fromConfig中配置`topic=xxx`。 fromConfig | `String` | 否 | 来源配置。例如:`properties.group.id=flink-jobs`。 to | `String` | 是 | 目标数据源名称,目前仅支持JDBC数据源。 toConfig | `String` | 是 | 目标配置。例如:`sink.buffer-flush.max-rows = 0`。 table | `String` | 是 | 同步数据表名。 primaryKey | `String` | 否 | 主键,多个列名以“,”分隔。当开启智能模式时,会自动获取主键信息。 timestamp | `String` | 否 | 时间戳列名,多个列名使用“,”分隔。设置这个值后,创建源表和目标表时会添加这些列,并在数据同步时写入这些列。一般在flink-jobs应用程序中使用配置文件统一指定,而不是每个同步任务单独指定。 smart | `Boolean` | 否 | 是否开启智能模式。不设置时,根据全局配置确定是否开启智能模式,全局默认配置为`data.sync.smart=true`。 `` | `Element` | 否 | 同步数据列。当开启智能模式时,会自动获取列信息。 ##### `` 属性 | 类型 | 必需 | 说明 ---------|----------|----|-------- fromName | `String` | 是 | 来源列名。 fromType | `String` | 否 | 来源数据类型。如果缺省,则如果开启智能模式会自动获取目标数据类型作为来源数据类型,如果关闭智能模式则必填。 toName | `String` | 否 | 目标列名。默认为来源列名。 toType | `String` | 否 | 目标列数据类型。如果缺省,则如果开启智能模式会自动获取,如果关闭智能模式则默认为来源列数据类型。 strategy | `String` | 否 | 同步策略。可选值:both/from/to,both表示来源列和目标列均创建,from表示仅创建原来列,to表示仅创建目标列,默认为both。 script | `String` | 否 | 自定义脚本。通常是需要进行函数转换时使用。 ### XML配置示例 为了更好的理解flink-jobs的XML配置文件,以下提供几种常见场景的XML配置文件示例: #### 运行普通Flink程序 ``` ``` #### 运行自定义服务 以下为一个自定义服务任务XML配置文件: ``` ``` #### 运行批处理SQL 以下为一个简单订单量统计SQL批处理任务XML配置文件: ``` 2021-01-01 2021-07-01 = :beginDate and o.business_date < :endDate group by cast(to_date(o.business_date) as date) ]]> = :beginDate and stats_date < :endDate ]]> ``` #### 运行流处理SQL 以下为通过Debezium实现异构数据库同步任务XML配置文件: ``` ``` #### 运行数据同步任务 以下为通过Debezium实现异构数据库同步任务XML配置文件: ``` TO_TIMESTAMP(FROM_UNIXTIME(UPDATE_TIME/1000, 'yyyy-MM-dd HH:mm:ss')) ``` ## 配置文件 默认的配置文件为`flink-jobs-launcher.properties`(注意:需在`classpath`下),可通过`flink-jobs-launcher-context-loader.properties`配置文件的`config.location`修改配置文件路径和名称。 ### 通用配置 属性 | 类型 | 必需 | 说明 -------------------------|----------|----|-------- flink.jobs.default.jar | `String` | 否 | 启动时默认向Flink提交的JAR包。即当任务配置中没有指定`jar`时,会采用此配置。 flink.jobs.default.class | `String` | 否 | 启动时默认向Flink提交运行的主类。即当任务配置中没有指定`class`时,会采用此配置。 ### RestClusterClientFlinkJobsLauncher 属性 | 类型 | 必需 | 说明 -------------------------|----------|----|-------- jobmanager.rpc.servers | `String` | 否 | Flink集群远程调用地址,格式为`host1:port1,host2:port2,…`,其中端口号(`port`)可以缺省,缺省时所有端口号均为`jobmanager.rpc.port`的值。 jobmanager.rpc.address | `String` | 否 | Flink集群远程调用地址,只能配置一个主机地址,已不推荐使用。配置`jobmanager.rpc.servers`后,该配置失效。 jobmanager.rpc.port | `int` | 是 | Flink集群远程调用端口号,默认值为`6123`。 jobmanager.* | 略 | 略 | Flink集群的其他配置参见[Flink官网](https://flink.apache.org)对应版本的文档。 rest.* | 略 | 略 | Flink集群的其他配置参见[Flink官网](https://flink.apache.org)对应版本的文档。 ### CommandLineFlinkJobsLauncher 属性 | 类型 | 必需 | 说明 ----------------------------------------------------------------|----------|----|-------- commandline.flink.home | `String` | 否 | 如果访问flink命令行不需要添加目录,则无需配置。 commandline.launch.action | `String` | 否 | 默认值为`run`。 commandline.launch.temp_file_prefix | `String` | 否 | 运行产生的临时文件前缀,默认为`flink-jobs_`。 commandline.yarn.rest | `String` | 否 | YARN REST服务地址和端口,运行Flink On YARN时需要指定。 commandline.yarn.application_check_attempts | `String` | 否 | 默认值`60`,运行Flink On YARN时通过自动生成的唯一的Application Name检测Application ID的尝试次数。 commandline.yarn.time_millis_between_application_check_attempts | `String` | 否 | 默认值`3000`,运行Flink On YARN时通过自动生成的唯一的Application Name检测Application ID的两次尝试之间等待的毫秒数。 commandline.yarn.application_id_prefix | `String` | 否 | 默认值`application_`,运行Flink On YARN自动生成的唯一的Application Name的前缀。 ## 参与贡献 1. Fork 本仓库 2. 新建 Feat_xxx 分支 3. 提交代码 4. 新建 Pull Request ## 相关链接 flink-jobs开源地址:https://gitee.com/tenmg/flink-jobs DSL开源地址:https://gitee.com/tenmg/dsl Flink官网:https://flink.apache.org Debezuim官网:https://debezium.io