弱小和无知并不是生存的障碍,傲慢才是。
---- ---- 面试者
总结
消息中间件,主要就是通过消息的模式来进行数据的采集、业务解耦、流量削峰等。核心逻辑还是生产与消费的逻辑,只是在消息分发、存储、确认的逻辑上侧重点会不同。从网上找了一张指标对比图以供参考:
Kafka原理
消息生产
生产者生产消息时,指定Topic发送消息,如果Topic不存在会自动创建,发送消息同时也可以指定Partition(一个Topic可以有多个Partition,生产的消息会均匀的存入Partition,只保证一个Partition内消息有序,这样消费者指定Partition消费,就可以按顺序消费消息),消息生产之后不会立刻发送,会进行缓存,在消息量达到阈值和缓存时间到之后,就会发送消息。(缓存消息量和缓存时间可配置)
消息发送之后,会收到Broken的ack,ack有三种应答机制:
- 0代表producer往集群发送数据不需要等待集群的返回,不确保消息发送成功。
- 1代表producer往集群发送数据只需要确保leader收到就返回,不用等其他follower进行同步;
- all代表producer往集群发送数据后,在leader收到之后,还需要所有follower完成副本同步,在返回。
这里kafka使用zookeeper当做的注册中心和元数据管理,生产者发送消息是需要把消息发送到leader Partition的,故需要从服务层获取到元数据信息(topic、Partition存储地址信息),通过hash计算之后,把消息发送到指定的实例的Partition中,如果该实例无leader Partition,则会返回无 leader Partition异常,触发生产者重新加载元数据信息。
服务层
每一个Broker就是一个kafka实例,一个实例存储着多个Topic的Partition,其中同一个Topic的leader Partition不会在同一个实例,消息写入Topic时会根据key进行hash存入相应的Partition,由于持久化写文件,是顺序写磁盘,故性能非常高。kafka会为每一个消息生成一个唯一的有序的offset,客户端消费也是根据offset记录,从哪条消息开始消费。
其中每一个Partition在磁盘上对应一个目录,一个Partition由多个segment组成,每个segment以起始offset命名,包含.log、.index、.timeindex三个文件,.log文件存储消息数据,.index存储消息数据索引(稀疏索引),故客户端根据offset消费消息时,会基于二分法先找到对应segment下的.index文件,再根据.index文件(记录offset对应的消息在log文件的偏移量,由于是稀疏索引,故offset是有序,不连续的)二分法找到最近的起始offset,根据目标消息offset - 其实offset得到偏移量,然后读取消息。
消息消费
客户端一般通过用户组进行消费,一个用户组内可以有多个消费者,由于一个Partition只可以被相同用户组的唯一用户消费,故当Partition与用户组消费者数量不匹配时,就需要进行Partition分配,最好的适配是Partition与消费者个数一样,如果消费者多余Partition个数,则会有消费者空闲,如果消费者少于Partition,则会有一个消费者消费多个Partition。同时当前消费组的消费组新增或是掉线,都会引起消费组的Partition重分配,会影响消费性能。
由于消费者是通过pull的机制从Broker读取消息,故Broker会对消费者进行心跳检测,当在一定时间未进行心跳反馈或是超过规定时间后仍未对消费的消息进行ack响应(即使心跳正常),则Broker会认为消费者故障,踢出消费组。
分区Leader选举
既然是分布式多分区,存在leader Partition那么就会涉及leader选举,在创建topic时会选择第一个活跃的Partition作为leader,并且会维护一个ISR列表(与leader分区同步的副本)和OSR(与leader分区落后较多的非同步副本),如果leader副本宕机了,则会从ISR列表中选择第一个作为leader副本,除非配置允许从OSR选择leader(因为数据落后存在数据丢失),否则只有ISR列表中的分区具有选举资格。
零拷贝
在写入数据时主要是基于内存映射文件(MMAP),即进程调用mmap(),操作系统在虚拟内存中分配一段地址空间与磁盘文件关联,应用程序直接通过内存地址读写数据,读写数据会写入PageCache缓存,然后系统自动刷盘持久化到文件(也可以手动刷盘),这样就省掉了用户把存储在应用缓存的内容拷贝到pageCache系统缓存这一步。
在读取消息时主要用到了系统提供的sendfile技术,传统数据传输需要“磁盘 --> 内核缓冲区(DMA拷贝) --> 用户缓冲区(CPU拷贝) --> 内核Socket缓冲区(CPU拷贝) --> 网卡(DMA拷贝)”涉及2次CPU拷贝、2次DMA拷贝和4次上线文切换,效率较低。而消费者消费消息读取数据时,Broker直接通过sendfile从磁盘读取数据发送至网卡,无需经过用户空间,即“磁盘 --> 内核缓冲区(DMA拷贝) --> 网卡(DMA拷贝)”。
DMA拷贝:Direct Memory Access直接存储器访问,CPU在指定源地址、目标地址、传输数据量之后,启动DMA控制器进行数据传输,期间CPU可以处理其他事项,等待传输结束之后,CPU再投入。
用户态&内核态:用户态即进程,由于应用进程是不可以直接操作文件的,需要CPU将文件读取到内核缓存区,然后复制到用户态的进程缓冲区,进程修改完文件之后,再将用户态缓冲区的数据写入内核缓冲区,内核缓冲区再把数据刷到磁盘文件中。
RocketMQ原理
消息发送
生产者会从NameServer获取Broker、Topic、Message Queue等信息,在发送消息时通过负载均衡算法(轮询、随机等)选择消息队列发送消息。
- 普通消息:立即发送,Broker接收到之后,即可被消费。
- 事务消息:发送的消息是半事务状态,得生产者执行完本地事务后才会将消息设置为可消费,如果生产者未响应或是主动查询生产者本地事务失败,则丢弃消息。(如果生产者提交了本地事务但是Broker修改半事务消息失败,则会定时继续检查未修改状态的消息)
生产者发送消息之后,如果是普通消息针对发送之后的响应,也有三种可配方案:
- 同步发送:阻塞等待Broker确认,确保消息可靠但是延迟较高;
- 异步发送:通过回调处理Broker处理结果,提升吞吐量。
- 单向发送:不等待响应,存在消息丢失的可能,适用于日志类低可靠性需求场景。
服务层
Broker通过主从模式支持高可用,其中topic的Message queue 会存储在不同的主Broker里,主Broker负责工作,slave用于备份。Broker在收到消息之后,会将消息按顺序追加写到commitLog文件夹内(不区分topic和队列),每一个文件按照起始偏移量命名,一个文件默认1GB写满则新建,将消息写入commitLog之后,同时需要将消息在commitLog中的位置(物理偏移量、大小、Tag哈希值)写入对应Topic和Queue的ConsumeQueue文件。消费者拉取消息则通过Topic和QueueID找到对应的ConsumeQueue,集合用户的offset和tag进行消息读取。
数据刷盘支持同步刷盘和异步刷盘,同步刷盘即写入数据之后即刻将Page cache刷盘,异步刷盘即将数据写入page cache之后返回,由系统按期将page cache的数据刷盘。
消费者
消费者消费消息支持拉取模式和推送模式,RocketMQ保证消息至少被消费一次,如果用户消费消息之后ack失败,则消息会被重复消费,故消费者需要实现消息消费幂等。如果消费者ack返回RECONSUME_LATER或是超时未响应,触发重试机制16次之后(定时队列,定时重试,重试间隔逐渐拉大)还是失败,这会把消息存入死信队列,需要用户手动进行处理,支持死信队列消息重新消费。
- 消费模式
集群模式:同一消费组内的消费者分摊队列,每一个消息仅被一个消费者消费。
广播模式:组内每一个消费者都消费全量消息。
- 消息确认
成功消费后返回CONSUME_SUCCESS,失败返回RECONSUME_LATER(重试默认16次,仍然失败则放入死信队列),如果是顺序消息,失败阻塞当前队列直到重试成功。
- 偏移量管理
集群模式:偏移量由Broker存储,支持重置进度。
广播模式:偏移量由消费者本地存储。
RabbitMQ原理
生产者
生产消息将消息发送到Exchange,并且指定消息的路由键Routing key,同时可以指定消息属性如持久化delivery_mode=2,通过Publisher Confirm异步确认模式,针对Broker处理消息后进行回调,对返回的结果(basic.ack:已持久化,basic.nack:处理错误)进行处理。
服务端
交换机(exchange)首先收到生产者发送的消息,通过消息的路由Routing key + 交换机的类型,将消息添加到绑定的队列中,如果消息无匹配队列,可能进入Dead Letter Exchange 或被丢弃。交换机类型有以下几种:
- Direct 基于Routing key精确匹配队列;
- Topic 使用通配符匹配(如user.*.order);
- Fanout 广播到所有绑定的队列;
- Headers Exchange 基于消息头键值对匹配。
服务端收到消息之后,如果队列声明时有设置durable=true,则队列元数据会被持久化,需要同时设置durable=true和delivery_mode=2,消息才会被持久化。在进行消息持久化时,会先将消息写入rabbitmq的内存缓冲区,然后异步将内存里的消息写入page cache(由系统定时持久化刷盘),也可以设置惰性队列,直接将消息写入page cache,消息被写入page cache之后,broker认为完成了持久化,便返回ack。如果消息是非持久化,会在消息存入缓存之后即可响应ack,消息在后续内存不足时,会被清理掉。
基于集群和镜像队列实现高可用。
消费者
一般采用消息推送的模式,一个队列将消息推送给客户端,如果客户端存在多个则轮询。消息消费之后,可以手动确认和自动确认,自动确认消息发出之后就会确认,不管消息消费处理是否异常,存在消息丢失可能,手动确认成功返回basic.ack,失败返回basic.nack或basic.reject,这样消息消费失败可以重新入队列或进入死信队列。