tencent cloud

Feedback

Consumption Principles

Last updated: 2024-04-23 11:35:52

    Prerequisites

    1. Cloud Log Service is activated. Create a log set and log topic, and you have successfully collected log data.
    2. Sub-accounts/Collaborators need root account authorization. For authorization steps, see CAM-Based Permission Management. For copying authorization policy, see CLS Access Policy Templates.

    Consumption Process within a Consumer Group

    When consuming data within a consumer group, the server manages the consumption tasks for all consumers within the group. It automatically balances these tasks based on the correlation between the number of topic partitions and the number of consumers. Moreover, it records the consumption progress for each partition in the topic to guarantee that different consumers can consume data without any duplication. The detailed process of consumption within a consumer group proceeds as follows:
    1. Create a consumer group.
    2. Every consumer periodically sends heartbeats to the server.
    3. The consumer group automatically assigns topic partitions to consumers according to the load balancing situation of the topic partitions.
    4. Consumers retrieve the partition offsets and consume the data according to the list of allocated partitions.
    5. Consumers periodically update their consumption progress for each partition to the consumer group, facilitating the next round of task allocation by the group.
    6. Repeat steps 2 through 6 until consumption is completed.

    Consumption Parameter Description

    Parameter
    Description
    Default Value
    Value Range
    endpoint
    Request Domain, domain name of the API for Log Upload Tag page.
    -
    Supported regions: Beijing, Shanghai, Guangzhou, Nanjing, Hong Kong (China), Tokyo, Eastern United States, Singapore, and Frankfurt.
    access_key_id
    For your Secret_id, go to CAM.
    -
    -
    access_key
    For your Secret_key, go to CAM.
    -
    -
    region
    Topic's region. For example, ap-beijing, ap-guangzhou, ap-shanghai. For more details, see Regions and Access Domains.
    -
    Supported regions: Beijing, Shanghai, Guangzhou, Nanjing, Hong Kong (China), Tokyo, Eastern United States, Singapore, and Frankfurt.
    logset_id
    Logset ID. Only one logset is supported.
    -
    -
    topic_ids
    Log topic ID. For multiple topics, use , to separate.
    -
    -
    consumer_group_name
    Consumer Group Name
    -
    -
    internal
    Private network: TRUE
    Public network: FALSE
    Note:
    For private network/public network read traffic cost, see Product Pricing.
    FALSE
    TRUE/FALSE
    consumer_name
    Consumer name. Within the same consumer group, consumer names must be unique.
    -
    A string consisting of 0-9, aA-zZ, '-', '_', '.'.
    heartbeat_interval
    The interval of heartbeats. If consumers fail to report a heartbeat for two intervals, they will be considered offline.
    20
    0-30 minutes
    data_fetch_interval
    The interval of consumer data pulling. Cannot be less than 1 second.
    2
    -
    offset_start_time
    The start time for data pulling. The string type of unix Timestamp , with second-level precision. For example, 1711607794. It can also be directly configured as "begin" and "end".
    begin: The earliest data within the log topic lifetime.
    end: The latest data within the log topic lifetime.
    "end"
    "begin"/"end"/unix Timestamp
    max_fetch_log_group_size
    The data size for a consumer in a single pulling. Defaults to 2 M and up to 10 M.
    2097152
    2M - 10M
    offset_end_time
    The end time for data pulling. Supports a string-type unix Timestamp , with second-level precision. For example, 1711607794. Not filling this field represents continuous pulling.
    -
    -

    Consumption Demo (Python)

    # -*- coding: utf-8 -*-
    
    import os
    from threading import RLock
    from tencentcloud.log.consumer import *
    from tencentcloud.log.logclient import YunApiLogClient
    
    root = logging.getLogger()
    
    handler = logging.StreamHandler()
    
    handler.setFormatter(logging.Formatter(
    
    fmt='[%(asctime)s] - [%(threadName)s] - {%(module)s:%(funcName)s:%(lineno)d} %(levelname)s - %(message)s',
    
    datefmt='%Y-%m-%d %H:%M:%S'))
    
    root.setLevel(logging.INFO)
    
    root.addHandler(handler)
    
    logger = logging.getLogger(__name__)
    
    class SampleConsumer(ConsumerProcessorBase):
    # Record the Time of the Last Consumption Offset
    last_check_time = 0
    # Record the Consumed Log Data
    log_results = []
    
    lock = RLock()
    
    def initialize(self, topic_id):
    
    self.topic_id = topic_id
    
    def process(self, log_groups, offset_tracker):
    # Access the Consumed Log Data
    for log_group in log_groups:
    
    for log in log_group.logs:
    
    item = dict()
    
    item['time'] = log.time
    
    item['filename'] = log_group.filename
    
    item['source'] = log_group.source
    
    for content in log.contents:
    
    item[content.key] = content.value
    
    with SampleConsumer.lock:
    
    SampleConsumer.log_results.append(item)
    # Submit Offset.
    current_time = time.time()
    
    if current_time - self.last_check_time > 3:
    
    try:
    
    self.last_check_time = current_time
    
    offset_tracker.save_offset(True)
    
    except Exception:
    
    import traceback
    
    traceback.print_exc()
    
    else:
    
    try:
    
    offset_tracker.save_offset(False)
    
    except Exception:
    
    import traceback
    
    traceback.print_exc()
    
    return None
    
    def shutdown(self, offset_tracker):
    
    try:
    
    offset_tracker.save_offset(True)
    
    except Exception:
    
    import traceback
    
    traceback.print_exc()
    
    
    def sleep_until(seconds, exit_condition=None, expect_error=False):
    
    if not exit_condition:
    
    time.sleep(seconds)
    
    return
    
    s = time.time()
    
    while time.time() - s < seconds:
    
    try:
    
    if exit_condition():
    
    break
    
    except Exception:
    
    if expect_error:
    
    continue
    
    time.sleep(1)
    # Create Consumer Group
    def sample_consumer_group():
    
    # load options from envs
    # Request Domain Name
    endpoint = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_ENDPOINT', '')
    # User's Secret ID
    access_key_id = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_ACCESSID', '')
    # User's Secret Key
    access_key = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_ACCESSKEY', '')
    # Log Set ID
    logset_id = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_LOGSET_ID', '')
    # List of Log Topic IDs (Supporting Multiple)
    topic_ids = ['topic_id_1','topic_id_2']
    # Consumption Group (Unique Consumption Group Names Under The Same Log Set)
    consumer_group = 'consumer-group-1'
    # Consumer 1
    consumer_name1 = "consumer-group-1-A"
    # Consumer 2
    consumer_name2 = "consumer-group-1-B"
    # Region
    region = "ap-guangzhou"
    
    assert endpoint and access_key_id and access_key and logset_id, ValueError("endpoint/access_id/access_key and logset_id cannot be empty")
    # Create Client for Accessing Cloud APIs
    client = YunApiLogClient(access_key_id, access_key, region=region)
    # Initialize Consumption Result List
    SampleConsumer.log_results = []
    
    try:
    
    # Create Two Consumer Configurations
    option1 = LogHubConfig(endpoint, access_key_id, access_key, region, logset_id, topic_ids, consumer_group,
    
    consumer_name1, heartbeat_interval=3, data_fetch_interval=1,
    
    offset_start_time="end", max_fetch_log_group_size=10485760)
    
    option2 = LogHubConfig(endpoint, access_key_id, access_key, region, logset_id, topic_ids, consumer_group,
    
    consumer_name2, heartbeat_interval=3, data_fetch_interval=1,
    
    offset_start_time="end", max_fetch_log_group_size=10485760)
    
    print("*** start to consume data...")
    # Consumer 1
    client_worker1 = ConsumerWorker(SampleConsumer, consumer_option=option1)
    # Start the Consumer 1
    client_worker1.start()
    
    client_worker2 = ConsumerWorker(SampleConsumer, consumer_option=option2)
    
    client_worker2.start()
    sleep_until(120, lambda: len(SampleConsumer.log_results) > 0)
    
    print("*** consumer group status ***")
    # Print Consumer Group Information: Name of the Consumption Group, Log Topic Consumed, and Consumer Heartbeat Timeout
    ret = client.list_consumer_group(logset_id, topic_ids)
    ret.log_print()
    
    print("*** stopping workers")
    # Close the Consumer
    client_worker1.shutdown()
    
    client_worker2.shutdown()
    
    print("*** delete consumer group")
    # Delete Consumer Group
    client.delete_consumer_group(logset_id, consumer_group)
    
    except Exception as e:
    
    raise e
    # Print Consumed Log Data
    ret = str(SampleConsumer.log_results)
    
    print("*** get content:")
    
    print(ret)
    
    if __name__ == '__main__':
    
    sample_consumer_group()
    
    
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support