tencent cloud

数据湖计算

产品动态
产品简介
产品概述
产品优势
应用场景
购买指南
计费概述
退费说明
欠费说明
调整配置费用说明
快速入门
新用户开通全流程
DLC 数据导入指引
一分钟入门 DLC 数据分析
一分钟入门 DLC 权限管理
一分钟入门分区表
开启数据优化
跨源分析 EMR Hive 数据
标准引擎配置指引
配置数据访问策略
操作指南
控制台操作介绍
开发指南
运行环境
SparkJar 作业开发指南
PySpark 作业开发指南
查询性能优化指南
UDF 函数开发指南
系统约束
客户端访问
JDBC 访问
TDLC 命令行工具访问
第三方软件联动
Python 访问
实践教程
通过 Power BI 访问 DLC 数据操作指南
建表实践
使用 Apache Airflow 调度 DLC 引擎提交任务
StarRocks 直接查询 DLC 内部存储
Spark 计算成本优化实践
DATA + AI
使用 DLC 分析 CLS 日志
使用角色 SSO 访问 DLC
资源级鉴权指南
在 DLC 中实现 TCHouse-D 读写操作
DLC 原生表
SQL 语法
SuperSQL 语法
标准 Spark 语法概览
标准 Presto 语法概览
保留字
API 文档
History
Introduction
API Category
Making API Requests
Data Table APIs
Task APIs
Metadata APIs
Service Configuration APIs
Permission Management APIs
Database APIs
Data Source Connection APIs
Data Optimization APIs
Data Engine APIs
Resource Group for the Standard Engine APIs
Data Types
Error Codes
通用类参考
错误码
配额与限制
第三方软件连接DLC操作指南
常见问题
权限类常见问题
引擎类常见问题
功能类常见问题
Spark 作业类常见问题
DLC 政策
隐私协议
数据处理和安全协议
服务等级协议
联系我们

Python 访问

PDF
聚焦模式
字号
最后更新时间: 2024-07-31 17:34:04
数据湖计算 DLC 提供符合 DBAPI 2.0 标准的工具。您可以通过 Python 连接到 DLC 的 Presto/Spark 引擎,可以方便地使用 SQL 的方式操作 DLC 库表。

环境准备

1. Python 3.9及以上版本。
2. 安装tencentcloud-dlc-connector。
pip install -i https://mirrors.tencent.com/pypi/simple/ tencentcloud-dlc-connector

使用示例

步骤一:连接到引擎

代码:
import tdlc_connector
import datetime
from tdlc_connector import constants

conn = tdlc_connector.connect(region="<REGION>",
secret_id="<SECRET_ID>",
secret_key="<SECRET_KEY>",
token=None,
endpoint=None,
catalog=constants.Catalog.DATALAKECATALOG,
engine="<ENGINE>",
engine_type=constants.EngineType.AUTO,
result_style=constants.ResultStyles.LIST,
download=False,
mode=constants.Mode.LASY,
database='',
config={},
callback=None,
callback_events=None,
)
参数说明:
参数
说明
region
引擎所在地域,如 ap-nanjing,ap-beijing,ap-guangzhou,ap-shanghai, ap-chengdu,ap-chongqing,na-siliconvalley,ap-singapore, ap-hongkong
secret_id
腾讯云 SecretID
secret_key
腾讯云 SecretKey
token
(可选)临时密钥 Token
endpoint
(可选)连接服务节点
engine
使用的引擎名称,例如 “test_python”
engine_type
(可选)引擎类型:对应引擎名称的引擎类型, 默认值 constants.EngineType.AUTO
例如:AUTO、PRESTO、SPARK、SPARK_BATCH
result_style
(可选)返回结果的格式,可选 LIST/DICT
download
(可选)是否直接下载数据 True/False,详见下载模式说明
mode
(可选)模式。支持 ALL/LASY/STREAM
database
(可选)默认数据库
config
(可选)提交到集群配置
callback
(可选)回调函数,函数签名 def cb(statement_id, status)
callback_events
(可选)回调触发事件,同callback配合使用,详见回调机制说明
driver_size
(可选)Driver 节点大小,默认值 constants.PodSize.SMALL (仅SPARK_BATCH 集群有效)
可选值:SMALL、MEDIUM、LARGE、XLARGE、M_SMALL、M_MEDIUM、M_LARGE、M_XLARGE
executor_size
(可选)Executor 节点大小,默认值 constants.PodSize.SMALL (仅SPARK_BATCH 集群有效) 可选值:SMALL、MEDIUM、LARGE、XLARGE、M_SMALL、M_MEDIUM、M_LARGE、M_XLARGE
executor_num
(可选)Executor 节点数量, 默认值1 (仅SPARK_BATCH 集群有效)
executor_max_num
(可选)Executor 最大节点数量,若不等于 executor_num, 则开启资源动态分配 (仅 SPARK_BATCH 集群有效)

