Parameter | Description | Default Value | Value Range |
endpoint | - | Supported regions: Beijing, Shanghai, Guangzhou, Nanjing, Hong Kong (China), Tokyo, Eastern United States, Singapore, and Frankfurt. | |
access_key_id | - | - | |
access_key | - | - | |
region | Topic's region. For example, ap-beijing, ap-guangzhou, ap-shanghai. For more details, see Regions and Access Domains. | - | Supported regions: Beijing, Shanghai, Guangzhou, Nanjing, Hong Kong (China), Tokyo, Eastern United States, Singapore, and Frankfurt. |
logset_id | Logset ID. Only one logset is supported. | - | - |
topic_ids | Log topic ID. For multiple topics, use , to separate. | - | - |
consumer_group_name | Consumer Group Name | - | - |
internal | Private network: TRUE Public network: FALSE Note: | FALSE | TRUE/FALSE |
consumer_name | Consumer name. Within the same consumer group, consumer names must be unique. | - | A string consisting of 0-9, aA-zZ, '-', '_', '.'. |
heartbeat_interval | The interval of heartbeats. If consumers fail to report a heartbeat for two intervals, they will be considered offline. | 20 | 0-30 minutes |
data_fetch_interval | The interval of consumer data pulling. Cannot be less than 1 second. | 2 | - |
offset_start_time | The start time for data pulling. The string type of unix Timestamp , with second-level precision. For example, 1711607794. It can also be directly configured as "begin" and "end". begin: The earliest data within the log topic lifetime. end: The latest data within the log topic lifetime. | "end" | "begin"/"end"/unix Timestamp |
max_fetch_log_group_size | The data size for a consumer in a single pulling. Defaults to 2 M and up to 10 M. | 2097152 | 2M - 10M |
offset_end_time | The end time for data pulling. Supports a string-type unix Timestamp , with second-level precision. For example, 1711607794. Not filling this field represents continuous pulling. | - | - |
# -*- coding: utf-8 -*-import osfrom threading import RLockfrom tencentcloud.log.consumer import *from tencentcloud.log.logclient import YunApiLogClientroot = logging.getLogger()handler = logging.StreamHandler()handler.setFormatter(logging.Formatter(fmt='[%(asctime)s] - [%(threadName)s] - {%(module)s:%(funcName)s:%(lineno)d} %(levelname)s - %(message)s',datefmt='%Y-%m-%d %H:%M:%S'))root.setLevel(logging.INFO)root.addHandler(handler)logger = logging.getLogger(__name__)class SampleConsumer(ConsumerProcessorBase):# Record the Time of the Last Consumption Offsetlast_check_time = 0# Record the Consumed Log Datalog_results = []lock = RLock()def initialize(self, topic_id):self.topic_id = topic_iddef process(self, log_groups, offset_tracker):# Access the Consumed Log Datafor log_group in log_groups:for log in log_group.logs:item = dict()item['time'] = log.timeitem['filename'] = log_group.filenameitem['source'] = log_group.sourcefor content in log.contents:item[content.key] = content.valuewith SampleConsumer.lock:SampleConsumer.log_results.append(item)# Submit Offset.current_time = time.time()if current_time - self.last_check_time > 3:try:self.last_check_time = current_timeoffset_tracker.save_offset(True)except Exception:import tracebacktraceback.print_exc()else:try:offset_tracker.save_offset(False)except Exception:import tracebacktraceback.print_exc()return Nonedef shutdown(self, offset_tracker):try:offset_tracker.save_offset(True)except Exception:import tracebacktraceback.print_exc()def sleep_until(seconds, exit_condition=None, expect_error=False):if not exit_condition:time.sleep(seconds)returns = time.time()while time.time() - s < seconds:try:if exit_condition():breakexcept Exception:if expect_error:continuetime.sleep(1)# Create Consumer Groupdef sample_consumer_group():# load options from envs# Request Domain Nameendpoint = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_ENDPOINT', '')# User's Secret IDaccess_key_id = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_ACCESSID', '')# User's Secret Keyaccess_key = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_ACCESSKEY', '')# Log Set IDlogset_id = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_LOGSET_ID', '')# List of Log Topic IDs (Supporting Multiple)topic_ids = ['topic_id_1','topic_id_2']# Consumption Group (Unique Consumption Group Names Under The Same Log Set)consumer_group = 'consumer-group-1'# Consumer 1consumer_name1 = "consumer-group-1-A"# Consumer 2consumer_name2 = "consumer-group-1-B"# Regionregion = "ap-guangzhou"assert endpoint and access_key_id and access_key and logset_id, ValueError("endpoint/access_id/access_key and logset_id cannot be empty")# Create Client for Accessing Cloud APIsclient = YunApiLogClient(access_key_id, access_key, region=region)# Initialize Consumption Result ListSampleConsumer.log_results = []try:# Create Two Consumer Configurationsoption1 = LogHubConfig(endpoint, access_key_id, access_key, region, logset_id, topic_ids, consumer_group,consumer_name1, heartbeat_interval=3, data_fetch_interval=1,offset_start_time="end", max_fetch_log_group_size=10485760)option2 = LogHubConfig(endpoint, access_key_id, access_key, region, logset_id, topic_ids, consumer_group,consumer_name2, heartbeat_interval=3, data_fetch_interval=1,offset_start_time="end", max_fetch_log_group_size=10485760)print("*** start to consume data...")# Consumer 1client_worker1 = ConsumerWorker(SampleConsumer, consumer_option=option1)# Start the Consumer 1client_worker1.start()client_worker2 = ConsumerWorker(SampleConsumer, consumer_option=option2)client_worker2.start()sleep_until(120, lambda: len(SampleConsumer.log_results) > 0)print("*** consumer group status ***")# Print Consumer Group Information: Name of the Consumption Group, Log Topic Consumed, and Consumer Heartbeat Timeoutret = client.list_consumer_group(logset_id, topic_ids)ret.log_print()print("*** stopping workers")# Close the Consumerclient_worker1.shutdown()client_worker2.shutdown()print("*** delete consumer group")# Delete Consumer Groupclient.delete_consumer_group(logset_id, consumer_group)except Exception as e:raise e# Print Consumed Log Dataret = str(SampleConsumer.log_results)print("*** get content:")print(ret)if __name__ == '__main__':sample_consumer_group()
Was this page helpful?