2020年5月26日星期二

Kafka消息送达语义说明

Kafka消息送达语义说明


Kafka消息送达语义说明

0.11 版本之前保证的语义是:至少一次

 

至少一次的解释

可以做到消息不丢失--> 可以做到发送成功的消息一定可以被消费到。

不能做到消息不重复。

## 发送成功的消息,表示业务逻辑认为此消息已发送成功,即send方法已执行完成。 

 

丢消息场景

异步发送端:

a:send之后,等待发送的时候down(消息在缓冲区中),导致消息丢失。

b:send时,缓冲区已满,导致消息丢弃。

 

同步(异步)发送端:

a:有限的重试 

服务端Leader down时,因为有与zk的超时timeout,导致在timeout之后才会进行切换, 如果重试次数 * 重试间隔 < zk session timeout + 切换耗时,则消息会丢失。

 

b:ack != -1 

ack = 0 的场景,不需服务端确认,发送后,Leader down,导致消息丢失。 

ack = 1 的场景,只需要Leader确认,Leader收到消息后,未同步到Replica之前,Leader down,导致消息丢失。 

 

 

服务端:

a:min.insync.replicas < 2 且 unclean.leader.election.enable = true 

min.insync.replicas < 2 的场景下,如果副本均落后Leader,在Leader down时,根据脏选开关,会选择落后的副本作为新的Leader,则落后的数据会丢失。 

 

消费端:

a:自动offset提交 

消息处理失败,但是offset也提交了,对业务来说消息丢失。

 

b:先手动提交offset,后处理消息 

先提交offset,后处理消息,但是处理逻辑失败,对业务来说消息丢失。

 

不丢失数据的方法

发送端:

a:同步发送 

b:retries = Long.MAX_VALUE

c:acks = -1

 

服务端:

a:replication factor = 3

b:min.insync.replicas = 2

c:unclean.leader.election.enable = false

 

消费端:

a:auto.commit.enable = false

b:仅当消息处理成功之后再提交offset

 

消息重复的场景 

发送端:

a:发送端重试 

发送端发送消息后,服务端实际已接收,但是客户端因为网络或其他原因未收到确认响应, 再进行消息重试发送,导致消息重复。 

 

消费端:

a:auto.commit.enable = true

消息处理后,为进行自动offset提交之前,consumer down,恢复后从上个提交点开始消费导致消息处理重复。 

 

消息不重复的方法

发送端 --> 做不到。

消费端 --> auto.commit.enable = false,并且等消息处理完成再提交offset。

 

0.11之后版本保证的语义是:恰好一次 

 

恰好一次的解释

可以做到消息不丢失 --> 可以做到发送成功的消息一定可以被消费到。

可以做到消息不重复。 

 

发送端做不到消息不重复的解决办法 

方法:

给Producer编号,且给每条消息编号,服务端保存Producer编号与此Producer最后一条消息的映射,服务端校验当前收到的消息是否与保存的最后一条消息编号相同,如果相同则拒绝。

 

配置:

enable.idempotence = true --> 

retries = Integer.MAX_VALUE 

max.in.flight.requests.per.connection = 1 

acks = -1 

 

过程:

a:Producer发送InitProducerIdRequest获取ProducerIdAndEpoch producerId 是Broker从zk分段获取后递增分配的,保证唯一,epoch为事务所用,暂时不提。 

 

b:Producer发送消息(消息中加入ProducerIdAndEpoch,消息序列号(0))。 

 

c:服务端收到消息,内存中维护 ProducerIdAndEpoch与此消息的映射,数据落盘, 同步到副本。 

 

d:Producer发送消息(消息中加入ProducerIdAndEpoch,消息序列号(1))。 

 

e:服务端收到消息,判断内存中的映射是否存在,如不存在,更新映射,数据落盘, 同步到副本,如存在,返回消息重复异常。 

判断是否重复的条件:

isFromClient == true && 

batch.producerIdAndEpoch == producerIdAndEpoch && 

batch.baseSequence == firstSeq && 

batch.lastSequence == lastSeq 

 

几个问题:

a:重试的场景中,ProducerIdAndEpoch映射的会不会不是上次重试的消息。 

max.in.flight.requests.per.connection代表了同时只能有一条(批)消

息在发送,retries 保证了必须发送成功才会进行下一条的发送,所以不会有映射成其他消息 的情况。 

 

b:有了映射缓存,其他副本没成功复制怎么办 acks = -1 保证了其他副本必须复制成功。 

 

c:服务端启动或新选举为Leader的时候,缓存内容为空怎么判断重复 会先构造缓存内容再提供服务 初始化时加载.snapshot文件,append消息时更新缓存,退出时,写入.snapshot文件。 

 

事务消息

 

事务消息语义

保证发送消息不重复。

保证多条消息(发往不同服务端)发送的原子性,即同时提交或同时回滚。

保证 “消费-处理-发送”逻辑的原子性,及要么全部成功,要么整体回滚。 

 

整体原理

Producer新增 transactionalId、producerId、epoch标识,可标记事务。 

 

新增transaction coordinator,协调处理事务的开启、提交、终止,记录事务日志 ;

transaction coordinator依附在Broker上,基于Broker提供服务;

