Kafka发源于LinkedIn,于2011年成为Apache的孵化项目,随后于2012年成为Apache的主要项目之一。Kafka使用Scala和Java进行编写。Apache Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统。Kafka具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。关于kafka的基本概念,建议去kafka官方文档了解一下。在此声明一下,我本人在工作开发中也没有使用过kafka,这篇文章是建立在我自己的了解上写的,可能有一些不准确的地方。而且,本篇文章只展示如何使用Spring Boot集成kafka,至于kafka的重要特性,比如高吞吐量、故障转移等,我自己了解的也不是很清楚,现在就不献丑了,之后希望可以写一个kafka系列的文章。本篇文章,kafka版本使用的是2.0.0。
1. 项目结构
| pom.xml
| springboot-14-kafka.iml
|
+---kafka-consumer
| | kafka-consumer.iml
| | pom.xml
| |
| +---src
| | +---main
| | | +---java
| | | | \---com
| | | | \---zhuoli
| | | | \---service
| | | | \---springboot
| | | | \---kafka
| | | | \---consumer
| | | | | KafkaConsumerApplicationContext.java
| | | | |
| | | | +---common
| | | | | Listener.java
| | | | |
| | | | \---config
| | | | KafkaConsumerConfig.java
| | | |
| | | \---resources
| | | application.properties
| | |
| | \---test
| | \---java
\---kafka-producer
| kafka-producer.iml
| pom.xml
|
+---src
| +---main
| | +---java
| | | \---com
| | | \---zhuoli
| | | \---service
| | | \---springboot
| | | \---kafka
| | | \---producer
| | | | KafkaProducerApplicationContext.java
| | | |
| | | +---common
| | | | \---request
| | | | SendKafkaRequest.java
| | | |
| | | +---config
| | | | KafkaProducerConfig.java
| | | |
| | | +---controller
| | | | MessageController.java
| | | |
| | | \---service
| | | | MessageControllerService.java
| | | |
| | | \---impl
| | | MessageControllerServiceImpl.java
| | |
| | \---resources
| | application.properties
| |
| \---test
| \---java
这里简单讲一下各个module的功能:
- kafka-producer:对外暴露http接口,通过http接口调用,想kafka broker发送消息。
- kafka-consumer:订阅相应kafka topic,读取消息
2. pom.xml
所有的maven配置都写在父module的pom.xml中,所以对所有子module生效,如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.zhuoli.service</groupId>
<artifactId>springboot-14-kafka</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>kafka-producer</module>
<module>kafka-consumer</module>
</modules>
<!-- Spring Boot 启动父依赖 -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.7.RELEASE</version>
</parent>
<dependencies>
<!-- Exclude Spring Boot's Default Logging -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
3. kafka producer
3.1 application.properties
#kafka
kafka.producer.servers=115.47.149.48:9092
kafka.producer.retries=0
kafka.producer.batch.size=4096
kafka.producer.linger=1
kafka.producer.buffer.memory=40960
主要定义一些变量,供KafkaProducerConfig配置类中使用。
3.2 KafkaProducerConfig配置类
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Value("${kafka.producer.servers}")
private String servers;
@Value("${kafka.producer.retries}")
private int retries;
@Value("${kafka.producer.batch.size}")
private int batchSize;
@Value("${kafka.producer.linger}")
private int linger;
@Value("${kafka.producer.buffer.memory}")
private int bufferMemory;
private Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<String, Object>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
private ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<String, String>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory());
}
}
配置类主要提供了一个KafkaTemplate的Bean,这里有一点需要重点提一下,我在kafka2.0的文档上看到,producer配置broker server列表时,配置方式如下:
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
//ProducerConfig.BOOTSTRAP_SERVERS_CONFIG为常量,等价于
props.put("bootstrap.servers", servers);
但是在kafka0.8.0的文档中,producer配置broker server列表时,配置方式如下:
props.put("metadata.broker.list", servers);
后来查看了官方文档,了解到,两种方式都是用来配置producer配置broker server列表的,但是”metadata.broker.list”在kafka0.8.0之后就过期了,在配置producer时broker server列表时统一使用”bootstrap.servers”配置,官方文档对”bootstrap.servers”配置说明如下:
A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form host1:port1,host2:port2,…. Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).
3.3 生产消息服务
@Service
@AllArgsConstructor
public class MessageControllerServiceImpl implements MessageControllerService {
private KafkaTemplate kafkaTemplate;
@Override
public void sendMessage(String topic, String content) {
kafkaTemplate.send(topic, content);
}
}
通过调用kafkaTemplate的send方法,将content消息发送到相应的topic
3.4 controller对外暴露http服务,生产消息
@RestController
@AllArgsConstructor
@RequestMapping("/kafka")
@Slf4j
public class MessageController {
private MessageControllerService messageControllerService;
private ObjectMapper objectMapper;
@RequestMapping(value = "/send", method = RequestMethod.POST)
public ResponseEntity sendKafka(@RequestBody SendKafkaRequest request) {
try {
request.setSendTime(new Date());
log.info("kafka的消息={}", objectMapper.writeValueAsString(request));
messageControllerService.sendMessage("zhuoli", objectMapper.writeValueAsString(request));
log.info("发送kafka成功.");
return ResponseEntity.status(HttpStatus.OK).body("message send success");
} catch (Exception e) {
log.info("Exception is {}", e);
return ResponseEntity.status(HttpStatus.EXPECTATION_FAILED).body("exception");
}
}
}
就是将request通过objectMapper转化成Json字符串,然后通过MessageControllerService服务的sendMessage方法,将消息发送到相应的topic。
4. kafka consumer
4.1 application.properties
#kafka
kafka.consumer.servers=115.47.149.48:9092
kafka.consumer.enable.auto.commit=true
kafka.consumer.session.timeout=6000
kafka.consumer.auto.commit.interval=100
kafka.consumer.auto.offset.reset=latest
kafka.consumer.group.id=zhuoli
kafka.consumer.concurrency=10
作用跟kafka producer的application.properties一致,用于KafkaConsumerConfig配置类中使用。
4.2 KafkaConsumerConfig配置类
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.consumer.servers}")
private String servers;
@Value("${kafka.consumer.enable.auto.commit}")
private boolean enableAutoCommit;
@Value("${kafka.consumer.session.timeout}")
private String sessionTimeout;
@Value("${kafka.consumer.auto.commit.interval}")
private String autoCommitInterval;
@Value("${kafka.consumer.group.id}")
private String groupId;
@Value("${kafka.consumer.auto.offset.reset}")
private String autoOffsetReset;
@Value("${kafka.consumer.concurrency}")
private int concurrency;
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(1500);
return factory;
}
private ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
private Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return propsMap;
}
@Bean
public Listener listener() {
return new Listener();
}
}
配置类主要提供了KafkaListenerContainerFactory的一个Bean,和自定义的Bean Listener,自定义的Listener主要用于订阅相应topic,获取消息。跟KafkaProducerConfig配置类一致,在kafka0.8.0之后的版本,consumer在与kafka broker连接时发生较大改变。kafka2.0的配置方式如下:
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
//ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG为常量,等价于
propsMap.put("bootstrap.servers", servers);
而且servers其实是broker服务地址列表,格式为ip1:prot1,ip2:port2…… 在kafka0.8.0consumer配置方式如下:
propsMap.put("zookeeper.connect", servers);
可以发现,其实是通过zookeeper连接的。我也非常疑惑,为什么不通过zookeeper连接,而直接通过broker地址连接。后来在文档上查看到kafka0.8.0之后,消息偏移量从维护在zookeeper转移到broker。Kafka消费者需要将偏移量提交给kafka并从kafka获取偏移量,因为kafka将偏移量存储从zookeeper转移到kafka broker,kafka-consumer不需要直接与zookeeper通信,因此新的kafka消费者不需要配置zookeeper。 但是kafka消费者总是需要连接到kafka代理(集群)来将请求发送到服务器,bootstrap-server只是这个集群的一些代理,使用它,消费者可以找到所有的代理。
4.3 消费消息
@Slf4j
public class Listener {
@KafkaListener(topics = {"zhuoli"})
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
log.info("----------------- record =" + record);
log.info("------------------ message =" + message);
}
}
}
订阅卓立topic,消费消息
5. 测试
测试前一定要修改一下本地host文件,因为kafka是通过机器名连接的,假如不修改host文件,会报如下错误:
ERROR 118864 --- [ad | producer-1] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload=''
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for zhuoli-0 due to 30014 ms has passed since batch creation plus linger time
windows修改本地host文件方法如下:
打开C:\Windows\System32\drivers\etc\hosts,添加一行记录,如下:
格式如:broker ip地址 broker机器名
10.32.32.149 YZ-PTEST-APP-HADOOP-02
mac和ubuntu hosts文件的位置都是/etc/hosts,修改方式跟windows一致。
postman调用kafka-producer暴露的http接口,发送消息到kafka
kafka-producer控制台日志,显示消息发送成功。
kafka-consumer控制台日志,表名消息消费成功,record与message内容如下:
----------------- record =ConsumerRecord(topic = zhuoli, partition = 0, offset = 3, CreateTime = 1536116121933, checksum = 2020089868, serialized key size = -1, serialized value size = 74, key = null, value = {"id":6,"message":"send by java client message6","sendTime":1536116121582})
------------------ message ={"id":6,"message":"send by java client message6","sendTime":1536116121582}
在kafka服务端,通过命令消费zhuoli Topic,如下:
与控制台显示一致,表名消息消费成功了
示例代码:码云 – 卓立 – SpringBoot + Kafka集成示例
参考链接: