Kafka 和 RabbitMQ 对比

Kafka 和 RabbitMQ 对比 Recommended

7月 11, 2020
Database, Recommended, 系统设计

消息队列中间件(Message Queue Middleware,简称 MQ)在后端领域扮演很重要的角色,但是笔者之前日常多止步于使用,对机制原理研究很少。之前用 Python 写 Web 服务的时候,用到 RabbitMQ 的情况比较多,可能是因为有一个基于 Python 的消息队列的封装库 Celery 非常好用。后来写 Go 服务的时候,接入了公司整体的技术架构,Kafka 又用的比较多。这里接针对两种消息中间件专门对比一下各自的底层机制,方便在业务场景中做决策。

RabbitMQ #

RabbitMQ 服务器是用 Erlang 语言编写的开源软件,目前由 Pivotal 公司赞助,主要支持高级消息队列(AMQP) 0-9-1 版本,但可以通过插件支持 1.0 版本协议,同时 RabbitMQ 通过插件的形式也支持 MQTT 协议和 STOMP 协议。值得再说一句,RabbitMQ 有自带的后台管理界面。

AMQP #

高级消息队列协议(Advanced Message Queuing Protocol,AMQP)是一种二进制应用层协议,用于应对广泛的面向消息应用程序的支持。协议提供了消息流控制,保证的一个消息对象的传递过程1

RabbitMQ 的客户端也是以 AMQ 协议为规范通过 RPC 方式以帧(Frame)的数据结构进行通信的,帧的结构大致了解下:

  • 帧类型
  • 信道标号(channel🆔 )
  • 以字节为单位的帧大小
  • 帧有效载荷(payload)
  • 结束标记(ACSII 值 206)

AMQP 规范定义了五种类型的帧:协议头帧、方法帧、内容头帧、消息体帧及心跳帧。发布到 RabbitMQ 中的单个消息由三种帧类型组成:供 Basic.Publish RPC 调用的方法帧、消息头帧,以及一个或多个消息体帧。

Broker #

broker 意思是消息中间件的服务节点,可以将一个 RabbitMQ broker 看做一个 RabbitMQ 服务器。在 Kafka 中也有 broker 的概念,也是相同的意思,它们都是消息队列模型中介于生产者和消费者之间的的消息“经纪人”、“代理人”。

Producing #

生产者在第一次建立连接(connection)并开始一个信道(channel)的时候,会声明(declare)一个交换器并设置类型等选项,同时声明一个队列也进行相关配置,最后通过 BindingKey 将交换器和队列绑定起来。

这里的每个信道都会被指派一个唯一的 ID,信道是建立在 Connection 之上的虚拟连接,RabbitMQ 处理的每条AMQP 指令都是通过信道完成的,通过信道可以复用 TCP 连接,避免频繁的建立和销毁 TCP 连接。

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

# 根据命令行的参数,发送消息
routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

## python emit_log_topic.py "kern.critical" "A critical kernel error"

RabbitMQ 提供了两种方式来确认生产者的消息已经被 RabbitMQ 收到:事务机制和发送确认(Publisher Confirm)机制,事务机制较为复杂,同时会严重降低消息吞吐量,发送方确认机制较为轻量。

Consuming #

消费者建立连接到 RabbitMQ Broker,建立一个连接(Connection),并开启一个信道(Channel)。消费者从队列里拉取消息进行消费,在收到消息后会进行消息确认(Ack),RabbitMQ 收到确认请求后会从队列中删除已被确认的消息。

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()

## python receive_logs_topic.py "kern.*" "*.critical"

队列里的每条消息只会发送给订阅列表里的一个消费者(通过模运算得到),这种方式非常适合扩展消费者的规模。RabbitMQ 的消费模式分两种:推(Push/Consume)模式和拉(Pull/Get)模式。

拉模式 #

当你的应用程序使用拉模式(Basic.Get) 请求来获取消息时,每次它想要接收消息就必须发送一个新的请求,即使队列中存在多个消息。当发出一个 Basic.Get,如果你想要获取消息的队列中有一条消息正处于等待处理状态,RabbitMQ 就会回应一个 Basic.GetOk 的 RPC 响应。

推模式 #

推模式通过使用 Basic.Consume RPC 命令来消费消息,你可以使用 RabbitMQ 注册你的应用程序,并告诉它在消费者可用时以异步方式向消费者发送消息。这通常被称为发布—订阅模式(publish-subscribe pattern,或 pub-sub)。与使用 Basic.Get 时与 RabbitMQ 创建的同步会话不同,使用 Basic.Consume 消费消息意味着你的应用程序会在消息可用时自动从 RabbitMQ 接收消息,直到客户端发出 Basic.Cancel 为止。

Routing #

生产者将消息发给交换器的时候,一般会指定一个 RoutingKey,用来指定消息的路由(routing)规则。

Binding #

RabbitMQ 通过绑定(binding)将交换器和队列关联起来,在绑定的时候一般会指定一个 BindingKey,这样 RabbitMQ 就可以知道如何正确地将消息路由到队列中。

BindingKey 和 RoutingKey 要一起使用才能最终生效,一些情况下,两者可以看做同一个东西,具体往下看。

Exchange #

所有的消息都通过生产者产生,它中间通过一个交换器(Exchange)将消息路由到一个或多个队列中,最后通过队列发给不同的消费者,交换器是将生产者的消息发到队列中的关键节点,是 RabbitMQ 中必须要理解的概念之一,交换器一共有 fanoutdirecttopicheaders 四种消息路由方式(exchange_type),下面进行一一介绍。

fanout #

它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。fanout 是一个典型的生产消费(Producer/Consumer)模式。

img

direct #

它会把消息路由到 BindingKey 和 RoutingKey 完全匹配的队列中。direct 是一个典型的发布订阅(Publish/Subscribe)模式。

topic #

因为 direct 中的 key 值都是确定的,在实际应用中可能一些场景就不太灵活。topic 在 direct 基础上通过 *# 的方式增加了模糊匹配的功能,并用 . 进行单词分隔,这样就可以实现把特定的一组消息区分开分别发送到指定队列。其中星号 * 会匹配下个 . 前的所有字符,井号 # 会匹配包括 . 在内的所有字符。

举例 #

(1)在发送消息时,RoutingKey 可以是 hello.foo.rabbit,同时指定了 BindingKey 是 *.*.rabbit,那么最终消息会进入到 Q2 中进行消费。

(2)如果指定了 RoutingKey 是 quick.orange.rabbit,消息就会同时进入 Q1 和 Q2,如上图。

(3)lazy.orange.male.rabbit 因为有四个单词则只会进入 Q2 。

headers #

headers 类型的交换器不依赖路由键的匹配规则来路由消息,而是根据发送的消息内容的 headers 属性进行匹配,headers 里是一个键值对,在交换器收到消息后会对键值对指定的队列进行匹配,如果能够匹配到就会将消息路由到该队列,一般用到这个类型的场景会比较少。

进阶特性 #

过期时间 #

目前有两种方法可以设置消息的 TTL(Time to Live)。第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。第二种方法是对消息本身进行单独设置,每条消息的 TTL 可以不同。如果两种方法一起使用,则消息的 TTL 以两者之间较小的那个数值为准。

死信队列 #

满足以下条件的消息会进入到死信队列(Dead-Letter-Queue,DLQ):

  • 消息被拒绝(Basic.Reject/Basic.Nack);
  • 消息过期;
  • 队列达到最大长度。

延迟队列 #

延迟队列主要是借助消息的 TTL(Time to Live)和死信 exchange(Dead Letter Exchanges)来实现。

定时任务 #

可以利用延迟队列实现定时任务:消费延迟队列的消息后重新将消息打入延迟队列。

持久化 #

RabbitMQ 的持久化(durable)分为三个部分:交换器的持久化、队列的持久化和消息的持久化,可以通过参数进行配置。这种持久化只是保证了系统的异常重启不会丢失数据,但是只要是被消费者确认的消息,RabbitMQ 都会进行删除。

分布式集群管理 #

RabbitMQ 可以通过 3 种方式实现分布式部署:Clustering、Federation 和 Shovel。这 3 种方式不是互斥的,可以根据需要选择其中的一种或者以几种方式的组合来达到分布式部署的目的。Federation 和 Shovel 可以为 RabbitMQ 的分布式部署提供更高的灵活性,但同时也提高了部署的复杂性。

不同于其他服务的集群方案,RabbitMQ 集群中节点之间没有主从节点之分。

Kafka #

Kafka 是由 Apache 软件基金会开发的一个开源流处理平台,最初由 LinkedIn 开发,由 Scala 和 Java 编写2。通常来说不将 Kafka 定义为一种消息中间件的实现,更强调是一个分布式的流式系统。

