Seatunnel实践总结及相关错误报告

大数据 2023-09-13 11:09:34
122阅读

写在前面:我也是新手。本文仅供个人使用和总结。可能存在一些片面的理解或错误。如果各位高手看到的话可以留言指正。我必须努力学习。同时,我也只是想给自己做个笔记,方便以后整理,所以才贴出来。如果能帮助刚开始使用Seatunnel 的朋友避免一些陷阱那就最好了。

1、语法模块(官方文档摘抄)

1.1 source 和 sink源

只列出我目前需要的。基本上所有的数据源都是兼容的。列出只是为了阐明需要下载的连接器jar 包。

来源:

配置单元、jdbc、maxcomputer、localfile、mysqlcdc、ossfile、rocketMQ、sftpfile、套接字

sink:console, datahub, mysql, sftpfile, hive, jdbc, localfile, maxcomputer, ossfile, socket, RocketMQ

1.2 source 和 sink 相关写法

# 固定环境变量,我使用本地模式

环境{

执行并行度=2

job.mode='流式传输'

检查点间隔=2000

} 插座

来源{

插座{

主机='1**.***.**.**1'

端口=9999

}

}

#最大计算

来源{

最大计算{

accessId='************************'

访问密钥='****************************'

端点='http://service.cn.maxcompute.aliyun.com/api'

项目='professional_test_dev'

表名='test_person__info'

partition_spec='pt=20230628'

split_row=10000

字段=[姓名、年龄、地址]

}

}

#oss文件

来源{

奥斯文件{

路径='/seatunnel/sink/age=20/'

Bucket='oss://you_bucket_name'

access_key='************************'

access_secret='****************************'

端点='oss-cn-shanghai.aliyuncs.com'

文件格式类型='文本'

#字段映射

read_columns=['姓名','年龄','地址','开始日期']

#是否根据路径解析分区,请注意,如果启用,sink端需要有一个额外的字段映射到分区字段。

parse_partition_from_path=true

# 跳过前几行(去掉行头功能)

跳过标题行数=1

#什么格式的字符串需要解析成日期

date_format='yyyy-MM-dd'

#字段分隔符

分隔符='\t'

# 读取所有列

模式{

字段{

名称=字符串

年龄=整数

地址=字符串

开始日期=日期

}

}

}

} 插座

下沉{

插座{

主机='1**.***.**.**1'

端口=8888

}

}

#OssFile

下沉{

奥斯文件{

路径='/座位隧道/水槽'

Bucket='oss://you_bucket_name'

access_key='************************'

access_secret='****************************'

端点='oss-cn-shanghai.aliyuncs.com'

文件格式类型='文本'

字段分隔符='\t'

行分隔符='\n'

sink_columns=['姓名','年龄']

}

}

#数据中心

下沉{

数据中心{

source_table_name='test_split_table'

端点='https://dh-cn-shanghai.aliyuncs.com'

accessId='='************************''

accessKey='********************************'

项目='you_project_name'

主题='test_seatunnel_socket'

超时=3000

重试次数=3

}

}

# mysql

下沉{

jdbc {

# url中的参数rewriteBatchedStatements=true是批量执行,否则mysql会一笔一笔的执行,导致性能低下。

url='jdbc:mysql://host/test?serverTimezone=GMT%2b8useSSL=falserewriteBatchedStatements=true'

驱动程序='com.mysql.cj.jdbc.Driver'

用户='root'

密码='123456'

Primary_keys=['姓名', '年龄']

# upsert功能,性能较低,仅在数据库没有upsert功能时启用,mysql启用报错Duplicateentry '*' for key 'PRIMARY'

# support_upsert_by_query_primary_key_exist=true

# 不需要写查询语句。根据generate_sink_sql=true参数,将根据以下数据库名和表名自动生成SQL语句。运行时会报错。未知

数据库='测试'

表='clmp_user'

# 如何在mysql中编写upsert

查询='''

插入clmp_user(name,age,address,date,pt) 值

(??)

关于重复密钥更新

名称=值(名称),

年龄=值(年龄),

地址=值(地址),

日期=值(日期),

pt=值(pt);

'''

连接超时

连接检查超时秒=100

# 重试失败提交的次数。如果你想只设置一次,则需要将其设置为0,否则可能会导致重复。

最大重试次数=0

# 对于批量写入,当缓冲记录数达到batch_size或者时间达到batch_interval_ms时,数据就会被flush到数据库

批量大小=10000

批次间隔毫秒=60000

# 精确一次,如果启用,必须定义xa_data_source_class_name,并且mysql版本必须大于等于8.0.29

# is_exactly_once=true

#xa_data_source_class_name='com.mysql.cj.jdbc.MysqlXADataSource'

#交易提交失败重试次数

最大提交尝试次数=10

# 交易开启后超时,默认值为-1(永不超时)。注意设置超时可能会影响once语义

transaction_timeout_sec=-1

# 启用自动交易提交,默认为true

自动提交=true

}

}

