背景

近期项目中有业务需求将内网的请求数据流入Kafka(公网),然后进行数据分析,分析结果再流入Kafka以供内网进行消费查询的场景,所以用到了阿里云消息队列Kafka。

阿里云的Kafka的网络类型分为两种:VPC接入和公网+VPC接入(本编文章使用的是公网+VPC接入类型)

VPC

VPC全称是Virtual Private Cloud,翻译成中文是虚拟私有云。但是在有些场合也被翻译成私有网络或者专有网络等。这里其实就有些让人迷惑,VPC究竟是指云还是网络?答案是,VPC即是一种云,也是一种网络模式

公网

公网就是广域网。是连接不同地区局域网或城域网计算机通信的远程网。通常跨接很大的物理范围,所覆盖的范围从几十公里到几千公里,它能连接多个地区、城市和国家,或横跨几个洲并能提供远距离通信,形成国际性的远程网络。广域网并不等同于互联网。

接入方式

VPC其实就是内网实例,只要你拥有VPC环境就可以通过简单的配置连接。

阿里云VPC接入示例

阿里云公网接入示例

这里我们使用公网实例进行接入,公网接入需要配置SASL,所以本编文章在这里做个总结。

kafka实例

实例详情

接入说明

SASL

公网接入需要两个文件
1. 准备配置文件kafka_client_jaas.conf 也就是上图中的用户名和密码

//可以通过自己的实例用户名和密码进行配置
KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="xxxxxxx"
  password="xxxxxxx";
};
  1. 配置SSL根证书

下载根证书 密码 bt33

注意:这里需要说明的是,如果在本地进行开发调试的话,可以将这两个文件放在工程的根目录下,或者放在根目录下的config目录下,开发测试的时候会很方便找到这两个文件,如果部署到服务器上,需要将这两个文件单独拿出来进行引用,不需要打到jar包里。

演示示例

先来看下工程目录

工程其实很简单,就是配置一个kafka的工具类,进行实例接入,注意这里config目录一定要配置,否则在接入的时候会报 “没有找到这样的目录” 的错。
顺着工程的目录来说下各个文件的作用。

config 目录

  1. kafka.client.truststore.jks 根证书
  2. kafka_client_jaas.conf 阿里云kafka密钥管理

kafka.properties

接入kafka的配置文件,可根据配置信息接入kafka

## 接入点,通过控制台获取
bootstrap.servers=0.0.0.0:9093,0.0.0.1:9093,0.0.0.1:9093
## Topic,通过控制台创建
## ssl 根证书的路径,demo中有,请拷贝到自己的某个目录下,不能被打包到jar中
usessl=true
#服务器上的地址
ssl.truststore.location=config/kafka.client.truststore.jks
## sasl路径,demo中有,请拷贝到自己的某个目录下,不能被打包到jar中
#服务器上的地址
java.security.auth.login.config=config/kafka_client_jaas.conf

kafkaUtils

接入kafka后,需要使用的一些常用类,比如发送消息,接受消息等,可以根据自己的需求使用其中的方法,有基础的大家一看就会明白的,这里就不再赘述了。

package com.mengxi.demo;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

/**
 * kafka工具类
 *
 * @author mengxi
 */
public class KafkaUtils {

    public static void configureSasl() {
        //如果用-D或者其它方式设置过,这里不再设置
        if (null == System.getProperty("java.security.auth.login.config")) {
            //请注意将XXX修改为自己的路径
            //这个路径必须是一个文件系统可读的路径,不能被打包到jar中
            System.setProperty("java.security.auth.login.config",
                getKafkaProperties("kafka.properties").getProperty("java.security.auth.login.config"));
        }
    }

    public static Properties getKafkaProperties(String prop) {
        //获取配置文件kafka.properties的内容
        Properties kafkaProperties = new Properties();
        try {
            kafkaProperties.load(KafkaUtils.class.getClassLoader().getResourceAsStream(prop));
        } catch (Exception e) {
            //没加载到文件,程序要考虑退出
            e.printStackTrace();
        }
        return kafkaProperties;
    }

