tencent cloud

Feedback

Advanced Job Parameters

Last updated: 2023-12-26 17:49:27

    Overview

    You can set more custom Flink parameters in Job parameters > Advanced parameters ‍to tune a job. For example, you can set the job restart policy, adjust the mini-match settings of SQL, disable async checkpointing, set minimum checkpoint interval, and adjust the cache size of RocksDB StateBackend.
    Custom advanced parameters must comply with the YAML syntax, configured in the "key: value" format (note there is a space following the colon). After job parameters are modified, you need to re-publish and start the job to apply new parameters. For details of parameters in Flink v1.11, see Configuration.

    Example

    Setting the state backend of a job

    RocksDB state backend is used by default in Stream Compute Service. It allows access of a larger state, but its throughput and performance are inferior to those of the memory-based FileSystem state backend.
    If your job state is small, and you require low latency and high throughput, change the state backend to the FileSystem state backend using the following statement:
    state.backend: filesystem

    Setting job restart policy and threshold

    By default, a Flink job can be internally restarted (hot restart when the JobManager is still active, which takes about 15s) a maximum of five times after crash. If a crash occurs again after the number of restarts reaches the threshold, the JobManager will exit, resulting in a longer cold recovery period of the job (about 3-5 minutes). If checkpointing hasn't been enabled for the job, a lot of its state and data may be lost.
    To adjust the number of internal restarts allowed of the job, configure the following parameter (in this example, a maximum of 100 internal restarts are allowed. Set the parameter with caution):
    restart-strategy: fixed-delay
    restart-strategy.fixed-delay.attempts: 100
    restart-strategy.fixed-delay.delay: 5 s

    Setting the JVM overhead percentage

    The percentage of JVM overhead defaults to 10% in Flink. When the RocksDB state backend is used, it requires a larger memory in this area, which may cause overuse and JVM to be killed by the container management system. To minimize this situation and keep the job using the RocksDB state backend more stable, we recommend you increase the value of this parameter as appropriate.
    Note
    Increasing the value of this parameter will lower the percentage of available heap memory in JVM, making the job more prone to the out-of-memory (OOM) error in the heap. Please proceed with caution.
    taskmanager.memory.jvm-overhead.fraction: 0.3

    Setting the checkpoint policy to at-least-once

    The default checkpoint policy is exactly-once in Stream Compute Service. This policy can ensure exact state consistency after a crashed job is recovered, but it may sometimes cause a high latency.
    If a part of duplicate data is allowed to be used in computing (resulting in inaccurate results for a short period of time) when the crashed job is recovered, you can change the Flink checkpoint policy to at-least-once for better checkpoint performance, especially when the state is huge and multiple streams have different rates.
    execution.checkpointing.mode: AT_LEAST_ONCE

    Disabling operator chaining

    By default, operators with the same parallelism are chained together if possible in the execution graph in Flink to avoid additional serialization or deserialization of data transferred between upstream and downstream operators. If you want to view the data inflow and outflow of each operator to facilitate troubleshooting, disable this operator chaining feature.
    Note
    Disabling this feature may cause the running efficiency of the job to decline greatly. Please proceed with caution.
    pipeline.operator-chaining: false

    Setting the checkpoint timeout of a job

    The checkpoint timeout defaults to 20 minutes (1,200s) in Stream Compute Service.
    If your job state is large, you can set a longer timeout with the following parameter:
    execution.checkpointing.timeout: 3000s
    You can also reduce the timeout:
    execution.checkpointing.timeout: 1000s
    You also need to add the following statement on the editing page of a SQL job, with the value set to the configured timeout. For details, see Flink Configuration Options.
    set CHECKPOINT_TIMEOUT= '1000 s';

    Setting the checkpoint storage policy

    In Stream Compute Service, three checkpoint storage policies are available to Flink jobs: DELETE_ON_CANCELLATION, RETAIN_ON_CANCELLATION, and RETAIN_ON_SUCCESS. The default policy is DELETE_ON_CANCELLATION. If this parameter is not set, the default policy will be used.
    The following table compares these policies.
    Checkpoint Storage Policy
    Checkpoint Clearing
    DELETE_ON_CANCELLATION (default)
    1. Create a checkpoint when the job is stopped, and delete the old checkpoint (so cannot recover the job from the old one)
    2. Create no checkpoint when the job is stopped, and delete the old checkpoint (so cannot recover the job from checkpoint)
    RETAIN_ON_CANCELLATION
    1. Create a checkpoint when the job is stopped, and delete the old checkpoint (so cannot recover the job from the old one)
    2. Create no checkpoint when the job is stopped, and do not delete the old checkpoint (so can recover the job from checkpoint)
    RETAIN_ON_SUCCESS
    1. Create a checkpoint when the job is stopped, and do not delete the old checkpoint (so can recover the job from checkpoint)
    2. Create no checkpoint when the job is stopped, and do not delete the old checkpoint (so can recover the job from checkpoint)
    You can set the checkpoint storage policy of a job in Job parameters ‍> Advanced parameters. After the setting, you need to restart the job to apply the policy.
    execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_SUCCESS
    Note
    For a JAR or Python Flink job, you are not advised to explicitly set the checkpoint storage policy in the JAR package, because the settings there will overwrite those in advanced parameters.

    Setting more options

    Flink provides many other options. For a complete list, see the Flink documentation.
    Note
    Not all options are supported in Stream Compute Service. Before making any adjustment, please carefully read the following use limits and fully understand the relevant issues and risks to avoid unstable job running, failure to start a job, or other events due to inappropriate parameter adjustments.

    Use limits

    The following parameters are set by the Stream Compute Service system and cannot be modified. Please do not pass them in through advanced parameters.
    Non-customizable Parameters
    kubernetes.container.image
    kubernetes.jobmanager.cpu
    taskmanager.cpu.cores
    kubernetes.taskmanager.cpu
    jobmanager.heap.size
    jobmanager.heap.mb
    jobmanager.memory.process.size
    taskmanager.heap.size
    taskmanager.heap.mb
    taskmanager.memory.process.size
    taskmanager.numberOfTaskSlots
    env.java.opts (you can customize another two separate parameters: env.java.opts.taskmanager and env.java.opts.jobmanager)
    
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support