前文介绍了 kafka 的相关特性和原理,这一节我们将学习怎么在springboot中使用kafka;
首先导入依赖
1 | <dependency> |
然后启动项添加注解 @EnableScheduling
,@EnableKafka
。第一个注解是用来添加springboot定时任务以方便测试,第二个注解是装配kafka 配置。
接下来我们要在 application 的配置文件:
1 | ## 生产者配置 |
注册一个 AdminClient
:
1 |
|
这里因为是demo,我就将生产者和消费者写在一个程序里面了。
先测试一个简单的收发消息:
1 |
|
这里我调用了kafkaTemplate.send
方法发送消息,第一个参数是消息的主题,第二个参数是消息.
这里我并没有先创建主题,直接往主题里面发消息了,框架会给你直接创建一个默认的主题.
我们也可以直接创建一个主题:
1 |
|
当然像 rabbitMQ 的api 那样,spring boot 还非常贴心的准备了 topic 建造者类:
1 |
|
还可以通过 AdminClient 创建主题:
1 |
|
第一个参数是主题名称,第二个参数是分区数,第三个分区是副本数(包括leader).
我们可以通过 AdminClient
查看 主题信息:
1 | public String getTopic() throws ExecutionException, InterruptedException { |
ListTopicsResult
的方法返回值都是 Future
类型的,这意味这它是异步的,使用的时候需要注意这一点.
和rabbitMQ 类似,kafka 给我们准备了一个默认主题:
1 |
|
这条消息会被发送到名为 topic.quick.default
的主题当中去.
我们要注意 kafkaTemplate.send
它的返回值是ListenableFuture
,从名字我们就能知道它实际上是一个异步的方法,
我们可以通过 ListenableFuture.addCallback
方法去指定回调函数:
1 |
|
我们也可以通过 ListenableFuture.get
方法让它阻塞:
1 | // @Scheduled(cron = "*/15 * * * * ?") |
kafka 事务消息
Spring-kafka自动注册的KafkaTemplate实例是不具有事务消息发送能力的。需要配置属性:
1 | spring.kafka.producer.acks=-1 |
当激活事务时 kafkaTemplate 就只能发送事务消息了,发送非事务的消息会报异常。
发送事务消息的方法有两种,一种是通过 kafkaTemplate.executeInTransaction 实现,一种是通过 spring的注解 @Transactional
来实现,代码示例:
1 |
|
消费者Ack
消费者消息消息可以自动确认,也可以通过手动确认,开启手动首先需要关闭自动提交,然后设置下consumer的消费模式:
1 | false = |
配置完成之后我们需要对消费者监听器做一点小改动:
1 |
|
如你所见,我们可以通过 Acknowledgment.acknowledge()
来手动的确认消息的消费,不确认就不算消费成功,监听器会再次收到这个消息。
对于某些业务场景这个功能还是很必要的,比如消费消息的同时导致写库异常,数据库回滚,那么消息也不应该被ack。
消费者监听器生命周期控制
消费者监听器有三个生命周期:启动、停止、继续;如果我们想控制消费者监听器生命周期,需要修改@KafkaListener
的 autoStartup
属性为false,
并给监听器 id 属性赋值
然后通过KafkaListenerEndpointRegistry
控制id 对应的监听器的启动停止继续:
1 | import org.springframework.stereotype.Service; |
通过观察窗口输出就能看到,生产者生产了20条数据后消费者监听器才开始启动消费。
消息转发
kafka 消费者可以将消费到的消息转发到指定的主题中去,比如一条消息需要经过多次流转加工才能走完整个业务流程,需要多个consumer来配合完成。
转发代码示例如下:
1 |
|
生产者获取消费者响应
结合 @sendTo注解
和 ReplyingKafkaTemplate
类 生产者可以获取消费者消费消息的结果;
因为 ReplyingKafkaTemplate 是kafkaTemplate 的一个子类,当你往spring 容器注册 这个bean,
kafkaTemplate 的自动装配就会关闭,但是kafkaTemplate 是必须的,因此你需要把这两个bean 都手动注册上。
配置示例:
1 |
|
生产者接收消费者返回值(这俩最好不要开到一个应用中,否则会很容易生产者超时,观察不到返回的结果):
1 |
|