    public static KafkaProducer<String, String> getProducer(String url) {
        //设置sasl文件的路径
        KafkaUtils.configureSasl();
        Properties kafkaProperties = KafkaUtils.getKafkaProperties(url);
        Properties props = new Properties();
        //设置接入点,请通过控制台获取对应Topic的接入点
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
        //设置SSL根证书的路径,请记得将XXX修改为自己的路径
        //与sasl路径类似,该文件也不能被打包到jar中
        String usessl = kafkaProperties.getProperty("usessl");
        if ("true".equals(usessl)) {
            props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
                kafkaProperties.getProperty("ssl.truststore.location"));
            //根证书store的密码,保持不变
            props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
            //接入协议,目前支持使用SASL_SSL协议接入
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
            //SASL鉴权方式,保持不变
            props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        } else {
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
        }
        //Kafka消息的序列化方式
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer");
        //请求的最长等待时间
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
        //设置客户端内部重试次数
        props.put(ProducerConfig.RETRIES_CONFIG, 5);
        //设置客户端内部重试间隔
        props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 3000);
        //构造Producer对象,注意,该对象是线程安全的,一般来说,一个进程内一个Producer对象即可;
        //如果想提高性能,可以多构造几个对象,但不要太多,最好不要超过5个
        return new KafkaProducer<String, String>(props);
    }

    public static KafkaConsumer<String, String> getConsumer(String url, String groupId, String topic) {
        //设置sasl文件的路径
        KafkaUtils.configureSasl();
        //加载kafka.properties
        Properties kafkaProperties = KafkaUtils.getKafkaProperties(url);
        Properties props = new Properties();
        //设置接入点,请通过控制台获取对应Topic的接入点
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
        //设置SSL根证书的路径,请记得将XXX修改为自己的路径
        //与sasl路径类似,该文件也不能被打包到jar中
        String usessl = kafkaProperties.getProperty("usessl");
        if ("true".equals(usessl)) {
            props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
                kafkaProperties.getProperty("ssl.truststore.location"));
            //根证书store的密码,保持不变
            props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
            //接入协议,目前支持使用SASL_SSL协议接入
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
            //SASL鉴权方式,保持不变
            props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        } else {
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
        }
        //两次poll之间的最大允许间隔
        //可更加实际拉去数据和客户的版本等设置此值,默认30s
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        //每次poll的最大数量
        //注意该值不要改得太大,如果poll太多数据,而不能在下次poll之前消费完,则会触发一次负载均衡,产生卡顿
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        //消息的反序列化方式
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
        //当前消费实例所属的消费组,请在控制台申请之后填写
        //属于同一个组的消费实例,会负载消费消息
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        //构造消息对象,也即生成一个消费实例
        KafkaConsumer consumer = new KafkaConsumer<String, String>(props);
        //加载kafka.properties
        //设置消费组订阅的Topic,可以订阅多个
        //如果GROUP_ID_CONFIG是一样,则订阅的Topic也建议设置成一样
        List<String> subscribedTopics = new ArrayList<String>();
        //如果需要订阅多个Topic,则在这里add进去即可
        //每个Topic需要先在控制台进行创建
        subscribedTopics.add(topic);
        consumer.subscribe(subscribedTopics);

        return consumer;
    }

    public static KafkaConsumer<String, String> getVPCConsumer(String url, String groupId, String topic) {
        Properties kafkaProperties = KafkaUtils.getKafkaProperties(url);

        Properties props = new Properties();
        //设置接入点,请通过控制台获取对应Topic的接入点
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));

        //可更加实际拉去数据和客户的版本等设置此值,默认30s
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        //每次poll的最大数量
        //注意该值不要改得太大,如果poll太多数据,而不能在下次poll之前消费完,则会触发一次负载均衡,产生卡顿
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
        //消息的反序列化方式
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
        //当前消费实例所属的消费组,请在控制台申请之后填写
        //属于同一个组的消费实例,会负载消费消息
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        //构造消息对象,也即生成一个消费实例
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(
            props);
        //设置消费组订阅的Topic,可以订阅多个
        //如果GROUP_ID_CONFIG是一样,则订阅的Topic也建议设置成一样
        List<String> subscribedTopics = new ArrayList<String>();
        //如果需要订阅多个Topic,则在这里add进去即可
        //每个Topic需要先在控制台进行创建
        subscribedTopics.add(topic);
        consumer.subscribe(subscribedTopics);
        return consumer;
    }

    public static KafkaProducer<String, String> getVPCProducer(String url) {
        //设置sasl文件的路径
        Properties kafkaProperties = KafkaUtils.getKafkaProperties(url);
        Properties props = new Properties();
        //设置接入点,请通过控制台获取对应Topic的接入点
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
        //Kafka消息的序列化方式
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer");
        //可更加实际拉去数据和客户的版本等设置此值,默认30s
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        //每次poll的最大数量
        //注意该值不要改得太大,如果poll太多数据,而不能在下次poll之前消费完,则会触发一次负载均衡,产生卡顿
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
        //构造Producer对象,注意,该对象是线程安全的,一般来说,一个进程内一个Producer对象即可;
        //如果想提高性能,可以多构造几个对象,但不要太多,最好不要超过5个
        return new KafkaProducer<String, String>(props);
    }
}