Kafka 整体的架构如上图,非常简单易懂,Kafka 集群中由各个主题(Topic)组成,每个主题可以配置成多个分区,一个消费者可以消费一个或多个分区(Partition)的消息,另外每个Kafka 集群还需要一个 Zookeeper 或者 KRaft 进行元数据的存储。

Broker #

一个独立的 Kafka 服务器被称为 broker,多个 broker 组成了一个集群(Cluster)。broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。broker 为消费者提供服务,对读取分区的请求作出响应,返回已经提交到磁盘上的消息。根据特定的硬件及其性能特征,单个 broker 可以轻松处理数千个分区以及每秒百万级的消息量。

Topic #

主题(Topic)对应了一种特定业务类型的消息,生产者在生产消息时,需要指定往哪个 Topic 发送消息。

Partition #

每个主题对应了若干个分区(Partition),在创建主题时可以指定数量,分区越多,队列里消息并行通过的能力越强,可以理解为单向 2 车道和单向 4 车道的区别,分区的数量一般根据主题的吞吐量和消费者的吞吐量得出。

kafka 的消息是一个个键值对,在生产者发送消息时,可以指定消息的 key 值,相同 key 的消息会被分发到同一个分区,key 值也可以为空,此时消息会被均匀的分配到每个分区。Kafka 可以保证同一个分区的消息是顺序的,但是存在多个消费者的情况下,不能保证消息能够被顺序的执行完成,这对一些顺序敏感的业务会有影响。

副本机制 #

除了分区(Partition)机制,每个分区还可以有多个副本。在 Kafka 集群中存在 leader 和 follower 的节点关系,并通过 Zookeeper 实现了选举机制来维护集群的成员关系。消息数据也会通过 leader 到 follower 的方式进行复制。

复制功能是 Kafka 架构的核心,在 Kafka 的文档里,Kafka 把自己描述成一个分布式的、可分区的、可复制的提交日志服务。复制之所以这么关键,是因为它可以在个别节点失效时仍能保证 Kafka 的可用性和持久性。备份的份数设置的越多,复制的压力会越大,但是灾备的能力越强,第一个 leader 挂了可以上第二个,第二个挂了上第三个。

每个分区都有一个 ISR(in-sync Replica)列表,用于维护所有同步的、可用的副本。首领副本必然是同步副本,而对于跟随者副本来说,它需要满足以下条件才能被认为是同步副本:

  • 与 Zookeeper 之间有一个活跃的会话,即必须定时向 Zookeeper 发送心跳;
  • 在规定的时间内从首领副本那里低延迟地获取过消息。

Producer 可以通过配置 acks 这个参数来指定必须有多少个副本都到消息,生产者才会认为消息写入成功:

  • acks=0 :消息发送出去就认为已经成功了,不会等待任何来自服务器的响应;
  • acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应;
  • acks=-1 :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。这种方式性能最慢,但可靠性最强。

Consumer #

消费者(Consumer)是消费者组(Consumer Group)的一部分,负责订阅 Kafka 中的主题,订阅的主题下的每个分区只能分配给组下的一个 consumer,可以通过横向拓展消费者的数量提高消费能力,消费者可以是一个线程,也可以是同一台机器的另一个进程,也可以是单独的一台机器。

Kafka 中的消费是基于拉模式的,是一个不断轮询(poll 方法)的过程,每次都会从分区读取一个记录列表,每个记录都包含了记录所属主题的信息、记录所在分区的信息、记录在分区里的偏移量,以及记录的键值对。

当新增或删除一个消费者时,会触发再均衡触发器对分区的所有权进行一个再均衡的重新分配,一般情况下消费者的数量不需要超过分区的数量,超过了分区数量的多余消费者也会处于闲置状态,也就是说一个分区只会对应一个消费者。

每个分区都有一个偏移量,对于消息在分区中的位置,我们将 offset 称为“偏移量”,对于消费者消费到的位置,将 offset 称为“位移”,有时候也会更明确地称之为“消费位移”。消费位移存储在 Kafka 内部的主题 __consumer_offsets 中,这里把将消费位移存储起来(持久化)的动作称为“提交”,消费者在消费完消息之后需要执行消费位移的提交。

通信方式 #

