Apache Kafka, like any client-server application, offers access to its functionality through a well-defined set of APIs. These APIs are exposed via the Kafka wire protocol, a Kafka-specific binary protocol over TCP. The best way to interact with the Apache Kafka APIs is to make use of a client library that works with the Kafka wire protocol. The Apache Kafka project only officially supports a client library for Java, but in addition to that, Confluent officially supports client libraries for C/C++, C#, Go, and Python.
Unfortunately, some programming languages lack officially supported, production-grade client libraries for Kafka. However, HTTP is a widely available, universally supported protocol. For data access, DataHub exposes message sending APIs over the HTTP protocol to simplify client configurations.
This document describes message sending in the HTTP-based data access feature of DataHub and provides suggestions for real world cases.
After the HTTP data access layer is enabled, an HTTP client in the public network can directly send messages to a CKafka instance through TencentCloud API as shown below:
You have created the target CKafka instance and topic.
For detailed directions, see Reporting over HTTP.
pom.xml
file for project configuration:<dependency>
<groupid>com.tencentcloudapi</groupid>
<artifactid>tencentcloud-sdk-java</artifactid>
<version>3.1.430</version>
</dependency>
generateMsgFromUserAccess
is used to assemble all messages to be sent.List<batchcontent> batchContentList = generateMsgFromUserAccess(userId);
// Here, `ap-xxx` is the region abbreviation of the corresponding TencentCloud API
CkafkaClient client = new CkafkaClient(
new Credential("yourSecretId", "yourSecretKey"), "ap-xxx");
SendMessageRequest messageRequest = new SendMessageRequest();
// Access point ID of the data access task
messageRequest.setDataHubId("datahub-lzxxxxx6");
messageRequest.setMessage(batchContentList.toArray(BatchContent[]::new));
try {
SendMessageResponse sendMessageResponse = client.SendMessage(messageRequest);
String[] messageId = sendMessageResponse.getMessageId();
for (String s : messageId) {
LOGGER.info(s)
}
} catch (TencentCloudSDKException e) {
LOGGER.error(e.getMessage());
}
{
"Response": {
"MessageId": [
"datahub-lxxxxxx6:topicDev:4:2:1648185961342:1648185961398"
],
"RequestId": "3fq3na5r-xxxx-xxxx-xxxx-b2fiv0se7ded"
}
}
"[datahubId]:[topic name]:[topic partition number]:[topic offset]:[time when the HTTP access layer received the message]:[time when the message was sent to Kafka]"
You can query messages sent at the HTTP access layer in the CKafka console. For detailed directions, see Querying Message. In this example, messages at offset 2 in partition 4 in the topicDev
topic are queried as shown below:
If you find that the data access task affects the normal business, you can pause the task.
{
"Response": {
"Error": {
"Code": "FailedOperation",
"Message": "task status suspended [datahub-lxxxxxx6]"
},
"RequestId": "5f737a5b-xxxx-xxxx-xxxxx-b2fb703e7ded"
}
}
Was this page helpful?