tencent cloud

Feedback

Last updated: 2023-11-08 15:55:14

    Overview

    The Pulsar SQL connector allows you to read data from or write data to Pulsar topics using simple SQL queries or Flink Table API.

    Versions

    The Flink Pulsar connector, based on StreamNative/Flink,‌ is supported by Flink 1.13 and 1.14. For information about the DataStream API, see the StreamNative document.‌
    Flink Version
    Description
    Source Code
    1.11
    Unsupported
    -
    1.13
    Supported
    1.14
    Supported
    1.16
    Supported
    

    Limits

    Pulsar can be used as a source or as a sink for tuple and upsert streams.
    It does not support dimension tables.

    Creating a Pulsar table

    The example below shows you how to create a Pulsar table:
    CREATE TABLE PulsarTable (
    `user_id` bigint,
    `item_id` bigint,
    `behavior` STRING,
    `publish_time` TIMESTAMP_LTZ(3) METADATA FROM 'publish_time' VIRTUAL
    ) WITH (
    'connector' = 'pulsar',
    'service-url' = 'pulsar://pulsar:6650',
    'admin-url' = 'http://pulsar:8080',
    -- 'pulsar.client.authPluginClassName' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',
    -- 'pulsar.client.authParams' = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0In0.Cpsuc4hRw3yL231Kp8iQAQkYc2SvVTfeziAMcLxxxxx',
    'topics' = 'user_behavior',
    'format' = 'json',
    'source.subscription-name' = 'flink',
    'source.start.message-id' = 'earliest'
    );
    Note
    If authentication is enabled for your Pulsar cluster, please use a token with admin permissions.

    About tokens

    The Flink Pulsar connector uses Pulsar admin APIs to listen for partition changes and subscription creation, so if token authentication is enabled for your Pulsar cluster, a token with admin permissions is needed. You will also need permission to read from and write to the topic.
    # Checking admin permissions
    admin_url=http://172.28.28.46:8080,172.28.28.29:8080,172.28.28.105:8080
    token=eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJteS1zdXBlci11c2VyIn0.T-Bi__yGCs1lxEwdUcDKBDuJsWLO0X9LrePhnuXXXXX
    namespace=public/default
    pulsar-admin --admin-url ${admin_url} --auth-params token:${token} --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken topics list ${namespace}
    
    # Checking topic read and write permissions
    service_url=pulsar://172.28.28.46:6650,172.28.28.29:6650,172.28.28.105:6650
    token=eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJteS1zdXBlci11c2VyIn0.T-Bi__yGCs1lxEwdUcDKBDuJsWLO0X9LrePhnuXXXXX
    namespace=public/default
    topic=xxx
    subscription=yyy
    pulsar-client --url ${service_url} --auth-params token:${token} --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken consume -s ${subscription} -n 10 persistent://${namespace}/${topic} -p Earliest

    Connector options

    Option
    Required
    Default Value
    Data Type
    Description
    connector
    Yes
    -
    String
    The connector to use. For Apache Pulsar, use 'pulsar' or 'upsert-pulsar'.
    admin-url
    Yes
    -
    String
    The Pulsar admin URL, such as http://my-broker.example.com:8080 or https://my-broker.example.com:8443.
    service-url
    Yes
    -
    String
    The URL of the Pulsar server.
    A Pulsar protocol URL is needed for a Pulsar client to connect to a Pulsar cluster. Example: pulsar://localhost:6650
    URL for multiple brokers: pulsar://localhost:6550,localhost:6651,localhost:6652
    Production clusters are usually accessed via domains, such as pulsar://pulsar.us-west.example.com:6650.
    URL with TLS authentication enabled: pulsar+ssl://pulsar.us-west.example.com:6651
    topics
    Yes
    -
    String
    The names of the Apache Pulsar topics from/to which data is read/written.
    This option can be one topic name or multiple topic names separated by ";", such as topic-1;topic-2.
    You can specify a set of topics or partitions or both, such as topic-a-partition-0;topic-a-partition-2;some-topic2.
    If both a topic and its partition are specified, only the topic will be used. For example, some-topic1;some-topic1-partition-0 is equivalent to some-topic1.
    pulsar.client.authPluginClassName
    No
    -
    String
    The authentication plugin class name. For token authentication, use org.apache.pulsar.client.impl.auth.AuthenticationToken.
    pulsar.client.authParams
    No
    -
    String
    The authentication parameters. The format for token authentication is token:xxxx.
    explicit
    No
    true
    Boolean
    Whether the table is an explicit Flink table, which is used by PulsarCatalog. For details, see the PulsarCatalog introduction below.
    key.fields
    No
    -
    List<String>
    The physical fields in the Flink table corresponding to the key fields in Pulsar messages. Note that these fields are unrelated to primary keys.
    key.format
    No
    -
    String
    The format used to deserialize and serialize the key part of Pulsar messages.
    Valid values include 'csv', 'json', and 'avro'. To learn more, see Formats.
    format
    No
    -
    String
    The format used to deserialize and serialize the value part of Pulsar messages.
    Valid values include 'csv', 'json', and 'avro'. To learn more, see Formats.
    Between format and value.format, you need to specify at least one. If you specify both, format will be applied.
    value.format
    No
    -
    String
    The format used to deserialize and serialize the value part of Pulsar messages.
    Valid values include 'csv', 'json', and 'avro'. To learn more, see Formats.
    Between format and value.format, you need to specify at least one. If you specify both, format will be applied.
    sink.topic-routing-mode
    No
    round-robin
    Enum
    The topic routing policy. Valid values include round-robin and message-key-hash. The default value is round-robin. You can also configure a custom routing policy using the sink.custom-topic-router option.
    sink.custom-topic-router
    No
    -
    String
    The full class name for the custom topic routing policy. If you specify this option, do not set sink.topic-routing-mode.
    sink.message-delay-interval
    No
    0
    Duration
    The delay time for sending messages, such as 10ms, 1s, or 1min. This allows you to delay the consumption of messages. For details, see the Pulsar document Delayed message delivery.
    pulsar.sink.deliveryGuarantee
    No
    none
    Enum
    The message delivery guarantee for the Pulsar sink. Valid values include none, at-least-once, and exactly-once. To use exactly-once, your Pulsar cluster must support transactions.
    pulsar.sink.transactionTimeoutMillis
    No
    10800000
    Long
    The Pulsar transaction timeout period (milliseconds), which must be longer than the checkpoint interval. The default value is 10800000 (3 hours).
    pulsar.producer.batchingEnabled
    No
    false
    Boolean
    Whether to enable batch write.
    pulsar.producer.batchingMaxMessages
    No
    1000
    Int
    The maximum number of Pulsar messages that can be written at a time.
    source.start.message-id
    No
    -
    String
    The start of source consumption. It can be set to earliest, latest, or a specific message ID in the format ledgerId:entryId:partitionId (e.g., "12:2:-1").
    source.start.publish-time
    No
    -
    Long
    The publishing time (Unix timestamp) of the starting message of source consumption.
    source.subscription-name
    No
    flink-sql-connector-pulsar-<RANDOM>
    String
    The Pulsar subscription name. The default value is flink-sql-connector-pulsar-<RANDOM>, where RANDOM is five random letters.
    source.subscription-type
    No
    Exclusive
    Enum
    The Pulsar subscription type. Valid values include Exclusive and Shared. For more information about subscription types, see Subscription types.
    source.stop.at-message-id
    No
    -
    String
    The end of source consumption. It can be set to earliest, latest, or a specific message ID in the format ledgerId:entryId:partitionId (e.g., "12:2:-1").
    source.stop.at-publish-time
    No
    -
    Long
    The publishing time (Unix timestamp) of the ending message of source consumption.
    source.stop.after-message-id
    No
    -
    String
    The ID of the ending message of source consumption in the format ledgerId:entryId:partitionId (e.g., "12:2:-1"). The ending message will be consumed as well.
    pulsar.source.partitionDiscoveryIntervalMs
    No
    30000
    Long
    The interval (milliseconds) at which the Pulsar source checks for new partitions. If this is 0 or a negative value, partition detection will be disabled.
    pulsar.admin.requestRetries
    No
    5
    Int
    The number of retries in case of failure to call Pulsar admin RESTful APIs.
    pulsar.client.*
    No
    -
    -
    An arbitrary Pulsar client parameter.
    pulsar.admin.*
    No
    -
    -
    An arbitrary Pulsar admin parameter.
    pulsar.sink..*
    No
    -
    -
    An arbitrary Pulsar sink parameter.
    pulsar.producer.*
    No
    -
    -
    An arbitrary Pulsar producer API parameter.
    pulsar.source..*
    No
    -
    -
    An arbitrary Pulsar source parameter.
    pulsar.consumer.*
    No
    -
    -
    An arbitrary Pulsar consumer API parameter.

    Available metadata

    Metadata Key
    Data Type
    R/W
    Description
    topic
    STRING NOT NULL
    R
    The topic name of a Pulsar message.
    message_size
    INT NOT NULL
    R
    The Pulsar message size.
    producer_name
    STRING NOT NULL
    R
    The producer name of a Pulsar message.
    message_id
    BYTES NOT NULL
    R
    The ID of a Pulsar message.
    sequenceId
    BIGINT NOT NULL
    R
    The sequence ID of a Pulsar message.
    publish_time
    TIMESTAMP_LTZ(3) NOT NULL
    R
    The publishing time of a Pulsar message.
    event_time
    TIMESTAMP_LTZ(3) NOT NULL
    R/W
    The properties of a Pulsar message.
    properties
    MAP<STRING, STRING> NOT NULL
    R/W
    The event time of a Pulsar message.
    Note
    R/W determines metadata access, which can be read only or write. Read-only columns should be excluded in VIRTUAL and INSERT INTO operations.
    For a list of the fields in Pulsar messages, see the Pulsar document Messages.

    Data type mappings

    Pulsar Schema
    Flink Format
    AVRO
    avro
    JSON
    json
    PROTOBUF
    Not supported yet
    PROTOBUF_NATIVE
    Not supported yet
    AUTO_CONSUME
    Not supported yet
    AUTO_PUBLISH
    Not supported yet
    NONE/BYTES
    raw
    BOOLEAN
    raw
    STRING
    raw
    DOUBLE
    raw
    FLOAT
    raw
    INT8
    raw
    INT16
    raw
    INT32
    raw
    INT64
    raw
    LOCAL_DATE
    Not supported yet
    LOCAL_TIME
    Not supported yet
    LOCAL_DATE_TIME
    Not supported yet

    PulsarCatalog

    PulsarCatalog can store Pulsar clusters as metadata of Flink tables.

    Explicit and native tables

    PulsarCatalog defines two types of tables: explicit tables and native tables.
    Explicit tables are tables created explicitly by the CREATE statement or using a table API. They work similarly to the tables in other SQL connectors. You can create an explicit table and read from or write to the table.
    Native tables are created automatically by PulsarCatalog. PulsarCatalog scans all the non-system topics in a Pulsar cluster and converts each topic into a Flink table. Such tables are not created using the CREATE statement.

    Explicit tables

    PulsarCatalog uses the schemaInfo field in the schema of a topic to store the metadata of an explicit table. For each explicit table, PulsarCatalog creates a place-holding topic. You can specify the tenant of the topic using the catalog-tenant option. The default tenant is __flink_catalog. Your Flink database maps to a namespace with the same name under this tenant. Then a topic named table_<FLINK_TABLE_NAME> is created, whose schema stores the metadata of the Flink table.
    For example, if you create a database testdb and a Flink table users, PulsarCatalog will create a topic table_users in the namespace testdb under the tenant __flink_catalog.
    The topic table_users is a place-holding topic because it doesn't have any producers or consumers. You can use the schema of this topic to store the metadata of the Flink table.
    To get the metadata of a topic, you can use the Pulsar admin command line tool:
    pulsar-admin schemas get persistent://<tenant>/<namespace>/<topic>

    Native tables

    ‌Native tables do not have place-holding topics. PulsarCatalog maps topic schema to Flink table schema. For more information about Pulsar schema, see the Pulsar document Understand schema.
    Pulsar Schema
    Flink Data Type
    Flink Format
    Work
    AVRO
    It is decided by the Avro format.
    avro
    Yes
    JSON
    It is decided by the JSON format.
    json
    Yes
    PROTOBUF
    Not supported yet
    /
    No
    PROTOBUF_NATIVE
    It is decided by the Protobuf definition.
    Not supported yet
    No
    AUTO_CONSUME
    Not supported yet
    /
    No
    AUTO_PUBLISH
    Not supported yet
    /
    No
    NONE/BYTES
    DataTypes.BYTES()
    raw
    Yes
    BOOLEAN
    DataTypes.BOOLEAN()
    raw
    Yes
    LOCAL_DATE
    DataTypes.DATE()
    /
    No
    LOCAL_TIME
    DataTypes.TIME()
    /
    No
    LOCAL_DATE_TIME
    DataTypes.TIMESTAMP(3)
    /
    No
    STRING
    DataTypes.STRING()
    raw
    Yes
    DOUBLE
    DataTypes.DOUBLE()
    raw
    Yes
    FLOAT
    DataTypes.FLOAT()
    raw
    Yes
    INT8
    DataTypes.TINYINT()
    raw
    Yes
    INT16
    DataTypes.SMALLINT()
    raw
    Yes
    INT32
    DataTypes.INT()
    raw
    Yes
    INT64
    DataTypes.BIGINT()
    raw
    Yes
    Note
    Although the Pulsar schema types LOCAL_DATE and LOCAL_TIME have corresponding Flink data types, Flink cannot parse data based on the two schema types, and automatic schema mapping will fail.

    Explicit table versus native table

    With native tables, you can read data from existing Pulsar topics. PulsarCatalog automatically reads the schema of a topic and determines the format to use for decoding/encoding. However, native tables do not support watermark or primary keys. Therefore, with native tables, you cannot perform data aggregation based on time windows. A native table maps tenant/namespace to the Flink database and the topic name to the Flink table name.
    If you want full control of a table, create an explicit table, define the watermark, and specify the metadata fields and custom formats. It's similar to creating a Pulsar table in GenericInMemoryCatalog. You can bind an explicit table to a Pulsar topic. Each Pulsar topic can be bound with multiple Flink tables.

    PulsarCatalog parameters

    Key
    Default
    Type
    Description
    Required
    catalog-admin-url
    "http://localhost:8080&quot;
    String
    The Pulsar admin URL, such as http://my-broker.example.com:8080 or https://my-broker.example.com:8443.
    Yes
    catalog-auth-params
    -
    String
    The authentication parameters for accessing the Pulsar cluster.
    -
    catalog-auth-plugin
    -
    String
    The name of the authentication plugin for accessing the Pulsar cluster.
    -
    catalog-service-url
    "pulsar://localhost:6650"
    String
    The URL of the Pulsar server.
    A Pulsar protocol URL is needed for a Pulsar client to connect to a Pulsar cluster. Example: pulsar://localhost:6650
    URL for multiple brokers: pulsar://localhost:6550,localhost:6651,localhost:6652
    Production clusters are usually accessed via domains, such as pulsar://pulsar.us-west.example.com:6650.
    URL with TLS authentication enabled: pulsar+ssl://pulsar.us-west.example.com:6651
    Yes
    catalog-tenant
    "__flink_catalog"
    String
    The Pulsar tenant that stores table information.
    -
    default-database
    "default_database"
    String
    The default database of PulsarCatalog. If a database with this name does not exist, one will be created automatically.
    -

    PulsarCatalog example

    CREATE CATALOG pulsar WITH (
    'type' = 'pulsar-catalog',
    -- 'catalog-auth-plugin' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',
    -- 'catalog-auth-params' = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJteS1zdXBlci11c2VyIn0.T-Bi__yGCs1lxEwdUcDKBDuJsWLO0X9LrePhnuxxxxx',
    'catalog-admin-url' = '<ADMIN_URL>',
    'catalog-service-url' = '<SERVICE_URL>'
    );

    Full example

    Pulsar source and sink

    The example below shows how to create a Pulsar source and a Pulsar sink that guarantee exactly-once, with a transaction timeout period of 2 minutes (note that the transaction timeout period must be longer than the checkpoint interval).
    CREATE TABLE `pulsar_source` (
    `user_id` bigint,
    `item_id` bigint,
    `behavior` STRING
    ) WITH (
    'connector' = 'pulsar',
    'service-url' = 'pulsar://pulsar:6650',
    'admin-url' = 'http://pulsar:8080',
    -- 'pulsar.client.authPluginClassName' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',
    -- 'pulsar.client.authParams' = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0In0.Cpsuc4hRw3yL231Kp8iQAQkYc2SvVTfeziAMcLxxxxx',
    'topics' = 'topic_source',
    'format' = 'json',
    'source.subscription-name' = 'flink',
    'source.start.message-id' = 'earliest'
    );
    
    CREATE TABLE `pulsar_sink` (
    `user_id` bigint,
    `item_id` bigint,
    `behavior` STRING
    ) WITH (
    'connector' = 'pulsar',
    'service-url' = 'pulsar://pulsar:6650',
    'admin-url' = 'http://pulsar:8080',
    -- 'pulsar.client.authPluginClassName' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',
    -- 'pulsar.client.authParams' = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0In0.Cpsuc4hRw3yL231Kp8iQAQkYc2SvVTfeziAMcLxxxxx',
    'topics' = 'topic_sink',
    'format' = 'json',
    'pulsar.sink.deliveryGuarantee' = 'exactly-once',
    'pulsar.sink.transactionTimeoutMillis' = '120000'
    );
    
    INSERT INTO `pulsar_sink` SELECT * FROM `pulsar_source`;

    PulsarCatalog example

    Explicit table

    CREATE CATALOG `pulsar` WITH (
    'type' = 'pulsar-catalog',
    -- 'catalog-auth-plugin' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',
    -- 'catalog-auth-params' = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJteS1zdXBlci11c2VyIn0.T-Bi__yGCs1lxEwdUcDKBDuJsWLO0X9LrePhnuxxxxx',
    'catalog-admin-url' = 'http://pulsar:8080',
    'catalog-service-url' = 'pulsar://pulsar:6650'
    );
    INSERT INTO `pulsar`.`default_database`.`pulsar_sink` SELECT * FROM `pulsar`.`default_database`.`pulsar_source`;
    The pulsar_source and pulsar_sink tables in the above example are created using the following statements (which can be put in the same SQL job).
    CREATE TABLE IF NOT EXISTS `pulsar`.`default_database`.`pulsar_source` (
    `user_id` bigint,
    `item_id` bigint,
    `behavior` STRING
    ) WITH (
    'connector' = 'pulsar',
    'service-url' = 'pulsar://pulsar:6650',
    'admin-url' = 'http://pulsar:8080',
    -- 'pulsar.client.authPluginClassName' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',
    -- 'pulsar.client.authParams' = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0In0.Cpsuc4hRw3yL231Kp8iQAQkYc2SvVTfeziAMcLxxxxx',
    'topics' = 'topic_source',
    'format' = 'json',
    'source.subscription-name' = 'flink',
    'source.start.message-id' = 'earliest'
    );
    
    CREATE TABLE IF NOT EXISTS `pulsar`.`default_database`.`pulsar_sink` (
    `user_id` bigint,
    `item_id` bigint,
    `behavior` STRING
    ) WITH (
    'connector' = 'pulsar',
    'service-url' = 'pulsar://pulsar:6650',
    'admin-url' = 'http://pulsar:8080',
    -- 'pulsar.client.authPluginClassName' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',
    -- 'pulsar.client.authParams' = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0In0.Cpsuc4hRw3yL231Kp8iQAQkYc2SvVTfeziAMcLxxxxx',
    'topics' = 'topic_sink',
    'format' = 'json',
    'pulsar.sink.deliveryGuarantee' = 'exactly-once',
    'pulsar.sink.transactionTimeoutMillis' = '120000'
    );

    Native table

    1. Prepare a JSON file of the topic schema. Name it "schema.json".
    {
    "schema": "{\\"type\\":\\"record\\",\\"name\\":\\"userBehavior\\",\\"namespace\\":\\"my.example\\",\\"fields\\":[{\\"name\\":\\"user_id\\",\\"type\\":\\"long\\"},{\\"name\\":\\"item_id\\",\\"type\\":\\"long\\"},{\\"name\\":\\"behavior\\",\\"type\\":\\"string\\"}]}",
    "type": "JSON",
    "properties": {}
    }
    2. Use the Pulsar admin command line tool to configure the topic schema.
    # Configure the schema
    bin/pulsar-admin schemas upload -f ./schema.json topic_source
    bin/pulsar-admin schemas upload -f ./schema.json topic_sink
    
    # Check the schema
    bin/pulsar-admin schemas get topic_source
    bin/pulsar-admin schemas get topic_sink
    3. Below is an example of a job. The format of the Flink table's database public/default is tenant/namespace, which is the default Pulsar cluster.
    CREATE CATALOG `pulsar` WITH (
    'type' = 'pulsar-catalog',
    -- 'catalog-auth-plugin' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',
    -- 'catalog-auth-params' = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJteS1zdXBlci11c2VyIn0.T-Bi__yGCs1lxEwdUcDKBDuJsWLO0X9LrePhnuxxxxx',
    'catalog-admin-url' = 'http://pulsar:8080',
    'catalog-service-url' = 'pulsar://pulsar:6650'
    );
    
    INSERT INTO `pulsar`.`public/default`.`topic_sink` SELECT * FROM `pulsar`.`public/default`.`topic_source`;

    FAQs

    What should I do if an error occurs saying that I haven't enabled transactions?

    java.lang.NullPointerException: You haven't enable transaction in Pulsar client.
    To enable transactions, see How to use transactions?.

    Messages are written to a Pulsar sink with exactly-once guaranteed by default. After the job restarts due to an error, the topic data cannot be consumed. Why?

    Cause: This may be because you have uncommitted transactions before the job restarts, and the OPEN transactions blocked operations to read data written to the topic after the restart. You can use pulsar-admin transactions slow-transactions -t 1s to view transactions in OPEN state. After OPEN transactions are committed or reverted, you will be able to read data written to the topic after the restart. Suggestion: Configure an appropriate transaction timeout period using WITH parameters (the default Pulsar transaction timeout is 3 hours). For example, you can use 'pulsar.sink.transactionTimeoutMillis' = '120000' to set the timeout period to 2 minutes. Note that the transaction timeout period must be larger than the checkpoint interval.

    Why does the Pulsar source fail to restore data from a checkpoint in the batch messaging scenario?

    If the error java.lang.IllegalArgumentException: We only support normal message id currently occurs, it's because batch write is enabled for Pulsar write operations. Currently, Pulsar sources do not support restoring batch-write messages. With Stream Compute Service, batch write is disabled by default for Pulsar sinks.
    Caused by: java.lang.IllegalArgumentException: We only support normal message id currently.

    What is the relationship between the start of Pulsar source consumption and subscriptions?

    If a topic does not have a subscription, a subscription will be created based on the ID of the starting message of source consumption.
    If the topic has a subscription, the start of source consumption will be ignored.
    If data is not restored from a checkpoint, consumption will start from the subscription cursor. If data is restored from a checkpoint, consumption will start from the message following the message ID recorded by the checkpoint. This is achieved by resetting the subscription cursor (for details, see PulsarOrderedPartitionSplitReader#beforeCreatingConsumer).

    Why can't I use the NonDurable subscription mode for the Pulsar source?

    PulsarSourceEnumerator#createSubscription creates a Durable subscription first.
    If PulsarPartitionSplitReaderBase#createPulsarConsumer then consumes data in the NonDurable mode, the error Durable subscription with the same name already exists will occur.
    Option
    Required
    Default Value
    Data Type
    Description
    pulsar.consumer.subscriptionMode
    No
    Durable
    Enum
    The Pulsar subscription mode. Valid values include Durable and NonDurable. In the Durable mode, the cursor is durable, which retains messages and persists the current position. If a broker restarts from a failure, it can recover the cursor from the persistent storage (bookie), so that messages can continue to be consumed from the last consumed position. In the NonDurable mode, once a broker stops, the cursor is lost and can never be recovered, so messages cannot continue to be consumed from the last consumed position. To learn more, see Subscription modes.

    About MessageId

    See Message Storage and ID Generation Rules. Messages IDs can be compared, for example, 174:1:0 > 174:1:-1.

    Why does the Pulsar source fail to consume data according to publish-time?

    Cause: If the broker connected does not provide namespace information of the topic, the RESTful API getting the message ID according to publish-time will return HTTP 307 Temporary Redirect, and the Pulsar client API used in the Flink connector will return HTTP 500 Server Error. The job will fail to start. You can use the RESTful API get-message-by-id to view the error.
    ## 1662480195714 is a publishing time accurate to the millisecond.
    curl http://${adminUrl}:8080/admin/v2/persistent/public/default/${topic}/messageid/1662480195714
    Note
    Because the RESTful API querying message IDs according to publish-time is not used, you can specify source.stop.at-publish-time.
    Suggestion: You can try increasing pulsar.admin.requestRetries (the number of retries for RESTful APIs, which is 5 by default) to avoid this issue.

    Why doesn't the job stop at the configured ending position for Pulsar source consumption?

    Solution: Disable automatic partition detection using 'pulsar.source.partitionDiscoveryIntervalMs' ='0'.
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support