必看!S3File Sink Connector 使用文档

这篇具有很好参考价值的文章主要介绍了必看!S3File Sink Connector 使用文档。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

必看!S3File Sink Connector 使用文档

S3File 是一个用于管理 Amazon S3(Simple Storage Service)的 Python 模块。当前,Apache SeaTunnel 已经支持 S3File Sink Connector,为了更好地使用这个 Connector,有必要看一下这篇使用文档指南。

描述

将数据输出到 AWS S3 文件系统。

提示:

如果您使用的是 Spark/Flink,在使用此连接器之前,必须确保您的 Spark/Flink 集群已经集成了 Hadoop。Hadoop 2.x 版本已通过测试。

如果您使用的是 SeaTunnel Engine,它会在您下载和安装 SeaTunnel Engine 时自动集成 Hadoop JAR 包。您可以在 ${SEATUNNEL_HOME}/lib 目录下确认这个 JAR 包是否存在。

主要特性

默认情况下,我们使用 2PC 提交来确保 "仅一次语义"。

选项

名称 类型 必需 默认值 备注
path string -
bucket string -
fs.s3a.endpoint string -
fs.s3a.aws.credentials.provider string com.amazonaws.auth.InstanceProfileCredentialsProvider
access_key string - 仅在 fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider 时使用
access_secret string - 仅在 fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider 时使用
custom_filename boolean false 是否需要自定义文件名
file_name_expression string "${transactionId}" 仅在 custom_filename 为 true 时使用
filename_time_format string "yyyy.MM.dd" 仅在 custom_filename 为 true 时使用
file_format_type string "csv"
field_delimiter string '\001' 仅在 file_format 为 text 时使用
row_delimiter string "\n" 仅在 file_format 为 text 时使用
have_partition boolean false 是否需要处理分区
partition_by array - 仅在 have_partition 为 true 时使用
partition_dir_expression string "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" 仅在 have_partition 为 true 时使用
is_partition_field_write_in_file boolean false 仅在 have_partition 为 true 时使用
sink_columns array 当此参数为空时,将写入所有从 "Transform" 或 "Source" 获取的字段
is_enable_transaction boolean true
batch_size int 1000000
compress_codec string none
common-options object -
max_rows_in_memory int - 仅在 file_format 为 Excel 时使用
sheet_name string Sheet$ 仅在 file_format 为 Excel 时使用

path [string]

目标目录路径是必需的。

bucket [string]

S3 文件系统的bucket地址,例如:s3n://seatunnel-test,如果您使用的是 s3a 协议,此参数应为 s3a://seatunnel-test

fs.s3a.endpoint [string]

fs s3a 端点

fs.s3a.aws.credentials.provider [string]

认证 s3a 的方式。目前我们仅支持 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvidercom.amazonaws.auth.InstanceProfileCredentialsProvider

关于凭证提供程序的更多信息,您可以参考 Hadoop AWS 文档

access_key [string]

S3 文件系统的访问密钥。如果未设置此参数,请确认凭证提供程序链可以正确验证,可参考 hadoop-aws。

access_secret [string]

S3 文件系统的访问密钥。如果未设置此参数,请确认凭证提供程序链可以正确验证,可参考 hadoop-aws。

hadoop_s3_properties [map]

如果需要添加其他选项,可以在这里添加并参考此 链接

hadoop_s3_properties {
      "fs.s3a.buffer.dir" = "/data/st_test/s3a"
      "fs.s3a.fast.upload.buffer" = "disk"
   }

custom_filename [boolean]

是否自定义文件名。

file_name_expression [string]

仅在 custom_filenametrue 时使用

file_name_expression 描述了将创建到 path 中的文件表达式。我们可以在 file_name_expression 中添加变量 ${now} ${uuid},例如 test_${uuid}_${now}
${now} 代表当前时间,其格式可以通过指定选项 filename_time_format 来定义。

请注意,如果 is_enable_transactiontrue,我们将在文件名的开头自动添加${transactionId}_

filename_time_format [string]

仅在 custom_filenametrue 时使用

file_name_expression 参数中的格式为 xxxx-${now} 时,filename_time_format 可以指定路径的时间格式,默认值为 yyyy.MM.dd。常用的时间格式列于下表中:

符号 描述
y
M
d 月中的天数
H 一天中的小时 (0-23)
m 小时中的分钟
s 分钟中的秒数

file_format_type [string]

我们支持以下文件类型:

  • 文本 (text)
  • JSON
  • CSV
  • ORC
  • Parquet
  • Excel

请注意,最终文件名将以文件格式的后缀结尾,文本文件的后缀是 txt

field_delimiter [string]

数据行中列之间的分隔符。仅在 file_format 为 text 时需要。

row_delimiter [string]

