通过充分利用C++20的强大功能,演示了在分布式系统关键组件中构建现代且高效的方法。文章深入探讨了使用C++20协程在复杂编程环境中构建Raft服务器时所遇到的挑战和解决方案。
本文介绍了如何在C++20中实现Raft服务器一致性模块,而无需使用任何额外的库。文章分为三个主要部分:
Raft算法的全面概述
Raft服务器开发的详细说明
基于协程的自定义网络库的描述
该实现利用了C++20的强大功能,特别是协程,以提供一种有效且现代的方法来构建分布式系统的关键组件。这篇文章不仅展示了在复杂编程环境中使用C++20协程的实际应用和好处,还深入探讨了从零开始构建共识模块(如Raft服务器)时遇到的挑战和解决方案。Raft服务器和网络库的存储库(miniraft-cpp和coroio)可供进一步探索和实际应用。
引言
在深入研究Raft算法的复杂性之前,让我们考虑一个现实世界的例子。我们的目标是开发一个网络键值存储(K/V)系统。在C++中,可以通过使用unordered_map<string, string>轻松实现这一目标。然而,在实际应用中,对容错存储系统的需求增加了复杂性。一种看似简单的方法可能涉及部署三台(或更多)机器,每台机器托管此服务的副本。期望是用户管理数据复制和一致性。然而,这种方法可能导致不可预测的行为。例如,可能使用特定键更新数据,然后稍后检索较旧版本。
用户真正想要的是一个分布式系统,可能分布在多台机器上,运行得就像单台主机系统一样流畅。为了满足这一需求,通常在K/V存储(或任何类似的服务,以下简称为“状态机”)的前面放置一个共识模块。此配置确保所有用户与状态机的交互都通过共识模块进行,而不是直接访问。在这个背景下,让我们看看如何使用Raft算法作为示例来实现这样一个共识模块。
Raft概述
在Raft算法中,有一个奇数个被称为节点的参与者。每个节点保留自己的记录日志。有一个节点是领导者,其他的是追随者。用户将所有请求(读取和写入)直接发送给领导者。当收到更改状态机的写入请求时,领导者首先记录它,然后将其转发给追随者,追随者也记录它。一旦大多数节点成功响应,领导者将认为此条目已提交,将其应用于状态机,并通知用户成功。
在Raft中,“Term”是一个关键概念,它只能增长。Term在系统发生变化时(例如领导变更)会发生变化。Raft的日志具有特定的结构,每个条目包括一个Term和一个Payload。Term指的是写入初始条目的领导者。Payload表示要对状态机进行的更改。Raft确保具有相同索引和Term的两个条目是相同的。Raft日志不是仅追加的,可能会被截断。例如,在下面的场景中,领导者S1在崩溃之前复制了两个条目。S2接管并开始复制条目,S1的日志与S2和S3的不同。因此,S1日志的最后一个条目将被移除并替换为新的条目。
Raft RPC API
让我们来研究一下Raft RPC。值得注意的是,Raft API相当简单,只有两个调用。我们将首先查看领导者选举API。值得注意的是,Raft确保每个任期只能有一个领导者。还可能存在没有领导者的任期,例如选举失败的情况。为确保只发生一次选举,每个节点将其选票保存在一个称为VotedFor的持久变量中。选举RPC称为RequestVote,具有三个参数:Term、LastLogIndex和LastLogTerm。响应包含Term和VoteGranted。值得注意的是,每个请求都包含Term,在Raft中,只有当节点的Terms是兼容的时,它们才能有效通信。
当节点发起选举时,它向其他节点发送RequestVote请求并收集它们的选票。如果大多数响应是积极的,节点将晋升为领导者角色。
现在让我们看一下AppendEntries请求。它接受参数,如Term、PrevLogIndex、PrevLogTerm和Entries,响应包含Term和Success。如果请求中的Entries字段为空,则它充当心跳。
当收到AppendEntries请求时,追随者检查Term的PrevLogIndex。如果它与PrevLogTerm匹配,追随者将Entries添加到其日志,从PrevLogIndex + 1开始(如果存在PrevLogIndex之后的条目,则将其删除):
AppendEntries请求被接收的流程
如果Terms不匹配,追随者返回Success=false。在这种情况下,领导者会重试发送请求,将PrevLogIndex减少一个。
领导者重试发送请求,将PrevLogIndex减少一个
当节点收到RequestVote请求时,它将其LastTerm和LastLogIndex对比到最近的日志条目。如果这对是小于或等于请求者的值,节点返回VoteGranted=true。
Raft中的状态转换
Raft的状态转换如下。每个节点开始时都处于Follower状态。如果Follower在设定的超时内未收到AppendEntries,它会增加其Term并移动到Candidate状态,触发选举。节点可以从Candidate状态转到Leader状态,如果它赢得选举,或者如果它收到AppendEntries请求,则返回到Follower状态。候选者还可以在超时期内未转换为Follower或Leader时,重新变为Candidate。如果处于任何状态的节点收到具有比当前Term大的RPC请求,则它将转到Follower状态。
提交
现在让我们考虑一个示例,演示了Raft并不像看起来那么简单。我从Diego Ongaro的博士论文中选取了这个例子。在第2个任期中,S1是领导者,在崩溃之前复制了两个条目。随后,S5在第3个任期中领导,添加了一个条目,然后崩溃。接下来,S2在第4个任期中接管领导,复制了来自第2个任期的条目,为第4个任期添加了自己的条目,然后崩溃。这导致了两种可能的结果:S5重新获得领导权并截断第2个任期的条目,或者S1重新获得领导权并提交第2个任期的条目。第2个任期的条目只有在它们被新领导者的后续条目覆盖后才会被安全提交。
Raft算法在动态且经常不可预测的情况下的操作方式
这个例子展示了Raft算法在动态且经常不可预测的情况下的操作方式。事件的顺序,包括多个领导者和崩溃,展示了在分布式系统中维护一致状态的复杂性。这种复杂性不会立即显现出来,但在涉及领导者更改和系统故障的情况下变得重要。该示例强调了在处理这种复杂性时稳健和深思熟虑方法的重要性,而这正是Raft试图解决的问题。
额外资料
为了进一步学习和更深入地了解Raft,我推荐以下材料:原始的Raft论文,非常适合实现。Diego Ongaro的博士论文提供了更深入的见解。Maxim Babenko的讲座则更加详细深入。
Raft实现
现在让我们转向Raft服务器的实现,我认为它在很大程度上受益于C++20协程。在我的实现中,持久状态存储在内存中。然而,在实际场景中,它应该保存到磁盘上。稍后我会详细介绍MessageHolder。它的功能类似于shared_ptr,但专门设计用于处理Raft消息,确保对这些通信的高效管理和处理。
struct TState { uint64_t CurrentTerm = 1; uint32_t VotedFor = 0; std::vector<TMessageHolder<TLogEntry>> Log; };
在Volatile状态中,我使用了"L"表示"leader"或"F"表示"follower"来标记条目,以澄清它们的用途。CommitIndex表示最后一个被提交的日志条目。相反,LastApplied是应用于状态机的最新日志条目,它始终小于或等于CommitIndex。NextIndex很重要,因为它标识要发送给同行的下一个日志条目。类似地,MatchIndex跟踪最后一个发现匹配的日志条目。Votes部分包含投票给我的同行的ID。超时管理是一个重要方面:HeartbeatDue和RpcDue管理领导者的超时,而ElectionDue处理追随者的超时。
using TTime = std::chrono::time_point<std::chrono::steady_clock>; struct TVolatileState { uint64_t CommitIndex = 0; // L,F uint64_t LastApplied = 0; // L,F std::unordered_map<uint32_t, uint64_t> NextIndex; // L std::unordered_map<uint32_t, uint64_t> MatchIndex; // L std::unordered_set<uint32_t> Votes; // C std::unordered_map<uint32_t, TTime> HeartbeatDue; // L std::unordered_map<uint32_t, TTime> RpcDue; // L TTime ElectionDue; // F };
Raft API
我的Raft算法实现有两个类。第一个是INode,表示一个同行节点。这个类包括两个方法:Send,将出站消息存储在内部缓冲区中,和Drain,处理实际的消息分发。Raft是第二个类,它管理当前节点的状态。它还包括两个方法:Process,处理传入的连接,和ProcessTimeout,必须定期调用以处理超时,例如领导者选举超时。使用这些类的用户应根据需要使用Process、ProcessTimeout和Drain方法。INode的Send方法在Raft类内部被调用,确保消息处理和状态管理在Raft框架中无缝集成。
struct INode { virtual ~INode() = default; virtual void Send(TMessageHolder<TMessage> message) = 0; virtual void Drain() = 0; }; class TRaft { public: TRaft(uint32_t node, const std::unordered_map<uint32_t, std::shared_ptr<INode>>& nodes); void Process(TTime now, TMessageHolder<TMessage> message, const std::shared_ptr<INode>& replyTo = {}); void ProcessTimeout(TTime now); };
Raft消息
现在让我们看看如何发送和读取Raft消息。我没有使用序列化库,而是以TLV格式读取和发送原始结构。这是消息头的样子:
struct TMessage { uint32_t Type; uint32_t Len; char Value[0]; };
为了更方便,我引入了第二级头:
struct TMessageEx: public TMessage { uint32_t Src = 0; uint32_t Dst = 0; uint64_t Term = 0; };
这包括每个消息中发送者和接收者的ID。除了LogEntry之外,所有消息都继承自TMessageEx。LogEntry和AppendEntries的实现如下:
struct TLogEntry: public TMessage { static constexpr EMessageType MessageType = EMessageType::LOG_ENTRY; uint64_t Term = 1; char Data[0]; }; struct TAppendEntriesRequest: public TMessageEx { static constexpr EMessageType MessageType = EMessageType::APPEND_ENTRIES_REQUEST; uint64_t PrevLogIndex = 0; uint64_t PrevLogTerm = 0; uint32_t Nentries = 0; };
为了简化消息处理,我使用了一个名为MessageHolder的类,类似于shared_ptr:
template<typename T> requires std::derived_from<T, TMessage> struct TMessageHolder { T* Mes; std::shared_ptr<char[]> RawData; uint32_t PayloadSize; std::shared_ptr<TMessageHolder<TMessage>[]> Payload; template<typename U> requires std::derived_from<U, T> TMessageHolder<U> Cast() {...} template<typename U> requires std::derived_from<U, T> auto Maybe() { ... } };
这个类包括一个包含消息本身的char数组。它还可能包括一个Payload(仅用于AppendEntry),以及用于安全地将基本类型消息转换为特定类型消息的方法(Maybe方法)和不安全的转换方法(Cast方法)。这是使用MessageHolder的典型例子:
void SomeFunction(TMessageHolder<TMessage> message) { auto maybeAppendEntries = message.Maybe<TAppendEntriesRequest>(); if (maybeAppendEntries) { auto appendEntries = maybeAppendEntries.Cast(); } // 如果我们确定 auto appendEntries = message.Cast<TAppendEntriesRequest>(); // 使用重载的operator-> auto term = appendEntries->Term; auto nentries = appendEntries->Nentries; // ... }
以及在Candidate状态处理程序中的实际示例:
void TRaft::Candidate(TTime now, TMessageHolder<TMessage> message) { if (auto maybeResponseVote = message.Maybe<TRequestVoteResponse>()) { OnRequestVote(std::move(maybeResponseVote.Cast())); } else if (auto maybeRequestVote = message.Maybe<TRequestVoteRequest>()) { OnRequestVote(now, std::move(maybeRequestVote.Cast())); } else if (auto maybeAppendEntries = message.Maybe<TAppendEntriesRequest>()) { OnAppendEntries(now, std::move(maybeAppendEntries.Cast())); } }
这种设计方法提高了Raft实现中消息处理的效率和灵活性。
Raft 服务器
让我们讨论一下 Raft 服务器的实现。Raft 服务器将为网络交互设置协程。首先,我们将看一下处理消息读写的协程。这些协程使用的原语稍后在文章中讨论,同时还会对网络库进行分析。写协程负责将消息写入套接字,而读协程稍微复杂一些。为了读取,它必须首先检索 Type 和 Len 变量,然后分配一个长度为 Len 字节的数组,最后读取剩余的消息。这种结构有助于在 Raft 服务器内部有效而高效地管理网络通信。
template<typename TSocket> TValueTask<void> TMessageWriter<TSocket>::Write(TMessageHolder<TMessage> message) { co_await TByteWriter(Socket).Write(message.Mes, message->Len); auto payload = std::move(message.Payload); for (uint32_t i = 0; i < message.PayloadSize; ++i) { co_await Write(std::move(payload[i])); } co_return; } template<typename TSocket> TValueTask<TMessageHolder<TMessage>> TMessageReader<TSocket>::Read() { decltype(TMessage::Type) type; decltype(TMessage::Len) len; auto s = co_await Socket.ReadSome(&type, sizeof(type)); if (s != sizeof(type)) { /* throw */ } s = co_await Socket.ReadSome(&len, sizeof(len)); if (s != sizeof(len)) { /* throw */} auto mes = NewHoldedMessage<TMessage>(type, len); co_await TByteReader(Socket).Read(mes->Value, len - sizeof(TMessage)); auto maybeAppendEntries = mes.Maybe<TAppendEntriesRequest>(); if (maybeAppendEntries) { auto appendEntries = maybeAppendEntries.Cast(); auto nentries = appendEntries->Nentries; mes.InitPayload(nentries); for (uint32_t i = 0; i < nentries; i++) mes.Payload[i] = co_await Read(); } co_return mes; }
要启动 Raft 服务器,创建 RaftServer 类的实例并调用 Serve 方法。Serve 方法启动两个协程。Idle 协程负责定期处理超时,而 InboundServe 管理传入的连接。
class TRaftServer { public: void Serve() { Idle(); InboundServe(); } private: TVoidTask InboundServe(); TVoidTask InboundConnection(TSocket socket); TVoidTask Idle(); }
通过 accept 调用接收传入的连接。然后启动 InboundConnection 协程,该协程读取传入的消息并将其转发给 Raft 实例进行处理。这个配置确保了 Raft 服务器能够高效处理内部超时和外部通信。
TVoidTask InboundServe() { while (true) { auto client = co_await Socket.Accept(); InboundConnection(std::move(client)); } co_return; } TVoidTask InboundConnection(TSocket socket) { while (true) { auto mes = co_await TMessageReader(client->Sock()).Read(); Raft->Process(std::chrono::steady_clock::now(), std::move(mes), client); Raft->ProcessTimeout(std::chrono::steady_clock::now()); DrainNodes(); } co_return; }
Idle 协程的工作方式如下:每隔 sleep 秒调用一次 ProcessTimeout 方法。值得注意的是,这个协程使用了异步的 sleep。这种设计使得 Raft 服务器能够有效地管理对时间敏感的操作,而不阻塞其他进程,提高服务器的整体响应性和性能。
while (true) { Raft->ProcessTimeout(std::chrono::steady_clock::now()); DrainNodes(); auto t1 = std::chrono::steady_clock::now(); if (t1 > t0 + dt) { DebugPrint(); t0 = t1; } co_await Poller.Sleep(t1 + sleep); }
为发送出站消息创建的协程设计得非常简单。它在循环中重复发送所有累积的消息到套接字。在发生错误时,它启动另一个协程,负责连接(通过 connect 函数)。这种结构确保了出站消息的顺利和高效处理,同时通过错误处理和连接管理保持了鲁棒性。
try { while (!Messages.empty()) { auto tosend = std::move(Messages); Messages.clear(); for (auto&& m : tosend) { co_await TMessageWriter(Socket).Write(std::move(m)); } } } catch (const std::exception& ex) { Connect(); } co_return;
有了 Raft 服务器的实现,这些例子展示了协程是如何极大地简化开发的。虽然我还没有深入研究 Raft 的实现(相信我,它比 Raft 服务器复杂得多),但总体算法不仅简单,而且设计紧凑。
接下来,我们将看一些 Raft 服务器的例子。之后,我将描述我从头开始为 Raft 服务器创建的网络库。这个库对于在 Raft 框架内实现高效的网络通信至关重要。
以下是启动三个节点的 Raft 集群的示例。每个实例都接收自己的 ID 作为参数,以及
其他实例的地址和 ID。在这种情况下,客户端只与领导者通信。它发送随机字符串,同时保持固定数量的在途消息并等待它们的提交。这个配置描述了客户端在多节点 Raft 环境中与领导者之间的交互,演示了算法处理分布式数据和共识的能力。
$ ./server --id 1 --node 127.0.0.1:8001:1 --node 127.0.0.1:8002:2 --node 127.0.0.1:8003:3 ... Candidate, Term: 2, Index: 0, CommitIndex: 0, ... Leader, Term: 3, Index: 1080175, CommitIndex: 1080175, Delay: 2:0 3:0 MatchIndex: 2:1080175 3:1080175 NextIndex: 2:1080176 3:1080176 .... $ ./server --id 2 --node 127.0.0.1:8001:1 --node 127.0.0.1:8002:2 --node 127.0.0.1:8003:3 ... $ ./server --id 3 --node 127.0.0.1:8001:1 --node 127.0.0.1:8002:2 --node 127.0.0.1:8003:3 ... Follower, Term: 3, Index: 1080175, CommitIndex: 1080175, ... $ dd if=/dev/urandom | base64 | pv -l | ./client --node 127.0.0.1:8001:1 >log1 198k 0:00:03 [159.2k/s] [ <=>
我测量了 3 节点和 5 节点集群配置下的提交延迟。正如预期的那样,5 节点设置的延迟更高:
3 节点
50 百分位数(中位数):292,872 ns
80 百分位数:407,561 ns
90 百分位数:569,164 ns
99 百分位数:40,279,001 ns
5 节点
50 百分位数(中位数):425,194 ns
80 百分位数:672,541 ns
90 百分位数:1,027,669 ns
99 百分位数:38,578,749 ns
I/O库
现在让我们来看一下我从头开始创建并在Raft服务器实现中使用的I/O库。我从cppreference.com的以下示例开始,这是一个回显服务器的实现:
task<> tcp_echo_server() { char data[1024]; while (true) { std::size_t n = co_await socket.async_read_some(buffer(data)); co_await async_write(socket, buffer(data, n)); } }
我的库需要事件循环、套接字原语以及像`read_some`/`write_some`(在我的库中称为`ReadSome`/`WriteSome`)这样的方法,以及更高级的包装器,比如`async_write`/`async_read`(在我的库中称为`TByteReader`/`TByteWriter`)。
为了实现套接字的`ReadSome`方法,我需要创建一个`Awaitable`,如下所示:
auto ReadSome(char* buf, size_t size) { struct TAwaitable { bool await_ready() { return false; /*总是挂起*/ } void await_suspend(std::coroutine_handle<> h) { poller->AddRead(fd, h); } int await_resume() { return read(fd, b, s); } TSelect* poller; int fd; char* b; size_t s; }; return TAwaitable{Poller_, Fd_, buf, size}; }
当调用`co_await`时,协程会暂停,因为`await_ready`返回`false`。在`await_suspend`中,我们捕获协程句柄,并将其与套接字句柄一起传递给轮询器。当套接字准备好时,轮询器调用协程句柄来重新启动协程。在恢复时,调用`await_resume`,它执行读取并将读取的字节数返回给协程。`WriteSome`、`Accept`和`Connect`方法的实现方式相似。
轮询器的设置如下:
struct TEvent { int Fd; int Type; // READ = 1, WRITE = 2; std::coroutine_handle<> Handle; }; class TSelect { void Poll() { for (const auto& ch : Events) { /* FD_SET(ReadFds); FD_SET(WriteFds);*/ } pselect(Size, ReadFds, WriteFds, nullptr, ts, nullptr); for (int k = 0; k < Size; ++k) { if (FD_ISSET(k, WriteFds)) { Events[k].Handle.resume(); } // ... } } std::vector<TEvent> Events; // ... };
我保持了一组成对的(套接字描述符,协程句柄)数组,用于初始化轮询器后端的结构(在本例中为select)。当与就绪套接字相对应的协程唤醒时,将调用`resume`。
在主函数中应用如下:
TSimpleTask task(TSelect& poller) { TSocket socket(0, poller); char buffer[1024]; while (true) { auto readSize = co_await socket.ReadSome(buffer, sizeof(buffer)); } } int main() { TSelect poller; task(poller); while (true) { poller.Poll(); } }
我们启动了一个协程(或多个协程),该协程通过`co_await`进入睡眠模式,然后控制被传递到一个无限循环中,该循环调用轮询器机制。如果在轮询器中套接字变为就绪,那么相应的协程将被触发并执行,直到下一个`co_await`。
为了读写Raft消息,我需要创建对`ReadSome`/`WriteSome`进行高级封装的函数,类似于:
TValueTask<T> Read() { T res; size_t size = sizeof(T); char* p = reinterpret_cast<char*>(&res); while (size != 0) { auto readSize = co_await Socket.ReadSome(p, size); p += readSize; size -= readSize; } co_return res; } // 用法 T t = co_await Read<T>();
为了实现这些,我需要创建一个同时充当`Awaitable`的协程。协程由一对协程句柄和promise组成。协程句柄用于从外部管理协程,而promise用于内部管理。协程句柄可以包含`Awaitable`方法,允许使用`co_await`等待协程的结果。promise可以
用于存储`co_return`返回的结果,并唤醒调用协程。
在`coroutine_handle`中,在`await_suspend`方法中,我们存储了调用协程的协程句柄。其值将保存在promise中:
template<typename T> struct TValueTask : std::coroutine_handle<> { bool await_ready() { return !!this->promise().Value; } void await_suspend(std::coroutine_handle<> caller) { this->promise().Caller = caller; } T await_resume() { return *this->promise().Value; } using promise_type = TValuePromise<T>; };
在promise本身中,`return_value`方法将存储返回的值。使用可等待对象唤醒调用协程,在`final_suspend`中返回该对象。这是因为编译器在`co_return`后会调用`final_suspend`上的`co_await`。
template<typename T> struct TValuePromise { void return_value(const T& t) { Value = t; } std::suspend_never initial_suspend() { return {}; } // 在这里恢复调用者 TFinalSuspendContinuation<T> final_suspend() noexcept; std::optional<T> Value; std::coroutine_handle<> Caller = std::noop_coroutine(); };
在`await_suspend`中,调用协程可以被返回,并将自动唤醒。需要注意的是,被调用的协程现在将处于睡眠状态,它的协程句柄必须在销毁时通过`destroy`进行销毁,以避免内存泄漏。例如,可以在`TValueTask`的析构函数中完成此操作。
template<typename T> struct TFinalSuspendContinuation { bool await_ready() noexcept { return false; } std::coroutine_handle<> await_suspend( std::coroutine_handle<TValuePromise<T>> h) noexcept { return h.promise().Caller; } void await_resume() noexcept { } };
随着库描述的完成,我将其移植到libevent基准测试中,以确保其性能。该基准测试生成一个包含N个Unix管道的链,每个管道都连接到下一个。然后,它初始化对链的100次写入操作,这将持续到总共进行1000次写入调用。下面的图像展示了该基准测试的运行时,作为我的库(coroio)与libevent的不同后端的函数。
我的库 (coroio) 与 libevent 的各种后端的 Benchmark 运行时作为 N 的函数
结论
总的来说,本文描述了使用C++20协程实现Raft服务器的过程,强调了这一现代C++特性提供的便利和效率。自定义的I/O库是此实现的关键,因为它有效地处理了异步I/O操作。该库的性能通过与libevent基准测试的验证得到了证实,展示了其竞争力。文章来源:https://www.toymoban.com/diary/apps/705.html
对于那些对学习或使用这些工具感兴趣的人,I/O库可在coroio处找到,Raft库可在miniraft-cpp处找到(在文章开头提供链接)。这两个存储库提供了使用C++20协程构建强大、高性能分布式系统的详细信息。文章来源地址https://www.toymoban.com/diary/apps/705.html
到此这篇关于使用C++20协程实现Raft一致性算法 | Raft算法C++20实战的文章就介绍到这了,更多相关内容可以在右上角搜索或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!