楼主: 柯仁
21 0

从零构建高性能KV存储服务器:架构设计与实现细节 [推广有奖]

  • 0关注
  • 0粉丝

等待验证会员

学前班

80%

还不是VIP/贵宾

-

威望
0
论坛币
0 个
通用积分
0
学术水平
0 点
热心指数
0 点
信用等级
0 点
经验
30 点
帖子
2
精华
0
在线时间
0 小时
注册时间
2018-2-1
最后登录
2018-2-2

楼主
柯仁 发表于 2025-11-22 07:10:46 |AI写论文

+2 论坛币
k人 参与回答

经管之家送您一份

应届毕业生专属福利!

求职就业群
赵安豆老师微信:zhaoandou666

经管之家联合CDA

送您一个全额奖学金名额~ !

感谢您参与论坛问题回答

经管之家送您两个论坛币!

+2 论坛币

一、项目背景

KV存储是一种常见的数据存储模型,广泛应用于缓存、配置管理、计数器、队列以及分布式锁等场景。例如,Redis 就是典型的 KV 存储系统,常用于网络服务中的高速缓存。

本项目实现了一个轻量级的远程 KV 存储服务,功能类似于 Redis,支持基本的数据读写操作,并具备良好的扩展性与高性能处理能力。

开发环境与技术栈

编程环境:
部署于 2G 内存、2 核 CPU 的 Linux 云服务器,通过 Xshell 与 VSCode 进行远程连接开发,使用 Makefile 构建整个项目。

技术选型:
C/C++ 基础语言,结合 C++11 特性(如智能指针、lambda 表达式、function 函数包装器),采用 Socket 网络编程,基于 epoll 的 ET 模式配合 Reactor 设计模式,整体架构中引入策略模式,底层涉及数组、哈希表、红黑树和链表等多种基础数据结构。

二、项目设计思路

项目遵循高内聚、低耦合的设计原则,将系统划分为多个独立模块,便于维护与扩展。

1. 网络模块

负责管理客户端的并发连接,接收来自客户端的请求并转交给协议解析模块,同时将处理结果回传给客户端。该模块基于 epoll + ET(边缘触发)模式,结合 Reactor 模式实现高效的 I/O 多路复用机制。每个文件描述符(fd)均注册了对应的回调函数,以便在事件就绪时快速响应。

2. 协议与数据解析模块

该模块依据自定义通信协议对接收到的原始数据进行解析,验证请求合法性,对符合格式的数据进行反序列化,生成标准请求对象后传递给数据存储引擎模块。当存储层返回结果后,再将其封装并通过网络模块发送回客户端。

3. 数据存储引擎模块

根据上层解析模块传来的指令执行具体操作。当前支持多种数据结构作为底层存储引擎,包括数组、哈希表、红黑树及 LRU 缓存机制。通过策略模式,在服务启动时可动态选择合适的存储方案以适应不同应用场景。

支持的核心命令如下:

  • SET key value:插入或更新键值对,若 key 已存在则覆盖原值
  • GET key:获取指定 key 对应的 value,若不存在返回 NO EXIST
  • DEL key:删除指定 key 及其对应 value,key 不存在时返回 NO EXIST
  • MOD key value:修改已有 key 的 value,key 不存在则返回 NO EXIST
  • SIZE:返回当前存储中所有键值对的数量

三、核心模块详解

3.1 网络模块

a) Socket.hpp

对原始 Socket 接口进行了封装,提升代码可读性和复用性。主要封装了 socket、bind、listen 和 accept 等系统调用。

由于项目采用 epoll 的 ET 模式,因此在创建监听套接字 listenfd 时需设置为非阻塞模式(SOCK_NONBLOCK),避免因频繁唤醒导致性能下降。此外,ET 模式下必须正确处理 EAGAIN 或 EINTR 错误码,故 accept 操作需返回错误状态供上层判断。

#pragma once
#include <iostream>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <arpa/inet.h>

#include <cstring>

const int gbacklog = 128;
class mySocket
{
public:
    // 1.构建tcp socketfd
    static int creatSockfd()
    {
        // 创建socketfd
        int sockfd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
        return sockfd;
    }

    // 2.bind绑定端口
    static void Bind(int sockfd, int port)
    {
        struct sockaddr_in serveraddr;
        memset(&serveraddr, 0, sizeof(serveraddr));
        // 设置地址的信息(协议,ip,端口)
        serveraddr.sin_family = AF_INET;
        serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); // 绑定任意网卡ip,通常我们访问某一个IP地址是这个服务器的公网网卡IP地址
        serveraddr.sin_port = htons(port);              // 注意端口16位,2字节需要使用htons。不可使用htonl
        if (bind(sockfd, (const sockaddr *)(&serveraddr), sizeof(serveraddr)) < 0)
        {
            perror("sock bind err");
            exit(-1);
        }
        std::cout << "sock bind success" << std::endl;
    }

    // 3. listen监听,让打开的sock这个"文件"去监听来自网络的请求。用于获取新的网络连接
    static void Listen(int sockfd, int maxaccept)
    {
        if (listen(sockfd, maxaccept) == -1)
        {
            perror("sock listen err");
            exit(-1);
        }
        std::cout << "sock listen success" << std::endl;
    }

    // 4 accept创建sockfd用于传输数据
    static int Accept(int listenfd, std::string &clientIp, uint16_t &clientPort, int &err)
    {
        // 获取新fd用于通信
        struct sockaddr_in clientaddr;
        memset(&clientaddr, 0, sizeof(clientaddr));

        socklen_t len = sizeof(clientaddr);
        // std::cout << "accept start  " << listenfd << std::endl;
        int sockfd = accept(listenfd, (struct sockaddr *)&clientaddr, &len);

        // 需要处理错误 EAGAIN 和 EINTER
        err = errno;

        clientIp = inet_ntoa(clientaddr.sin_addr);
        clientPort = ntohs(clientaddr.sin_port);
        return sockfd;
    }
};