记录事务日志是一个内部可根据Key压缩的Topic,可复制的,当 transaction coordinator down,有其他的接管继续事务处理。 

 

服务端只要接收到消息,就写入文件,不管后续是否是提交或者终止。 

 

新增control类型消息,标记事务是提交还是终止(全部消息以transactionalId为标识)。 

 

消费端使用缓存堆积消息,直到看到control类型消息,返回给应用或丢弃。 

 

多条消息发送的原子性

发送消息过程

a:初始化事务 

发送FindCoordinatorRequest,查找transaction coordinator。

发送InitProducerIdRequest到transaction coordinator,获取ProducerIdAndEpoch。

transaction coordinator 新增消息 transactionalId --> producerId 到事务日志。

transaction coordinator 恢复或终止此 transactionalId 之前的事务。

 

b:开启事务 

 

c:生产消息(可多次) 

发送AddPartitionsToTxnRequest到transaction coordinator,transaction coordinator增加 BEGIN的事务日志,日志包含此partition 发送消息到实际的Broker,Broker接收消息写入文件并复制到副本 

 

d:提交或终止事务 

Producer 发送 EndTxnRequest 到 transaction coordinator transaction coordinator 写入PREPARE_COMMIT或 PREPARE_ABORT到事务日志 transaction coordinator 依次向全部partition发送 COMMIT/ ABORT的control消息 transaction coordinator 写入 COMMIT/ ABORT到事务日志 Broker接收消息写入文件并复制到副本 

 

消费过程(READ_COMMITED) 

a:初始化、Rebalance

b:拉消息,如果不包含transactionalId,则返回给应用,如果包含,则放入缓存,直到拉到该transactionalId的control消息,如果该control消息是COMMIT,则相关消息返回应用;如果是ABORT,则删除这些消息

 

“消费-处理-发送”逻辑的原子性

offset提交:

offset提交除了写zk外,还提供一种写Broker的方式,即新增 consumercoordinator 接收offset 的提交请求,consumercoordinator 将 groupId-offset 写入内部Topic,此Topic可压缩,即旧的 groupId-offset 会被清除,只保留最新的。

 

处理过程 

a:初始化事务 

b:开启事务 

c:生产消息(可多次) 

发送AddPartitionsToTxnRequest到 transaction coordinator,transaction coordinator增加 BEGIN的事务日志,日志包含此partition 发送消息到实际的Broker,Broker接收消息写入文件并复制到副本。

发送 AddOffsetsToTxnRequest 到 transaction coordinator,写入事务日志。

发送 TxnOffsetCommitRequest 到 consumercoordinator,写入文件并复制到副本。

d:提交或终止事务 

Producer 发送 EndTxnRequest 到 transaction。

coordinator transaction coordinator 写入PREPARE_COMMIT或 PREPARE_ABORT到事务日志。

transaction coordinator 依次向全部partition发送 COMMIT/ ABORT的control消息。

transaction coordinator 向 consumercoordinator 发送 offset提交的control消息。

transaction coordinator 写入 COMMIT/ ABORT到事务日志。

Broker接收消息写入文件并复制到副本。

 

消息可见性 

offset可见性:

offset提交后,事务提交前,此offset虽然写入文件,但是 consumercoordinator 缓存不更新,直到收到事务提交的control消息才更新缓存,即事务提交前,外部不能查询到此 offset,只能查询到旧的offset。 

 

消息可见性:

a:read_uncommited

所有消息都可被拉取,也可返回给应用处理。 

b:read_commited

所有消息都可被拉取,但是只有判断已提交的可返回给应用处理。 

 

失败场景分析

生产端事务开启后,发消息前down: 

事务无任何动作,后续会被超时处理掉,不影响事务语义。

 

生产端事务开启后,发消息后,保存offset前down:

事务会被超时关闭,该消息没有对应的control消息,对外不可见,不影响事务语义。

 

生产端事务开启后,发消息后,保存offset后,提交事务前down:

事务会被超时关闭,该消息及offset没有对应的control消息,对外不可见,不影响事务语义。 

 

生产端事务提交事务后down:

后续事务动作与客户端无关,由服务端处理,会全部执行完成。 

 

transaction coordinator down: 

因为 transaction coordinator 实际为 Broker,Kafka既有机制保证选举新的 coordinator。

因为事务日志是同步的,即最少有三个节点共享事务状态,单个down不影响事务继续。 

 

consumercoordinator down:

事务提交前,offset信息是同步的,但是不生效。

 

事务提交后,offset信息是同步的,并可对外公开。 

 

broker leader down:

Kafka既有机制保证重新选举Leader,且消息同步。 

 

消费端 down:

read_uncommited的场景,出现消费到未提交事务的消息,且有可能重复。

read_commited的场景,不会消费到未提交事务的消息,其他消费者接管后,从之前位置继续。

 

最新稳定的消息位置(LSO) 

问题:穿插在一批事务消息中间的非事务消息,被消费端消费后,如何提交消费位置 

解答:通过LSO机制,即只能消费到最早的全部事务完成的消息位置 

 

事务日志状态

BEGIN 

PREPARE_COMMIT

PREPARE_ABORT 

COMMIT 

ABORT


没有评论:

发表评论