tencent cloud

Feedback

FileSystem

Last updated: 2023-11-08 16:00:41

    Overview

    The FileSystem connector allows for writing to common file systems such as HDFS and Tencent Cloud Object Storage.

    Versions

    Flink Version
    Description
    1.11
    Supported
    1.13
    Supported (supports common compression algorithms such as LZO and Snappy)
    1.14
    Supports write to HDFS (does not support the LZO and Snappy compression algorithms)
    1.16
    Supports write to HDFS (does not support the LZO and Snappy compression algorithms)

    Limits

    The FileSystem connector can be used as a data sink for append-only data streams. It cannot be used as a sink for upsert data currently. The following data formats are supported:
    CSV
    JSON
    Avro
    Parquet
    ORC
    Note
    To write data in Avro, Parquet, or ORC format, you need to manually upload a JAR package.

    Defining a table in DDL

    As a sink

    CREATE TABLE `hdfs_sink_table` (
    `id` INT,
    `name` STRING,
    `part1` INT,
    `part2` INT
    ) PARTITIONED BY (part1, part2) WITH (
    'connector' = 'filesystem',
    'path' = 'hdfs://HDFS10000/data/', -- cosn://${buketName}/path/to/store/data
    'format' = 'json',
    'sink.rolling-policy.file-size' = '1M',
    'sink.rolling-policy.rollover-interval' = '10 min',
    'sink.partition-commit.delay' = '1 s',
    'sink.partition-commit.policy.kind' = 'success-file'
    );

    WITH parameters

    Option
    Required
    Default Value
    Description
    path
    Yes
    -
    The path to which files are written.
    sink.rolling-policy.file-size
    No
    128MB
    The maximum file size. If a file exceeds this size, it will be closed. A new file will be opened and data will be written to this new file.
    sink.rolling-policy.rollover-interval
    No
    30min
    The maximum duration a file can stay open. After this duration elapses, the file will be closed. A new file will be opened and data will be written to the new file.
    sink.rolling-policy.check-interval
    No
    1min
    The file check interval. The FileSystem connector will check whether a file should be closed at this interval.
    sink.partition-commit.trigger
    No
    process-time
    The partition commit trigger. Valid values:
    process-time: A partition will be committed when a certain time elapses after the partition creation time. Here, the creation time is the time when the partition is created.
    partition-time: A partition will be committed when a certain time elapses after the partition creation time. Here, the creation time is extracted from partition values. partition-time requires watermark generation to automatically detect partitions. A partition is committed once watermark passes the time extracted from partition values plus delay.
    sink.partition-commit.delay
    No
    0s
    The time to wait before committing a partition. A partition will not be committed until the specified time elapses after its creation time.
    partition.time-extractor.kind
    No
    default
    The partition time extractor. This option is valid only if sink.partition-commit.trigger is partition-time. If you have your own time extractor, set this to custom.
    partition.time-extractor.class
    No
    -
    The partition time extractor class. This class should implement the PartitionTimeExtractor API.
    partition.time-extractor.timestamp-pattern
    No
    -
    The partition timestamp extraction format. The timestamps should be yyyy-mm-dd hh:mm:ss, with the placeholders replaced with the corresponding partition fields in the Hive table. By default, yyyy-mm-dd hh:mm:ss is extracted from the first field.
    If timestamps are extracted from a single partition field dt, you can set this to $dt.
    If timestamps are extracted from multiple partition fields, such as year, month, day, and hour, you can set this to $year-$month-$day $hour:00:00.
    If timestamps are extracted from two partition fields dt and hour, you can set this to $dt $hour:00:00.
    sink.partition-commit.policy.kind
    Yes
    -
    The partition committing policy. Valid values:
    success-file: When a partition is committed, a _success file will be generated in the partition's directory.
    custom: A custom committing policy.
    sink.partition-commit.policy.class
    No
    -
    The partition committing class. This class should implement PartitionCommitPolicy.

    HDFS configuration

    After creating a data directory in HDFS, you need to grant write access to the directory. With Stream Compute Service, the user writing data to HDFS is Flink. Before the configuration, you need to log in to your EMR cluster to download the hdfs-site.xml file of the Hadoop cluster. The file includes the parameter values needed for the configuration.
    The HDFS path is in the format hdfs://${dfs.nameserivces}/${path}. You can find the value of ${dfs.nameserivces} in hdfs-site.xml. ${path} is the path to which data will be written.
    If the target Hadoop cluster has only one master node, you only need to pass the HDFS path to path. You don't need to set advanced parameters.
    If the target Hadoop cluster is a high-availability cluster with two master nodes, after passing the HDFS path, you also need to specify the addresses and ports of the two master nodes by configuring advanced job parameters. Below is an example. You can find the parameter values in hdfs-site.xml.
    fs.hdfs.dfs.nameservices: HDFS12345
    fs.hdfs.dfs.ha.namenodes.HDFS12345: nn2,nn1
    fs.hdfs.dfs.namenode.http-address.HDFS12345.nn1: 172.27.2.57:4008
    fs.hdfs.dfs.namenode.https-address.HDFS12345.nn1: 172.27.2.57:4009
    fs.hdfs.dfs.namenode.rpc-address.HDFS12345.nn1: 172.27.2.57:4007
    fs.hdfs.dfs.namenode.http-address.HDFS12345.nn2: 172.27.1.218:4008
    fs.hdfs.dfs.namenode.https-address.HDFS12345.nn2: 172.27.1.218:4009
    fs.hdfs.dfs.namenode.rpc-address.HDFS12345.nn2: 172.27.1.218:4007
    fs.hdfs.dfs.client.failover.proxy.provider.HDFS12345: org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
    Note
    By default, Flink jobs access HDFS via Flink. If the Flink user does not have permission to write to HDFS, you can use advanced job parameters to set the accessing user to a user that has write permission or to the super-user hadoop.
    containerized.taskmanager.env.HADOOP_USER_NAME: hadoop
    containerized.master.env.HADOOP_USER_NAME: hadoop

    COS configuration

    Note: If you write data to COS, the job must run in the same region as the COS bucket.
    Use advanced job parameters to specify the COS region. With Stream Compute Service, the user writing data to COS is Flink. You need to configure the following parameters. For the values of regions, see COS - Regions and Access Endpoints.
    fs.AbstractFileSystem.cosn.impl: org.apache.hadoop.fs.CosN
    fs.cosn.impl: org.apache.hadoop.fs.CosFileSystem
    fs.cosn.credentials.provider: org.apache.flink.fs.cos.OceanusCOSCredentialsProvider
    fs.cosn.bucket.region: The region of the COS bucket.
    fs.cosn.userinfo.appid: The AppID of the account that owns the COS bucket.
    For JAR jobs, to write data to COS, you need to select the built-in connector flink-connector-cos.
    Note
    If you use Flink 1.16, you can skip this step because flink-connector-cos is built into the image.

    Metadata acceleration-enabled COS buckets and Tencent Cloud HDFS

    1. Access Authorizations. Go to Compute resources > Cluster info > More and select Authorizations.
    2. Grant permission to a metadata acceleration-enabled COS bucket or CHDFS.
    3. Download dependencies.
    On the Dependencies page of the Stream Compute Service console, upload the JAR package you downloaded. For details, see Managing Dependencies.
    Metadata acceleration-enabled COS buckets
    Corresponding Flink connectors
    CHDFS
    Metadata acceleration-enabled COS buckets
    fs.cosn.trsf.fs.AbstractFileSystem.ofs.impl: com.qcloud.chdfs.fs.CHDFSDelegateFSAdapter
    fs.cosn.trsf.fs.ofs.impl: com.qcloud.chdfs.fs.CHDFSHadoopFileSystemAdapter
    fs.cosn.trsf.fs.ofs.tmp.cache.dir: /tmp/chdfs/
    fs.cosn.trsf.fs.ofs.user.appid: The AppID of the account that owns the COS bucket.
    fs.cosn.trsf.fs.ofs.bucket.region: The region of the COS bucket.
    fs.cosn.trsf.fs.ofs.upload.flush.flag: true
    containerized.taskmanager.env.HADOOP_USER_NAME: hadoop
    containerized.master.env.HADOOP_USER_NAME: hadoop
    
    fs.AbstractFileSystem.cosn.impl: org.apache.hadoop.fs.CosN
    fs.cosn.impl: org.apache.hadoop.fs.CosFileSystem
    fs.cosn.credentials.provider: org.apache.flink.fs.cos.OceanusCOSCredentialsProvider
    fs.cosn.bucket.region: The region of the COS bucket.
    fs.cosn.userinfo.appid: The AppID of the account that owns the COS bucket.
    Note
    Flink 1.16 supports metadata acceleration-enabled buckets by default. You don't need to download JAR packages. Just configure the following two options. The other parameters will be configured automatically.
    containerized.taskmanager.env.HADOOP_USER_NAME: hadoop
    containerized.master.env.HADOOP_USER_NAME: hadoop
    CHDFS
    fs.AbstractFileSystem.ofs.impl: com.qcloud.chdfs.fs.CHDFSDelegateFSAdapter
    fs.ofs.impl: com.qcloud.chdfs.fs.CHDFSHadoopFileSystemAdapter
    fs.ofs.tmp.cache.dir: /tmp/chdfs/
    fs.ofs.upload.flush.flag: true
    fs.ofs.user.appid: The AppID of the account that owns the file system.
    fs.ofs.bucket.region: The region of the file system.
    containerized.taskmanager.env.HADOOP_USER_NAME: hadoop
    containerized.master.env.HADOOP_USER_NAME: hadoop

    Manually uploading a JAR package

    1. Download the required JAR package. Download addresses for different Flink versions: Flink 1.11, Flink 1.13, Flink 1.14
    2. On the Dependencies page of the Stream Compute Service console, upload the JAR package you downloaded. For details, see Managing Dependencies.
    3. Go to the debug page of the job and click Job parameters. Find Referenced package and click Add package. Select the JAR package you upload in step 2, and click Confirm.
    4. Publish the job.

    HDFS Kerberos authentication

    1. Log in to the cluster master node to get the files krb5.conf, emr.keytab, core-site.xml, and hdfs-site.xml in the following paths.
    /etc/krb5.conf
    /var/krb5kdc/emr.keytab
    /usr/local/service/hadoop/etc/hadoop/core-site.xml
    /usr/local/service/hadoop/etc/hadoop/hdfs-site.xml
    2. Package the files into a JAR file.
    jar cvf hdfs-xxx.jar krb5.conf emr.keytab core-site.xml hdfs-site.xml
    3. Check the JAR structure (run the Vim command vim hdfs-xxx.jar). Make sure the JAR file includes the following information and has the correct structure.
    META-INF/
    META-INF/MANIFEST.MF
    emr.keytab
    krb5.conf
    hdfs-site.xml
    core-site.xml
    4. Upload the JAR file to the Dependencies page of the Stream Compute Service console, and reference the package when configuring job parameters.
    5. Get Kerberos principals to configure advanced job parameters.
    klist -kt /var/krb5kdc/emr.keytab
    
    # The output is as follows (use the first): hadoop/172.28.28.51@EMR-OQPO48B9
    KVNO Timestamp Principal
    ---- ------------------- ------------------------------------------------------
    2 08/09/2021 15:34:40 hadoop/172.28.28.51@EMR-OQPO48B9
    2 08/09/2021 15:34:40 HTTP/172.28.28.51@EMR-OQPO48B9
    2 08/09/2021 15:34:40 hadoop/VM-28-51-centos@EMR-OQPO48B9
    2 08/09/2021 15:34:40 HTTP/VM-28-51-centos@EMR-OQPO48B9
    containerized.taskmanager.env.HADOOP_USER_NAME: hadoop
    containerized.master.env.HADOOP_USER_NAME: hadoop
    security.kerberos.login.principal: hadoop/172.28.28.51@EMR-OQPO48B9
    security.kerberos.login.keytab: emr.keytab
    security.kerberos.login.conf: krb5.conf
    If you use Flink 1.13, you need to add the following configurations to advanced parameters. The parameter values should be the same as those in hdfs-site.xml.
    fs.hdfs.dfs.nameservices: HDFS17995
    fs.hdfs.dfs.ha.namenodes.HDFS17995: nn2,nn1
    fs.hdfs.dfs.namenode.http-address.HDFS17995.nn1: 172.28.28.214:4008
    fs.hdfs.dfs.namenode.https-address.HDFS17995.nn1: 172.28.28.214:4009
    fs.hdfs.dfs.namenode.rpc-address.HDFS17995.nn1: 172.28.28.214:4007
    fs.hdfs.dfs.namenode.http-address.HDFS17995.nn2: 172.28.28.224:4008
    fs.hdfs.dfs.namenode.https-address.HDFS17995.nn2: 172.28.28.224:4009
    fs.hdfs.dfs.namenode.rpc-address.HDFS17995.nn2: 172.28.28.224:4007
    fs.hdfs.dfs.client.failover.proxy.provider.HDFS17995: org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
    fs.hdfs.hadoop.security.authentication: kerberos
    Note
    Kerberos authentication is not supported for historical Stream Compute Service clusters. To support the feature, please contact us to upgrade the cluster management service.

    Example

    
    CREATE TABLE datagen_source_table ( id INT, name STRING, part1 INT, part2 INT Block ) WITH ( 'connector' = 'datagen', 'rows-per-second'='1', -- The number of records generated per second. 'fields.part1.min'='1', 'fields.part1.max'='2', 'fields.part2.min'='1', 'fields.part2.max'='2' );
    
    CREATE TABLE hdfs_sink_table ( id INT, name STRING, part1 INT, part2 INT Block ) PARTITIONED BY (part1, part2) WITH ( 'connector' = 'filesystem', 'path' = 'hdfs://HDFS10000/data/', 'format' = 'json', 'sink.rolling-policy.file-size' = '1M', 'sink.rolling-policy.rollover-interval' = '10 min', 'sink.partition-commit.delay' = '1 s', 'sink.partition-commit.policy.kind' = 'success-file' );
    
    INSERT INTO hdfs_sink_table SELECT id, name, part1, part2 FROM datagen_source_table;

    About compressible-fs

    Can only be used with Flink 1.13.
    Supports writing data in the CSV or JSON format. Other formats such as Avro, Parquet, and ORC are built-in with compression capabilities.
    Supports the compression algorithms LzopCodec and OceanusSnappyCodec.
    Supports writing data to HDFS and COS files. The method is the same as that for FileSystem.

    As a sink

    CREATE TABLE `hdfs_sink_table` (
    `id` INT,
    `name` STRING,
    `part1` INT,
    `part2` INT
    ) PARTITIONED BY (part1, part2) WITH (
    'connector' = 'compressible-fs',
    'hadoop.compression.codec' = 'LzopCodec',
    'path' = 'hdfs://HDFS10000/data/',
    'format' = 'json',
    'sink.rolling-policy.file-size' = '1M',
    'sink.rolling-policy.rollover-interval' = '10 min',
    'sink.partition-commit.delay' = '1 s',
    'sink.partition-commit.policy.kind' = 'success-file'
    );

    WITH parameters

    In addition to the above parameters supported by the filesystem connector, compressible-fs also supports the following three parameters:
    Option
    Required
    Default Value
    Description
    hadoop.compression.codec
    No
    -
    The compression algorithm to use. Valid values include LzopCodec and OceanusSnappyCodec. If this is not specified, data will be written in the default file format. OceanusSnappyCodec is used due to Snappy library version restrictions. It works the same as SnappyCodec.
    filename.suffix
    No
    -
    The name of the file to which data is written. If this is not specified, a suffix will be generated according to the compression algorithm used. If neither lzop nor Snappy is used, and this option is not specified, the filename suffix will be empty.
    filepath.contain.partition-key
    No
    false
    Whether to include the partition field in the path when data is written into partition files. The partition field is not included by default. For example, if partitioning is based on the date dt=12 and hour ht=24, the default partition path will be 12/24 instead of dt=12/ht=24.
    
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support