b) connItem.hpp

用于管理每一个连接的 fd 及其关联的读写回调函数,同时维护每个连接的输入输出缓冲区(inBuffer/outBuffer)。该结构体充当了网络层与协议解析层之间的桥梁。

当某个 fd 被 epoll 检测到就绪后,可通过查找对应 connItem 实例,调用已注册的 reader、sender 或 accepter 方法进行处理。

#pragma once
#include <string>
#include <functional>

// 管理网络连接信息和缓冲区的结构体
class tcpServer;
struct connItem;
using func_t = std::function<void(connItem *)>; // 使用函数包装器,当然也可以使用函数指针
// typedef void (*func_t)(connItem *);

struct connItem
{
    // 构造函数
    connItem(int sockfd = -1, tcpServer *tcsvptr = nullptr)
        : _sockfd(sockfd), _tcsvptr(tcsvptr) {}

    // 用于注册该连接的对应的读写异常回调方法
    void Register(func_t recver = nullptr, func_t sender = nullptr, func_t execpter = nullptr)
    {
        _recver = recver;
        _sender = sender;
        _execpter = execpter;
    }

    // 文件描述符和读写缓冲区
    std::string _inbuffer;
    std::string _outbuffer;

    // 这个连接对应的读写异常方法
    func_t _recver;
    func_t _sender;
    func_t _execpter;

    // 执行服务器的回调指针
    tcpServer *_tcsvptr;

    int _sockfd;
};

c) TcpServer.hpp

TcpServer 是整个服务器的核心控制类,包含以下关键成员变量:

// 如果要同时监听多个端口,就需要维护每一个sockfd与对应端口的信息
    std::unordered_map<int, int> _listensock_fds;

    int _epfd;             // epollfd
    epoll_event *_revents; // 返回事件的列表

    std::unordered_map<int, connItem *> _connlist{}; // 用于快速查找fd和对于的连接结构体conn
    func_t _service = kvstoreTask;                   // 处理kv请求和响应的函数
  • _listensock_fds:哈希表结构,保存监听套接字与其绑定端口号的映射关系。支持多端口监听,解决单端口连接数上限或负载过高的问题。
  • _epfd_revents:epoll 所必需的句柄与事件数组。其中 _revents 使用 vector 实现更优,可实现动态扩容。
  • _connfdlist:管理所有活动连接的 fd 与对应 connItem 对象的映射,便于事件触发时快速定位连接实例并执行相应逻辑。
  • kvstoreTask:作为与解析模块交互的回调接口,接收到完整数据后调用此函数进入后续处理流程。

初始化相关函数包括 addListenPort 与 init:

// 增加监听的端口
    void addListenPort(int port)
    {
        // 服务器初始化,在creat中已经设置为非阻塞了
        int listensock = mySocket::creatSockfd();
        // 设置端口复用,保证服务器退出后能够快速bind
        int opt = 1;
        setsockopt(listensock, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt));

        mySocket::Bind(listensock, port);
        mySocket::Listen(listensock, gbacklog);

        // 将listensock 与 对应端口添加到哈希表中
        _listensock_fds.insert(std::make_pair(listensock, port));
    }

    // 初始化到监听即可
    void init()
    {
        // epoll初始化,返回事件集合初始化
        _epfd = epoll_create(1);
        _revents = new struct epoll_event[fdnums];

        // 遍历哈希表,将所有的listenfd 与 port进行 关心
        if (_listensock_fds.empty())
        {
            std::cerr << "没有设置监听端口!" << std::endl;
            exit(-1);
        }

        for (auto &kv : _listensock_fds)
        {
            AddConnList(kv.first, EPOLLIN | EPOLLET, [this](connItem *conn)
                        { this->Accepter(conn); }, nullptr, nullptr);
        }
    }
  • addListenPort:用于添加新的监听端口。注意启用 SO_REUSEPORT 选项,允许服务重启时即使处于 TIME_WAIT 状态也能立即绑定端口。
  • init:完成 epoll 实例的初始化,并为监听套接字注册 accept 回调函数。同时设置 EPOLLET 标志以启用边缘触发模式。

事件派发主循环:

// 事件派发器
    void Dispatcher()
    {
        printf("Dispatcher start\n");

        while (true)
        {
            int n = epoll_wait(_epfd, _revents, fdnums, -1);

            // 遍历就绪队列,epoll只会返回真的就绪的事件。不会返回无效事件,减少遍历
            for (int i = 0; i < n; i++)
            {
                int connfd = _revents[i].data.fd;
                uint32_t events = _revents[i].events;

                // 哈希表中该连接没有删除
                if (_connlist.count(connfd))
                {
                    // 不要使用if    elseif     else,因为同一个事件有可能读写事件都就绪了
                    if ((events & EPOLLIN) && _connlist[connfd]->_recver != nullptr) // 回调执行fd对应读事件
                        _connlist[connfd]->_recver(_connlist[connfd]);

                    if ((events & EPOLLOUT) && _connlist[connfd]->_sender != nullptr) // 回调执行fd对应写事件
                        _connlist[connfd]->_sender(_connlist[connfd]);
                }
            }
        }
        printf("Dispatcher over\n");
    }

这是服务器的主运行循环,持续调用 epoll_wait 获取就绪事件。对于每一个返回的 fd,先在哈希表中查找是否存在对应连接记录;若存在,则调用其 connItem 中预设的方法——Accepter 处理新连接,Recver 负责接收数据,Sender 发送响应数据。

在每个方法的实现中,必须对 EAGAIN 和 EINTR 异常进行处理。当遇到 EAGAIN 时,应直接跳出当前流程;而若捕获到 EINTR,则需重新尝试操作,即执行 continue 操作。

