To access CKafka over public network, you can add public routes in CKafka Console and configure SASL authentication and ACL rules to access the production and consumption messages in CKafka topics.
You have created an instance.
Perform ACL permission management (including read and write) on the existing topic. Only users with permissions can perform read and write permission operations on the topic.
After operating on the console, you can access instance resources over public network using user name and password.
Properties props = new Properties();
//Domain name for public access, i.e. public routing address
props.put("bootstrap.servers", "your_public_network_route_addr");
props.put("acks", "all");
props.put("retries",0);
props.put("batch.size", 16384);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("request.timeout.ms", 10000);
props.put("max.block.ms", 30000);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
//User name and password. Note: use name is not the one on the console, but concatenated as the “instanceId#user name” instead
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"yourinstance#yourusername\" password=\"yourpassword\";");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
for (int i = 0; i < 1000; i++) {
Future<RecordMetadata> future = producer.send(new ProducerRecord<>("topic1", UUID.randomUUID().toString()));
System.out.println("produce offset:" + future.get().offset());
}
producer.close();
Properties props = new Properties();
//Domain name for public access
props.put("bootstrap.servers", "your_public_network_route_addr");
props.put("group.id", "yourconsumegroup");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
//User name and password. Note: use name is not the one on the console, but concatenated as the “instanceId#user name” instead
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"yourinstance#yourusername\" password=\"yourpassword\";");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}
}
Note:
Except adding
sasl.jaas.config
configurations usingproperties
, you can also pass in usingSystem.setProperty
or-D
method.
- System.setProperty("java.security.auth.login.config", "/etc/ckafka_client_jaas.conf");
> KafkaClient {
> org.apache.kafka.common.security.plain.PlainLoginModule required
> username="yourinstance#yourusername"
> password="yourpassword";
> };
> ```
Was this page helpful?