纯钧(ChunJun,原名FlinkX)框架学习

目录

一、背景

二、概念

三、特性

四、工作原理

五、快速开始

1.数据同步任务模版

kafka to kudu

mysql to hive

2.数据同步执行命令

flinkx老版本命令参数:

flinkx老版本执行命令:

chunjun新版本执行命令:(明显看出命令还是减少了很多的,更简便易用了)

六、dolphinscheduler集成chunjun

[CSDN话题挑战赛第2期](https://marketing.csdn.net/p/7b6697fd9dd3795a268d1a6f2fe75012) 参赛话题:[大数据学习成长记录](https://activity.csdn.net/creatActivity?id=10214)

一、背景

今天领导突然问dolphinscheduler能不能支持采集埋点数据实时写入kudu,datax是离线etl工具肯定不支持了,只能用flink sql或者FlinkX来实现了。但是FlinkX之前没听说过,新知识点呀,果断学起来!!!

二、概念

纯钧(ChunJun,原名FlinkX),是一款稳定、易用、高效、批流一体的数据集成框架,

是在是袋鼠云内部广泛使用的基于flink的分布式离线数据同步框架,实现了多种异构数据源之间高效的数据迁移。不同的数据源头被抽象成不同的Reader插件,不同的数据目标被抽象成不同的Writer插件。

目前基于实时计算引擎Flink实现多种异构数据源之间的数据同步与计算,已在上千家公司部署且稳定运行。

官方网站:https://dtstack.github.io/chunjun/

git:

chunjun: 基于flink的分布式数据同步框架

肖友/flinkx - Gitee.com

三、特性

纯钧(ChunJun)将不同的数据库抽象成了reader/source 插件,writer/sink 插件和lookup 维表插件,其具有以下特点:

基于实时计算引擎Flink,支持JSON模版配置任务,兼容Flink SQL语法;支持分布式运行,支持flink-standalone、yarn-session、yarn-per job等多种提交方式;支持Docker一键部署,支持K8S 部署运行;支持多种异构数据源,可支持MySQL、Oracle、SQLServer、Hive、Kudu等20多种数据源的同步与计算;易拓展,高灵活性,新拓展的数据源插件可以与现有数据源插件即时互通,插件开发者不需要关心其他插件的代码逻辑;不仅仅支持全量同步,还支持增量同步、间隔轮训;批流一体,不仅仅支持离线同步及计算,还兼容实时场景;支持脏数据存储,并提供指标监控等;配合checkpoint实现断点续传;不仅仅支持同步DML数据,还支持Schema变更同步

四、工作原理

在底层实现上,FlinkX依赖Flink,数据同步任务会被翻译成StreamGraph在Flink上执行,工作原理如下图:

五、快速开始

1.数据同步任务模版

flinkx使用和datax配置差不多,配置好输入输出的json

kafka to kudu

{

"job": {

"content": [

{

"reader": {

"name": "streamreader",

"parameter": {

"column": [

{

"name": "int_field",

"type": "int"

},

{

"name": "byte_field",

"type": "byte"

},

{

"name": "short_field",

"type": "smallint"

},

{

"name": "long_field",

"type": "bigint"

},

{

"name": "binary_field",

"type": "binary"

},

{

"name": "string_field",

"type": "string"

},

{

"name": "bool_field",

"type": "boolean"

},

{

"name": "float_field",

"type": "float"

},

{

"name": "double_field",

"type": "double"

}

],

"sliceRecordCount": [

100

]

}

},

"writer": {

"parameter": {

"kerberos": {

"keytab": "/Users/wtz/dtstack/conf/kerberos/eng-cdh/hive3.keytab",

"principal": "hive/eng-cdh3@DTSTACK.COM",

"krb5Conf": "/Users/wtz/dtstack/conf/kerberos/eng-cdh/krb5.conf"

},

"column": [

{

"name": "int_field",

"type": "int32"

},

{

"name": "byte_field",

"type": "int8"

},

{

"name": "short_field",

"type": "int16"

},

{

"name": "long_field",

"type": "int64"

},

{

"name": "binary_field",

"type": "binary"

},

{

"name": "string_field",

"type": "string"

},

{

"name": "bool_field",

"type": "bool"

},

{

"name": "float_field",

"type": "float"

},

{

"name": "double_field",

"type": "double"

}

],

"masters": "eng-cdh1:7051",

"table": "table_name",

"flushMode": "manual_flush",

"writeMode": "append",

"batchSizeBytes": 1048576

},

"name": "kuduwriter"

}

}

],

"setting": {

"speed": {

"channel": 1,

"bytes": 0

},

"errorLimit": {

"record": 10000,

"percentage": 100

},

"dirty": {

"path": "/tmp",

"hadoopConfig": {

"fs.default.name": "hdfs://ns1",

"dfs.nameservices": "ns1",

"dfs.ha.namenodes.ns1": "nn1,nn2",

"dfs.namenode.rpc-address.ns1.nn1": "node02:9000",

"dfs.namenode.rpc-address.ns1.nn2": "node03:9000",

"dfs.ha.automatic-failover.enabled": "true",

"dfs.client.failover.proxy.provider.ns1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",

"fs.hdfs.impl.disable.cache": "true"

}

},

"restore": {

"isRestore": false,

"isStream": false

}

}

}

}

mysql to hive

{

"job": {

"content": [

{

"reader": {

"parameter" : {

"username" : "username",

"password" : "password",

"cat" : "insert,delete,update",

"jdbcUrl" : "jdbc:mysql://ip:3308/tudou?useSSL=false",

"host" : "ip",

"port" : 3308,

"start" : {

},

"table" : [ "tudou.kudu" ],

"splitUpdate" : false,

"pavingData" : true

},

"name" : "binlogreader"

},

"writer": {

"name" : "hivewriter",

"parameter" : {

"jdbcUrl" : "jdbc:hive2://ip:10000/tudou",

"username" : "",

"password" : "",

"fileType" : "text",

"fieldDelimiter" : ",",

"writeMode" : "overwrite",

"compress" : "",

"charsetName" : "UTF-8",

"maxFileSize" : 1073741824,

"analyticalRules" : "test_${schema}_${table}",

"schema" : "tudou",

"tablesColumn" : "{\"kudu\":[{\"comment\":\"\",\"type\":\"varchar\",\"key\":\"type\"},{\"comment\":\"\",\"type\":\"varchar\",\"key\":\"schema\"},{\"comment\":\"\",\"type\":\"varchar\",\"key\":\"table\"},{\"comment\":\"\",\"type\":\"bigint\",\"key\":\"ts\"},{\"part\":false,\"comment\":\"\",\"type\":\"INT\",\"key\":\"before_id\"},{\"comment\":\"\",\"type\":\"INT\",\"key\":\"after_id\",\"part\":false},{\"part\":false,\"comment\":\"\",\"type\":\"VARCHAR\",\"key\":\"before_name\"},{\"comment\":\"\",\"type\":\"VARCHAR\",\"key\":\"after_name\",\"part\":false},{\"part\":false,\"comment\":\"\",\"type\":\"INT\",\"key\":\"before_age\"},{\"comment\":\"\",\"type\":\"INT\",\"key\":\"after_age\",\"part\":false}]}",

"partition" : "pt",

"partitionType" : "MINUTE",

"defaultFS" : "hdfs://ns",

"hadoopConfig" : {

"dfs.ha.namenodes.ns": "nn1,nn2",

"fs.defaultFS": "hdfs://ns",

"dfs.namenode.rpc-address.ns.nn2": "ip:9000",

"dfs.client.failover.proxy.provider.ns": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",

"dfs.namenode.rpc-address.ns.nn1": "ip:9000",

"dfs.nameservices": "ns",

"fs.hdfs.impl.disable.cache": "true",

"hadoop.user.name": "root",

"fs.hdfs.impl": "org.apache.hadoop.hdfs.DistributedFileSystem"

}

}

}

}

],

"setting": {

"speed": {

"channel": 1

}

}

}

}

2.数据同步执行命令

flinkx老版本命令参数:

model

描述:执行模式,也就是flink集群的工作模式

local: 本地模式

standalone: 独立部署模式的flink集群

yarn: yarn模式的flink集群,需要提前在yarn上启动一个flink session,使用默认名称"Flink session cluster"

必选:否

默认值:local

job

描述:数据同步任务描述文件的存放路径;该描述文件中使用json字符串存放任务信息。

必选:是

默认值:无

pluginRoot

描述:插件根目录地址,也就是打包后产生的pluginRoot目录。

必选:是

默认值:无

flinkconf

描述:flink配置文件所在的目录(单机模式下不需要),如/hadoop/flink-1.4.0/conf

必选:否

默认值:无

yarnconf

描述:Hadoop配置文件(包括hdfs和yarn)所在的目录(单机模式下不需要),如/hadoop/etc/hadoop

必选:否

默认值:无

flinkx老版本执行命令:

以本地模式启动数据同步任务

bin/flinkx -mode local -job /Users/softfly/company/flink-data-transfer/jobs/task_to_run.json -pluginRoot /Users/softfly/company/flink-data-transfer/plugins -confProp "{"flink.checkpoint.interval":60000,"flink.checkpoint.stateBackend":"/flink_checkpoint/"}" -s /flink_checkpoint/0481473685a8e7d22e7bd079d6e5c08c/chk-*

以standalone模式启动数据同步任务

bin/flinkx -mode standalone -job /Users/softfly/company/flink-data-transfer/jobs/oracle_to_oracle.json -pluginRoot /Users/softfly/company/flink-data-transfer/plugins -flinkconf /hadoop/flink-1.4.0/conf -confProp "{"flink.checkpoint.interval":60000,"flink.checkpoint.stateBackend":"/flink_checkpoint/"}" -s /flink_checkpoint/0481473685a8e7d22e7bd079d6e5c08c/chk-*

以yarn模式启动数据同步任务

bin/flinkx -mode yarn -job /Users/softfly/company/flinkx/jobs/mysql_to_mysql.json -pluginRoot /opt/dtstack/flinkplugin/syncplugin -flinkconf /opt/dtstack/myconf/conf -yarnconf /opt/dtstack/myconf/hadoop -confProp "{"flink.checkpoint.interval":60000,"flink.checkpoint.stateBackend":"/flink_checkpoint/"}" -s /flink_checkpoint/0481473685a8e7d22e7bd079d6e5c08c/chk-*

chunjun新版本执行命令:(明显看出命令还是减少了很多的,更简便易用了)

以本地模式启动数据同步任务

进入到chunjun-dist 目录,执行命令

sh bin/chunjun-local.sh -job chunjun-examples/json/stream/stream.json

以standalone模式启动数据同步任务

1. 启动Flink Standalone环境

sh $FLINK_HOME/bin/start-cluster.sh

启动成功后默认端口为8081,我们可以访问当前机器的8081端口进入standalone的flink web ui

2. 提交任务

进入到本地chunjun-dist目录,执行命令

sh bin/chunjun-standalone.sh -job chunjun-examples/json/stream/stream.json

以yarn模式启动数据同步任务

1. 启动Yarn Session环境

Yarn Session 模式依赖Flink 和 Hadoop 环境,需要在提交机器中提前设置好HADOOPHOME和

FLINK_HOME,我们需要使用yarn-session -t参数上传chunjun-dist

cd $FLINK_HOME/bin

./yarn-session -t $CHUNJUN_HOME -d

2. 提交任务

通过yarn web ui 查看session 对应的application $SESSION_APPLICATION_ID,进入到本地chunjun-dist目录,执行命令

sh ./bin/chunjun-yarn-session.sh -job chunjun-examples/json/stream/stream.json -confProp {\"yarn.application.id\":\"SESSION_APPLICATION_ID\"}

六、dolphinscheduler集成chunjun

dolphinscheduler工具栏集成chunjun,本来不支持的,7天前有位好心的大佬更新了相关代码🙏🙏🙏

好人一生平安!!!集成好之后可以自定义好json文件,直接拖拽chunjun组件配置实时数据同步任务啦~

下载地址:https://github.com/apache/dolphinscheduler/tree/dev/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun

ps:我们开发小哥最近比较任性,rm -rf了什么东东,dolphinssheduler挂掉了暂时没法用,具体的安装文档修好之后再整理叭🌸

Copyright © 2022 ZGC网游最新活动_热门游戏资讯_玩家互动社区 All Rights Reserved.