在 Reader 方法中,完成数据读取后需要调用 _service(conn),将数据交由解析层进行后续处理。发送数据(send)完成后,需重新注册该连接对应的事件监听。读事件始终需要关注,而是否关注写事件则取决于输出缓冲区中是否存在待发送的数据。

// listenfd 触发EOILLIN执行
    void Accepter(connItem *conn)
    {
        // 1.获取新连接的fd,注意ET模式下,需要死循环一次性将所有数据读取完毕。否则会出现问题
        // printf("Accepter start\n");
        while (true)
        {
            std::string clientip;
            uint16_t clientport;

            int err = 0;
            int clientsock = mySocket::Accept(conn->_sockfd, clientip, clientport, err);

            // 2.构建新连接的信息表,让epoll关心该事件同时并通过哈希表进行管理。需要使用lambda进行处理类内回调函数
            if (clientsock > 0)
            {
                AddConnList(clientsock, EPOLLIN | EPOLLET, [this](connItem *conn)
                            { this->Reader(conn); }, [this](connItem *conn)
                            { this->Sender(conn); }, nullptr); // 这里暂时不处理异常事件
                // 这里可以给每一个连接客户端发送一份使用说明
                printf("Get a new link, info [%s:%d] clientsock[%d]\n", clientip.data(), clientport, clientsock);
            }
            else
            {
                // 处理EAGAIN等异常信号
                if (err == EAGAIN || err == EWOULDBLOCK)
                {
                    // 没有连接了
                    // printf("DEBUG Accepter EAGAIN 没有更多连接,此次获取连接结束\n");
                    break;
                }
                else if (err == EINTR)
                {
                    // printf("DEBUG Accepter EINTR 还有更多连接需要处理\n");
                    continue;
                }
                else
                {
                    // printf("ERRNO Accepter 建立连接失败\n");
                    break;
                }
            }
        }
        // printf("Accepter over\n");
    }

    // clientfd触发EOILLIN执行
    void Reader(connItem *conn)
    {
        char buffer[1024];
        while (true)
        {
            int count = recv(conn->_sockfd, buffer, sizeof(buffer) - 1, 0);
            if (count > 0)
            {
                buffer[count] = 0;
                conn->_inbuffer += buffer;
            }
            else if (count == 0)
            {
                // printf("DEBUG Reader recv over\n");
                RemoveConn(conn->_sockfd);
                return;
            }
            else
            {
                // 同理需要处理EAGAIN和EINTER
                if (errno == EAGAIN || errno == EWOULDBLOCK)
                {
                    // 无数据可读
                    // printf("DEBUG Reader EAGAIN\n");
                    break;
                }
                else if (errno == EINTR)
                {
                    // printf("DEBUG  Reader EINTR\n");
                    continue;
                }
                else
                {
                    // printf("ERRNO Reader 建立连接失败\n");
                    //  关闭套接字和取消epoll关心,然后退出
                    RemoveConn(conn->_sockfd);
                    return;
                }
            }
        }

        // 接收数据之后,进行解析处理。这里目前只是简单处理,还能进一步优化
        // printf("处理客户端请求开始\n");
        _service(conn);
        // printf("处理客户端请求结束\n");
    }

    // clientfd触发EOILLOUT执行
    void Sender(connItem *conn)
    {
        while (true)
        {
            int count = send(conn->_sockfd, conn->_outbuffer.c_str(), conn->_outbuffer.size(), 0);

            if (count > 0)
            {
                // 清空发送的数据,可以进一步优化
                conn->_outbuffer.erase(0, count);
                // 数据发送完毕
                if (conn->_outbuffer.empty())
                {
                    // 此时不可以直接更改事件的关系,因为数据可能还在内核,没有发送到网络
                    // printf("DEBUG Senderr send over\n");
                    break;
                }
            }
            else if (count == 0)
            {
                // 没有数据发送
                RemoveConn(conn->_sockfd);
                return;
            }
            else
            {
                // 同理需要处理EAGAIN和EINTER
                if (errno == EAGAIN || errno == EWOULDBLOCK)
                    break;
                else if (errno == EINTR)
                    continue;
                else
                {
                    // 关闭套接字和取消epoll关心,然后退出
                    RemoveConn(conn->_sockfd);
                    return;
                }
            }
        }

        // 到这里,事件结束了,数据才是真的发送出去了通重新关心该事件的读写

        if (conn->_outbuffer.empty())
            SetEvent(conn->_sockfd, EPOLLIN | EPOLLET, EVENT_MOD);
        else
            SetEvent(conn->_sockfd, EPOLLET | EPOLLIN | EPOLLOUT, EVENT_MOD);
    }

AddConnList:用于初始化连接对象 conn,注册其事件回调,并将其插入哈希表中以便统一管理。

SetEvent:通过 epoll_ctl 接口来添加、修改或删除指定文件描述符(fd)所关注的事件类型,如 EPOLLIN 或 EPOLLOUT。

RemoveConn:负责销毁与连接相关的资源,包括关闭文件描述符和释放内存(close 和 delete)。