1.3 SQL Functions

https://seatunnel.apache.org/docs/2.3.1/transform-v2/sql-functions

1.4 命令行参数

用法: Seatunnel.sh [选项]

命令

影响

--异步

异步运行作业。当作业提交后,客户端将退出(默认值:false)

-可以,--取消作业

通过JobId 取消作业

- 查看

是否检查配置(默认:false)

-cj,--结束工作

关闭客户端任务也会被关闭(默认:true)

-cn,--集群

集群名称

-c,--config

配置文件

--解密

解密配置文件。当同时指定--decrypt和--encrypt时,仅--encrypt生效(默认值:false)

-m、--master、-e、--部署模式

SeaTunnel作业提交master,支持[本地,集群](默认:集群)

- 加密

解密配置文件。当同时指定--decrypt和--encrypt时,仅--encrypt生效(默认值:false)

-h,--帮助

显示使用信息

-j, --工作ID

通过JobId获取作业状态

-l,--列表

列出作业状态(默认:false)

--指标

通过JobId获取作业指标

-n,--名称

SeaTunnel 作业名称(默认: SeaTunnel)

-r, --恢复

通过jobId恢复保存点

-s, --保存点

通过jobId 保存点作业

-i, --变量

变量替换,例如-i City=beijing,或-i date=20190318(默认值:[])

2、实践问题模块

2.1 快速开始案例存在问题

如果想要快速案例v2.batch.config.template 成功运行,需要将下面链接的两个jar 包添加到Seatunnel 安装路径/lib 目录下(版本取决于您的版本) hadoop,我的没有使用hadoop,所以直接和seatunnel版本一致)下载地址

Seatunnel-hadoop3-3.1.4-uber-2.3.2.jar

Seatunnel-hadoop3-3.1.4-uber-2.3.2-可选.jar

2.2 socket连接失败

写入文件时,所有字符串指示符都需要使用双引号。单引号只能用于引用变量。如果文件中某个指标的内容太长,需要分多行写入,则需要用三引号括起来。内容和变量不能用三引号括起来。如果必须引用,可以使用多个三引号拼接语句,中间穿插变量引用。启动任务时,使用-i为变量赋值(-i date=20230627);注意:引用变量在本地模式下不可用,仅flink 或Spark 引擎可用。

#示例:多行写入并引用变量

变量='''

你的字符串1

'''${you_var}'''你的字符串2'''

# 示例:在语句中应用变量

转换{

sql {

query='select * from user_view where city=''${city}'' and dt=''${date}'''

}

}

# 示例:变量启动赋值

/app/clmp/seatunnel-2.3.2/bin/seatunnel.sh --config /root/test_seatunnel/--driver-memory 4g -e local -i date=20230627

2.3 写入oss报错

# 主要有两个错误信息。

1. java.lang.RuntimeException: java.lang.ClassNotFoundException: 类org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem 未找到

2. org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.seatunnel.engine.server.checkpoint.CheckpointException: 聚合提交错误。

# 一开始感觉oss连接jar包不存在,后来排查发现是存在的。然后我在github上搜索了和我类似的问题。最终的解决办法是添加hadoop-aliyun-2.7.2.jar放到seatunnel-2.3.2/lib下,果然解决了。 github上的情况是类加载器有问题。

# 注意:如果是私有云,除了放置jar包外,还需要关闭cname,否则会出现新的错误SignatureDoesNotMatch。如何关闭cname可以咨询阿里云驻地或者提交工单解决。这里的解决方案是Ping不添加http://的oss地址。返回的结果中有一个IP。使用这个IP代替原来的oss地址

2.4 使用jdbc连接mysql报错

Caused by: org.apache.seatunnel.api.common.PrepareFailException: ErrorCode:[API-01], ErrorDescription: [配置项验证失败] - PluginName: jdbc, PluginType: source, Message: com.mysql .cj.jdbc.exceptions.CommunicationsException: 通信链路故障

最后一次成功发送到服务器的数据包是在0 毫秒前。驱动程序尚未收到来自服务器的任何数据包。

这件事情是由很多原因导致的。我目前遇到两种情况:

# 1、使用jdbc连接数据库时,必须自己提供数据库驱动,并复制到$SEATNUNNEL_HOME/plugins/jdbc/lib/目录下才能使用。例如,如果您使用MySQL,则应下载并复制MySQL-connector-java-xxx.jar到$SEATNUNNEL_HOME/plugins/jdbc/lib/,然后正常放置。

