票商北汤1. 软件领域二次请求无法避免
我们生活的每一刻都是独特的,事件/行为可能不会以相同的方式再次发生。
在软件领域,同一操作请求并不总是只产生一次,这可能会引发一些问题:想象你月底发薪,公司的转账指令错误地触发了两次,这岂不是双倍的幸福。
为什么幂等性很重要?
网络不可靠:客户端超时后,可以安全地重试幂等的请求(如PUT, DELETE),而不必担心产生意外后果。
分布式系统:在微服务架构中,服务间的重试机制依赖于幂等性来确保数据一致性。
二次请求的来源能避免出现吗?怎么避免出现?
前端的频繁点击提交可以通过提交后置灰按钮/提交后切换页面/防误触来解决
客户端/中间服务器的重试动作则无法避免 -
[此处为图片1]
根据双将军理论,即使A/B将军不断确认收到对方的上一条信息,也无法确保对方与自己达成(同时攻击的共识)。
两将军问题是无解的,间歇性重试是一种工程解决方案。(还有散弹打鸟):我们一直发送相同的服务请求,直到确定收到它(虽然可能会多次收到),这就叫至少一次交付。
但是我们不希望被扣款两次,因此必须确保多次处理相同的请求不会改变最初的应用状态,这是幂等请求的关键。
除此之外,重试还可能带来重试风暴、资源雪崩等衍生问题。
2. 某些请求天然幂等,你无需做任何事情
想象你在银行开户。
public sealed class Account
{
public Guid Id { get; }
public decimal Balance { get; private set; }
public Account(Guid id, decimal balance)
{
if (id == default)
throw new InvalidOperationException("Account id must be provided");
if (balance < 0)
throw new InvalidOperationException("Balance cannot be negative");
Id = id;
Balance = balance;
}
// 取钱
public void Withdraw(decimal amount)
{
if (amount < 0)
throw new InvalidOperationException("Cannot withdraw negative amount");
if (amount > Balance)
throw new InvalidOperationException("Cannot withdraw more than existing balance");
Balance -= amount;
}
// 存钱
public void Deposit(decimal amount)
{
if (amount < 0)
throw new InvalidOperationException("Cannot deposit negative amount");
Balance += amount;
}
}
前端发起的开户请求OpenAccountRequest是幂等的,只需要在开户逻辑中检查数据表是否存在该AccountId。
你甚至可以在数据库中设置AccountId为唯一索引,使重试动作引发异常。
public async Task HandleAsync(OpenAccountRequest request, CancellationToken token = default)
{
var account = new Account(request.AccountId, request.Balance);
try
{
await _repository.InsertAsync(account, token);
}
catch (DuplicateKeyException)
{
// Ignore
}
}
对于存钱(Withdraw)和取钱(Deposit)则不行,如果因为网络原因而重试了两次存钱请求(deposit),岂不就是双倍的幸福。
3. 乐观锁的介入一定合理吗?
一种处理重复请求的方法是询问实体的状态,严格来说,这是解决更大背景(乐观锁)问题的方案。
首先我们知道,在高并发场景下,有一种称为乐观锁的并发控制机制,乐观地认为数据在操作时不会冲突,因此在操作前不加锁,在提交时检查数据是否被修改。
文章一开始提到:让前端在请求时带上需要保护的Balance,在更新时利用AccountId+原Balance来定位并更新账户。
// 下面的前端DTO需要带上账户余额,(二次请求也是这个值)。
public sealed class DepositToAccountRequest
{
public Guid AccountId { get; }
public decimal Amount { get; } // 处理金额
public decimal AccountBalance { get; }
public DepositToAccountRequest(Guid accountId, decimal amount, decimal accountBalance)
{
AccountId = accountId;
Amount = amount;
AccountBalance = accountBalance;
}
}
public async Task HandleAsync(DepositToAccountRequest request, CancellationToken token = default)
{
var account = await _repository.GetAsync(request.AccountId, token) ??
throw new EntityNotFoundException();
account.Deposit(request.Amount);
await _repository.UpdateAsync(account, request.AccountBalance, token);
}
public sealed class AccountRepository : IAccountRepository
{
//....
public async Task UpdateAsync(Account account, decimal expectedBalance, CancellationToken token = default)
{
var sql = "UPDATE Accounts SET Balance = @Balance WHERE Id = @Id AND Balance = @ExpectedBalance";
var sqlParams = new
{
Id = account.Id,
Balance = account.Balance, // 更新后的余额
ExpectedBalance = expectedBalance // 初始余额
};
await using var connection = new SqlConnection(_connectionString);
await connection.OpenAsync(token);
var rowsAffected = await connection.ExecuteAsync(sql, sqlParams);
if (rowsAffected == 0)
throw new InvalidStateException();
}
//....
}
读者肯定也注意到了:
① 这种方法不够灵活,如果涉及的不仅仅是Balance,那么SQL逻辑就需要调整;
② 此外,这种方法无法识别重复请求,无法判断是重复请求还是底层数据确实发生了变动。
设想你被触发了第二次取款请求,如果这时恰好有人向你的账户存入了一笔款项(数额正好等于你首次取款的金额),导致你的第二次取款请求成功了,这无疑会带来新的双重遗憾。
因此,文中提出了一个更为正式的解决方案:前端参与 + 状态版本控制
在前端DTO请求中加入AccountVersion,在每次更新时使用AccountId+原始AccountVersion来定位并更新状态版本。如果WHERE条件失败,说明实体状态已发生变化,应向前端返回错误信息,提示前端重新获取数据;如果WHERE条件成功,则表明状态版本未变,可以递增version,并反馈给前端。
public async Task UpdateAsync(Account account, int expectedVersion, CancellationToken token = default)
{
var sql = "UPDATE Accounts SET Balance = @Balance, Version = @Version WHERE Id = @Id AND Version = @ExpectedVersion";
var sqlParams = new
{
Id = account.Id,
Balance = account.Balance,
Version = account.Version,
ExpectedVersion = expectedVersion
};
await using var connection = new SqlConnection(_connectionString);
await connection.OpenAsync(token);
var rowsAffected = await connection.ExecuteAsync(sql, sqlParams);
if (rowsAffected === 0) {
throw new InvalidStateException();
}
image
提供一个Grafana 修改数据源的实例。
curl 'https://grafana-chinese.observe.dev.eks.gainetics.io/api/datasources/uid/tempo' \
-X 'PUT' \
-H 'content-type: application/json' \
--data-raw '{"id":2,"uid":"tempo","orgId":1,"name":"Tempo","type":"tempo","typeLogoUrl":"public/plugins/tempo/img/tempo_logo.svg","access":"proxy","url":"http://tempo:3200","user":"","database":"","basicAuth":false,"basicAuthUser":"","withCredentials":false,"isDefault":true,"jsonData":{"pdcInjected":false,"tracesToLogsV2":{"customQuery":false,"datasourceUid":"opensearch","filterBySpanID":true,"filterByTraceID":true,"spanEndTimeShift":"1m","spanStartTimeShift":"-1m","tags":[{"key":"beast","value":""}]}},"secureJsonFields":{},"version":18,"readOnly":false,"accessControl":{"alert.instances.external:read":true,"alert.instances.external:write":true,"alert.notifications.external:read":true,"alert.notifications.external:write":true,"alert.rules.external:read":true,"alert.rules.external:write":true,"datasources.id:read":true,"datasources:delete":true,"datasources:query":true,"datasources:read":true,"datasources:write":true},"apiVersion":""}'
其中包含一个版本号,每次前端试图更新时,都会携带此版本号以定位后端信息。
ds = &datasources.DataSource{
ID: cmd.ID,
OrgID: cmd.OrgID,
.....
Version: cmd.Version + 1,
.....
}
var updateSession *xorm.Session
if cmd.Version !== 0 {
// 允许cmd.version 大于 db.version 的原因是使得用户可以通过datasource.yaml文件强制
// 更新数据源,而无需确切知道数据库中数据源的具体版本。
updateSession = sess.Where("id=? and org_id=? and version < ?", ds.ID, ds.OrgID, ds.Version)
} else {
updateSession = sess.Where("id=? and org_id=?", ds.ID, ds.OrgID)
}
affected, err := updateSession.Update(ds)
if err != nil {
return err
}
这种积极的锁机制在处理幂等问题时存在一个小缺点,因为这种锁主要是为了应对并发控制,它解决了并发请求中的一部分重复请求问题,但同时也带来了负面影响,在高并发情况下,许多请求会被阻止(重试请求和并发请求均可能被拒绝),导致效率降低,不过数据一致性得到了保障,避免了重复操作的问题。
4. 使用数据库事务包围,方法更为简便且常见
你可以创建一张表来记录 requestId 的历史,确保每个 requestId 的唯一性。
通过事务处理:首先将 requestId 插入到历史记录表中,然后执行实际的请求操作,这样可以有效解决幂等问题,实现真正的幂等性,因为该事务能够准确识别并处理重复请求。
public sealed class AccountRepository : IAccountRepository
{
//....
public async Task UpdateAsync(Account account, Guid requestId, CancellationToken token = default)
{
var requestSql = "INSERT INTO RequestIds VALUES (@Id)";
var requestSqlParams = new
{
Id = requestId.ToString()
};
var accountSql = "UPDATE Accounts SET Balance = @Balance WHERE Id = @Id";
var accountSqlParams = new
{
Id = account.Id,
Balance = account.Balance
};
await using var connection = new SqlConnection(_connectionString);
await connection.OpenAsync(token);
await using var transaction = await connection.BeginTransactionAsync(token);
try
{
await connection.ExecuteAsync(requestSql, requestSqlParams);
}
catch (Exception e) when (IsDuplicateKeyException(e))
{
throw new DuplicateKeyException();
}
await connection.ExecuteAsync(accountSql, accountSqlParams);
await transaction.CommitAsync(token);
}
//....
}


雷达卡


京公网安备 11010802022788号