// 初始化连接信息和方法,注册到epoll关心列表中,并放入连接信息哈希表中
    void AddConnList(int sockfd, uint32_t event, func_t reader, func_t sender, func_t execpter)
    {
        // ET模式下,将fd设置为非阻塞
        int n = SetNonBlock(sockfd);
        if (n < 0)
        {
            printf("SetNonBlock 失败!\n");
            exit(-1);
        }

        // 1.构建连接信息,并注册方法
        connItem *conn = new connItem(sockfd, this);
        conn->Register(reader, sender, execpter);

        // 2.让epoll关心该事件
        SetEvent(sockfd, event, EVENT_ADD);
        // 3.放入连接信息哈希表中
        _connlist.insert(std::make_pair(sockfd, conn));
    }

    void SetEvent(int sockfd, uint32_t event, int flag)
    {
        struct epoll_event ev;
        ev.data.fd = sockfd;
        ev.events = event;

        if (flag == EVENT_ADD) // 新增
            epoll_ctl(_epfd, EPOLL_CTL_ADD, sockfd, &ev);
        else if (flag == EVENT_MOD) // 修改
            epoll_ctl(_epfd, EPOLL_CTL_MOD, sockfd, &ev);
        else if (flag == EVENT_DEL) // 删除
            epoll_ctl(_epfd, EPOLL_CTL_DEL, sockfd, 0);
    }
    void RemoveConn(int sockfd)
    {
        auto it = _connlist.find(sockfd);
        if (it != _connlist.end())
        {
            // 从epoll中删除
            SetEvent(sockfd, 0, EVENT_DEL);
            // 关闭socket
            close(sockfd);
            // 释放内存
            delete it->second;
            // 从哈希表删除
            _connlist.erase(it);
            // printf("连接关闭: fd=%d, 剩余连接数: %zu\n", sockfd, _connlist.size());
        }
    }

serverEntry.cc

该文件是整个服务器程序的入口点。如果未来希望切换至其他网络编程模型(例如协程或异步 I/O 框架),只需将项目的主入口替换为目标框架所提供的启动方式即可,无需大规模重构业务逻辑。

3.2 数据解析协议模块

a. Task.h / Task.cc

#pragma once


// 这里使用自定制协议
//  处理请求,发送响应
class connItem;
void kvstoreTask(connItem *conn);

#include "Task.h"
#include "Protocol.h"
#include "../reactorTcpServer/connItem.hpp" //解析模块与网络模块的交互
#include "../storage/commandExecutor.h"       //解析模块与存储模块的交互
// 这里使用自定制协议
//  处理请求,发送响应

void kvstoreTask(connItem *conn)
{
    printf("recv :%s\n", conn->_inbuffer.c_str());

    // 接收并解析数据,构建请求完整报文。对于非法数据,啥处理都不做
    std::string onePackage;
    while (ParseOnePackage(conn->_inbuffer, onePackage))
    {
        // TCP数据会有粘包,所以要解析出一个报文进行处理

        // 序列化建请求报文
        kvstoreRequest req;
        std::string ans;
        if (!req.deserialize(onePackage))
        {
            // 数据错误,解析失败
            ans = "ERROR please input:\nSET KEY VALUE\nGET KEY\nDEL KEY\nMOD KEY VALUE\nSIZE\n";
            // 发送错误应答,解析下一个报文
            conn->_outbuffer = ans;
            conn->_sender(conn);
            continue;
        }

        // 可以成功处理请求,解析层保证传入的数据是对的
        ans = globalExecutor.execute(req);
        // 根据请求,构建响应报文
        // printf("ans : %s\n", ans);

        // 发送响应
        conn->_outbuffer = ans;
        conn->_sender(conn);
    }
}

此部分是网络层、解析层与存储层之间交互的核心区域。网络层通过 kvstoreTask 提交接收到的数据包,解析层完成报文解析后,将封装好的请求提交给存储层的 globalExecutor 执行具体操作,最终将响应结果返回至网络层,由其发送回客户端。

b. protocol.h / protocol.cc

.h 文件内容

#pragma once
#include <string>

// 解析一份报文
bool ParseOnePackage(std::string &inbuffer, std::string &rcv_text);
// 判断报文是否合法
bool isValidCommand(const std::string &cmd);

// kvstroe请求报文
class kvstoreRequest
{
public:
    // 序列化请求报文
    void serialize();

    // 反序列化请求报文,将一个报文数据反序string列化
    bool deserialize(const std::string &onePacPage);

    // 判断字符串是否非法,最好在创建时候验证,减少检测消耗
    bool isValid();

public:
    const std::string &getop() const;
    const std::string &getkey() const;
    const std::string &getvalue() const;

private:
    std::string op;
    std::string key;
    std::string value;
};

// kvstroe响应报文
class kvstroeResponse
{
public:
    // 序列化请求报文
    void serialize();

    // 反序列化请求报文
    void deserialize();

private:
};

主要定义了请求与响应的结构体,包含序列化与反序列化的接口。同时提供了 ParseOnePackage 函数,用于从字节流中提取一个完整的数据包。由于本项目返回值较为简单(如 OK、ERROR、NO EXIST 等字符串),因此未对响应报文做复杂设计。

protocol.cc 实现细节

#include "Protocol.h"
#include <vector>
#include <iostream>
#include <sstream>

// 解析报文
// 将收到的数据inbuffer中解析为一个个报文
// client -> server
// set key value
// get key
// del key
// mody key value
bool ParseOnePackage(std::string &inbuffer, std::string &rcv_text)
{
#if 1
    // 接收数据为空,直接返回
    if (inbuffer.empty())
        return false;

    // 清空上一次的报文
    rcv_text.clear();

    // 开始解析报文,这里先直接简单处理
    rcv_text = inbuffer;
    inbuffer.clear();
    return true;
#else
    // 找到一个报文的\r\n
    auto pos = inbuffer.find("\r\n");
    if (pos == std::string::npos)
        return false;

    // 清空之前报文,获取一份新报文
    rcv_text.clear();
    rcv_text = inbuffer.substr(0, pos);

    // 删除缓冲区取出的数据
    inbuffer.erase(0, pos + 2);

    // 检测报文是否合法
    return isValidCommand(rcv_text);
#endif
}

// 判断接收的报文是否合法
bool isValidCommand(const std::string &cmd)
{
    // 检测报文是否为空
    if (cmd.empty())
        return false;
    return cmd.find("SET ") == 0 ||
           cmd.find("GET ") == 0 ||
           cmd.find("DEL ") == 0 ||
           cmd.find("MOD ") == 0 ||
           cmd.find("SIZE") == 0;
}