KafkaUtilTest

最后,提供一个测试类,可以根据这个测试类去验证是否已经成功接入阿里云Kafka

package com.mengxi.demo;

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Future;

/**
 * @Description
 * @Author mengxi
 * @date 2020-06-23 15:06
 * @Copyright 2019 Alibaba.com All right reserved.
 */
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {DemoApplication.class})
public class KafkaUtilTest {

    @Test
    public void kafkaTestSendMessage() {
        String topic = "kafka-test";
        KafkaProducer<String, String> producer = KafkaUtils.getProducer("kafka.properties");
        try {
            //批量获取 futures 可以加快速度, 但注意,批量不要太大
            List<Future<RecordMetadata>> futures = new ArrayList<Future<RecordMetadata>>(128);
            for (int i = 1; i < 5; i++) {
                //发送消息,并获得一个Future对象
                HashMap map = new HashMap();
                map.put("id", "id" + i);
                map.put("name", "name" + i);
                map.put("address", "address" + i);
                ProducerRecord<String, String> kafkaMessage = new ProducerRecord<String, String>(topic,
                    new JSONObject(map).toString());
                Future<RecordMetadata> metadataFuture = producer.send(kafkaMessage);
                futures.add(metadataFuture);
            }
            producer.flush();
            for (Future<RecordMetadata> future : futures) {
                //同步获得Future对象的结果
                try {
                    RecordMetadata recordMetadata = future.get();
                    System.out.println("Produce ok:" + recordMetadata.toString());
                } catch (Throwable t) {
                    t.printStackTrace();
                }
            }
        } catch (Exception e) {
            //客户端内部重试之后,仍然发送失败,业务要应对此类错误
            //参考常见报错: https://help.aliyun.com/document_detail/68168.html?spm=a2c4g.11186623.6.567.2OMgCB
            System.out.println("error occurred");
            e.printStackTrace();
        }
    }

    @Test
    public void consumer() {
        String topic = "kafka-test";
        String groupId = "input_url_consumer";
        KafkaConsumer<String, String> consumer = KafkaUtils.getConsumer("kafka.properties", groupId, topic);
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            List<ConsumerRecord<String, String>> list = new ArrayList<>();
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                for (ConsumerRecord<String, String> record : partitionRecords) {
                    list.add(record);
                }
                long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
            }
            if (list != null && list.size() > 0) {
                log.info("records info ----> " + list.get(0).toString());
            }
            log.info("Read {} records from Kafka. ", records.count());
        }
    }
}
验证

上述已经列出演示清单,最后一步我们进行验证。

运行测试类中的kafkaTestSendMessage方法,向topic为kafka-test中发送消息,最终运行的日志如下

2020-07-08 10:51:10.421  INFO 30305 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = 1
    batch.size = 16384
    block.on.buffer.full = false
    bootstrap.servers = [121.40.96.141:9093, 116.62.171.70:9093, 47.111.67.160:9093]
    buffer.memory = 33554432
    client.id = 
    compression.type = none
    connections.max.idle.ms = 540000
    interceptor.classes = null
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    linger.ms = 0
    max.block.ms = 30000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.fetch.timeout.ms = 60000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.ms = 3000
    request.timeout.ms = 30000
    retries = 5
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = PLAIN
    security.protocol = SASL_SSL
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = config/kafka.client.truststore.jks
    ssl.truststore.password = [hidden]
    ssl.truststore.type = JKS
    timeout.ms = 30000
    value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2020-07-08 10:51:11.056  INFO 30305 --- [           main] o.a.k.c.s.authenticator.AbstractLogin    : Successfully logged in.