# 2. SSL 连接原因; MySQL在高版本中需要指明是否进行SSL连接。默认情况下它是启用的。所以只需要在配置文件中的url后面添加useSSL=false即可,如:url='jdbc:mysql://local_host/test?serverTimezone=GMT%2b8useSSL=false'Caused by: org.apache.seatunnel.api.common。 PreparationFailException: ErrorCode:[API-01]、ErrorDescription:[配置项验证失败] - PluginName: jdbc、PluginType: 源、Message: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver

# 首先请注意,官网提供的驱动类名是com.mysql.cj.jdbc.Driver,但这应该是8以上的驱动包。我第一次使用mysql-connector-java-5.1.47.jar,它的驱动类名不是这个,而是com.mysql.jdbc.Driver。如果直接使用官方的会报上面的错误。

2.5 关于尝试连接impala的总结

Seatunnel本身没有impala作为source或sink,但是impala保留了hive和jdbc的连接方式,所以我准备尝试使用这两种方式进行连接。

2.5.1 使用jdbc连接

org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException:

ErrorCode: [JDBC-06],ErrorDescription: [找不到合适的方言工厂] - 找不到任何可以处理实现“org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory”的url '' 的jdbc 方言工厂在类路径中

可用工厂有:个

org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.db2.DB2DialectFactory

org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm.DmdbDialectFactory

org.apache.seatunnel.connectors.seatunnel.jabc.internal.dialect.gbasea.GbaseBaDialectFactory

org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.greenplum.GreenplunDialectFactory

org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.My5qDialectFactory

org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleDialectFactory

org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.phoenix.PhoenixDialectFactory

org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresDialectFactory

org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.redshift.RedshiftDialectFactory

org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.saphana.SapHanaDialectFactory

org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.snowflake.SnowflakeDialectFactory

org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlite.SqliteDialectFactory

org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver.SqlServerDialectFactory

org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.tablestorel.TablestoreDialectFactory

org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.teradata.TeradataDialectFactory

org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.vertica.VerticaDialectFactory

# 根据错误报告和源代码,当前jdbc连接不包含impala数据源类型,因此在调用jdbc等连接器时,无法解析URL,无法创建类加载器。错误内容后面是当前可以使用jdbc 2.5.2 Connect using hive连接的所有数据源列表

2023-07-12 16:14:47,905 警告org.apache.hadoop.util.NativeCodeLoader - 无法为您的平台加载本机hadoop 库.在适用的情况下使用内置java 类

2023-07-1216:14:47,983 警告hive 元存储- setlugi() 不成功,可能导致: 新客户端与旧服务器通信。没有它继续。 org.apache.thrift。 TApplicationException: 方法名称无效:'set ugit'

在org.apache.thrift.TApplicationException.read (TApplicationException.java: 111) ~ [hive-exec-2.3.9.jar: 2.3.9]

在org.apache.thrift.TServiceClient.receiveBase (TServiceClient.java: 79) ~ [hive-exec-2.3.9.jar: 2.3.9]

2023-07-1216:14:51,089 错误org.apache.seatunnel.core.starter.SeaT