//--------------------------------------------Request--------------------------------------------
//--------------------------------------------Request--------------------------------------------

// 序列化请求报文
void kvstoreRequest::serialize() {}

// 反序列化请求报文,将一个报文数据反序string列化
bool kvstoreRequest::deserialize(const std::string &onePacPage)
{
    std::vector<std::string> tokens;
    std::string token;

    // set key value
    std::stringstream ss(onePacPage);
    while (ss >> token)
        tokens.emplace_back(token);

    if (tokens.size() > 3)
        return false;

    op = tokens[0];
    key = tokens.size() > 1 ? tokens[1] : "";
    value = tokens.size() > 2 ? tokens[2] : "";

    // std::cout << "序列化数据为:" << op << " " << key << " " << value << std::endl;
    return isValid();
}

// 判断字符串是否非法,最好在创建时候验证,减少检测消耗
bool kvstoreRequest::isValid()
{
    if (op == "SIZE" && key.empty() && value.empty())
        return true;

    if (key.empty())
        return false;

    if (op == "SET" || op == "MOD")
        return !value.empty();
    else if (op == "GET" || op == "DEL")
        return value.empty(); // 防止 命令 GET key value 和 DEL key value

    return false;
}

const std::string &kvstoreRequest::getop() const { return op; }
const std::string &kvstoreRequest::getkey() const { return key; }
const std::string &kvstoreRequest::getvalue() const { return value; }

//--------------------------------------------Request--------------------------------------------
//--------------------------------------------Request--------------------------------------------

//--------------------------------------------Response--------------------------------------------
//--------------------------------------------Response--------------------------------------------

// 序列化响应报文
void kvstroeResponse::serialize()
{
}

// 反序列化响应报文
void kvstroeResponse::deserialize()
{
}

//--------------------------------------------Response--------------------------------------------
//--------------------------------------------Response--------------------------------------------

重点关注 ParseOnePackage 函数,其实现支持两种报文分割方式:一种为不作特殊处理,另一种使用分隔符 \r\n 来界定报文边界。采用分隔符可有效缓解 TCP 粘包问题,尽管更优方案是在每个报文前附加长度字段加标识符的方式。

请求的反序列化过程旨在从原始数据中提取出操作类型(op)、键(key)和值(value),然后提交至存储层执行。对于格式错误的请求,函数应返回 false,并由 kvstoreTask 进行相应处理。

3.3 数据存储模块

a. commandExecutor.h / commandExecutor.cc

该模块负责接收来自解析层的请求 req,并调度执行相应的命令,如 SET、DEL、GET、MOD、SIZE 等。

#pragma once
#include "../protocol/Protocol.h"
#include "kvStorages.h"
#include <unordered_map>
#include <functional>
#include <memory>

class commandExecutor
{
public:
    // 构造函数,用于注册方法
    commandExecutor();

    // 设置存储引擎
    void setStorage(std::unique_ptr<kvStorags> kvStoragePtr);

    // 执行函数,根据传入的数据执行方法表相应的方法
    std::string execute(const kvstoreRequest &req);

private:
    // 注册方法表
    void registerCommands();

private:
    std::unique_ptr<kvStorags> _kvStoragePtr;
    std::unordered_map<std::string, std::function<std::string(const kvstoreRequest &)>> _cmds;
};

// 声明全局的存储引擎和执行器
extern commandExecutor globalExecutor;

关键成员说明:

  • _kvStoragePtr:智能指针,指向存储基类,利用多态机制支持多种存储引擎的统一调用。
  • _cmds:哈希表结构,记录命令名称与其对应回调函数的映射关系,便于快速查找与执行。
  • setStorage:用于设置具体的存储引擎实例,通过基类指针实现运行时绑定,体现多态特性。
  • registerCommands:对各存储引擎中的基础操作进行封装注册,提供一致的外部访问接口,有利于扩展新引擎。
  • execute:接收解析后的请求对象,依据其中的操作码调用相应函数完成处理。

#include "commandExecutor.h"

// 定义全局存储引擎
commandExecutor globalExecutor;
// 构造函数,用于注册方法,默认使用哈希
commandExecutor::commandExecutor()
    : _kvStoragePtr(new HashStorage())
{
    registerCommands();
}

// 设置存储引擎
void commandExecutor::setStorage(std::unique_ptr<kvStorags> kvStoragePtr)
{
    // 注意 unique_ptr是独占智能指针,转移管理权必须使用 std::move
    _kvStoragePtr = std::move(kvStoragePtr);
}

void commandExecutor::registerCommands()
{
    // 注册方法列表
    _cmds["SET"] = [this](const kvstoreRequest &req)
    { return _kvStoragePtr->SET(req.getkey(), req.getvalue()) ? "OK" : "SET FAILED"; };

    _cmds["GET"] = [this](const kvstoreRequest &req)
    {
        const std::string &value = _kvStoragePtr->GET(req.getkey());
        return value.empty() ? "NO EXIST" : value;
    };

    _cmds["DEL"] = [this](const kvstoreRequest &req)
    { return _kvStoragePtr->DEL(req.getkey()) ? "OK" : "NO EXIST"; };

    _cmds["MOD"] = [this](const kvstoreRequest &req)
    { return _kvStoragePtr->MOD(req.getkey(), req.getvalue()) ? "OK" : "NO EXIST"; };

    _cmds["SIZE"] = [this](const kvstoreRequest &req)
    { return std::to_string(_kvStoragePtr->SIZE()); };
}

// 执行函数,根据传入的数据执行相应的方法
std::string commandExecutor::execute(const kvstoreRequest &req)
{
    auto it = _cmds.find(req.getop());

    // 解析模块保证传输数据是有效的
    //  if (it == _cmds.end())
    //      return "cmd error please input SET GET MOD DEL";

    return it->second(req);
}

