共计 4551 个字符,预计需要花费 12 分钟才能阅读完成。
本篇文章给大家分享的是有关如何实现 Kafka 的入门,丸趣 TV 小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着丸趣 TV 小编一起来看看吧。
一、入门 1. 简介
Kafka is a distributed, partitioned, replicated commit log service。它提供了类似于 JMS 的特性,但是在设计实现上完全不同,此外它并不是 JMS 规范的实现。kafka 对消息保存时根据 Topic 进行归类,发送消息者成为 Producer, 消息接受者成为 Consumer, 此外 kafka 集群有多个 kafka 实例组成,每个实例 (server) 成为 broker。无论是 kafka 集群,还是 producer 和 consumer 都依赖于 zookeeper 来保证系统可用性集群保存一些 meta 信息。
下面这张图描述更准确。
主要特性:
1)消息持久化
要从大数据中获取真正的价值,那么不能丢失任何信息。Apache Kafka 设计上是时间复杂度 O(1) 的磁盘结构,它提供了常量时间的性能,即使是存储海量的信息(TB 级)。
2)高吞吐
记住大数据,Kafka 的设计是工作在标准硬件之上,支持每秒数百万的消息。
3)分布式
Kafka 明确支持在 Kafka 服务器上的消息分区,以及在消费机器集群上的分发消费,维护每个分区的排序语义。
4)多客户端支持
Kafka 系统支持与来自不同平台(如 java、.NET、PHP、Ruby 或 Python 等)的客户端相集成。
5)实时
生产者线程产生的消息对消费者线程应该立即可见,此特性对基于事件的系统(比如 CEP 系统)是至关重要的。
2. 概念 Topics/logs
一个 Topic 可以认为是一类消息,每个 topic 将被分成多个 partition(区), 每个 partition 在存储层面是 append log 文件。任何发布到此 partition 的消息都会被直接追加到 log 文件的尾部,每条消息在文件中的位置称为 offset(偏移量),offset 为一个 long 型数字,它是唯一标记一条消息。它唯一的标记一条消息。kafka 并没有提供其他额外的索引机制来存储 offset,因为在 kafka 中几乎不允许对消息进行“随机读写”。
kafka 和 JMS 实现 (activeMQ) 不同的是: 即使消息被消费, 消息仍然不会被立即删除. 日志文件将会根据 broker 中的配置要求, 保留一定的时间之后删除; 比如 log 文件保留 2 天, 那么两天后, 文件会被清除, 无论其中的消息是否被消费.kafka 通过这种简单的手段, 来释放磁盘空间, 以及减少消息消费之后对文件内容改动的磁盘 IO 开支.
对于 consumer 而言, 它需要保存消费消息的 offset, 对于 offset 的保存和使用, 有 consumer 来控制; 当 consumer 正常消费消息时,offset 将会 线性 的向前驱动, 即消息将依次顺序被消费. 事实上 consumer 可以使用任意顺序消费消息, 它只需要将 offset 重置为任意值..(offset 将会保存在 zookeeper 中, 参见下文)
kafka 集群几乎不需要维护任何 consumer 和 producer 状态信息, 这些信息有 zookeeper 保存; 因此 producer 和 consumer 的客户端实现非常轻量级, 它们可以随意离开, 而不会对集群造成额外的影响.
partitions 的设计目的有多个. 最根本原因是 kafka 基于文件存储. 通过分区, 可以将日志内容分散到多个 server 上, 来避免文件尺寸达到单机磁盘的上限, 每个 partiton 都会被当前 server(kafka 实例)保存; 可以将一个 topic 切分多任意多个 partitions, 来消息保存 / 消费的效率. 此外越多的 partitions 意味着可以容纳更多的 consumer, 有效提升并发消费的能力.(具体原理参见下文).
Distribution
一个 Topic 的多个 partitions, 被分布在 kafka 集群中的多个 server 上; 每个 server(kafka 实例)负责 partitions 中消息的读写操作; 此外 kafka 还可以配置 partitions 需要备份的个数(replicas), 每个 partition 将会被备份到多台机器上, 以提高可用性.
基于 replicated 方案, 那么就意味着需要对多个备份进行调度; 每个 partition 都有一个 server 为 leader leader 负责所有的读写操作, 如果 leader 失效, 那么将会有其他 follower 来接管(成为新的 leader);follower 只是单调的和 leader 跟进, 同步消息即可.. 由此可见作为 leader 的 server 承载了全部的请求压力, 因此从集群的整体考虑, 有多少个 partitions 就意味着有多少个 leader ,kafka 会将 leader 均衡的分散在每个实例上, 来确保整体的性能稳定.
Producers
Producer 将消息发布到指定的 Topic 中, 同时 Producer 也能决定将此消息归属于哪个 partition; 比如基于 round-robin 方式或者通过其他的一些算法等.
Consumers
本质上 kafka 只支持 Topic. 每个 consumer 属于一个 consumer group; 反过来说, 每个 group 中可以有多个 consumer. 发送到 Topic 的消息, 只会被订阅此 Topic 的每个 group 中的一个 consumer 消费.
如果所有的 consumer 都具有相同的 group, 这种情况和 queue 模式很像; 消息将会在 consumers 之间负载均衡.
如果所有的 consumer 都具有不同的 group, 那这就是 发布 - 订阅 消息将会广播给所有的消费者.
在 kafka 中, 一个 partition 中的消息只会被 group 中的一个 consumer 消费; 每个 group 中 consumer 消息消费互相独立; 我们可以认为一个 group 是一个 订阅 者, 一个 Topic 中的每个 partions, 只会被一个 订阅者 中的一个 consumer 消费, 不过一个 consumer 可以消费多个 partitions 中的消息.kafka 只能保证一个 partition 中的消息被某个 consumer 消费时, 消息是顺序的. 事实上, 从 Topic 角度来说, 消息仍不是有序的.
kafka 的设计原理决定, 对于一个 topic, 同一个 group 中不能有多于 partitions 个数的 consumer 同时消费, 否则将意味着某些 consumer 将无法得到消息.
Guarantees
1) 发送到 partitions 中的消息将会按照它接收的顺序追加到日志中
2) 对于消费者而言, 它们消费消息的顺序和日志中消息顺序一致.
3) 如果 Topic 的 replication factor 为 N, 那么允许 N - 1 个 kafka 实例失效.
3. 适用场景 1、Messaging
对于一些常规的消息系统,kafka 是个不错的选择;partitons/replication 和容错, 可以使 kafka 具有良好的扩展性和性能优势. 不过到目前为止, 我们应该很清楚认识到,kafka 并没有提供 JMS 中的 事务性 消息传输担保(消息确认机制) 消息分组 等企业级特性;kafka 只能使用作为 常规 的消息系统, 在一定程度上, 尚未确保消息的发送与接收绝对可靠(比如, 消息重发, 消息发送丢失等)
2、Websit activity tracking
kafka 可以作为 网站活性跟踪 的最佳工具; 可以将网页 / 用户操作等信息发送到 kafka 中. 并实时监控, 或者离线统计分析等
3、Metrics
Kafka 通常被用于可操作的监控数据。这包括从分布式应用程序来的聚合统计用来生产集中的运营数据提要。
4、Log Aggregation
kafka 的特性决定它非常适合作为 日志收集中心 application 可以将操作日志 批量 异步 的发送到 kafka 集群中, 而不是保存在本地或者 DB 中;kafka 可以批量提交消息 / 压缩消息等, 这对 producer 端而言, 几乎感觉不到性能的开支. 此时 consumer 端可以使 hadoop 等其他系统化的存储和分析系统.
4. 命令
1. 启动 Server
Kafka 依赖 ZK 服务
nohup bin/kafka-server-start.sh config/server.properties
2. 创建 Topic
bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic page_visits
3. 查看命令
bin/kafka-topics.sh –list –zookeeper localhost:2181
4. 发送消息
bin/kafka-console-producer.sh –broker-list localhost:9092 –topic page_visits
5. 消费消息
bin/kafka-console-consumer.sh –zookeeper localhost:2181 –topic page_visits –from-beginning
6. 多 Broker 方式
bin/kafka-server-start.sh config/server-1.properties
bin/kafka-server-start.sh config/server-2.properties
bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 3 –partitions 1 –topic visits
bin/kafka-topics.sh –describe –zookeeper localhost:2181 –topic visits
bin/kafka-console-producer.sh –broker-list localhost:9092 –topic visits
my message test1
my message test2
bin/kafka-console-consumer.sh –zookeeper localhost:2181 –from-beginning –topic visits
7. 停止服务
pkill -9 -f config/server.properties
8. 删除无用的 topic
bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand –topic visits –zookeeper sjxt-hd02:2181,sjxt-hd03:2181,sjxt-hd04:2181
beta in 0.8.1
bin/kafka-topics.sh --zookeeper zk_host:port --delete --topic my_topic_name
以上就是如何实现 Kafka 的入门,丸趣 TV 小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注丸趣 TV 行业资讯频道。