unne1 - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException:SeaTunneljobexecutedfailed at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute (ClientExecuteCommand. java: 188) at org. apache.seatunnel.core.starter.SeaTunnel.run (SeaTunnel.java:40) at org.apache. seatunnel.core.starter.seatunnel.SeaTunnelClient.main (SeaTunnelClient.java: 34) Caused by: org.apache.thrift.TApplicationException: Invalid method name: 'get_table' at org.apache.thrift.TApplicationException.read (TApplicationException.java: 111) at org.apache.thrift.TServiceClient.receiveBase (TServiceClient.java: 79) # 第一次使用报错信息是这样,提示新客户端访问老版服务,且后面还有一个报错为get_table这个方法不存在,更能确定时版本的问题。这里主要是因为用seatunnelengine,需要把seatunnel-hadoop3-3.1.4-uber.jar和hive-exec-2.3.9.jar放在$SEATUNNEL_HOME/lib/目录下。这个hadoop和hive的版本需要和impala中的相应版本对应。2.5.3 解决方式 按照源码该路径下(seatunnel-dev\seatunnel-dev\seatunnel-connectors-v2\connector-jdbc\src\main\java\org\apache\seatunnel\ connectors\seatunnel\jdbc\internal\dialect\mysql)的代码重写4个方法,编写一个hive的jdbc连接器。路径定义为hive-jdbc-connector \src\main\java\org\apache\seatunnel\connectors\seatunnel\jdbc\hive\internal\dialect\hive。之后单独打包,放入seatunnel_home /pligins/jdbc/lib目录下。之后再运行代码,可能会出现jar包冲突的问题,根据报错解决依赖的冲突后重新打包即可 # 冲突报错举例 TTransport 即为冲突的内容: Exception in thread "main" java.1ang.LinkageError:loader constraint violation: loader (instance of sun/misc/Launcher$AppClassLoader) previously initiated loading for a different type with name "org/apache/thrift/transport/TTransport"
2.6 oss - mysql同步数据报错
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.RuntimeException: java.lang.IllegalArgumentException at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:207) at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:59) ... Caused by: java.lang.IllegalArgumentException at com.google.common.base.Preconditions.checkArgument(Preconditions.java:127) at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.FieldNamedPreparedStatement. prepareStatement(FieldNamedPreparedStatement.java:639) # 查看源码 639行代码如下,本人对于JAVA不是很熟,所以前面报错得源码部分没看懂,没发现问题在哪 checkArgument(parameterMap.size() == fieldNames.length); # 这个说明和字段有关系,再看我写的配置文件,我使用了generate_sink_sql=true参数,可能是这个参数得原因,所以我去除后使用query参数代替,解决问题Caused by: org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException: ErrorCode:[COMMON-10], ErrorDescription:[Flush data operation that in sink connector failed] - java.sql.SQLException: Parameter index out of range (5 > number of parameters, which is 4). at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129) at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97) at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:89) at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:63) at com.mysql.cj.jdbc.ClientPreparedStatement.checkBounds(ClientPreparedStatement.java:1373) at com.mysql.cj.jdbc.ClientPreparedStatement.getCoreParameterIndex(ClientPreparedStatement.java:1386) at com.mysql.cj.jdbc.ClientPreparedStatement.setString(ClientPreparedStatement.java:1753) # 这个报错得原因很明确,就是注入SQL写得有问题。很多时候是因为使用引号,导致系统把字段识别成了字符串。所以写注入SQL时,字符类型得字段也不需要套引号。我这个报错得原因是少穿了参数,因为我OSS做为source,同时启用了parse_partition_from_path=true这个参数,这个时候是需要多一个字段来映射分区得,但是我没传,所以报错少一个字段
2.7 maven依赖缺失解决
# 正常情况maven正常下载就可以,但是有些依赖因为各种原因下载不下来,这个时候需要我们自己下载jar包并安装。jar包下载后,使用cmd进入命令行,切换至jar包所在目录下,执行命令 mvn install:install-file -Dfile=jindo-sdk-4.6.1.jar -DgroupId=com.aliyun.jindodata -DartifactId=jindosdk -Dversion=4.6.1 -Dpackaging=jar # 配置说明: Dfile jar包名称 DgroupId groupId DartifactId artifactId Dversion version 依赖下载下来直接放入本地maven仓库是不行的,java还是无法识别这个maven,必须执行命令安装
2.8 比较小的问题
2.4.1 我目前测试都是在虚拟机上编辑文件通过命令行启动本地模式执行,但是好几次因为大小写的问题出现报错,报错内容大致为某个必需指标未赋值,尤其是需要AK、SK的源,大小写很奇怪;这个问题我不是很确定,只是遇到过,但是没有时间继续在这上面深究,所以不一定正确,可以作为一个排查问题的思路 2.4.2 字段顺序需要对应,及source写的字段顺序如果为 fields=[name,age,address] 的话,sink的字段顺序也要按这个顺序写,不会自动按字段名称来对应

3、官方相关报错参考

链接:https://seatunnel.apache.org/docs/2.3.2/connector-v2/Error-Quick-Reference-Manual/ 格式类似下面这样,可以帮助精确报错原因,更快定位问题 代码 描述 解决方案 API-01 配置项验证失败 当用户遇到此错误代码时,通常是由于用户配置的连接器参数有问题,请检查连接器文档并更正参数 API-02 选项项验证失败 - API-03 目录初始化失败 当用户遇到此错误代码时,通常是因为连接器初始化目录失败,请检查连接器连接器选项是否正确 API-04 数据库不存在 当用户遇到此错误代码时,通常是因为您要访问的数据库不存在,请仔细检查数据库是否存在 API-05 表不存在 当用户遇到此错误代码时,通常是因为您要访问的表不存在,请仔细检查该表是否存在 API-06 工厂初始化失败 当用户遇到此错误代码时,通常是因为jar包依赖有问题,请检查您本地的SeaTunnel安装包是否完整 API-07 数据库已经存在 当用户遇到此错误代码时,表示您要创建的数据库已经存在,请删除数据库并重试 API-08 表已存在 当用户遇到此错误代码时,说明您要创建的表已经存在,请删除表后重试

4、目前本人正在解决的问题

1、maxcomputer to datahub 数据分字段插入,不安条数插入 2、使用本地模式 -i 传递参数一直失败,目前分区动态传入均报错 有知道的大佬也可以告知下,关于代码就是我1.1部分的代码,十分感谢!
the end
免责声明:本文不代表本站的观点和立场,如有侵权请联系本站删除!本站仅提供信息存储空间服务。