文件中行之间的分隔符。仅在 file_format 为 text 时需要。

have_partition [boolean]

是否需要处理分区。

partition_by [array]

仅在 have_partitiontrue 时使用。

基于选定字段对分区数据进行分区。

partition_dir_expression [string]

仅在 have_partitiontrue 时使用。

如果指定了 partition_by,我们将根据分区信息生成相应的分区目录,并将最终文件放在分区目录中。

默认的 partition_dir_expression${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/k0 是第一个分区字段,v0 是第一个分区字段的值。

is_partition_field_write_in_file [boolean]

仅在 have_partitiontrue 时使用。

如果 is_partition_field_write_in_filetrue,分区字段及其值将写入数据文件中。

例如,如果您想要写入 Hive 数据文件,其值应为 false

sink_columns [array]

需要写入文件的哪些列,默认值为从 "Transform" 或 "Source" 获取的所有列。
字段的顺序决定了实际写入文件的顺序。

is_enable_transaction [boolean]

如果 is_enable_transaction 为 true,我们将确保在写入目标目录时数据不会丢失或重复。

请注意,如果 is_enable_transactiontrue,我们将在文件头部自动添加 ${transactionId}_

目前仅支持 true

batch_size [int]

文件中的最大行数。对于 SeaTunnel Engine,文件中的行数由 batch_sizecheckpoint.interval 共同决定。如果 checkpoint.interval 的值足够大,当文件中的行数大于 batch_size 时,写入器将写入文件。如果 checkpoint.interval 较小,则在新的检查点触发时,写入器将创建一个新文件。

compress_codec [string]

文件的压缩编解码器及其支持的详细信息如下:

  • txt: lzo none
  • JSON: lzo none
  • CSV: lzo none
  • ORC: lzo snappy lz4 zlib none
  • Parquet: lzo snappy lz4 gzip brotli zstd none

提示:Excel 类型不支持任何压缩格式。

常见选项

请参考 Sink Common Options 获取 Sink 插件的常见参数详细信息。

max_rows_in_memory [int]

当文件格式为 Excel 时,可以缓存在内存中的数据项的最大数量。

sheet_name [string]

工作簿的工作表名称。

示例

对于文本文件格式,具有 have_partitioncustom_filenamesink_columnscom.amazonaws.auth.InstanceProfileCredentialsProvider 的配置示例:

  S3File {
    bucket = "s3a://seatunnel-test"
    tmp_path = "/tmp/seatunnel"
    path="/seatunnel/text"
    fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
    fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
    file_format_type = "text"
    field_delimiter = "\t"
    row_delimiter = "\n"
    have_partition = true
    partition_by = ["age"]
    partition_dir_expression = "${k0}=${v0}"
    is_partition_field_write_in_file = true
    custom_filename = true
    file_name_expression = "${transactionId}_${now}"
    filename_time_format = "yyyy.MM.dd"
    sink_columns = ["name","age"]
    is_enable_transaction=true
    hadoop_s3_properties {
      "fs.s3a.buffer.dir" = "/data/st_test/s3a"
      "fs.s3a.fast.upload.buffer" = "disk"
    }
  }

对于 Parquet 文件格式,仅需用 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider进行配置:

  S3File {
    bucket = "s3a://seatunnel-test"
    tmp_path = "/tmp/seatunnel"
    path="/seatunnel/parquet"
    fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
    fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
    access_key = "xxxxxxxxxxxxxxxxx"
    secret_key = "xxxxxxxxxxxxxxxxx"
    file_format_type = "parquet"
    hadoop_s3_properties {
      "fs.s3a.buffer.dir" = "/data/st_test/s3a"
      "fs.s3a.fast.upload.buffer" = "disk"
    }
  }

对于 orc 文件仅需配置 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider

  S3File {
    bucket = "s3a://seatunnel-test"
    tmp_path = "/tmp/seatunnel"
    path="/seatunnel/orc"
    fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
    fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
    access_key = "xxxxxxxxxxxxxxxxx"
    secret_key = "xxxxxxxxxxxxxxxxx"
    file_format_type = "orc"
  }

更新日志

2.3.0-beta 2022-10-20

  • 添加 S3File Sink 连接器

2.3.0 2022-12-30

  • Bug修复
    • 修复了以下导致数据写入文件失败的错误:
      • 当上游字段为空时会抛出 NullPointerException
      • Sink 列映射失败
      • 从状态中恢复写入器时直接获取事务失败 (3258)
  • 功能
    • 支持 S3A 协议 (3632)
      • 允许用户添加额外的 Hadoop-S3 参数
      • 允许使用 S3A 协议
      • 解耦 Hadoop-AWS 依赖
    • 支持设置每个文件的批处理大小 (3625)
    • 设置 S3 AK 为可选项 (3688)

下一版本

  • [优化]支持文件压缩(3699)

本文由 白鲸开源 提供发布支持!文章来源地址https://www.toymoban.com/news/detail-710000.html

到了这里,关于必看!S3File Sink Connector 使用文档的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • AWS 的cloudfront是如何实现S3桶的file缓存的

    CloudFront介绍: CloudFront安全性: 下图说明了请求和响应如何流经 CloudFront 边缘站点和区域边缘缓存。 经过上面的介绍大家大概也了解了CloudFront,其实就是我们平时所说的DNS,那么如何使用CloudFront来缓存我们WEB页面的内容呢?接下来我们就开始带大家进行构建一下。 1. 创建

    2024年02月05日
    浏览(32)
  • 【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(1) - File、Socket、console

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月01日
    浏览(38)
  • 使用Flink MySQL cdc分别sink到ES、Kafka、Hudi

    [flink-1.13.1-bin-scala_2.11.tgz](https://archive.apache.org/dist/flink/flink-1.13.1/flink-1.13.1-bin-scala_2.11.tgz) [hadoop-2.7.3.tar.gz](https://archive.apache.org/dist/hadoop/common/hadoop-2.7.3/hadoop-2.7.3.tar.gz) [flink-cdc-connectors](https://github.com/ververica/flink-cdc-connectors)(git clone源码编译) [hudi](https://github.com/apache/hudi)(git

    2024年02月03日
    浏览(38)
  • 0基础学习PyFlink——使用PyFlink的Sink将结果输出到外部系统

    在 《0基础学习PyFlink——使用PyFlink的SQL进行字数统计》一文中,我们直接执行了Select查询操作,在终端中直接看到了查询结果。 在生产环境,我们往往要将计算结果保存到外部系统中,比如Mysql等。这个时候我们就要使用Sink。 Sink用于将Reduce结果输出到外部系统。它也是通过

    2024年02月08日
    浏览(30)
  • Flink SQL Hive Connector使用场景

    目录 1.介绍 2.使用 2.1注册HiveCatalog 2.2Hive Read 2.2.1流读关键配置 2.2.2示例

    2024年02月06日
    浏览(32)
  • 【Flink-Kafka-To-RocketMQ】使用 Flink 自定义 Sink 消费 Kafka 数据写入 RocketMQ

    这里的 maven 依赖比较冗余,推荐大家都加上,后面陆续优化。 注意: 1、此程序中所有的相关配置都是通过 Mysql 读取的(生产环境中没有直接写死的,都是通过配置文件动态配置),大家实际测试过程中可以将相关配置信息写死。 2、此程序中 Kafka 涉及到了 Kerberos 认证操作

    2024年02月03日
    浏览(45)
  • 使用 SPL 高效实现 Flink SLS Connector 下推

    作者:潘伟龙(豁朗) 日志服务 SLS 是云原生观测与分析平台,为 Log、Metric、Trace 等数据提供大规模、低成本、实时的平台化服务,基于日志服务的便捷的数据接入能力,可以将系统日志、业务日志等接入 SLS 进行存储、分析;阿里云 Flink 是阿里云基于 Apache Flink 构建的大数

    2024年03月19日
    浏览(27)
  • 文档控件DevExpress Office File API v23.1新版亮点 - 支持.NET MAUI

    DevExpress Office File API是一个专为C#, VB.NET 和 ASP.NET等开发人员提供的非可视化.NET库。有了这个库,不用安装Microsoft Office,就可以完全自动处理Excel、Word等文档。开发人员使用一个非常易于操作的API就可以生成XLS, XLSx, DOC, DOCx, RTF, CSV 和 Snap Report等企业级文件。 DevExpress Office Fi

    2024年02月13日
    浏览(30)
  • Enterprise:使用 MySQL connector 同步 MySQL 数据到 Elasticsearch

    Elastic MySQL 连接器是 MySQL 数据源的连接器。它可以帮我们把 MySQL 里的数据同步到 Elasticsearch 中去。在今天的文章里,我来详细地描述如何一步一步地实现。 在下面的展示中,我将使用 Elastic Stack 8.8.2 来进行展示。 无缝集成:将 Elasticsearch 连接到 MongoDB Enterprise:使用 MySQL c

    2024年02月16日
    浏览(34)
  • s3cmd使用

    s3cmd 是一个用于管理 Amazon S3(Simple Storage Service)的命令行工具。它允许您通过命令行界面执行各种 S3 操作,例如上传文件、下载文件、复制文件、删除文件等。 以下是一些常见的 s3cmd 命令和用法示例: 安装 s3cmd 工具: 配置 s3cmd 工具: 这将引导您进行配置,包括提供 A

    2024年02月16日
    浏览(28)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包