2020-07-08 10:51:11.200  INFO 30305 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 0.10.2.2
2020-07-08 10:51:11.201  INFO 30305 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : cd80bc412b9b9701
2020-07-08 10:51:12.708  WARN 30305 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender  : Got error produce response with correlation id 5 on topic-partition kafka-test-9, retrying (4 attempts left). Error: LEADER_NOT_AVAILABLE
Produce ok:kafka-test-9@0
Produce ok:kafka-test-3@0
Produce ok:kafka-test-6@0
Produce ok:kafka-test-0@0

可以看出来,最后的log打印表明此时已向topic中发送4条消息,我们去kafka中验证。

kafka topic

topic info

最后验证消费者方法 consumer ,我们在kafka页面中向kafka-test中发送消息,该方法就会接受到我们发送的消息并且进行消费,如下

查看方法consumer打印的日志

2020-07-08 13:15:14.304  INFO 31187 --- [           main] com.mengxi.demo.KafkaUtilTest            : Read 0 records from Kafka. 
2020-07-08 13:15:15.309  INFO 31187 --- [           main] com.mengxi.demo.KafkaUtilTest            : Read 0 records from Kafka. 
2020-07-08 13:15:16.314  INFO 31187 --- [           main] com.mengxi.demo.KafkaUtilTest            : Read 0 records from Kafka. 
2020-07-08 13:15:17.319  INFO 31187 --- [           main] com.mengxi.demo.KafkaUtilTest            : Read 0 records from Kafka. 
2020-07-08 13:15:18.321  INFO 31187 --- [           main] com.mengxi.demo.KafkaUtilTest            : Read 0 records from Kafka. 
2020-07-08 13:15:19.325  INFO 31187 --- [           main] com.mengxi.demo.KafkaUtilTest            : Read 0 records from Kafka. 
2020-07-08 13:15:20.327  INFO 31187 --- [           main] com.mengxi.demo.KafkaUtilTest            : Read 0 records from Kafka. 
2020-07-08 13:15:21.329  INFO 31187 --- [           main] com.mengxi.demo.KafkaUtilTest            : Read 0 records from Kafka. 
2020-07-08 13:15:22.332  INFO 31187 --- [           main] com.mengxi.demo.KafkaUtilTest            : Read 0 records from Kafka. 
2020-07-08 13:15:23.336  INFO 31187 --- [           main] com.mengxi.demo.KafkaUtilTest            : Read 0 records from Kafka. 
2020-07-08 13:15:23.548  INFO 31187 --- [           main] com.mengxi.demo.KafkaUtilTest            : records info ----> ConsumerRecord(topic = kafka-test, partition = 0, offset = 1, CreateTime = 1594185323014, checksum = 2904038765, serialized key size = 7, serialized value size = 11, key = message, value = mengxi test)
2020-07-08 13:15:23.549  INFO 31187 --- [           main] com.mengxi.demo.KafkaUtilTest            : Read 1 records from Kafka. 
2020-07-08 13:15:24.554  INFO 31187 --- [           main] com.mengxi.demo.KafkaUtilTest            : Read 0 records from Kafka. 
2020-07-08 13:15:25.555  INFO 31187 --- [           main] com.mengxi.demo.KafkaUtilTest            : Read 0 records from Kafka. 
2020-07-08 13:15:26.557  INFO 31187 --- [           main] com.mengxi.demo.KafkaUtilTest            : Read 0 records from Kafka. 
2020-07-08 13:15:27.561  INFO 31187 --- [           main] com.mengxi.demo.KafkaUtilTest            : Read 0 records from Kafka. 
2020-07-08 13:15:28.563  INFO 31187 --- [           main] com.mengxi.demo.KafkaUtilTest            : Read 0 records from Kafka. 
2020-07-08 13:15:29.566  INFO 31187 --- [           main] com.mengxi.demo.KafkaUtilTest            : Read 0 records from Kafka. 
2020-07-08 13:15:30.567  INFO 31187 --- [           main] com.mengxi.demo.KafkaUtilTest            : Read 0 records from Kafka. 

可以看到刚才我们在页面中发送的消息,已经接受到了。

records info ----> ConsumerRecord(topic = kafka-test, partition = 0, offset = 1, CreateTime = 1594185323014, checksum = 2904038765, serialized key size = 7, serialized value size = 11, key = message, value = mengxi test)

这条信息就是刚才发送的消息,我们可以取出当中的value值进行响应的数据处理。

以上就是SpringBoot集成阿里云Kafka组件(公网)版本的示例内容,大家可以去尝试一下,如有问题可留言。