b. storage.h / storage.cc

这是实际的数据存储实现部分,采用策略模式进行设计。定义了一个抽象基类 storage,所有具体存储类型(如 array、hash、rbtree、lrucache)均继承自该基类,并重写虚函数以实现各自特有的数据操作逻辑。

#pragma once
#include <vector>
#include <list>
#include <map>
#include <unordered_map>

// 根据用户需求,选择不同的存储数据结构。采用策略模式
// 方案有 哈希 红黑树 数组 跳表 LRUCache
class kvStorags
{
public:
    // 策略模式,保证基类析构函数是虚函数
    virtual ~kvStorags() = default;

    // 四种方法的操作定义
    virtual bool SET(const std::string &key, const std::string &value) = 0;

    virtual std::string GET(const std::string &key) = 0;

    virtual bool DEL(const std::string &key) = 0;

    virtual bool MOD(const std::string &key, const std::string &value) = 0;

    virtual size_t SIZE() const = 0;
};

// 用于存储数据的类,还需要考虑线程安全的问题
class RBTreeStorage : public kvStorags
{
public:
    // 四种方法的操作定义
    virtual bool SET(const std::string &key, const std::string &value) override;

    virtual std::string GET(const std::string &key) override;

    virtual bool DEL(const std::string &key) override;

    virtual bool MOD(const std::string &key, const std::string &value) override;

    virtual size_t SIZE() const;

private:
    std::map<std::string, std::string> _storage{}; // 红黑树存储
};

// 用于存储数据的类,还需要考虑线程安全的问题
class HashStorage : public kvStorags
{
public:
    // 四种方法的操作定义
    virtual bool SET(const std::string &key, const std::string &value) override;

    virtual std::string GET(const std::string &key) override;

    virtual bool DEL(const std::string &key) override;

    virtual bool MOD(const std::string &key, const std::string &value) override;

    virtual size_t SIZE() const;
private:
    std::unordered_map<std::string, std::string> _storage{10000}; // 哈希存储,预分配空间减少哈希冲突;
};

// 用于存储数据的类,还需要考虑线程安全的问题
class ArrayStorage : public kvStorags
{
public:
    // 四种方法的操作定义
    virtual bool SET(const std::string &key, const std::string &value) override;

    virtual std::string GET(const std::string &key) override;

    virtual bool DEL(const std::string &key) override;

    virtual bool MOD(const std::string &key, const std::string &value) override;

    virtual size_t SIZE() const;
private:
    std::vector<std::pair<std::string, std::string>> _storage{10000};
};

// 用于存储数据的类,还需要考虑线程安全的问题
class LRUCacheStorage : public kvStorags
{
public:
    LRUCacheStorage() : _capacity(10000) {}
    // 四种方法的操作定义
    virtual bool SET(const std::string &key, const std::string &value) override;

    virtual std::string GET(const std::string &key) override;

    virtual bool DEL(const std::string &key) override;

    virtual bool MOD(const std::string &key, const std::string &value) override;

    virtual size_t SIZE() const;
private:
    // 需要一个迭代器
    using iter = std::list<std::pair<std::string, std::string>>::iterator;

    int _capacity;
    std::list<std::pair<std::string, std::string>> _LRUList;
    std::unordered_map<std::string, iter> _hashmap;
};

storage.cc 文件中包含了基本的增删改查操作实现,逻辑清晰,具体细节可参考代码仓库。

3.4 kvstore.cc

包含主函数 main,主要职责是配置默认存储引擎并启动服务器实例。

#include "storage/commandExecutor.h"
#include <cstring>
int serverEntry();

// 初始化存储引擎
void initEgineKvstore(const std::string &storage)
{
    if (storage == "array")
        globalExecutor.setStorage(std::make_unique<ArrayStorage>());
    else if (storage == "rbtree")
        globalExecutor.setStorage(std::make_unique<RBTreeStorage>());
    else if (storage == "lru")
        globalExecutor.setStorage(std::make_unique<LRUCacheStorage>());
    else
    {
        printf("未选择或者错误选择存储引擎, 默认使用hash\n");
    }
}

int main(int argc, char *argv[])
{
    std::string storageType = "hash";
    // 初始化存储引擎
    for (int i = 1; i < argc; ++i)
    {
        if ((strcmp(argv[i], "--storage") == 0) && i + 1 < argc)
            storageType = argv[++i];
        else if (strcmp(argv[i], "--help") == 0)
        {
            printf("\r\n%s [--storage hash|rbtree|array|lru] [--help]\r\n\r\n", argv[0]);
            printf("Default storage engine: hash\r\n\r\n");
            return 0;
        }
        else
        {
            printf("\r\nUnknown option\r\n\r\n");
            printf("KVStore Usage:\r\n%s [--storage hash|rbtree|array|lru] [--help]\r\n\r\n", argv[0]);
            return -1;
        }
    }
    printf("初始化存储引擎\n");
    initEgineKvstore(storageType);

    // 启动服务器,在这里可以选择不同的网络框架。
    // 如果想要使用协程网络框架,直接调用Task.h中的kvstoreTask,然后执行相应的交互即可
    printf("初始化服务器\n");
    serverEntry();
}

四、项目测试与性能分析

测试环境为一台配置为 2核2G 内存的云服务器,操作系统为 CentOS 8。

项目构建完成后,需进行以下几项测试验证:

  1. 完整功能流程测试,确保各项指令行为正确:
    • SET name yzc → 返回 OK
    • SET key value → 返回 OK
    • SIZE → 返回 2
    • GET name → 返回 yzc
    • MOD name czy → 返回 OK
    • GET name → 返回 czy
    • DEL name → 返回 OK
    • GET name → 返回 NO EXIST
  2. 测量系统的平均 QPS(Queries Per Second)性能指标。
  3. 评估系统所能支持的最大并发连接数。

