- A+
背景
近期项目中有业务需求将内网的请求数据流入Kafka(公网),然后进行数据分析,分析结果再流入Kafka以供内网进行消费查询的场景,所以用到了阿里云消息队列Kafka。
阿里云的Kafka的网络类型分为两种:VPC接入和公网+VPC接入(本编文章使用的是公网+VPC接入类型)
VPC
VPC全称是Virtual Private Cloud,翻译成中文是虚拟私有云。但是在有些场合也被翻译成私有网络或者专有网络等。这里其实就有些让人迷惑,VPC究竟是指云还是网络?答案是,VPC即是一种云,也是一种网络模式
公网
公网就是广域网。是连接不同地区局域网或城域网计算机通信的远程网。通常跨接很大的物理范围,所覆盖的范围从几十公里到几千公里,它能连接多个地区、城市和国家,或横跨几个洲并能提供远距离通信,形成国际性的远程网络。广域网并不等同于互联网。
接入方式
VPC其实就是内网实例,只要你拥有VPC环境就可以通过简单的配置连接。
这里我们使用公网实例进行接入,公网接入需要配置SASL,所以本编文章在这里做个总结。
kafka实例
实例详情
接入说明
SASL
公网接入需要两个文件
1. 准备配置文件kafka_client_jaas.conf 也就是上图中的用户名和密码
//可以通过自己的实例用户名和密码进行配置
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="xxxxxxx"
password="xxxxxxx";
};
- 配置SSL根证书
下载根证书 密码 bt33
注意:这里需要说明的是,如果在本地进行开发调试的话,可以将这两个文件放在工程的根目录下,或者放在根目录下的config目录下,开发测试的时候会很方便找到这两个文件,如果部署到服务器上,需要将这两个文件单独拿出来进行引用,不需要打到jar包里。
演示示例
先来看下工程目录
工程其实很简单,就是配置一个kafka的工具类,进行实例接入,注意这里config目录一定要配置,否则在接入的时候会报 “没有找到这样的目录” 的错。
顺着工程的目录来说下各个文件的作用。
config 目录
- kafka.client.truststore.jks 根证书
- kafka_client_jaas.conf 阿里云kafka密钥管理
kafka.properties
接入kafka的配置文件,可根据配置信息接入kafka
## 接入点,通过控制台获取
bootstrap.servers=0.0.0.0:9093,0.0.0.1:9093,0.0.0.1:9093
## Topic,通过控制台创建
## ssl 根证书的路径,demo中有,请拷贝到自己的某个目录下,不能被打包到jar中
usessl=true
#服务器上的地址
ssl.truststore.location=config/kafka.client.truststore.jks
## sasl路径,demo中有,请拷贝到自己的某个目录下,不能被打包到jar中
#服务器上的地址
java.security.auth.login.config=config/kafka_client_jaas.conf
kafkaUtils
接入kafka后,需要使用的一些常用类,比如发送消息,接受消息等,可以根据自己的需求使用其中的方法,有基础的大家一看就会明白的,这里就不再赘述了。
package com.mengxi.demo;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
/**
* kafka工具类
*
* @author mengxi
*/
public class KafkaUtils {
public static void configureSasl() {
//如果用-D或者其它方式设置过,这里不再设置
if (null == System.getProperty("java.security.auth.login.config")) {
//请注意将XXX修改为自己的路径
//这个路径必须是一个文件系统可读的路径,不能被打包到jar中
System.setProperty("java.security.auth.login.config",
getKafkaProperties("kafka.properties").getProperty("java.security.auth.login.config"));
}
}
public static Properties getKafkaProperties(String prop) {
//获取配置文件kafka.properties的内容
Properties kafkaProperties = new Properties();
try {
kafkaProperties.load(KafkaUtils.class.getClassLoader().getResourceAsStream(prop));
} catch (Exception e) {
//没加载到文件,程序要考虑退出
e.printStackTrace();
}
return kafkaProperties;
}
public static KafkaProducer<String, String> getProducer(String url) {
//设置sasl文件的路径
KafkaUtils.configureSasl();
Properties kafkaProperties = KafkaUtils.getKafkaProperties(url);
Properties props = new Properties();
//设置接入点,请通过控制台获取对应Topic的接入点
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
//设置SSL根证书的路径,请记得将XXX修改为自己的路径
//与sasl路径类似,该文件也不能被打包到jar中
String usessl = kafkaProperties.getProperty("usessl");
if ("true".equals(usessl)) {
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
kafkaProperties.getProperty("ssl.truststore.location"));
//根证书store的密码,保持不变
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
//接入协议,目前支持使用SASL_SSL协议接入
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
//SASL鉴权方式,保持不变
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
} else {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
}
//Kafka消息的序列化方式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
//请求的最长等待时间
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
//设置客户端内部重试次数
props.put(ProducerConfig.RETRIES_CONFIG, 5);
//设置客户端内部重试间隔
props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 3000);
//构造Producer对象,注意,该对象是线程安全的,一般来说,一个进程内一个Producer对象即可;
//如果想提高性能,可以多构造几个对象,但不要太多,最好不要超过5个
return new KafkaProducer<String, String>(props);
}
public static KafkaConsumer<String, String> getConsumer(String url, String groupId, String topic) {
//设置sasl文件的路径
KafkaUtils.configureSasl();
//加载kafka.properties
Properties kafkaProperties = KafkaUtils.getKafkaProperties(url);
Properties props = new Properties();
//设置接入点,请通过控制台获取对应Topic的接入点
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
//设置SSL根证书的路径,请记得将XXX修改为自己的路径
//与sasl路径类似,该文件也不能被打包到jar中
String usessl = kafkaProperties.getProperty("usessl");
if ("true".equals(usessl)) {
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
kafkaProperties.getProperty("ssl.truststore.location"));
//根证书store的密码,保持不变
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
//接入协议,目前支持使用SASL_SSL协议接入
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
//SASL鉴权方式,保持不变
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
} else {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
}
//两次poll之间的最大允许间隔
//可更加实际拉去数据和客户的版本等设置此值,默认30s
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
//每次poll的最大数量
//注意该值不要改得太大,如果poll太多数据,而不能在下次poll之前消费完,则会触发一次负载均衡,产生卡顿
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//消息的反序列化方式
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
//当前消费实例所属的消费组,请在控制台申请之后填写
//属于同一个组的消费实例,会负载消费消息
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
//构造消息对象,也即生成一个消费实例
KafkaConsumer consumer = new KafkaConsumer<String, String>(props);
//加载kafka.properties
//设置消费组订阅的Topic,可以订阅多个
//如果GROUP_ID_CONFIG是一样,则订阅的Topic也建议设置成一样
List<String> subscribedTopics = new ArrayList<String>();
//如果需要订阅多个Topic,则在这里add进去即可
//每个Topic需要先在控制台进行创建
subscribedTopics.add(topic);
consumer.subscribe(subscribedTopics);
return consumer;
}
public static KafkaConsumer<String, String> getVPCConsumer(String url, String groupId, String topic) {
Properties kafkaProperties = KafkaUtils.getKafkaProperties(url);
Properties props = new Properties();
//设置接入点,请通过控制台获取对应Topic的接入点
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
//可更加实际拉去数据和客户的版本等设置此值,默认30s
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
//每次poll的最大数量
//注意该值不要改得太大,如果poll太多数据,而不能在下次poll之前消费完,则会触发一次负载均衡,产生卡顿
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
//消息的反序列化方式
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
//当前消费实例所属的消费组,请在控制台申请之后填写
//属于同一个组的消费实例,会负载消费消息
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
//构造消息对象,也即生成一个消费实例
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(
props);
//设置消费组订阅的Topic,可以订阅多个
//如果GROUP_ID_CONFIG是一样,则订阅的Topic也建议设置成一样
List<String> subscribedTopics = new ArrayList<String>();
//如果需要订阅多个Topic,则在这里add进去即可
//每个Topic需要先在控制台进行创建
subscribedTopics.add(topic);
consumer.subscribe(subscribedTopics);
return consumer;
}
public static KafkaProducer<String, String> getVPCProducer(String url) {
//设置sasl文件的路径
Properties kafkaProperties = KafkaUtils.getKafkaProperties(url);
Properties props = new Properties();
//设置接入点,请通过控制台获取对应Topic的接入点
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
//Kafka消息的序列化方式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
//可更加实际拉去数据和客户的版本等设置此值,默认30s
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
//每次poll的最大数量
//注意该值不要改得太大,如果poll太多数据,而不能在下次poll之前消费完,则会触发一次负载均衡,产生卡顿
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
//构造Producer对象,注意,该对象是线程安全的,一般来说,一个进程内一个Producer对象即可;
//如果想提高性能,可以多构造几个对象,但不要太多,最好不要超过5个
return new KafkaProducer<String, String>(props);
}
}
KafkaUtilTest
最后,提供一个测试类,可以根据这个测试类去验证是否已经成功接入阿里云Kafka
package com.mengxi.demo;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Future;
/**
* @Description
* @Author mengxi
* @date 2020-06-23 15:06
* @Copyright 2019 Alibaba.com All right reserved.
*/
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {DemoApplication.class})
public class KafkaUtilTest {
@Test
public void kafkaTestSendMessage() {
String topic = "kafka-test";
KafkaProducer<String, String> producer = KafkaUtils.getProducer("kafka.properties");
try {
//批量获取 futures 可以加快速度, 但注意,批量不要太大
List<Future<RecordMetadata>> futures = new ArrayList<Future<RecordMetadata>>(128);
for (int i = 1; i < 5; i++) {
//发送消息,并获得一个Future对象
HashMap map = new HashMap();
map.put("id", "id" + i);
map.put("name", "name" + i);
map.put("address", "address" + i);
ProducerRecord<String, String> kafkaMessage = new ProducerRecord<String, String>(topic,
new JSONObject(map).toString());
Future<RecordMetadata> metadataFuture = producer.send(kafkaMessage);
futures.add(metadataFuture);
}
producer.flush();
for (Future<RecordMetadata> future : futures) {
//同步获得Future对象的结果
try {
RecordMetadata recordMetadata = future.get();
System.out.println("Produce ok:" + recordMetadata.toString());
} catch (Throwable t) {
t.printStackTrace();
}
}
} catch (Exception e) {
//客户端内部重试之后,仍然发送失败,业务要应对此类错误
//参考常见报错: https://help.aliyun.com/document_detail/68168.html?spm=a2c4g.11186623.6.567.2OMgCB
System.out.println("error occurred");
e.printStackTrace();
}
}
@Test
public void consumer() {
String topic = "kafka-test";
String groupId = "input_url_consumer";
KafkaConsumer<String, String> consumer = KafkaUtils.getConsumer("kafka.properties", groupId, topic);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
List<ConsumerRecord<String, String>> list = new ArrayList<>();
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
list.add(record);
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
if (list != null && list.size() > 0) {
log.info("records info ----> " + list.get(0).toString());
}
log.info("Read {} records from Kafka. ", records.count());
}
}
}
验证
上述已经列出演示清单,最后一步我们进行验证。
运行测试类中的kafkaTestSendMessage
方法,向topic为kafka-test
中发送消息,最终运行的日志如下
2020-07-08 10:51:10.421 INFO 30305 --- [ main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = 1
batch.size = 16384
block.on.buffer.full = false
bootstrap.servers = [121.40.96.141:9093, 116.62.171.70:9093, 47.111.67.160:9093]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 30000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.fetch.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.ms = 3000
request.timeout.ms = 30000
retries = 5
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = PLAIN
security.protocol = SASL_SSL
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = config/kafka.client.truststore.jks
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
timeout.ms = 30000
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
2020-07-08 10:51:11.056 INFO 30305 --- [ main] o.a.k.c.s.authenticator.AbstractLogin : Successfully logged in.
2020-07-08 10:51:11.200 INFO 30305 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 0.10.2.2
2020-07-08 10:51:11.201 INFO 30305 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : cd80bc412b9b9701
2020-07-08 10:51:12.708 WARN 30305 --- o.a.k.clients.producer.internals.Sender : Got error produce response with correlation id 5 on topic-partition kafka-test-9, retrying (4 attempts left). Error: LEADER_NOT_AVAILABLE
Produce ok:kafka-test-9@0
Produce ok:kafka-test-3@0
Produce ok:kafka-test-6@0
Produce ok:kafka-test-0@0
可以看出来,最后的log打印表明此时已向topic中发送4条消息,我们去kafka中验证。
kafka topic
topic info
最后验证消费者方法 consumer
,我们在kafka页面中向kafka-test中发送消息,该方法就会接受到我们发送的消息并且进行消费,如下
查看方法consumer
打印的日志
2020-07-08 13:15:14.304 INFO 31187 --- [ main] com.mengxi.demo.KafkaUtilTest : Read 0 records from Kafka.
2020-07-08 13:15:15.309 INFO 31187 --- [ main] com.mengxi.demo.KafkaUtilTest : Read 0 records from Kafka.
2020-07-08 13:15:16.314 INFO 31187 --- [ main] com.mengxi.demo.KafkaUtilTest : Read 0 records from Kafka.
2020-07-08 13:15:17.319 INFO 31187 --- [ main] com.mengxi.demo.KafkaUtilTest : Read 0 records from Kafka.
2020-07-08 13:15:18.321 INFO 31187 --- [ main] com.mengxi.demo.KafkaUtilTest : Read 0 records from Kafka.
2020-07-08 13:15:19.325 INFO 31187 --- [ main] com.mengxi.demo.KafkaUtilTest : Read 0 records from Kafka.
2020-07-08 13:15:20.327 INFO 31187 --- [ main] com.mengxi.demo.KafkaUtilTest : Read 0 records from Kafka.
2020-07-08 13:15:21.329 INFO 31187 --- [ main] com.mengxi.demo.KafkaUtilTest : Read 0 records from Kafka.
2020-07-08 13:15:22.332 INFO 31187 --- [ main] com.mengxi.demo.KafkaUtilTest : Read 0 records from Kafka.
2020-07-08 13:15:23.336 INFO 31187 --- [ main] com.mengxi.demo.KafkaUtilTest : Read 0 records from Kafka.
2020-07-08 13:15:23.548 INFO 31187 --- [ main] com.mengxi.demo.KafkaUtilTest : records info ----> ConsumerRecord(topic = kafka-test, partition = 0, offset = 1, CreateTime = 1594185323014, checksum = 2904038765, serialized key size = 7, serialized value size = 11, key = message, value = mengxi test)
2020-07-08 13:15:23.549 INFO 31187 --- [ main] com.mengxi.demo.KafkaUtilTest : Read 1 records from Kafka.
2020-07-08 13:15:24.554 INFO 31187 --- [ main] com.mengxi.demo.KafkaUtilTest : Read 0 records from Kafka.
2020-07-08 13:15:25.555 INFO 31187 --- [ main] com.mengxi.demo.KafkaUtilTest : Read 0 records from Kafka.
2020-07-08 13:15:26.557 INFO 31187 --- [ main] com.mengxi.demo.KafkaUtilTest : Read 0 records from Kafka.
2020-07-08 13:15:27.561 INFO 31187 --- [ main] com.mengxi.demo.KafkaUtilTest : Read 0 records from Kafka.
2020-07-08 13:15:28.563 INFO 31187 --- [ main] com.mengxi.demo.KafkaUtilTest : Read 0 records from Kafka.
2020-07-08 13:15:29.566 INFO 31187 --- [ main] com.mengxi.demo.KafkaUtilTest : Read 0 records from Kafka.
2020-07-08 13:15:30.567 INFO 31187 --- [ main] com.mengxi.demo.KafkaUtilTest : Read 0 records from Kafka.
可以看到刚才我们在页面中发送的消息,已经接受到了。
records info ----> ConsumerRecord(topic = kafka-test, partition = 0, offset = 1, CreateTime = 1594185323014, checksum = 2904038765, serialized key size = 7, serialized value size = 11, key = message, value = mengxi test)
这条信息就是刚才发送的消息,我们可以取出当中的value值进行响应的数据处理。
以上就是SpringBoot集成阿里云Kafka组件(公网)版本的示例内容,大家可以去尝试一下,如有问题可留言。
- 我的微信
- 加好友一起交流!
-
- 微信公众号
- 关注公众号获取分享资源!
-