spring kakfa如何集成

99次阅读
没有评论

共计 3789 个字符,预计需要花费 10 分钟才能阅读完成。

丸趣 TV 小编给大家分享一下 spring kakfa 如何集成,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!

一、生产端

1.1 kafka-producer.xml 配置说明

!-- spring 的属性加载器,加载多个 properties 文件中的属性  ,  如果只有一个 properties 文件则用 context / 就行了,用了这个加载器过后不用在其他 xml 中再使用了 -- 
 bean id= propertyConfigurer 
  >
 !--  定义 producer 的参数  -- 
 bean id= producerProperties   >

1.2 kafka-producer.properties 属性文件

bootstrap.servers=192.168.0.75:9092,192.168.0.75:9093,192.168.0.75:9094
group.id=testGroup
retries=1
batch.size=16384
linger.ms=1
buffer.memory=33554432
defaultTopic=topic-test

1.3 生产端接口封装说明:

1)类名:
com.rkhd.ienterprise.kafka.producer.KafkaProducerServer

2)方法:

/**
 *  发送信息(不分区) * @param data  要发送的数据
 * @return  返回一个 map。如果成功 code 为 0, 其他则为失败
 */
public Map String, Object  sendDefault(Object data);
/**
 *  发送信息 (不分区)
 * @param key  要发送的键
 * @param data  要发送的数据
 * @return  返回一个 map。如果成功 code 为 0, 其他则为失败
 */
public Map String, Object  sendDefault(Object key, Object data);
/**
 *  发送信息 (分区)
 * @param partitionNum  分区数 (大于 1), 请注意分区数是在 topic 创建的时候就指定了,不能改变了
 * @param key  要发送的键
 * @param data  要发送的数据
 * @return  返回一个 map。如果成功 code 为 0, 其他则为失败
 */
public Map String, Object  sendDefault(int partitionNum, Object key, Object data);
/**
 *  发送信息 (不分区)
 * @param topic  发送目的 topic 名称, 如果 topic 为 null 或者是为 , 则会使用 xml 中配置的 defaultTopic
 * @param data  要发送的数据
 * @return  返回一个 map。如果成功 code 为 0, 其他则为失败
 */
public Map String, Object  sendMessage(String topic, Object data);
/**
 *  发送信息 (不分区)
 * @param topic  发送目的 topic 名称, 如果 topic 为 null 或者是为 , 则会使用 xml 中配置的 defaultTopic
 * @param key  要发送的键
 * @param data  要发送的数据
 * @return  返回一个 map。如果成功 code 为 0, 其他则为失败
 * */
public Map String, Object  sendMessage(String topic, Object key, Object data);
/**
 *  发送信息 (分区)
 * @param topic  发送目的 topic 名称, 如果 topic 为 null 或者是为 , 则会使用 xml 中配置的 defaultTopic
 * @param partitionNum  分区数 (大于 1), 请注意分区数是在 topic 创建的时候就指定了,不能改变了
 * @param data  要发送的数据
 * @return  返回一个 map。如果成功 code 为 0, 其他则为失败
 */
public Map String, Object  sendMessage(String topic, Integer partitionNum, Object data);
/**
 *  发送信息 (分区)
 * @param topic  发送目的 topic 名称, 如果 topic 为 null 或者是为 , 则会使用 xml 中配置的 defaultTopic
 * @param key  要发送的键
 * @param value  要发送的数据
 * @param partitionNum  分区数 (大于 1), 请注意分区数是在 topic 创建的时候就指定了,不能改变了
 * @return  返回一个 map。如果成功 code 为 0, 其他则为失败
 * */
public Map String, Object  sendMessage(String topic, int partitionNum, Object key, Object value);

二、消费端

2.1 kafka-consumer.xml 配置说明

!--  定义 consumer 的参数  -- 
 bean id= consumerProperties   >
 
property name= ackCount  value= 90 / -- 
  !-- property name= ackMode  value= TIME / 
  property name= ackTime  value= 5000 / -- 
 /bean 
 !--  创建单实例 KafkaMessageListenerContainer--
!-- bean id= messageListenerContainer_trade   >
 !--  创建多实例 ConcurrentMessageListenerContainer--
bean id= messageListenerContainer   >

2.2 kafka-consumer.properties 属性文件

bootstrap.servers=192.168.0.75:9092,192.168.0.75:9093,192.168.0.75:9094
group.id=testGroup
enable.auto.commit=false
auto.commit.interval.ms=1000
session.timeout.ms=15000
topicName=ahao-test

2.3 消费端接口封装说明

1)类名:com.rkhd.ienterprise.mq.client.consumer.client.KafkaConsumerClient

2)对外提供抽象方法(根据不同的业务实现):

public abstract void onConsumer(ConsumerRecord String, String  record);

3)实现说明:各业务线通过继承该类实现该抽象方法;

三、Kafka 技术概览

3.1 Kafka 的特性

高吞吐量、低延迟:kafka 每秒可以处理几十万条消息,它的延迟最低只有几毫秒

可扩展性:kafka 集群支持热扩展

持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失

容错性:允许集群中节点失败(若副本数量为 n, 则允许 n - 1 个节点失败)

高并发:支持数千个客户端同时读写

3.2 Kafka 架构组件 

       Kafka 中发布订阅的对象是 topic。我们可以为每类数据创建一个 topic,把向 topic 发布消息的客户端称作 producer,从 topic 订阅消息的客户端称作 consumer。Producers 和                consumers 可以同时从多个 topic 读写数据。一个 kafka 集群由一个或多个 broker 服务器组成,它负责持久化和备份具体的 kafka 消息。

topic:消息存放的目录即主题

Producer:生产消息到 topic 的一方

Consumer:订阅 topic 消费消息的一方

Broker:Kafka 的服务实例就是一个 broker

3.3 kafka 应用场景

日志收集:一个公司可以用 Kafka 可以收集各种服务的 log,通过 kafka 以统一接口服务的方式开放给各种 consumer,例如 hadoop、Hbase、Solr 等。

消息系统:解耦和生产者和消费者、缓存消息等。

用户活动跟踪:Kafka 经常被用来记录 web 用户或者 app 用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到 kafka 的 topic 中,然后订阅者通过订阅这些 topic 来做实时的监控分析,或者装载到 hadoop、数据仓库中做离线分析和挖掘。

运营指标:Kafka 也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。

流式处理:比如 spark streaming 和 storm

事件源

以上是“spring kakfa 如何集成”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注丸趣 TV 行业资讯频道!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计3789字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)