4.1 功能流程测试

首先进行基础功能验证:

结果显示,整个操作流程执行正常,各命令返回符合预期,功能完整可用。

4.2 QPS 性能测试

使用仓库中 Prestandatest 目录下的 qpsTest.cc 工具进行压测。建议选择 Hash 存储引擎,多次运行取平均值以提高准确性。

测试参数设定为:并发连接数 × 每个连接发送的请求数量(混合发送 SET 与 GET 请求)。

基础功能测试: 全部通过
=== 性能测试 ===
并发数: 100
每线程请求数: 5000 (SET+GET)
总请求数: 1000000
测试结果:
  总耗时: 29423 ms
  总请求: 1000000
  成功请求: 1000000
  失败请求: 0
  成功率: 100%
  QPS: 33987


基础功能测试: 全部通过
=== 性能测试 ===
并发数: 100
每线程请求数: 5000 (SET+GET)
总请求数: 1000000
测试结果:
  总耗时: 28154 ms
  总请求: 1000000
  成功请求: 1000000
  失败请求: 0
  成功率: 100%
  QPS: 35518.9


基础功能测试: 全部通过
=== 性能测试 ===
并发数: 200
每线程请求数: 2500 (SET+GET)
总请求数: 1000000
测试结果:
  总耗时: 29177 ms
  总请求: 1000000
  成功请求: 1000000
  失败请求: 0
  成功率: 100%
  QPS: 34273.6

=== 性能测试 ===
并发数: 50
每线程请求数: 10000 (SET+GET)
总请求数: 1000000
测试结果:
  总耗时: 27165 ms
  总请求: 1000000
  成功请求: 1000000
  失败请求: 0
  成功率: 100%
  QPS: 36812.1

=== 性能测试 ===
并发数: 100
每线程请求数: 50000 (SET+GET)
总请求数: 10000000
测试结果:
  总耗时: 423004 ms
  总请求: 10000000
  成功请求: 10000000
  失败请求: 0
  成功率: 100%
  QPS: 23640.4

从测试结果可以看出,当总请求量达到100万时,系统的QPS均值约为35000;而当请求总量上升至1000万时,QPS下降至20000以上。推测性能下降的主要原因为数据规模增大导致哈希冲突频率上升,进而使得每次插入与查询操作的开销增加。

测试汇总表

测试编号 并发数 每线程请求数 总请求数 总耗时(ms) QPS 成功率 备注
测试1 100 5,000 1,000,000 29,423 33,987 100% 基准测试
测试2 100 5,000 1,000,000 28,154 35,518 100% 基准测试
测试3 200 2,500 1,000,000 29,177 34,274 100% 基准测试
测试4 50 10,000 1,000,000 27,165 36,812 100% 最佳性能
测试5 100 50,000 10,000,000 423,004 23,640 100% 压力测试

4.3 最大并发连接测试

使用项目仓库中的 stressConnectionTest 工具进行测试。此前已多次执行相关实验,当前测试结果刚好接近服务器所能承受的最大负载状态。

启动连接压力测试...
存储引擎模式: Hash

=== 真实连接压力测试 ===
目标最大连接数: 30000
测试持续时间: 100 秒
服务器: 127.0.0.1:8080
测试模式: 建立连接 + 持续数据交互 (70% GET, 30% SET)
工作线程数: 50
每线程连接数: 600
[0s] 连接数: 0 | 成功: 0 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[2s] 连接数: 9821 | 成功: 9821 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[4s] 连接数: 14533 | 成功: 14533 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[6s] 连接数: 15257 | 成功: 15257 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[8s] 连接数: 15915 | 成功: 15915 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[10s] 连接数: 16599 | 成功: 16599 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[12s] 连接数: 17253 | 成功: 17253 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[14s] 连接数: 17899 | 成功: 17899 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[16s] 连接数: 18513 | 成功: 18513 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[18s] 连接数: 19130 | 成功: 19130 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[20s] 连接数: 19734 | 成功: 19734 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[22s] 连接数: 20313 | 成功: 20313 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[24s] 连接数: 20870 | 成功: 20870 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[26s] 连接数: 21446 | 成功: 21446 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[28s] 连接数: 22007 | 成功: 22007 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[30s] 连接数: 22521 | 成功: 22521 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[32s] 连接数: 23032 | 成功: 23032 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[34s] 连接数: 23542 | 成功: 23542 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[36s] 连接数: 24044 | 成功: 24044 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[38s] 连接数: 24538 | 成功: 24538 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[40s] 连接数: 25046 | 成功: 25046 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[42s] 连接数: 25519 | 成功: 25519 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[44s] 连接数: 26009 | 成功: 26009 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[46s] 连接数: 26509 | 成功: 26509 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[48s] 连接数: 26963 | 成功: 26963 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[50s] 连接数: 27417 | 成功: 27417 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[52s] 连接数: 27862 | 成功: 27862 | 失败: 0 | 请求: 0 | 请求成功率: 0%
[54s] 连接数: 28232 | 成功: 28232 | 失败: 7 | 请求: 0 | 请求成功率: 0%
[56s] 连接数: 28232 | 成功: 28232 | 失败: 283 | 请求: 0 | 请求成功率: 0%
[58s] 连接数: 28232 | 成功: 28232 | 失败: 558 | 请求: 0 | 请求成功率: 0%
[60s] 连接数: 28232 | 成功: 28232 | 失败: 833 | 请求: 0 | 请求成功率: 0%
[62s] 连接数: 28232 | 成功: 28232 | 失败: 1102 | 请求: 0 | 请求成功率: 0%
[64s] 连接数: 28232 | 成功: 28232 | 失败: 1390 | 请求: 0 | 请求成功率: 0%
[66s] 连接数: 28232 | 成功: 28232 | 失败: 1666 | 请求: 9 | 请求成功率: 100%
[68s] 连接数: 28232 | 成功: 28232 | 失败: 1768 | 请求: 799 | 请求成功率: 100%
[70s] 连接数: 28232 | 成功: 28232 | 失败: 1768 | 请求: 1792 | 请求成功率: 100%
[72s] 连接数: 28232 | 成功: 28232 | 失败: 1768 | 请求: 2792 | 请求成功率: 100%
[74s] 连接数: 28232 | 成功: 28232 | 失败: 1768 | 请求: 3792 | 请求成功率: 100%
[76s] 连接数: 28232 | 成功: 28232 | 失败: 1768 | 请求: 4792 | 请求成功率: 100%
[78s] 连接数: 28232 | 成功: 28232 | 失败: 1768 | 请求: 5792 | 请求成功率: 100%
[80s] 连接数: 28232 | 成功: 28232 | 失败: 1768 | 请求: 6778 | 请求成功率: 100%
[82s] 连接数: 28232 | 成功: 28232 | 失败: 1768 | 请求: 7778 | 请求成功率: 100%
[84s] 连接数: 28232 | 成功: 28232 | 失败: 1768 | 请求: 8778 | 请求成功率: 100%
[86s] 连接数: 28232 | 成功: 28232 | 失败: 1768 | 请求: 9771 | 请求成功率: 100%
[88s] 连接数: 28232 | 成功: 28232 | 失败: 1768 | 请求: 10771 | 请求成功率: 100%
[90s] 连接数: 28232 | 成功: 28232 | 失败: 1768 | 请求: 11771 | 请求成功率: 100%
[92s] 连接数: 28232 | 成功: 28232 | 失败: 1768 | 请求: 12771 | 请求成功率: 100%
[94s] 连接数: 28232 | 成功: 28232 | 失败: 1768 | 请求: 13771 | 请求成功率: 100%
[96s] 连接数: 28232 | 成功: 28232 | 失败: 1768 | 请求: 14767 | 请求成功率: 100%
[98s] 连接数: 28232 | 成功: 28232 | 失败: 1768 | 请求: 15767 | 请求成功率: 100%
[100s] 连接数: 28232 | 成功: 28232 | 失败: 1768 | 请求: 16752 | 请求成功率: 100%