下载模式说明
序号
download
mode
说明
1
False
ALL
从接口获取所有数据,获取完成后才能 fetch 到数据
2
False
LASY
从接口获取数据,根据 fetch 的数据量延迟到服务端获取数据
3
False
STREAM
同 LASY 模式
4
True
ALL
从 COS 下载所有结果(需具备 COS 读权限)占用本地临时存储,数据量较大时推荐使用
5
True
LASY
从 COS 下载结果(需具备 COS 读权限),根据 fetch 数据量延迟下载文件
6
True
STREAM
实时的从 COS 读取结果流(需具备 COS 读权限),性能较慢,本地内存磁盘占用率极低

步骤二:执行 SQL

代码:
# 基本操作

cursor = conn.cursor()
count = cursor.execute("SELECT 1")
print(cursor.fetchone()) # 读取一行数据
for row in cursor.fetchall(): # 读取剩余多行数据
print(row)

# 使用参数 pyformat 格式

cursor.execute("SELECT * FROM dummy WHERE date < %s", datetime.datetime.now())
cursor.execute("SELECT * FROM dummy WHERE status in %s", (('SUCCESS', 'INIT', 'FAIL'),))
cursor.execute("SELECT * FROM dummy WHERE date < %(date)s AND status = %(status)s", {'date': datetime.datetime.now(), 'status': 'SUCCESS'})

# 使用BULK方式

cursor.executemany("INSERT INTO dummy VALUES(%s, %s)", [('张三', 18), ('李四', 20)])

基本操作流程

上述代码流程如下:
1. 通过conn.cursor() 创建了一个游标对象。
2. 通过cursor.execute("SELECT 1") 执行了一条 SQL 查询语句,并将结果赋值给变量count。
3. 通过调用cursor.fetchone() 方法读取了一行数据,并将其打印出来。
4. 在一个循环中调用了cursor.fetchall() 方法来读取剩余的多行数据,并逐行打印出来。

参数传递方式

支持两种不同的参数传递方式:
使用 pyformat 格式:在执行 SQL 语句时,可以使用%s作为占位符,并将实际参数以元组或字典形式传入。
使用 BULK 方式:通过调用executemany() 方法可以批量插入多个记录到数据库表中。

特性功能

回调机制使用说明

import tdlc_connector
import datetime
from tdlc_connector import constants


def tdlc_connector_callback(statement_id, state):
'''
parmas: statement_id 任务id
params: state 任务状态,枚举值 参考constants.TaskStatus
'''
print(statement_id, state)


conn = tdlc_connector.connect(region="<REGION>",
secret_id="<SECRET_ID>",
secret_key="<SECRET_KEY>",
engine="<ENGINE>",
engine_type=constants.EngineType.SPARK,
result_style=constants.ResultStyles.LIST,
callback=tdlc_connector_callback,
callback_events=[constants.CallbackEvent.ON_INIT, constants.CallbackEvent.ON_SUCCESS]
)

cursor = conn.cursor()
cursor.execute("SELECT 1")
cursor.fetchone()

# callback 函数会在任务初始化 和 任务成功时调用


提交任务到作业集群

当前已支持提交任务到 Spark 作业集群,具体请参考如下示例。
from tdlc_connector import constants

conn = tdlc_connector.connect(region="<REGION>",
secret_id="<SECRET_ID>",
secret_key="<SECRET_KEY>",
engine="<ENGINE>", # 请选择 spark 作业引擎
result_style=constants.ResultStyles.LIST,
driver_size=constants.PodSize.SMALL, # 选择 Driver 规格
executor_size=constants.PodSize.SMALL, # 选择 Executor 规格
executor_num=1, # 设置 Executor 数量
executor_max_num=1, # 设置 Executor 最大数量, 如果 不等于{executor_num},则开启动态资源分配
)
说明:
如需使用该特性,请将 connector 升级至 >= 1.1.0。

自动推断引擎类型

当前指定使用引擎后无需再指定引擎类型,connector 会自动推断,具体请参考如下示例。
from tdlc_connector import constants

conn = tdlc_connector.connect(region="<REGION>",
secret_id="<SECRET_ID>",
secret_key="<SECRET_KEY>",
engine="<ENGINE>",
engine_type=constants.EngineType.AUTO # 可设置成AUTO 或者 不传入该参数,驱动自动推断
)
说明:
如需使用该特性,请将 connector 升级至 >= 1.1.0。

空值转换

当前结果集基于CSV格式存储,引擎默认将空值转换成空字符串,如果需要区分空值,请指定空值表示符, 例如 “\\1” ,引擎查询结果会将空值转换成 “\\1”,同时驱动会将 “\\1”字段转换成 None,具体请参考如下示例。
from tdlc_connector import constants, formats

formats.FORMAT_STRING_NULL = '\\1'

conn = tdlc_connector.connect(region="<REGION>",
secret_id="<SECRET_ID>",
secret_key="<SECRET_KEY>",
engine="<ENGINE>",
result_style=constants.ResultStyles.LIST
)
说明:
空值转换目前仅支持 SparkSQL集群,请将 connector 升级至 >= 1.1.3。


帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