Accessing CKafka over Public Network

Last updated: 2020-07-21 17:08:59

    Operation Scenarios

    To access CKafka over public network, you can add public routes in CKafka Console and configure SASL authentication and ACL rules to access the production and consumption messages in CKafka topics.

    Note:

    Public Route feature is currently in beta test. To try it out, submit a ticket for application, we will process it and contact you within 5 business days.

    Prerequisites

    Directions

    Creating a public route

    1. Click the target instance ID in the Instance List of CKafka Console to enter the instance details page.
    2. Click Add a routing policy on Basic Info -> Access Mode and select policy info.
      • Route type: public domain name access
      • Access mode: only SASL_PLAINTEXT is supported currently
    3. Click Submit, and you will see the routing policy below the access mode.

    Creating a user

    1. Click Create on Instance List -> User Management.
    2. Enter the following information in the pop-up window:
      • User Name: only contain letters, numbers, underscores, "-" and "."
      • Password: only contain letters, numbers, underscores, "-" and "."
      • Confirm Password: enter the password again
    3. Click Submit, and you will see this new user in the user management list.

    Adding an ACL policy

    Perform ACL permission management (including read and write) on the existing topic. Only users with permissions can perform read and write permission operations on the topic.

    1. Enter Instance List -> ACL Policy Management, and click Edit ACL Policy on the operation column of the target topic.
    2. Click Create to enter the Add ACL Policy page.
    3. Configure user and IP in the prompted Add ACL Policy window. If not selected, all users/hosts are supported by default.
    4. Click Submit, you will see the policy show in the policy list of the target topic.

    Production and consumption over public network

    After operating on the console, you can access instance resources over public network using user name and password.

    Production

    Properties props = new Properties();
            //Domain name for public access, i.e. public routing address
            props.put("bootstrap.servers", "your_public_network_route_addr");
            props.put("acks", "all");
            props.put("retries",0);
            props.put("batch.size", 16384);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("request.timeout.ms", 10000);
            props.put("max.block.ms", 30000);
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
            props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
            //User name and password. Note: use name is not the one on the console, but concatenated as the “instanceId#user name” instead
            props.put("sasl.jaas.config",
                    "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"yourinstance#yourusername\" password=\"yourpassword\";");
            Producer<String, String> producer = new KafkaProducer<String, String>(props);
            for (int i = 0; i < 1000; i++) {
                Future<RecordMetadata> future = producer.send(new ProducerRecord<>("topic1", UUID.randomUUID().toString()));
                System.out.println("produce offset:" + future.get().offset());
            }
            producer.close();

    Consumption

    Properties props = new Properties();
            //Domain name for public access
            props.put("bootstrap.servers", "your_public_network_route_addr");
            props.put("group.id", "yourconsumegroup");
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
            props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
            //User name and password. Note: use name is not the one on the console, but concatenated as the “instanceId#user name” instead
            props.put("sasl.jaas.config",
                    "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"yourinstance#yourusername\" password=\"yourpassword\";");
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList("foo", "bar"));
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
                }
            }

    Note:

    Except adding sasl.jaas.config configurations using properties, you can also pass in using System.setProperty or -D method.

    • System.setProperty("java.security.auth.login.config", "/etc/ckafka_client_jaas.conf");
    > KafkaClient {
    > org.apache.kafka.common.security.plain.PlainLoginModule required
    > username="yourinstance#yourusername"
    > password="yourpassword";
    > }; 
    > ```
    

    Was this page helpful?

    Was this page helpful?

    • Not at all
    • Not very helpful
    • Somewhat helpful
    • Very helpful
    • Extremely helpful
    Send Feedback
    Help