To access CKafka over a public network, you can add public routes in the CKafka console and configure SASL authentication and ACL rules to access the production and consumption messages in CKafka topics.
You have created an instance.
You can manage existing topics with ACLs (including read and write permissions) so that only authorized users can perform read and write operations on the topics.
Note:For details on SASL, ACL and user access control, see User Access Control (User and ACL Policy Management).
After performing the above steps, with a user name and password, you will be able to access the resources of your instance over a public network.
java
Properties props = new Properties();
//Domain name for public network access, i.e. public routing address
props.put("bootstrap.servers", "your_public_network_route_addr");
props.put("acks", "all");
props.put("retries",0);
props.put("batch.size", 16384);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("request.timeout.ms", 10000);
props.put("max.block.ms", 30000);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
//User name and password. Note: the user name is a combination of the instance ID and the username used for the console: “`instanceId`#`username`”.
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"yourinstance#yourusername\" password=\"yourpassword\";");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
for (int i = 0; i < 1000; i++) {
Future<RecordMetadata> future = producer.send(new ProducerRecord<>("topic1", UUID.randomUUID().toString()));
System.out.println("produce offset:" + future.get().offset());
}
producer.close();
java
Properties props = new Properties();
//Domain name for public network access
props.put("bootstrap.servers", "your_public_network_route_addr");
props.put("group.id", "yourconsumegroup");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
//User name and password. Note: use name is not the one on the console, but concatenated as the “instanceId#user name” instead
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"yourinstance#yourusername\" password=\"yourpassword\";");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("yourtopic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}
}
Note:Apart from adding
sasl.jaas.config
configurations usingproperties
, you can also pass in configurations usingSystem.setProperty
or-D
.
- System.setProperty("java.security.auth.login.config", "/etc/ckafka_client_jaas.conf");
- The content of the
ckafka_client_jaas.conf
file is as follows:
java
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="yourinstance#yourusername"
password="yourpassword";
};
Was this page helpful?