不同于 AMQP 的帧,Kafka 自定义了一组基于 TCP 的二进制协议,只要遵守这组协议的格式,就可以与 Kafka 进行通信。在 Kafka 2.0 中有 43 种协议类型,每种协议都对应了请求和响应的结构。

每种类型的 Request 都包含相同结构的协议请求头(RequestHeader)和不同结构的协议请求体(RequestBody)。

每种类型的 Response 也包含相同结构的协议响应头(ResponseHeader)和不同结构的响应体(ResponseBody)。

时间轮 #

Kafka 可以通过时间轮可以实现延时操作、延时生产等功能。

Kafka中的时间轮(TimingWheel)是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务列表(TimerTaskList)。TimerTaskList 是一个环形的双向链表,链表中的每一项表示的都是定时任务项(TimerTaskEntry),其中封装了真正的定时任务(TimerTask)。

时间轮由多个时间格组成,每个时间格代表当前时间轮的基本时间跨度(tickMs)。时间轮的时间格个数是固定的,可用 wheelSize 来表示,那么整个时间轮的总体时间跨度(interval)可以通过公式 tickMs × wheelSize 计算得出。时间轮还有一个表盘指针(currentTime),用来表示时间轮当前所处的时间,currentTime 是 tickMs 的整数倍。currentTime 可以将整个时间轮划分为到期部分和未到期部分,currentTime 当前指向的时间格也属于到期部分,表示刚好到期,需要处理此时间格所对应的 TimerTaskList 中的所有任务。

若时间轮的 tickMs 为 1ms 且 wheelSize 等于 20,那么可以计算得出总体时间跨度 interval 为 20ms。此时如果有一个定时为 350ms 的任务该如何处理?Kafka 为此引入了层级时间轮的概念,当任务的到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层时间轮中。如果遇到第二层的 interval 不满足时,会继续升级到第三层,以此类推。在将要执行任务时,又会根据剩余时间满足 interval 的情况下,将任务降级到下一层时间轮对应的时间格等待执行。

虽然时间轮机制可以将区间缩短至 1ms,但同时非常多的层级以及频繁的切换层级必然会有性能问题,故 Kafka 在实现延时队列方面在原生架构下就没有一个很好的解决方案,包括设置消息的过期时间也需要一些自定义的扩展才能实现。可见多层时间轮机制只是适合 Kafka 内部低延时功能的运行。

机制对比 #

一个是业务场景,另一个看公司的技术栈。

如果追求高性能,Kafka 绝对满足你的要求,但是如果业务量可预见的情况下不大,同时不想花太多时间在进阶功能的实现上,RabbitMQ 可能更适合一些。

RabbitMQ 架构会复杂一些,体现在性能上也会比 Kafka 差一点。Kafka 的概念相比 RabbitMQ 要简洁一点,架构上自然也不会那么复杂,同时也不需要实现 AMQ 协议。但是架构相对简单、性能强悍的同时,却也损失了 AMQP 强大的功能,比如延时队列、死信队列、消息路由等功能。另外由于通常会有多个生产者、多个队列、多个消费者的存在,显然两者都不能保证消息的绝对顺序性。

同时在数据的持久化上两个系统存在明显差异:RabbitMQ 会删除已经被消费确认的消息,Kafka 不会主动删除,只是会更新分区的偏移量,即可以将过去的数据重放,这也是决策的一个关键点。

RabbitMQ 因为实现了 AMQP 协议,那在对接一些其他支持了 AMQP 协议的服务会更方便一些。如果你使用 Java 技术栈,Kafka 的生态圈会好一些,如果使用 Django、Flask 这样的 Web 框架,Celery 会更好用一点,而 Celery 通常也是用 RabbitMQ 作为 Broker。

最后,还要看公司的技术栈,以及公司的运维擅长哪个系统,这样可以保证系统日常的稳定运行和及时的技术支持。

参考 #

Kafka documentation

RabbitMQ Tutorials

AMQP 文档

深入 RabbitMQ(偏底层)

RabbitMQ 实战指南(偏运维)

Kafka 权威指南(一般)

深入理解 Kafka:核心设计与实践原理(推荐)

本文共 6328 字,上次修改于 Jul 9, 2024,以 CC 署名-非商业性使用-禁止演绎 4.0 国际 协议进行许可。

相关文章

» Redis 的分布式锁使用注意

» Redis 实现布隆过滤器

» 使用 Typora 来编辑你的 Hugo 博客

» 学习钢琴半年的体会

» Redis 基础知识