tencent cloud

文档反馈

使用流计算 Oceanus 接入 COS

最后更新时间:2022-05-31 15:42:01

    Oceanus 简介

    流计算 Oceanus 是大数据生态体系的实时化分析利器。只需几分钟,您就可以轻松构建网站点击流分析、电商精准推荐、物联网 IoT 等应用。流计算基于 Apache Flink 构建,提供全托管的云上服务,您无须关注基础设施的运维,并能便捷对接云上数据源,获得完善的配套支持。

    流计算 Oceanus 提供了便捷的控制台环境,方便用户编写 SQL 分析语句或者上传运行自定义 JAR 包,支持作业运维管理。基于 Flink 技术,流计算可以在 PB 级数据集上支持亚秒级的处理延时。

    目前 Oceanus 使用的是独享集群模式,用户可以在自己的集群中运行各类作业,并进行相关资源管理。本文将为您详细介绍如何使用 Oceanus 对接对象存储(Cloud Object Storage,COS)。

    准备工作

    创建 Oceanus 集群

    登录 Oceanus 控制台,创建一个 Oceanus 集群。

    创建 COS 存储桶

    1. 登录 COS 控制台
    2. 在左侧导航栏中,单击存储桶列表
    3. 单击创建存储桶,创建一个存储桶。具体可参见 创建存储桶 文档。
      说明:

      当写入 COS 时,Oceanus 作业所运行的地域必须和 COS 在同一个地域。

    实践步骤

    前往 Oceanus 控制台,创建一个 SQL 作业,集群选择与 COS 在相同地域的集群。

    1. 创建 Source

    CREATE TABLE `random_source` ( 
    f_sequence INT, 
    f_random INT, 
    f_random_str VARCHAR 
    ) WITH ( 
    'connector' = 'datagen', 
    'rows-per-second'='10',                  -- 每秒产生的数据条数
    'fields.f_sequence.kind'='random',       -- 随机数
    'fields.f_sequence.min'='1',             -- 随机数的最小值
    'fields.f_sequence.max'='10',            -- 随机数的最大值
    'fields.f_random.kind'='random',         -- 随机数
    'fields.f_random.min'='1',               -- 随机数的最小值
    'fields.f_random.max'='100',             -- 随机数的最大值
    'fields.f_random_str.length'='10'        -- 随机字符串的长度
    );
    
    说明:

    此处选用内置 connector datagen,请根据实际业务需求选择相应数据源。

    2. 创建 Sink

    -- 请将<存储桶名称>和<文件夹名称>替换成您实际的存储桶名称和文件夹名称
    CREATE TABLE `cos_sink` (
    f_sequence INT, 
    f_random INT, 
    f_random_str VARCHAR
    ) PARTITIONED BY (f_sequence) WITH (
      'connector' = 'filesystem',
      'path'='cosn://<存储桶名称>/<文件夹名称>/',                 --- 数据写入的目录路径
      'format' = 'json',                                       --- 数据写入的格式
      'sink.rolling-policy.file-size' = '128MB',               --- 文件最大的大小
      'sink.rolling-policy.rollover-interval' = '30 min',      --- 文件最大写入时间
      'sink.partition-commit.delay' = '1 s',                   --- 分区提交延迟
      'sink.partition-commit.policy.kind' = 'success-file'     --- 分区提交方式
    );
    
    说明:

    更多 Sink 的 WITH 参数,请参见Filesystem (HDFS/COS)文档。

    3. 业务逻辑

    INSERT INTO `cos_sink`
    SELECT * FROM `random_source`;
    
    注意:

    此处只做展示,无实际业务目的。

    4. 作业参数设置

    内置 Connector选择flink-connector-cos,在高级参数中对 COS 的地址进行如下配置:

    fs.AbstractFileSystem.cosn.impl: org.apache.hadoop.fs.CosN
    fs.cosn.impl: org.apache.hadoop.fs.CosFileSystem
    fs.cosn.credentials.provider: org.apache.flink.fs.cos.OceanusCOSCredentialsProvider
    fs.cosn.bucket.region: <COS 所在地域>
    fs.cosn.userinfo.appid: <COS 所属用户的 appid>
    

    作业配置说明如下:

    • 请将<cos 所在地域="">替换为您实际的 COS 地域,例如:ap-guangzhou。
    • 请将<cos 所属用户的="" appid="">替换为您实际的 APPID,具体请进入 账号中心 查看。
    说明:

    具体的作业参数设置请参见Filesystem (HDFS/COS) 文档。

    5. 启动作业

    依次单击保存 > 语法检查 > 发布草稿,等待 SQL 作业启动后,即可前往相应 COS 目录中查看写入数据。

    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持