博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spring Boot 中使用 kafka
阅读量:7097 次
发布时间:2019-06-28

本文共 4101 字,大约阅读时间需要 13 分钟。

Kafka 是一种高吞吐的分布式发布订阅消息系统,能够替代传统的消息队列用于解耦合数据处理,缓存未处理消息等,同时具有更高的吞吐率,支持分区、多副本、冗余,因此被广泛用于大规模消息数据处理应用。Kafka 支持Java 及多种其它语言客户端,可与Hadoop、Storm、Spark等其它大数据工具结合使用。

准备

环境安装

测试用例

Github 代码

代码我已放到 Github ,导入spring-boot-kafka 项目

github

添加依赖

在项目中添加 kafka-clients 依赖

org.apache.kafka
kafka-clients
0.10.2.0
org.springframework.kafka
spring-kafka
复制代码

启用 kafka

@Configuration@EnableKafkapublic class KafkaConfiguration {}复制代码

消息生产者

@Componentpublic class MsgProducer {    private static final Logger log = LoggerFactory.getLogger(MsgProducer.class);    @Autowired    private KafkaTemplate
kafkaTemplate; public void sendMessage(String topicName, String jsonData) { log.info("向kafka推送数据:[{}]", jsonData); try { kafkaTemplate.send(topicName, jsonData); } catch (Exception e) { log.error("发送数据出错!!!{}{}", topicName, jsonData); log.error("发送数据出错=====>", e); } //消息发送的监听器,用于回调返回信息 kafkaTemplate.setProducerListener(new ProducerListener
() { @Override public void onSuccess(String topic, Integer partition, String key, String value, RecordMetadata recordMetadata) { } @Override public void onError(String topic, Integer partition, String key, String value, Exception exception) { } @Override public boolean isInterestedInSuccess() { log.info("数据发送完毕"); return false; } }); }}复制代码

消息消费者

@Componentpublic class MsgConsumer {    @KafkaListener(topics = {
"topic-1","topic-2"}) public void processMessage(String content) { System.out.println("消息被消费"+content); } }复制代码

参数配置

application.properties

#kafka# 指定kafka 代理地址,可以多个spring.kafka.bootstrap-servers=YZ-PTEST-APP-HADOOP-02:9092,YZ-PTEST-APP-HADOOP-04:9092# 指定listener 容器中的线程数,用于提高并发量spring.kafka.listener.concurrency=3# 每次批量发送消息的数量spring.kafka.producer.batch-size=1000# 指定默认消费者group idspring.kafka.consumer.group-id=myGroup# 指定默认topic idspring.kafka.template.default-topic=topic-1复制代码

启动服务

@SpringBootApplication@ComponentScan(value = {
"io.ymq.kafka"})public class Startup { public static void main(String[] args) { SpringApplication.run(Startup.class, args); }}复制代码

单元测试

import io.ymq.kafka.MsgProducer;import io.ymq.kafka.run.Startup;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;/** * 描述: 测试 kafka * * @author yanpenglei * @create 2017-10-16 18:45 **/@RunWith(SpringRunner.class)@SpringBootTest(classes = Startup.class)public class BaseTest {    @Autowired    private MsgProducer msgProducer;    @Test    public void test() throws Exception {        msgProducer.sendMessage("topic-1", "topic--------1");        msgProducer.sendMessage("topic-2", "topic--------2");    }}复制代码

消息生产者,响应

2017-10-17 15:54:44.814  INFO 2960 --- [           main] io.ymq.kafka.MsgProducer                 : 向kafka推送数据:[topic--------1]2017-10-17 15:54:44.860  INFO 2960 --- [           main] io.ymq.kafka.MsgProducer                 : 向kafka推送数据:[topic--------2]2017-10-17 15:54:44.878  INFO 2960 --- [ad | producer-1] io.ymq.kafka.MsgProducer                 : 数据发送完毕2017-10-17 15:54:44.878  INFO 2960 --- [ad | producer-1] io.ymq.kafka.MsgProducer                 : 数据发送完毕复制代码

消息消费者,响应

消息被消费topic--------1消息被消费topic--------2复制代码

代码我已放到 Github ,导入spring-boot-kafka 项目

github

遇到一些坑

[2017-10-16 19:20:08.340] - 14884 严重 [main] --- org.springframework.kafka.support.LoggingProducerListener: Exception thrown when sending a message with key='null' and payload='topic--------2' to topic topic-2:复制代码

经调试发现 kafka 连接是用的主机名,所以修改 hosts

C:\Windows\System32\drivers\etc\hosts10.32.32.149 YZ-PTEST-APP-HADOOP-0210.32.32.154 YZ-PTEST-APP-HADOOP-04复制代码

Contact

  • 作者:鹏磊
  • 出处:
  • Email:
  • 版权归作者所有,转载请注明出处
  • Wechat:关注公众号,搜云库,专注于开发技术的研究与知识分享

搜云库

你可能感兴趣的文章
数据交换
查看>>
DIY现榨鲜果汁大全,IT工作者的福利
查看>>
linux 历史
查看>>
人人需具备的基本信息素养视频课程【荷露叮咚】免费
查看>>
zabbix升级版本过程(7)
查看>>
linux驱动之ioctl
查看>>
我的友情链接
查看>>
jxl导出excel
查看>>
三大WEB服务器对比分析(apache,lighttpd,nginx)
查看>>
redhat nginx 安装
查看>>
WSE convert to ESE, or ESE to WSE.
查看>>
CentOs最小化安装之后安装桌面环境及中文支持
查看>>
加载定义界面
查看>>
SEO基础知识
查看>>
手机通讯录导入工具
查看>>
Java虚拟机精讲之内存分配与垃圾回收
查看>>
hadoop常见命令
查看>>
each用法(网络摘抄)
查看>>
菜鸟也可以自己封装万能ghost系统
查看>>
路由心生
查看>>