=== 压力测试结果 ===
总耗时: 102293 ms
最大并发连接数: 28232
失败连接数: 1768
连接成功率: 94.1067%
总请求数: 17752
请求成功率: 100%
平均连接建立速度: 275.992 连接/秒
平均请求QPS: 173.541 请求/秒

=== 结果分析 ===
???? 服务器表现良好 - 接近承载极限

在单个端口的情况下,系统能维持的最大并发连接数为28232。该限制可能源于操作系统层面的约束。每个TCP连接由一个四元组(源IP、源端口、目的IP、目的端口)唯一标识。由于客户端与服务端程序运行在同一台机器上,可用端口数量受到限制,从而影响了最大连接数。

所有成功建立的连接在稳定性方面表现良好。若需进一步提升最大连接数,可考虑以下几种优化方式:

  1. 增加服务器监听的端口数量。当前测试中仅使用了1个端口,实际部署时服务器已配置10个监听端口。
  2. 调整内核对文件句柄数的限制,通过命令 ulimit -n 提高进程可打开的文件描述符上限。
  3. 采用多个具有不同IP地址的客户端发起连接,以突破本地端口复用的瓶颈。

五、项目难点与解决方案

  • 网络模块如何高效管理连接并完成数据收发及协议交互?
    通过设计自定义连接对象 connItem,并利用哈希表维护 fd 与 conn* 的映射关系,实现了连接的快速查找与管理,有效支撑了网络层的数据通信和协议模块之间的协作。
  • 数据解析模块如何实现高效解析?如何应对TCP粘包问题?
    通过定义 Request 和 Response 结构体,实现数据的序列化与反序列化,提升了数据传递的可靠性。同时引入分隔符机制,准确切分报文边界,有效解决了TCP粘包问题。
  • 存储模块如何整合多种存储引擎?
    采用策略模式,定义统一基类,各具体存储引擎继承该基类。在命令执行器(cmd)中通过依赖注入和函数注册机制动态调用对应引擎的方法,实现了多引擎的灵活适配与扩展。

六、收获与反思

技术层面的提升
  • 网络编程:深入掌握了 epoll 的工作原理以及 Reactor 模式的设计思想。
  • 系统设计:逐步建立起从单机架构向分布式系统演进的整体思维框架。
  • C++ 进阶:熟练运用现代 C++ 特性,如 RAII 原则、智能指针等,增强了代码的安全性与可维护性。
  • 调试能力:积累了复杂系统中问题定位的经验,提升了日志分析与性能排查的能力。
工程实践中的体会
  • 模块化设计对于系统解耦和功能扩展至关重要。
  • 良好的文档规范与编码风格显著提高了团队协作效率和后期维护便利性。
存在的不足
  • 尚未实现数据持久化机制,存在断电丢数风险。
  • 监控指标体系不够全面,缺乏细粒度的性能追踪能力。
  • 配置管理灵活性有待提升,部分参数仍硬编码于代码中。
  • 网络模块仍有优化空间,例如零拷贝、连接池等技术可进一步引入。
  • 解析模块对请求与响应报文的处理逻辑尚不完善,边界情况覆盖不足。

本项目让我深刻体会到高并发系统设计的复杂性。通过亲手实现各个核心模块,我在网络编程、内存管理、并发控制等方面获得了扎实的实践经验。这不仅是一次技术能力的全面提升,更是一场工程思维方式的深度锤炼。

二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

关键词:服务器 高性能 Protocol continue Register

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
jg-xs1
拉您进交流群
GMT+8, 2025-12-28 21:29