C++集群聊天服务器 muduo+nginx+redis+mysql数据库连接池 笔记 (下)

这篇具有很好参考价值的文章主要介绍了C++集群聊天服务器 muduo+nginx+redis+mysql数据库连接池 笔记 (下)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

C++集群聊天服务器 网络模块+业务模块+CMake构建项目 笔记 (上)-CSDN博客https://blog.csdn.net/weixin_41987016/article/details/135991635?spm=1001.2014.3001.5501C++集群聊天服务器 数据模块+业务模块+CMake构建项目 笔记 (上)-CSDN博客https://blog.csdn.net/weixin_41987016/article/details/136007616?spm=1001.2014.3001.5501C++集群聊天服务器 nginx+redis安装 笔记 (中)-CSDN博客https://blog.csdn.net/weixin_41987016/article/details/136119985?spm=1001.2014.3001.5501基于C++11的数据库连接池【C++/数据库/多线程/MySQL】_c++ 数据库 句柄 连接池管理-CSDN博客https://blog.csdn.net/weixin_41987016/article/details/135719057?spm=1001.2014.3001.5501MysqlConn.h

#pragma once
#include <mysql/mysql.h>
#include <string>
#include <chrono>
using namespace std;
using namespace std::chrono;
class MysqlConn {
public:
    // 初始化数据库连接
    MysqlConn();
    // 释放数据库连接
    ~MysqlConn();
    // 连接数据库
    bool connect(string user, string passwd, string dbName, string ip, unsigned short port = 3306);
    // 更新数据库: select,update,delete
    bool update(string sql);
    // 查询数据库
    MYSQL_RES* query(string sql);
    // 遍历查询得到的结果集
    bool next();
    // 得到结果集中的字段值
    string value(int index);
    // 事务操作
    bool transaction();
    // 提交事务
    bool commit();
    // 事务回滚
    bool rollback();
    // 刷新起始的空闲时间点
    void refreshAliveTime();
    // 计算连接存活的总时长
    long long getAliveTime();
    // 获取连接
    MYSQL* getConnection();
private:
    void freeResult();
    MYSQL* m_conn = nullptr; // 数据库连接
    MYSQL_RES* m_result = nullptr;
    MYSQL_ROW m_row = nullptr;
    steady_clock::time_point m_aliveTime;
};

ConnPool.h

#pragma once
#include <queue>
#include <mutex>
#include <condition_variable>//条件变量
#include "MysqlConn.h"
using namespace std;
class ConnPool {
public:
    static ConnPool* getConnPool();// 获得单例对象
    ConnPool(const ConnPool& obj) = delete; // 删除拷贝构造函数
    ConnPool& operator=(const ConnPool& obj) = delete; // 删除拷贝赋值运算符重载函数
    shared_ptr<MysqlConn> getConn(); // 从连接池中取出一个连接
    ~ConnPool(); // 析构函数
private:
    ConnPool(); // 构造函数私有化
    bool parseJsonFile(); // 解析json格式文件
    void produceConn(); // 生产数据库连接
    void recycleConn(); // 销毁数据库连接
    void addConn(); // 添加数据库连接
 
    // 连接服务器所需信息
    string m_ip;            // 数据库服务器ip地址
    string m_user;          // 数据库服务器用户名
    string m_dbName;        // 数据库服务器的数据库名
    string m_passwd;        // 数据库服务器密码
    unsigned short m_port;  // 数据库服务器绑定的端口
 
    // 连接池信息
    queue<MysqlConn*> m_connQ;
    unsigned int m_maxSize; // 连接数上限值
    unsigned int m_minSize; // 连接数下限值
    int m_timeout; // 连接超时时长
    int m_maxIdleTime; // 最大的空闲时长
    mutex m_mutexQ; // 独占互斥锁
    condition_variable m_cond; // 条件变量
};

MysqlConn.cpp

#include "MysqlConn.h"
#include <muduo/base/Logging.h> 
// 初始化数据库连接
MysqlConn::MysqlConn() {
    m_conn = mysql_init(nullptr);
    mysql_set_character_set(m_conn, "GBK"); // 设置字符集
}
 
// 释放数据库连接
MysqlConn::~MysqlConn() {
    if (m_conn != nullptr) {
        mysql_close(m_conn);
    }
    freeResult();
}
 
// 连接数据库
bool MysqlConn::connect(string user, string passwd, string dbName, string ip, unsigned short port) {
    MYSQL* ptr = mysql_real_connect(m_conn, ip.c_str(), user.c_str(), passwd.c_str(), dbName.c_str(), port, nullptr, 0);
    return ptr != nullptr;
}
 
// 更新数据库:insert,update,delete
bool MysqlConn::update(string sql) {
    if (mysql_query(m_conn, sql.c_str())) {
        return false;
    }
    return true;
}
 
// 查询数据库
MYSQL_RES* MysqlConn::query(string sql) {
    if(mysql_query(m_conn, sql.c_str())) {
        LOG_INFO << __FILE__ << ":" << __LINE__ << ":"
            << sql <<"查询失败!";   
        return nullptr;
    }
    return mysql_use_result(m_conn);
}
 
// 遍历查询得到的结果集
bool MysqlConn::next() {
    if (m_result != nullptr) {
        m_row = mysql_fetch_row(m_result);
        if (m_row != nullptr) {
            return true;
        }
    }
    return false;
}
 
// 得到结果集中的字段值
string MysqlConn::value(int index) {
    int rowCount = mysql_num_fields(m_result);
    if (index >= rowCount || index < 0) {
        return string();
    }
    char* val = m_row[index];
    unsigned long length = mysql_fetch_lengths(m_result)[index];
    return string(val, length);
}
 
// 事务操作
bool MysqlConn::transaction() {
    return mysql_autocommit(m_conn, false);
}
 
// 提交事务
bool MysqlConn::commit() {
    return mysql_commit(m_conn);
}
 
// 事务回滚
bool MysqlConn::rollback() {
    return mysql_rollback(m_conn);
}
 
// 刷新起始的空闲时间点
void MysqlConn::refreshAliveTime() {
    // 这个时间戳就是某个数据库连接,它起始存活的时间点
    // 这个时间点通过时间类就可以得到了
    m_aliveTime = steady_clock::now();
}
 
// 计算连接存活的总时长
long long MysqlConn::getAliveTime() {
    nanoseconds duration = steady_clock::now() - m_aliveTime;
    milliseconds millsec = duration_cast<milliseconds>(duration);
    return millsec.count();
}

// 获取连接
MYSQL *MysqlConn::getConnection() {
    return m_conn;
}

void MysqlConn::freeResult() {
    if (m_result != nullptr) {
        mysql_free_result(m_result);
        m_result = nullptr;
    }
}

ConnPool.cpp

#include "ConnPool.h"
// #include <json/json.h>
// #include <json.h>
#include "json.hpp"
#include <fstream>
#include <thread>
#include <iostream>
// using namespace Json;
using json = nlohmann::json;
ConnPool* ConnPool::getConnPool() {
    static ConnPool pool;
    return &pool;
}
 
// 从连接池中取出一个连接
shared_ptr<MysqlConn> ConnPool::getConn() {
    unique_lock<mutex> locker(m_mutexQ);
    while (m_connQ.empty()) {
        if (cv_status::timeout == m_cond.wait_for(locker, chrono::milliseconds(m_timeout))) {
            if (m_connQ.empty()) {
                //return nullptr;
                continue;
            }
        }
    }
    shared_ptr<MysqlConn>connptr(m_connQ.front(), [this](MysqlConn* conn) {
        lock_guard<mutex>locker(m_mutexQ); // 自动管理加锁和解锁
        conn->refreshAliveTime();// 更新连接的起始的空闲时间点
        m_connQ.push(conn); // 回收数据库连接,此时它再次处于空闲状态
        });// 智能指针
    m_connQ.pop();
    m_cond.notify_one(); // 本意是唤醒生产者
    return connptr;
}
 
ConnPool::~ConnPool() {
    while (!m_connQ.empty()) {
        MysqlConn* conn = m_connQ.front();
        m_connQ.pop();
        delete conn;
    }
}
 
ConnPool::ConnPool() {
    // 加载配置文件
    if (!parseJsonFile()) {
        std::cout << "加载配置文件失败!!!" << std::endl;
        return;
    }
    for (int i = 0; i < m_minSize; ++i) {
        addConn();
    }
    thread producer(&ConnPool::produceConn, this);// 生产连接
    thread recycler(&ConnPool::recycleConn, this);// 销毁连接
    producer.detach();
    recycler.detach();
}
 
bool ConnPool::parseJsonFile() {
    ifstream ifs;
    ifs.open("/home/heheda/Linux/Chat/configuration/dbconf.json");
    if (!ifs.is_open()) {
        std::cout << "无法打开 dbconf.json 配置文件!";
        return false;
    }
    std::cout << "开始解析 dbconf.json 配置文件..." << std::endl;
    json data; // 创建一个空的JSON对象
    ifs>>data; // 将文件内容加载到JSON对象中
    m_ip = data["ip"];
    m_port = data["port"];
    m_user = data["userName"];
    m_passwd = data["password"];
    m_dbName = data["dbName"];
    m_minSize = data["minSize"];
    m_maxSize = data["maxSize"];
    m_maxIdleTime = data["maxIdleTime"];
    m_timeout = data["timeout"];
    /*
    ifstream ifs("dbconf.json");
    Reader rd;
    Value root;
    rd.parse(ifs, root);
    if (root.isObject()) {
        std::cout << "开始解析配置文件..." << std::endl;
        m_ip = root["ip"].asString();
        m_port = root["port"].asInt();
        m_user = root["userName"].asString();
        m_passwd = root["password"].asString();
        m_dbName = root["dbName"].asString();
        m_minSize = root["minSize"].asInt();
        m_maxSize = root["maxSize"].asInt();
        m_maxIdleTime = root["maxIdleTime"].asInt();
        m_timeout = root["timeout"].asInt();
        return true;  // 解析成功返回true,否则返回false。
    }
    return false;
    */
    return true;
}
 
void ConnPool::produceConn() {
    while (true) {  // 生产者线程不断生产连接,直到连接池达到最大值
        unique_lock<mutex> locker(m_mutexQ);  // 加锁,保证线程安全
        while (m_connQ.size() >= m_minSize) {
            m_cond.wait(locker);  // 等待消费者通知
        }
        addConn(); // 生产连接
        m_cond.notify_all();// 通知消费者(唤醒)
    }
}
 
// 回收数据库连接
void ConnPool::recycleConn() {
    while (true) {
        this_thread::sleep_for(chrono::milliseconds(500));// 每隔半秒钟检测一次
        lock_guard<mutex> locker(m_mutexQ);  // 加锁,保证线程安全
        while (m_connQ.size() > m_minSize) {  // 如果连接池中的连接数大于最小连接数,则回收连接
            MysqlConn* conn = m_connQ.front();  // 取出连接池中的连接
            if (conn->getAliveTime() >= m_maxIdleTime) {
                m_connQ.pop();  // 回收连接
                delete conn;  // 释放连接资源
            }
            else {
                break;  // 如果连接的空闲时间小于最大空闲时间,则跳出循环
            }
        }
    }
}
 
// 添加连接到连接池
void ConnPool::addConn() {
    MysqlConn* conn = new MysqlConn;
    conn->connect(m_user, m_passwd, m_dbName, m_ip, m_port);
    conn->refreshAliveTime();// 记录建立连接的时候的对应的时间戳
    m_connQ.push(conn);
}

dbconf.json

{
    "ip": "127.0.0.1",
    "port": 3306,
    "userName": "root",
    "password": "123456",
    "dbName": "chat",
    "minSize":100,  
    "maxSize":1024,
    "maxIdleTime":5000,
    "timeout":1000
}

C++集群聊天服务器 muduo+nginx+redis+mysql数据库连接池 笔记 (下),数据库,服务器,nginx,redis,muduo,数据库连接池

执行sql语句: 

create table user(
    id int not null auto_increment primary key,
    name varchar(50) not null unique,
    password varchar(50) not null,
    state enum('online','offline')  default 'offline'
);
  • user.hpp
#ifndef USER_H
#define USER_H

#include <string>
using namespace std;

// 匹配User表的ORM类
class User {
public:
    User(int id=-1, string name="", string password="", string state="offline") {
        m_id = id;
        m_name = name;
        m_password = password;
        m_state = state;
    }
    // 设置相应字段
    void setId(int id) { m_id = id; }
    void setName(string name) { m_name = name; }
    void setPwd(string pwd) { m_password = pwd; }   
    void setState(string state) { m_state = state; }
    
    // 获取相应字段
    int getId() const { return m_id; }
    string getName() const { return m_name; }
    string getPwd() const { return m_password; }
    string getState() const { return m_state; }
private:
    int m_id;            // 用户id
    string m_name;       // 用户名
    string m_password;   // 用户密码
    string m_state;      // 当前登录状态
};
#endif // USER_H

/*
数据层代码框架设计
数据库操作与业务代码进行分离,业务代码处理的都为对象,数据库层操作
具体SQL语句,因此我们定义相应的类,每一个类对应数据库中一张表,将
数据库读出来的字段提交给业务使用。
*/
  • usermodel.hpp
#ifndef USERMODEL_H
#define USERMODEL_H
#include "user.hpp"
#include "ConnPool.h"
// User表的数据操作类:针对表的增删改查
class UserModel {
public:
    // user表的增加方法
    bool insert(ConnPool* pool,User& user); 
    // 根据用户号码查询用户信息
    User query(ConnPool* pool,int id);
    // 更新用户的状态信息
    bool updateState(ConnPool* pool,User user);
    // 重置用户的状态信息
    void resetState(ConnPool* pool);
};

#endif // USERMODEL_H
  • usermodel.cpp
#include "usermodel.hpp"
#include "MysqlConn.h"
#include <iostream>
#include <memory>
// User表的增加方法
bool UserModel::insert(ConnPool* pool,User &user) {
    // 1.组装sql语句
    char sql[1024] = {0};
    std::sprintf(sql,"insert into user(name,password,state) values('%s','%s', '%s')",
         user.getName().c_str(), user.getPwd().c_str(), user.getState().c_str());
    // 2.执行sql语句,进行处理
    shared_ptr<MysqlConn> conn = pool->getConn();
    if(conn->update(sql)) {
        // 获取插入成功的用户数据生成的主键id
        // id为自增键,设置回去user对象添加新生成的用户id
        user.setId(mysql_insert_id(conn->getConnection()));
        return true;
    }
    return false;
}

// 根据用户号码查询用户信息
User UserModel::query(ConnPool* pool,int id) {
    // 1.组装sql语句
    char sql[1024] = {0};
    sprintf(sql,"select * from user where id = %d", id);
    // 2.执行sql语句
    shared_ptr<MysqlConn> conn = pool->getConn();
    // 查询id对应的数据
    MYSQL_RES* res = conn->query(sql);
    if(res != nullptr) { // 查询成功
        MYSQL_ROW row = mysql_fetch_row(res);// 获取行数据
        if(row != nullptr) {
            User user;
            user.setId(atoi(row[0]));
            user.setName(row[1]);
            user.setPwd(row[2]);
            user.setState(row[3]);
            // 释放res动态开辟的资源
            mysql_free_result(res);
            return user;// 返回user对应的信息
        }
    }
    return User(); // 未找到,返回默认的user对象
}

// 更新用户的状态信息
bool UserModel::updateState(ConnPool* pool,User user) {
    // 1.组装sql语句
    char sql[1024] = {0};
    sprintf(sql,"update user set state = '%s' where id = %d",
         user.getState().c_str(), user.getId());
    // 2.执行sql语句
    shared_ptr<MysqlConn> conn = pool->getConn();
    if(conn->update(sql)) {
        return true;
    }
    return false;
}

// 重置用户的状态信息
void UserModel::resetState(ConnPool* pool) {
    // 1.组装sql语句
    char sql[1024] = "update user set state = 'offline' where state = 'online'";
    // 2.执行sql语句,进行相应处理
    shared_ptr<MysqlConn> conn = pool->getConn();
    conn->update(sql);
}

C++集群聊天服务器 muduo+nginx+redis+mysql数据库连接池 笔记 (下),数据库,服务器,nginx,redis,muduo,数据库连接池

  • 执行sql语句:  
create table friend(
    userid int not null,
    friendid int not null
);
alter table friend 
add constraint pk_friend primary key(userid,friendid);
  • friendmodel.hpp
#ifndef FRIENDMODEL_H
#define FRIENDMODEL_H

#include "user.hpp"
#include "ConnPool.h"
#include <vector>
using namespace std;

// Friend用户表的数据操作类:针对类的增删改查(维护好友信息的操作接口方法)
class FriendModel {
public:
    // 添加好友关系
    void insert(ConnPool* pool,int userid, int friendid);
    // 返回用户好友列表:返回用户好友id,名称,登录状态信息 
    vector<User> query(ConnPool* pool,int userid);
};

#endif // FRIENDMODEL_H
  • friendmodel.cpp
#include "friendmodel.hpp"
// 添加好友关系
void FriendModel::insert(ConnPool* pool,int userid, int friendid) {
    // 1.组装sql语句
    char sql[1024] = {0};
    sprintf(sql, "insert into friend values (%d, %d)", userid, friendid);
    // 2.执行sql语句
    shared_ptr<MysqlConn> conn = pool->getConn();
    conn->update(sql);
}

//返回用户好友列表:返回用户好友id、名称、登录状态信息
vector<User> FriendModel::query(ConnPool* pool,int userid) {
    // 1.组装sql语句
    char sql[1024] = {0};
    // sprintf(sql, "select a.id, a.name, a.state from user a inner join friend b on b.friendid = a.id where b.userid = %d", userid);      
    sprintf(sql, "select a.id, a.name, a.state from user a inner join friend b on b.userid = a.id where b.friendid = %d \
            union (select a.id, a.name, a.state from user a inner join friend b on b.friendid = a.id where b.userid = %d \
            or b.friendid = %d and a.id!=%d)",userid,userid,userid,userid);     
    // 2.发送SQL语句,进行相应处理
    vector<User> vec;
    shared_ptr<MysqlConn> conn = pool->getConn();
    MYSQL_RES * res = conn->query(sql);
    if(res != nullptr) {
        // 把userid用户的所有离线消息放入vec中返回
        MYSQL_ROW row;
        //将userid好友的详细信息返回
        while((row = mysql_fetch_row(res)) != nullptr) {
            User user;
            user.setId(atoi(row[0])); // id
            user.setName(row[1]);     // name
            user.setState(row[2]);    // state
            vec.push_back(user);
        }
        mysql_free_result(res);       // 释放资源
        return vec;
    }
    return vec;
}

// select a.id,a.name,a.state from user a inner join 
// friend b on b.friendid = a.id 
// where b.userid = %d

C++集群聊天服务器 muduo+nginx+redis+mysql数据库连接池 笔记 (下),数据库,服务器,nginx,redis,muduo,数据库连接池

  • 执行sql语句:   
create table offlinemessage(
    userid int not null primary key,
    message varchar(500) not null
);
  • offlinemessage.hpp
#ifndef OFFLINEMESSAGEMODEL_H
#define OFFLINEMESSAGEMODEL_H
#include <string>
#include <vector>
#include "ConnPool.h"
using namespace std;

// 离线消息表的数据操作类:针对表的增删改查(提供离线消息表的操作接口方法)
class OfflineMsgModel {
public:
    // 存储用户的离线消息
    void insert(ConnPool* pool,int userid, string msg);
    // 删除用户的离线消息
    void remove(ConnPool* pool,int userid);
    // 查询用户的离线消息:离线消息可能有多个
    vector<string> query(ConnPool* pool,int userid);
};

#endif // OFFLINEMESSAGEMODEL_H
  • offlinemessage.cpp
#include "offlinemessagemodel.hpp"
// 存储用户的离线消息
void OfflineMsgModel::insert(ConnPool* pool,int userid, string msg) {
    // 1.组装sql语句
    char sql[1024] = {0};
    sprintf(sql, "insert into offlinemessage values(%d, '%s')", userid, msg.c_str());
    // 2.执行sql语句
    shared_ptr<MysqlConn> conn = pool->getConn();
    conn->update(sql);
}

// 删除用户的离线消息
void OfflineMsgModel::remove(ConnPool* pool,int userid) {
    // 1.组装sql语句
    char sql[1024] = {0};
    sprintf(sql, "delete from offlinemessage where userid = %d", userid);
    // 2.执行sql语句
    shared_ptr<MysqlConn> conn = pool->getConn();
    conn->update(sql);
}

// 查询用户的离线消息:离线消息可能有多个
vector<string> OfflineMsgModel::query(ConnPool* pool,int userid) {
    // 1.组装sql语句
    char sql[1024] = {0};
    sprintf(sql, "select message from offlinemessage where userid = %d", userid);
    // 2.执行sql语句
    vector<string> vec;// 存储离线消息,离线消息可能有多条
    shared_ptr<MysqlConn> conn = pool->getConn();
    MYSQL_RES *res = conn->query(sql);
    if(res != nullptr) {
        // 把userid用户的所有离线消息放入vec中返回
        MYSQL_ROW row;
        while((row = mysql_fetch_row(res)) != nullptr) { //循环查找离线消息
            vec.push_back(row[0]);
        }
        mysql_free_result(res);
        return vec;
    }
    return vec;
}

C++集群聊天服务器 muduo+nginx+redis+mysql数据库连接池 笔记 (下),数据库,服务器,nginx,redis,muduo,数据库连接池

  •  执行sql语句:  
create table allgroup(
    id int not null auto_increment primary key,
    groupname varchar(50) not null,
    groupdesc varchar(200) default ''
);
  • group.hpp
#ifndef GROUP_H
#define GROUP_H
#include <vector>
#include <string>
using namespace std;
#include "groupuser.hpp"
// User表的ORM类
// Group群组表的映射类:映射表的相应字段
class Group{
public:
    Group(int id=-1,string name="",string desc="") 
        : m_id(id)
        ,m_name(name)
        ,m_desc(desc) {
        
    }

    void setId(int id) { m_id = id; }
    void setName(string name) { m_name = name; }
    void setDesc(string desc) { m_desc = desc; }
    
    int getId() const { return m_id; }
    string getName() const { return m_name; }
    string getDesc() const { return m_desc; }
    vector<GroupUser> &getUsers()  { return m_users; }

private:
    int m_id;                 // 群组id
    string m_name;            // 群组名称
    string m_desc;            // 群组功能描述
    vector<GroupUser> m_users;// 存储组成员
};

#endif // GROUP_H

C++集群聊天服务器 muduo+nginx+redis+mysql数据库连接池 笔记 (下),数据库,服务器,nginx,redis,muduo,数据库连接池

  • 执行sql语句:    
create table groupuser(
    groupid int not null,
    userid int not null,
    grouprole enum('creator','normal') default 'normal'
);
alter table groupuser
add constraint pk_friend primary key(groupid,userid);
  • groupuser.hpp
#ifndef GROUPUSER_H
#define GROUPUSER_H
#include <string>
#include "user.hpp"
using namespace std;

// 群组用户,多了一个role角色信息,从User类直接继承,复用User的其他信息
// GroupUser群组员表的映射类:映射表的相应字段
class GroupUser : public User {
public:
    void setRole(string role) { m_role = role; }
    string getRole() { return m_role; }
private:
    string m_role;
};

#endif // GROUPUSER_H
  • groupmodel.hpp
#ifndef GROUPMODEL_H
#define GROUPMODEL_H

#include "group.hpp"
#include <string>
#include <vector>
using namespace std;
#include "ConnPool.h"
// 群组表的数据操作类:维护数组信息的操作接口方法
class GroupModel {
public:
    // 创建数组
    bool createGroup(ConnPool* pool,Group &group);
    // 加入群组
    void joinGroup(ConnPool* pool,int userid, int groupid, string role);
    // 查询用户所在群组信息
    vector<Group> queryGroups(ConnPool* pool,int userid);
    // 根据指定的groupid查询群组用户id列表,除userid自己,主要用户群聊业务给群组其他成员群发消息
    vector<int> queryGroupUsers(ConnPool* pool,int userid, int groupid);
};

#endif // GROUPMODEL_H
  • groupmodel.cpp
#include "groupmodel.hpp"
#include <iostream>
// 创建群组
bool GroupModel::createGroup(ConnPool* pool,Group &group) {
    // 1.组装sql语句
    char sql[1024] = {0};
    sprintf(sql,"insert into allgroup(groupname,groupdesc) values('%s','%s')"
          ,group.getName().c_str(),group.getDesc().c_str());
    // 2.执行sql语句
    shared_ptr<MysqlConn> conn = pool->getConn();
    if(conn->update(sql)) {
        // 获取到自增id
        group.setId(mysql_insert_id(conn->getConnection()));
        return true;
    }
    return false;
}

// 加入群组:即给群组员groupuser表添加一组信息
void GroupModel::joinGroup(ConnPool* pool,int userid, int groupid, string role) {
    // 1.组装sql语句
    char sql[1024] = {0};
    sprintf(sql,"insert into groupuser values(%d,%d,'%s')",
            groupid,userid,role.c_str());
    // 2.执行sqls语句
    shared_ptr<MysqlConn> conn = pool->getConn();
    conn->update(sql);
}

// 查询用户所在群组信息:群信息以及组员信息
vector<Group> GroupModel::queryGroups(ConnPool* pool,int userid) {
    /*
    1.先根据userid在groupuser表中查询出该用户所属的群组信息
    2.在根据群组信息,查询属于该群组的所有用户的userid,并且和user表
    进行多表联合查询,查出用户的详细信息
    */
    char sql[1024] = {0};
    sprintf(sql,"select a.id,a.groupname,a.groupdesc from allgroup a inner join \
            groupuser b on a.id = b.groupid where b.userid = %d",userid);

    vector<Group> groupVec;
    shared_ptr<MysqlConn> conn = pool->getConn();
    MYSQL_RES *res = conn->query(sql);
    if(res != nullptr) {
        MYSQL_ROW row;
        // 查出userid所有的群组信息
        while((row = mysql_fetch_row(res)) != nullptr) {
            std::cout<<"group row[0]: "<<row[0]<<" row[1]: "<<row[1]<<" row[2]: "<<row[2]<<std::endl;
            Group group;
            group.setId(atoi(row[0]));
            group.setName(row[1]);
            group.setDesc(row[2]);
            groupVec.push_back(group);
        }
        mysql_free_result(res);
    }

    // 查询群组的用户信息
    for(Group& group:groupVec) {
        sprintf(sql,"select a.id,a.name,a.state,b.grouprole from user a \
                inner join groupuser b on b.userid = a.id where b.groupid=%d",group.getId());
    
        MYSQL_RES *res = conn->query(sql);
        if(res != nullptr) {
            MYSQL_ROW row;
            while((row = mysql_fetch_row(res)) != nullptr) {
                std::cout<<"group user row[0]: "<<row[0]<<" row[1]: "<<row[1]<<" row[2]: "<<row[2]<<" row[3]: "<<row[3]<<std::endl; 
                GroupUser user;
                user.setId(atoi(row[0]));
                user.setName(row[1]);
                user.setState(row[2]);
                user.setRole(row[3]);
                group.getUsers().push_back(user);
            }
            mysql_free_result(res);
        }
    }
    return groupVec;
}

//查询用户所在群组信息:群信息以及组员信息
// vector<Group> GroupModel::queryGroups(ConnPool* pool,int userid)
// {
//     /*
//     1、先根据userid在groupuser表中查询出该用户所属的群组详细信息
//     2、再根据群组信息,查询属于该群组的所有用户的userid,并且和user表进行多表联合查询出用户的详细信息
//     */

//     //1、组装SQL语句
//     char sql[1024] = {0};
//     sprintf(sql, "select a.id,a.groupname,a.groupdesc from allgroup a inner join \
//             groupuser b on a.id = b.groupid where b.userid=%d", userid);
    
//     //2、发送SQL语句,进行相应处理
//     vector<Group> groupVec;
//     // MySQL mysql;
//     shared_ptr<MysqlConn> conn = pool->getConn();
//     MYSQL_RES *res = conn->query(sql);
//     if (res != nullptr)
//     {
//         MYSQL_ROW row;
//         //查出userid所有的群信息
//         while ((row = mysql_fetch_row(res)) != nullptr)
//         {
//             Group group;
//             group.setId(atoi(row[0]));
//             group.setName(row[1]);
//             group.setDesc(row[2]);
//             groupVec.push_back(group);
//         }
//         mysql_free_result(res);
//     }

//     //查询群组的用户信息
//     for (Group &group : groupVec)
//     {
//         sprintf(sql, "select a.id,a.name,a.state,b.grouprole from user a \
//             inner join groupuser b on b.userid = a.id where b.groupid=%d", group.getId());
        
//         MYSQL_RES *res = conn->query(sql);
//         if (res != nullptr)
//         {
//             MYSQL_ROW row;
//             while ((row = mysql_fetch_row(res)) != nullptr)
//             {
//                 GroupUser user;
//                 user.setId(atoi(row[0]));
//                 user.setName(row[1]);
//                 user.setState(row[2]);
//                 user.setRole(row[3]);
//                 group.getUsers().push_back(user);
//             }
//             mysql_free_result(res);
//         }
//     }
// }

// 根据指定的groupid查询群组用户id列表,除userid自己,主要用户群聊业务给群组其他成员群发消息
vector<int> GroupModel::queryGroupUsers(ConnPool* pool,int userid, int groupid) {
    char sql[1024]={0};
    sprintf(sql,"select userid from groupuser \
    where groupid = %d and userid!=%d",groupid,userid);
    vector<int> idVec;
    shared_ptr<MysqlConn> conn = pool->getConn();
    MYSQL_RES *res = conn->query(sql);
    if(res != nullptr) {
        MYSQL_ROW row;
        while((row = mysql_fetch_row(res)) != nullptr) {
            idVec.push_back(atoi(row[0]));
        }
        mysql_free_result(res);
    }
    return idVec;
}

redis.hpp

#ifndef REDIS_H
#define REDIS_H

#include <hiredis/hiredis.h>
#include <thread>
#include <functional>
using namespace std;

class Redis {
public:
    Redis();
    ~Redis();
    // 连接redis服务器
    bool connect();
    // 向redis指定的通道channel发布消息
    bool publish(int channel,string message);
    // 向redis指定的通道subscribe订阅消息
    bool subscribe(int channel);
    // 向redis指定的通道unsubscribe取消订阅消息
    bool unsubscribe(int channel);
    // 在独立线程中接收订阅通道中的消息
    void observer_channel_message();
    // 初始化向业务层上报通道消息的回调对象
    void init_notify_handler(function<void(int,string)> fn);
private:
    // hiredis同步上下文对象,负责publish消息:相当于我们客户端一个redis-cli跟连接相关的所有信息,需要两个上下文处理
    redisContext* m_publish_context;
    // hiredis同步上下文对象,负责subscribe消息
    redisContext* m_subscribe_context;
    // 回调操作,收到订阅的消息,给service层上报:主要上报通道号、数据
    function<void(int,string)>m_notify_message_handler;
};
#endif

redis.cpp

#include <iostream>
using namespace std;
#include "redis.hpp"
//构造函数:初始化两个上下文指针
Redis::Redis() 
    : m_publish_context(nullptr)
    , m_subscribe_context(nullptr)
{
}

//析构函数:释放两个上下文指针占用资源
Redis::~Redis() {
    if (m_publish_context != nullptr) {
        redisFree(m_publish_context);
        // m_publish_context = nullptr;
    }

    if (m_subscribe_context != nullptr) {
        redisFree(m_subscribe_context);
        // m_subscribe_context = nullptr;
    }
}

//连接redis服务器
bool Redis::connect() {
    //负责publish发布消息的上下文连接
    m_publish_context = redisConnect("127.0.0.1", 6379);
    if (nullptr == m_publish_context) {
        cerr << "connect redis failed!" << endl;
        return false;
    }

    //负责subscribe订阅消息的上下文连接
    m_subscribe_context = redisConnect("127.0.0.1", 6379);
    if (nullptr == m_subscribe_context) {
        cerr << "connect redis failes!" << endl;
        return false;
    }

    //在单独的线程中监听通道上的事件,有消息给业务层上报 让线程阻塞去监听
    thread t([&](){
        observer_channel_message();
    });
    t.detach();

    cout << "connect redis-server success!" << endl;

    return true;
}

//向redis指定的通道channel publish发布消息:调用redisCommand发送命令即可
bool Redis::publish(int channel, string message) {
    redisReply *reply = (redisReply *)redisCommand(m_publish_context, "PUBLISH %d %s", channel, message.c_str()); //相当于给channel通道发送消息
    if (nullptr == reply) {
        cerr << "publish command failed!" << endl;
        return false;
    }
    freeReplyObject(reply);
    return true;
}

/* 为什么发布消息使用redisCommand函数即可,而订阅消息却不使用?
redisCommand本身会先调用redisAppendCommand将要发送的命令缓存到本地,再调用redisBufferWrite将命令发送到redis服务器上,再调用redisReply以阻塞的方式等待命令的执行。
subscribe会以阻塞的方式等待发送消息,线程是有限,每次订阅一个线程会导致线程阻塞住,这肯定是不行的。
publish一执行马上会回复,不会阻塞当前线程,因此调用redisCommand函数。
*/

//向redis指定的通道subscribe订阅消息:
bool Redis::subscribe(int channel) {
    // SUBSCRIBE命令本身会造成线程阻塞等待通道里面发生消息,这里只做订阅通道,不接收通道消息
    // 通道消息的接收专门在observer_channel_message函数中的独立线程中进行
    // 只负责发送命令,不阻塞接收redis server响应消息,否则和notifyMsg线程抢占响应资源
    if (REDIS_ERR == redisAppendCommand(this->m_subscribe_context, "SUBSCRIBE %d", channel)) { //组装命令写入本地缓存
        cerr << "subscribe command failed!" << endl;
        return false;
    }
    
    // redisBufferWrite可以循环发送缓冲区,直到缓冲区数据发送完毕(done被置为1)
    int done = 0;
    while (!done) {
        if (REDIS_ERR == redisBufferWrite(this->m_subscribe_context, &done)) { //将本地缓存发送到redis服务器上
            cerr << "subscribe command failed!" << endl;
            return false;
        }
    }
    // redisGetReply

    return true;
}

//向redis指定的通道unsubscribe取消订阅消息,与subscrible一样
bool Redis::unsubscribe(int channel) {
    if (REDIS_ERR == redisAppendCommand(this->m_subscribe_context, "UNSUBSCRIBE %d", channel)) {
        cerr << "unsubscribe command failed!" << endl;
        return false;
    }
    // redisBufferWrite可以循环发送缓冲区,直到缓冲区数据发送完毕(done被置为1)
    int done = 0;
    while (!done) {
        if (REDIS_ERR == redisBufferWrite(this->m_subscribe_context, &done)) {
            cerr << "unsubscribe command failed!" << endl;
            return false;
        }
    }
    return true;
}

//在独立线程中接收订阅通道中的消息:以循环阻塞的方式等待响应通道上发生消息
void Redis::observer_channel_message() {
    redisReply *reply = nullptr;
    while (REDIS_OK == redisGetReply(this->m_subscribe_context, (void**)&reply)) {
        //订阅收到的消息是一个带三元素的数,通道上发送消息会返回三个数据,数据下标为2
        if (reply != nullptr && reply->element[2] != nullptr && reply->element[2]->str != nullptr) {
            //给业务层上报通道上发送的消息:通道号、数据
            m_notify_message_handler(atoi(reply->element[1]->str), reply->element[2]->str);
        }
        freeReplyObject(reply);
    }
}

//初始化向业务层上报通道消息的回调对象
void Redis::init_notify_handler(function<void(int, string)> fn) {
    this->m_notify_message_handler = fn;
}

chatserver.hpp

#ifndef CHATSERVER_H
#define CHATSERVER_H

#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
using namespace muduo;
using namespace muduo::net;

// 聊天服务器的主类
class ChatServer {
public:
    // 初始化聊天服务器对象
    ChatServer(EventLoop* loop,const InetAddress& listenAddr,const string& nameArg);
    // 启动服务
    void start();  
private:
    // 上报链接相关信息的回调函数:参数为连接信息
    void onConnection(const TcpConnectionPtr& conn);
    // 上报读写事件相关信息的回调函数:参数分别为连接/缓冲区/接收到数据的时间信息
    void onMessage(const TcpConnectionPtr& conn,Buffer* buffer,Timestamp time);
    TcpServer m_server; // 组合的muduo库,实现服务器功能的类对象
    EventLoop *m_loop;  // 指向事件循环的指针
};

#endif

chatserver.cpp

#include "chatserver.hpp"
#include "chatservice.hpp"
#include "json.hpp"
#include <functional>
#include <string>
#include <iostream>
using namespace std;
using namespace placeholders;
using json = nlohmann::json;

// 初始化聊天服务器对象
ChatServer::ChatServer(EventLoop *loop, const InetAddress &listenAddr, const string &nameArg)
    : m_server(loop, listenAddr, nameArg), m_loop(loop) {
    // 注册用户连接的创建和断开事件的回调
    m_server.setConnectionCallback(std::bind(&ChatServer::onConnection, this, _1));
    // 注册用户读写事件的回调
    m_server.setMessageCallback(std::bind(&ChatServer::onMessage, this, _1, _2, _3));
    // 设置服务器线程数量 1个I/O线程,3个工作线程
    m_server.setThreadNum(4);
}

// 启动服务,开启事件循环
void ChatServer::start() {
    m_server.start();
}

// 上报链接相关信息的回调函数:参数为连接信息
void ChatServer::onConnection(const TcpConnectionPtr &conn) {
    // 客户端断开连接,释放连接资源 muduo库会打印相应日志
    if(!conn->connected()) {
        ChatService::getInstance()->clientCloseException(conn);// 处理客户端异常关闭
        conn->shutdown();// 释放socket fd资源
    }
}

// 网络模块与业务模块解耦:不直接调用相应方法,业务发生变化此处代码也不需要改动
// 上报读写事件相关信息的回调函数:参数分别为连接/缓冲区/接收到数据的时间信息
void ChatServer::onMessage(const TcpConnectionPtr &conn, Buffer *buffer, Timestamp time) {
    // 将buffer缓冲区收到的数据存入字符串
    string buf = buffer->retrieveAllAsString();
    
    std::cout<<"buf: "<<buf.c_str()<<std::endl;
    // 数据的反序列化
    json js = json::parse(buf);
    // 达到的目的:完全解耦网络模块的代码和业务模块的代码
    // 通过js["msgid"] 获取 => 业务handler => conn js time
    auto msghandler = ChatService::getInstance()->getHandler(js["msgid"].get<int>());
    // 回调消息绑定好的事件处理器,来执行相应的业务处理
    msghandler(conn,js,time);
}

chatservice.hpp

#ifndef CHATSERVICE_H
#define CHATSERVICE_H

#include <muduo/net/TcpConnection.h>
#include <unordered_map>
#include <functional>
#include <mutex>

using namespace std;
using namespace muduo;
using namespace muduo::net;

#include "json.hpp"
using json = nlohmann::json;

#include "usermodel.hpp"
#include "offlinemessagemodel.hpp"
#include "friendmodel.hpp"
#include "groupmodel.hpp"

#include "redis.hpp"
#include "ConnPool.h"

// 表示处理消息的事件回调方法类型
using MsgHandler = std::function<void(const TcpConnectionPtr& conn,json& js,Timestamp)>;

// 聊天服务器业务类,设计为单例模式:给msgid映射事件回调(一个消息id映射一个事件处理)
class ChatService {
public:
    // 获取单例对象的接口函数
    static ChatService* getInstance();
    // 处理登录业务
    void login(const TcpConnectionPtr& conn,json& js,Timestamp time); 
    // 处理注册业务(register)
    void reg(const TcpConnectionPtr& conn,json& js,Timestamp time); 
    // 处理一对一聊天业务
    void oneChat(const TcpConnectionPtr& conn,json& js,Timestamp time);
    // 添加好友业务
    // void addFriend(const TcpConnectionPtr& conn,json& js,Timestamp time);
    // 添加好友业务请求
    void addFriendRequest(const TcpConnectionPtr& conn,json& js,Timestamp time);
    // 添加好友业务响应
    void addFriendResponse(const TcpConnectionPtr& conn,json& js,Timestamp time);

    // 获取消息msgid对应的处理器
    MsgHandler getHandler(int msgid);
    // 处理客户端异常退出
    void clientCloseException(const TcpConnectionPtr& conn);
    // 服务器异常,业务重置方法
    void reset();
    // 创建群组业务
    void createGroup(const TcpConnectionPtr& conn,json& js,Timestamp time); 
    // 加入群组业务
    void joinGroup(const TcpConnectionPtr& conn,json& js,Timestamp time);   
    // 群组聊天业务
    void groupChat(const TcpConnectionPtr& conn,json& js,Timestamp time); 
    // 处理注销业务
    void loginOut(const TcpConnectionPtr &conn, json &js, Timestamp time);
    // 从redis消息队列中获取订阅的消息:通道号 + 消息
    void handleRedisSubscribeMessage(int userid, string msg);

    ChatService(const ChatService&) = delete;
    ChatService& operator=(const ChatService&) = delete;

    ConnPool* getConnPool() const { return m_connPool;}
private:
    // 注册消息以及对应的Handler回调操作
    ChatService();
    // 存储消息id和其对应的业务处理方法
    unordered_map<int,MsgHandler> m_msgHandlerMap;
    // 存储在线用户的通信连接
    unordered_map<int,TcpConnectionPtr> m_userConnMap; // 消息处理器map表 每一个msgid对应一个业务处理方法
    // 定义互斥锁,保证m_userConnMap的线程安全
    mutex m_connMutex;
    // 数据操作类对象
    UserModel m_userModel;              // 存储在线用户的通信连接map表
    OfflineMsgModel m_offlineMsgModel;  // 离线消息表的数据操作类对象
    FriendModel m_friendModel;          // 好友表的数据操作类对象
    GroupModel m_groupModel;
    Redis m_redis;                      // redis操作对象
    ConnPool* m_connPool;                   // 数据库连接池
};

#endif // CHATSERVICE_H

/*
3.1 用户注册业务:
    我们业务层与数据层分离,需要操作数据层数据对象即可,因此需要在
    ChatService类中实例化一个数据操作类对象进行业务开发
    UserModel m_userModel;// 数据操作类对象

服务器注册业务流程:
1.客户端注册的消息过来后,网络模块将json数据反序列化后上报到注册业务中,
因为User表中id字段为自增的,state字段是默认的,因此注册业务只需要获取
name与password字段即可
2.实例化User表对应的对象user,将获取到的name与password设置进去,再向
UserModel数据操作类对象进行新用户user的注册
3.注册完成后,服务器返回相应json数据给客户端:若注册成功,返回注册响应消息
REG_MSG_ACK,错误标识errno(0:成功,1:失败),用户id等组装好的json数据;
若注册失败,返回注册响应消息REG_MSG_ACK,错误标识

3.2 用户登录业务
3.2.1 基础登录业务实现
用户登录:服务器反序列化数据后,依据id,密码字段后判断账号是否正确,依据是否
登陆成功给客户端返回响应消息
服务器登录业务流程:
1.服务器获取输入用户id,密码字段
2.查询id对应的数据,判断用户id与密码是否正确,分为以下三种情况返回相应json数据给客户端:
(1)若用户名/密码正确且未重复登录,及时更新登录状态为在线,,返回登录响应消息
   LOGIN_MSG_ACK,错误标识errno(0:成功,1:失败,2:重复登录),用户id,用户名等信息
(2)若用户名/密码正确但重复登录,返回登录响应消息、错误标识、错误提示信息;
(3)若用户不存在或密码错误,返回登录响应消息,错误标识,错误提示信息;

3.2.2 记录用户连接信息处理
用户连接信息处理:假设此时用户1向用户2发送消息(源id, 目的id,消息内容),
此时服务器收到用户1的数据了,要主动向用户2推送该条消息,那么如何知道用户2
是那条连接呢。因此我们需要专门处理下,用户一旦登录成功,就会建立一条连接,
我们便要将该条连接存储下来,方便后续消息收发的处理.

3.2.3 客户端异常退出处理
客户端异常退出处理:假设用户客户端直接通过Ctrl+C中断,并没有给服务器发送合法的json过来,
我们必须及时修改用户登录状态,否则后续再想登录时为"online"状态,便无法登录了。

客户端异常退出处理流程:
1.通过conn连接去m_userConnMap表中查找,删除conn键值对记录;
2.将conn连接对应用户数据库的状态从"online"改为"offline";

3.2.4 服务器异常退出处理
服务器异常退出处理:假设用户服务器直接通过Ctrl+C中断,并没有给客户端发送
合法的json过去,我们必须及时修改所有用户登录状态未"offline",否则后续再
想登录时为"online"状态,便无法登录了。
服务器异常退出处理流程:主动截获Ctcl+c信号(SIGINT),在信号处理函数中将
数据库中用户状态重置为"offline"。

3.3 点对点聊天业务
点对点聊天:源用户向目的用户发送消息,目的用户若在线则将消息发出,
目的用户若不在线将消息存储至离线消息表中,待目的用户上线后离线
消息发出

在进行点对点聊天业务处理前,需要提前处理好以下几点:
在EnMsgType中增加一个聊天消息类型,给客户端标识此时是一个聊天消息.
将点对点业务的消息id与对应的事件处理器提前在聊天服务器业务类的构造
函数里绑定好

服务器点对点聊天业务流程
1.源id向目的id发送消息时候,消息里会包含消息类型,源id,源用户名,
目的id,消息内容,服务器解析到这些数据后,先获取到目的id字段
2.找到id判断是否在线,若在线则服务器将源id的消息中转给目的id;若
不在线则将消息内容存入离线消息表中,待目的id上线后离线消息发出

3.4 离线消息业务
离线消息业务:当用户一旦登录成功,我们查询用户是否有离线消息要发送,
若有则发送相应数据,发送完后删除本次存储的离线数据,防止数据重复发送
在进行点对点聊天业务处理前,我们需要提前处理好以下几点:
1、建立与离线消息表的映射OfflineMsgModel类:我们数据库中有创建的
OfflineMessage离线消息表,因为我们数据层与业务层要分离开来,所以
这里与前面一样提供离线消息表的数据操作类,提供给业务层对应的操作接口。

服务器离线消息业务流程:
1.无论是一对一聊天,还是群聊,若接收方用户不在线,则将发送方消息先存储至离线消息表里
2.一旦接收方用户登录成功,检查该用户是否有离线消息(可能有多条),若有则服务器
将离线消息发送给接收方用户
3.服务器发送完成后删除本次存储的离线消息,保证接收方不会每次登录都收到重复的离线消息

3.5 添加好友业务
添加好友业务:源用户id、目的用户id发送给服务器,服务器在数据库中进行好友关系的添加。
添加完成用户登录后,服务器返回好友列表信息给用户,用户可以依据好友列表进行聊天,这里实现的比较简单,后续可扩充更细化的业务。
在进行添加好友业务处理前,我们需要提前处理好以下几点:
1、我们需要在消息类型EnMsgType中增加一个聊天消息类型,给客户端标识此时是一个添加好友消息:
2、将添加好友业务的消息id与对应的事件处理器提前在聊天服务器业务类的构造函数里绑定好。
3、建立好友表与类的映射FriendModel类:表中userid与friendid关系只需要存储一次即可,因此为联合主键。这里与前面一样提供好友表的数据操作类,提供给业务层对应的操作接口。

服务器添加好友业务流程:
1.服务器获取当前用户id,要添加好友的id;
2.业务层调用数据层接口往数据库中添加相应好友信息;
用户登录成功时,查询该用户的好友信息并返回

3.6 群组业务
群组业务:群组业务分为三块,群管理员创建群组,组员加入群组与群组聊天功能
在进行群组业务处理前,我们需要提前处理好以下几点:
1.我们需要在消息类型EnMsgType中增加不同的消息类型,创建群组,
加入群组、群组聊天三种类型消息,给客户端标识此时要做什么事情:

3.6.1 创建群组
服务器创建群组业务,业务流程:
1.服务器获取创建群的用户id,要创建群名称,群功能等信息
2.业务层创建数据层对象,调用数据层方法进行群组创建,创建成功保存群组创建人信息;

3.6.2 加入群组
服务器组员加入群组业务流程:
1、服务器获取要加入群用户的id、要加入的群组id;
2、业务层调用数据层方法将普通用户加入;

3.6.3 群组聊天
服务器群组聊天业务流程:
1、获取要发送消息的用户id、要发送的群组id;
2、查询该群组其它用户id;
3、查询同组用户id,若用户在线则发送消息;若用户不在线则存储离线消息;

3.7 注销业务
注销业务: 客户端用户正常退出,更新其在线状态。

在进行注销业务处理前,我们需要提前处理好以下几点:
1、我们需要在消息类型EnMsgType中增加一个注销业务类型,给客户端标识此时是一个注销业务消息:
2、将注销业务的消息id与对应的事件处理器提前在聊天服务器业务类的构造函数里绑定好。

服务器注销业务业务流程:
1、服务器获取要注销用户的id,删除其对应的连接。
2、更新用户状态信息,从在线更新为离线。


四 服务器支持跨服务器通信功能
redis主要业务流程:
1.用户登录成功后相应的服务器需要向redis上依据用户id订阅相应通道的消息
2.当服务器上用户之间跨服务器发送消息时,需要向通道上发送消息
3、redis接收到消息通知相应服务器进行处理
*/

chatservice.cpp

#include "chatservice.hpp"
#include "public.hpp"
#include <muduo/base/Logging.h>
#include <vector>
#include <map>
#include <string>
#include <string.h>
#include <iostream>
using namespace std;
using namespace muduo;

// 获取单例对象的接口函数 线程安全的单例对象
ChatService* ChatService::getInstance() {
    static ChatService service;
    return &service;
}

// 构造函数:注册消息以及对应的Handler回调操作 实现网络模块与业务模块解耦的核心
// 将群组业务的消息id分别与对应的事件处理器提前在聊天服务器业务类的构造函数里绑定好
ChatService::ChatService() {
    m_msgHandlerMap.insert({LOGIN_MSG,std::bind(&ChatService::login, this, _1, _2, _3)});  
    m_msgHandlerMap.insert({REG_MSG,std::bind(&ChatService::reg, this, _1, _2, _3)});  
    m_msgHandlerMap.insert({ONE_CHAT_MSG,std::bind(&ChatService::oneChat, this, _1, _2, _3)});
    // m_msgHandlerMap.insert({ADD_FRIEND_MSG,std::bind(&ChatService::addFriend, this, _1, _2, _3)}); 
    m_msgHandlerMap.insert({ADD_FRIEND_REQ_MSG,std::bind(&ChatService::addFriendRequest, this, _1, _2, _3)});  
    m_msgHandlerMap.insert({ADD_FRIEND_MSG_ACK,std::bind(&ChatService::addFriendResponse, this, _1, _2, _3)});

    m_msgHandlerMap.insert({LOGIN_OUT_MSG, std::bind(&ChatService::loginOut, this, _1, _2, _3)});
    m_msgHandlerMap.insert({CREATE_GROUP_MSG, std::bind(&ChatService::createGroup, this, _1, _2, _3)});
    m_msgHandlerMap.insert({ADD_GROUP_MSG, std::bind(&ChatService::joinGroup, this, _1, _2, _3)});
    m_msgHandlerMap.insert({GROUP_CHAT_MSG, std::bind(&ChatService::groupChat, this, _1, _2, _3)});
    // 连接redis服务器
    if(m_redis.connect()) {
        // 设置上报消息的回调 
        m_redis.init_notify_handler(std::bind(&ChatService::handleRedisSubscribeMessage, this, _1, _2));  
    }
    // 初始化数据库
    m_connPool = ConnPool::getConnPool();
}

// 处理登录业务  user表:id password字段
void ChatService::login(const TcpConnectionPtr &conn, json &js, Timestamp time) {
    // 1.获取ids,password字段
    int id = js["id"].get<int>();
    string pwd = js["password"];

    // 传入用户id,返回相应数据
    ConnPool* connPool = this->getConnPool();
    User user = m_userModel.query(connPool,id);
    if(user.getId() == id && user.getPwd() == pwd) { // 登录成功
        if(user.getState() == "online") {
            //该用户已经登录,不允许重复登录
            json response;
            response["msgid"] = LOGIN_MSG_ACK;
            response["errno"] = 2; // 重复登录
            // response["errmsg"] = "该账号已经登录,请重新输入新账号";
            response["errmsg"] = "this account has logined, please input a new account";    
            conn->send(response.dump());
        }
        else{ // 用户未登录,此时登录成功
            // 登录成功,记录用户连接信息
            /*
            在用户登录成功时便将用户id与连接信息记录在一个map映射表里,方便后续查找与使用
            线程安全问题:上述我们虽然建立了用户id与连接的映射,但是在多线程环境下,不同的用户
            可能会在不同的工作线程中调用同一个业务,可能同时有多个用户上线,下线操作,因此要
            保证map表的线程安全
            */
            {
                lock_guard<mutex> lock(m_connMutex);
                m_userConnMap.insert({id, conn}); // 登录成功记录用户连接信息
            }
            // id用户登录成功后,向redis订阅channel(id)通道的事件
            m_redis.subscribe(id);

            // 登录成功,更新用户状态信息 state: offline => online
            user.setState("online");
            m_userModel.updateState(connPool,user); // 更新用户状态信息

            json response;
            response["msgid"] = LOGIN_MSG_ACK;
            response["errno"] = 0;
            response["id"] = user.getId();
            response["name"] = user.getName();
            
            // 查询该用户是否有离线消息
            vector<string> vec = m_offlineMsgModel.query(connPool,id);
            if(!vec.empty()) {
                response["offlinemsg"] = vec;// 查询到离线消息,发送给用户
                cout<<"查询到离线消息,发送给用户 :" <<response["offlinemsg"]<<endl;
                // 读取该用户的离线消息后,把该用户的所有离线消息删除掉
                m_offlineMsgModel.remove(connPool,id);
            }
            // 登录成功,查询该用户的好友信息并返回
            vector<User>userVec = m_friendModel.query(connPool,id);
            if(!userVec.empty()) {
                vector<string> vec2;
                for(User &user : userVec) {
                    json js;
                    js["id"] = user.getId();
                    js["name"] = user.getName();
                    js["state"] = user.getState();
                    vec2.push_back(js.dump());
                }
                response["friends"] = vec2;
            }

            vector<Group> groupVec = m_groupModel.queryGroups(connPool,id);
            if(groupVec.size() > 0) {
                // cout<<"................sdsdfasas................."<<endl;
                vector<string> vec3;
                for(Group& group:groupVec) {
                    vector<GroupUser> users = group.getUsers();
                    json js;
                    js["id"] = group.getId();
                    js["groupname"] = group.getName();
                    js["groupdesc"] = group.getDesc();

                    vector<string> userVec;
                    for(GroupUser& user:users) {
                        json js_tmp;
                        js_tmp["id"] = user.getId();
                        js_tmp["name"] = user.getName();
                        js_tmp["state"] = user.getState();
                        js_tmp["role"] = user.getRole();
                        userVec.push_back(js_tmp.dump());
                    }
                    js["users"] = userVec;
                    vec3.push_back(js.dump());
                    // cout<<"js.dump() = "<<js.dump()<<endl;
                }
                response["groups"] = vec3;
            }
            conn->send(response.dump());
        }
    }
    else {
        // 该用户不存在/用户存在但是密码错误,登录失败
        json response;
        response["msgid"] = LOGIN_MSG_ACK;
        response["errno"] = 1;
        // response["errmsg"] = "该用户不存在,您输入用户名或者密码可能错误!";
        response["errmsg"] = "This user does not exist, or the password you entered may be incorrect!"; 
        conn->send(response.dump());
    }
}

// 处理注册业务 user表:name password
void ChatService::reg(const TcpConnectionPtr &conn, json &js, Timestamp time) {
    // 1.获取name,password字段
    string name = js["name"];
    string pwd = js["password"];

    // 处理业务,操作的都是数据对象
    // 2.创建User对象,进行注册
    User user;
    user.setName(name);
    user.setPwd(pwd);
    // 新用户的插入
    ConnPool* connPool = this->getConnPool();
    bool state = m_userModel.insert(connPool,user);
    if(state) { // 注册成功
        json response;
        response["msgid"] = REG_MSG_ACK; // 注册响应消息
        response["errno"] = 0;           // 错误标识 0:成功 1:失败
        response["id"] = user.getId();
        conn->send(response.dump());
    }
    else { // 注册失败
        json response;
        response["msgid"] = REG_MSG_ACK;
        response["errno"] = 1;
        conn->send(response.dump());
    }
}

// 处理一对一聊天业务
void ChatService::oneChat(const TcpConnectionPtr &conn, json &js, Timestamp time) {
    // 1.先获取目的id
    int toid = js["toid"].get<int>();
    {
        lock_guard<mutex> lock(m_connMutex);
        auto it = m_userConnMap.find(toid);
        // 2.目的id在线 进行消息转发,服务器将源id发送的消息中转给目的id
        if(it != m_userConnMap.end()) {
            // toid在线,转发消息  服务器主动推送消息给toid用户
            it->second->send(js.dump());
            return;
        }
    }
    
    // 查询toid是否在线
    /*
     * A向B说话,在map表中未找到B,B可能不在本台服务器上但通过
     * 数据库查找在线,要发送的消息直接发送以B用户为id的通道上;
     * 也可能是离线状态,发送离线消息
     */

    cout<<"发送消息 :" <<js.dump()<<endl;

    ConnPool* connPool = this->getConnPool();
    User user = m_userModel.query(connPool,toid);
    if(user.getState() == "online") {
        m_redis.publish(toid, js.dump());
        return;
    }

    // 目的id不在线,将消息存储到离线消息里
    m_offlineMsgModel.insert(connPool,toid, js.dump());
}

// 添加好友业务 msgid id friendid
// void ChatService::addFriend(const TcpConnectionPtr &conn, json &js, Timestamp time) {
//     std::cout<<"添加好友业务 msgid id friendid"<<std::endl;
//     // 1.获取当前用户id,要添加好友id
//     int userid = js["id"].get<int>();
//     int friendid = js["friendid"].get<int>();
//     std::cout<<"打印当前用户id:"<<userid<<std::endl;
//     std::cout<<"打印要添加好友id:"<<friendid<<std::endl;
//     // 2.数据库中存储要添加好友的信息
//     ConnPool* connPool = this->getConnPool();
//     m_friendModel.insert(connPool,userid, friendid);
// }
// 添加好友业务请求
void ChatService::addFriendRequest(const TcpConnectionPtr &conn, json &js, Timestamp time) {
    int userid = js["id"].get<int>();
    int friendid = js["friendid"].get<int>();
    json response;
    response["msgid"] = ADD_FRIEND_REQ_MSG;

    string msgStr = "用户ID: "+to_string(userid)+" ,请求添加您为好友"+to_string(friendid);
    response["msg"] = msgStr;
    response["from"] = userid;
    response["toid"] = friendid;
    std::cout<<"来到这里了:"<<response.dump()<<std::endl;
    oneChat(conn,response,time);
}
 
// 添加好友业务 msgid id friendid
void ChatService::addFriendResponse(const TcpConnectionPtr &conn, json &js, Timestamp time) {
    int userid = js["id"].get<int>();
    int friendid = js["friendid"].get<int>();
    bool flag = js["flag"].get<bool>();
    json response;
    response["msgid"] = ADD_FRIEND_MSG_ACK;
    response["from"] = userid;
    response["toid"] = friendid;
    if(flag) {
        response["msg"] = "I very happy to make friends with you!!!";
        ConnPool* connPool = this->getConnPool();
        m_friendModel.insert(connPool,userid, friendid);
    }
    else{
        response["msg"] = "I am very sorry, you are not my friend!!!";
    }
    cout<<"response.dump() : "<<response.dump()<<endl;
    oneChat(conn,response,time);
}

// 获取消息msgid对应的处理器
MsgHandler ChatService::getHandler(int msgid) {
    // 记录错误日志,msgid没有对应的事件处理回调
    auto it = m_msgHandlerMap.find(msgid);
    if(it == m_msgHandlerMap.end()) {
        // 返回一个默认的处理器,空操作
        return [=](const TcpConnectionPtr &conn, json &js, Timestamp) {
            LOG_ERROR << "msgid:" << msgid << " can not find handler!";
        };//msgid没有对应处理器,打印日志,返回一个默认处理器,空操作
    }
    else {
        return m_msgHandlerMap[msgid];
    }
}

// 处理客户端异常退出
void ChatService::clientCloseException(const TcpConnectionPtr &conn) {
    User user;
    {
        lock_guard<mutex> lock(m_connMutex);   
        // 1.从map表删除用户的连接信息
        for(auto it = m_userConnMap.begin();it!=m_userConnMap.end();++it) {
            if(it->second == conn) {
                // 从map表删除用户的链接信息
                user.setId(it->first);
                m_userConnMap.erase(it);
                break;
            }
        }
    }

    // 用户注销,相当于就是下线,在redis中取消订阅通道
    m_redis.unsubscribe(user.getId());

    // 2.更新用户的状态信息
    if(user.getId() != -1) {
        user.setState("offline");
        ConnPool* connPool = this->getConnPool();
        m_userModel.updateState(connPool,user);
    }
   
}

// 服务器异常,业务重置方法
void ChatService::reset() {
    // 把online状态的用户,设置成offline
    ConnPool* connPool = this->getConnPool();
    m_userModel.resetState(connPool);
}

// 创建群组业务
void ChatService::createGroup(const TcpConnectionPtr &conn, json &js, Timestamp time) {
    // 1.获取创建群的用户id,群名称,群功能
    int userid = js["id"].get<int>();
    string name = js["groupname"];
    string desc = js["groupdesc"];
    // 2.存储新创建的群组信息
    ConnPool* connPool = this->getConnPool();
    Group group(-1, name, desc);
    if(m_groupModel.createGroup(connPool,group)) {
        // 存储群组创建人信息
        m_groupModel.joinGroup(connPool,userid,group.getId(),"creator");
    }
}

// 加入群组业务
void ChatService::joinGroup(const TcpConnectionPtr &conn, json &js, Timestamp time) {
    int userid = js["id"].get<int>();
    int groupid = js["groupid"].get<int>();
    // 存储用户加入的群组信息
    ConnPool* connPool = this->getConnPool();
    m_groupModel.joinGroup(connPool,userid,groupid,"normal");
}

// 群组聊天业务
void ChatService::groupChat(const TcpConnectionPtr &conn, json &js, Timestamp time) {
    // 1.获取要发送消息的用户id,要发送的群组id
    int userid = js["id"].get<int>();
    int groupid = js["groupid"].get<int>();

    // 2.查询该群组其他的用户id
    ConnPool* connPool = this->getConnPool();
    vector<int> useridVec = m_groupModel.queryGroupUsers(connPool,userid, groupid);  
    
    // 3.进行用户查找
    /*
     * A向B说话,在map表中未找到B,B可能不在本台服务器上但通过数据库查找
     * 在线,要发送的消息直接发送以B用户为id的通道上;也可能是离线状态,
     * 发送离线消息
     */
    lock_guard<mutex> lock(m_connMutex);
    for(int id : useridVec) {
        auto it = m_userConnMap.find(id);
        // 用户在线,转发群消息
        if(it != m_userConnMap.end()) {
            // 转发群消息
            it->second->send(js.dump());
        }
        else {  // 用户不在线,存储离线消息 或 在其它服务器上登录的
            // 查询toid是否在线
            User user = m_userModel.query(connPool,id);
            if(user.getState() == "online") { // 在其他服务器上登录的
                m_redis.publish(id,js.dump());
            }else{
                // 存储离线群消息
                ConnPool* connPool = this->getConnPool();
                m_offlineMsgModel.insert(connPool,id, js.dump());
            }
        }
    }
}

//处理注销业务
void ChatService::loginOut(const TcpConnectionPtr &conn, json &js, Timestamp time)
{
    //1、获取要注销用户的id,删除对应连接
    int userid = js["id"].get<int>();
    // std::cout<<"获取要注销用户的id,删除对应连接: userid: "<<userid<<std::endl;
    {
        lock_guard<mutex> lock(m_connMutex);
        auto it = m_userConnMap.find(userid);
        if (it != m_userConnMap.end())
        {
            m_userConnMap.erase(it);
        }
    }

    // 用户注销,相当于就是下线,在redis中取消订阅通道
    m_redis.unsubscribe(userid);

    //2、更新用户状态信息
    User user(userid, "", "", "offline");
    ConnPool* connPool = this->getConnPool();
    m_userModel.updateState(connPool,user);
}

// 从redis消息队列中获取订阅的消息:通道号 + 消息
void ChatService::handleRedisSubscribeMessage(int userid, string msg) {
    lock_guard<mutex> lock(m_connMutex);
    auto it = m_userConnMap.find(userid);
    if (it != m_userConnMap.end()) {
        it->second->send(msg);
        return;
    }
    // 存储该用户的离线消息:在从通道取消息时,用户下线则发送离线消息
    ConnPool* connPool = this->getConnPool();
    m_offlineMsgModel.insert(connPool,userid, msg);
}

/*
服务器业务模块ChatService
服务器业务模块:客户端发送的业务数据,先到达服务器端网络模块,
网络模块进行事件分发到业务模块相应的业务处理器,最终通过数据
层访问底层数据模块

3.1 用户注册业务
用户注册:服务器将客户端收到的json反序列化后存储到数据库中,依据是否
注册成功给客户端返回响应消息
*/

src/server/main.cpp

#include "chatserver.hpp"
#include "chatservice.hpp"
#include <iostream>
#include <signal.h>
using namespace std;

// 处理服务器ctrl+c结束后,重置user的状态信息
void resetHandler(int) {
    ChatService::getInstance()->reset();
    exit(0);
}

int main(int argc, char** argv) {
    signal(SIGINT,resetHandler);
    
    // InetAddress addr("127.0.0.1", 6000);
    char* ip = argv[1];
    uint16_t port = atoi(argv[2]);
    InetAddress addr(ip, port);
    
    EventLoop loop;
    ChatServer server(&loop, addr, "ChatServer");
    server.start();
    loop.loop(); // 启动事件循环
    return 0;
}

src/client/main.cpp

#include "json.hpp"
#include <iostream>
#include <thread>
#include <string>
#include <vector>
#include <chrono>
#include <ctime>
#include <map>
#include <atomic> 

using namespace std;
using json = nlohmann::json;

#include <unistd.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <semaphore.h>

#include "group.hpp"
#include "user.hpp"
#include "public.hpp"
// 登录成功客户端记录用户相关信息,后续无需再从服务器获取了
User g_currentUser;                   // 记录当前系统登录的用户信息
vector<User> g_currentUserFriendList; // 记录当前登录用户的好友列表信息
vector<Group> g_currentUserGroupList; // 记录当前登录用户的群组列表信息
bool isMainMenuRunning = false;       // 控制主菜单页面程序:主菜单页面是否正在进行
sem_t rwsem;                          // 用于读写线程之间的通信
atomic_bool g_isLoginSuccess{false};  // 记录登录状态是否成功

void mainMenu(int clientfd);

// 显示当前登录成功用户的基本信息:打印用户id、名称,显示其好友列表与群组列表
void showCurrentUserData()
{
    cout << "======================login user======================" << endl;
    cout << "current login user id:" << g_currentUser.getId() << " name:" << g_currentUser.getName() << endl;
    cout << "----------------------friend list---------------------" << endl;
    if (!g_currentUserFriendList.empty())
    {
        for (User &user : g_currentUserFriendList)
        {
            cout << user.getId() << " " << user.getName() << " " << user.getState() << endl;
        }
    }
    cout << "----------------------group list----------------------" << endl;
    if (!g_currentUserGroupList.empty())
    {
        for (Group &group : g_currentUserGroupList)
        {
            cout << group.getId() << " " << group.getName() << " " << group.getDesc() << endl;
            for (GroupUser &user : group.getUsers())
            {
                cout << user.getId() << " " << user.getName() << " " << user.getState() << " " << user.getRole() << endl;
            }
        }
    }
    cout << "======================login user======================" << endl;
}

// 处理登录响应逻辑
void doLoginResponse(json &responsejs)
{
    if (0 != responsejs["errno"].get<int>()) // 登录失败
    {
        cerr << responsejs["errmsg"] << endl;
        g_isLoginSuccess = false;
    }
    else // 登录成功
    {
        // 记录当前用户的id和name
        g_currentUser.setId(responsejs["id"].get<int>());
        g_currentUser.setName(responsejs["name"]);

        // 记录当前用户的好友列表信息
        if (responsejs.contains("friends"))
        {
            // 初始化
            g_currentUserFriendList.clear();

            vector<string> vec = responsejs["friends"];
            for (string &str : vec)
            {
                json js = json::parse(str);
                User user;
                user.setId(js["id"].get<int>());
                user.setName(js["name"]);
                user.setState(js["state"]);
                g_currentUserFriendList.push_back(user);
            }
        }

        // 记录当前用户的群组列表信息
        if (responsejs.contains("groups"))
        {
            // 初始化
            g_currentUserGroupList.clear();

            vector<string> vec1 = responsejs["groups"];
            // cout<<"vec1.size: "<<vec1.size()<<endl;
            for (string &groupstr : vec1)
            {
                // cout<<"groupstr: "<<groupstr<<endl;
                json grpjs = json::parse(groupstr);
                Group group;
                group.setId(grpjs["id"].get<int>());
                group.setName(grpjs["groupname"]);
                group.setDesc(grpjs["groupdesc"]);

                vector<string> vec2 = grpjs["users"];
                for (string &userstr : vec2)
                {
                    GroupUser user;
                    json js = json::parse(userstr);
                    user.setId(js["id"].get<int>());
                    user.setName(js["name"]);
                    user.setState(js["state"]);
                    user.setRole(js["role"]);
                    group.getUsers().push_back(user);
                }

                g_currentUserGroupList.push_back(group);
            }
        }

        // 显示登录用户的基本信息
        showCurrentUserData();

        // 显示当前用户的离线消息  个人聊天信息或者群组消息
        if (responsejs.contains("offlinemsg"))
        {
            vector<string> vec = responsejs["offlinemsg"];
            for (string &str : vec)
            {
                json js = json::parse(str);
                // time + [id] + name + " said: " + xxx
                if (ONE_CHAT_MSG == js["msgid"].get<int>())
                {
                    cout << js["time"].get<string>() << " [" << js["id"] << "]" << js["name"].get<string>()
                            << " said: " << js["msg"].get<string>() << endl;
                }
                else if(ADD_FRIEND_REQ_MSG == js["msgid"].get<int>()) {
                    cout << "offline msg: " << js["msg"].get<string>() << endl;
                    // cout << js["time"].get<string>() << " [" << js["id"] << "]" << js["name"].get<string>()
                    //         << " said: " << js["msg"].get<string>() << endl;
                }
                else if(GROUP_CHAT_MSG == js["msgid"].get<int>())
                {
                    cout << "groupmsg[" << js["groupid"] << "]:" << js["time"].get<string>() << " [" << js["id"] << "]" << js["name"].get<string>()
                            << " said: " << js["msg"].get<string>() << endl;
                }
            }
        }

        g_isLoginSuccess = true;
    }
}

// 处理注册响应逻辑
void doRegResponse(json &responsejs)
{
    if (0 != responsejs["errno"].get<int>()) // errno不为0,注册失败
    {
        cerr << "name is already exist, register error!" << endl;
    }
    else // errno为0,注册成功,返回userid
    {
        cout << "name register success, userid is " << responsejs["id"] << ", do not forget it!" << endl;
    }
}

// 子线程,接收线程:接收用户的手动输入
void readTaskHandler(int clientfd)
{
    for (;;)
    {
        char buffer[1024] = {0};
        int len = recv(clientfd, buffer, 1024, 0);
        if (-1 == len || 0 == len)
        {
            close(clientfd);
            exit(-1);
        }

        // 接收数据,将网络发送过来的数据反序列化为json数据对象,如果是聊天信息则打印
        json js = json::parse(buffer);
        int msgtype = js["msgid"].get<int>();
        if (ONE_CHAT_MSG == msgtype) // 点对点聊天消息
        {
            cout << js["time"].get<string>() << " [" << js["id"] << "]" << js["name"].get<string>()
                 << " said: " << js["msg"].get<string>() << endl;
            continue;
        }

        if (GROUP_CHAT_MSG == msgtype) // 群消息
        {
            cout << "groupmsg[" << js["groupid"] << "]:" << js["time"].get<string>() << " [" << js["id"] << "]" << js["name"].get<string>()
                 << " said: " << js["msg"].get<string>() << endl;
            continue;
        }

        if (LOGIN_MSG_ACK == msgtype) // 处理登录响应消息
        {
            doLoginResponse(js);
            sem_post(&rwsem); // 子线程给主线程通知信号量
            continue;
        }

        if (REG_MSG_ACK == msgtype) // 处理注册响应消息
        {
            doRegResponse(js);
            sem_post(&rwsem); // 子线程给主线程通知信号量
            continue;
        }
    }
}

// 获取系统时间(聊天信息需要显示发送时间)
string getCurrentTime()
{
    auto tt = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
    struct tm *ptm = localtime(&tt);
    char date[60] = {0};
    sprintf(date, "%d-%02d-%02d %02d:%02d:%02d",
            (int)ptm->tm_year + 1900, (int)ptm->tm_mon + 1, (int)ptm->tm_mday,
            (int)ptm->tm_hour, (int)ptm->tm_min, (int)ptm->tm_sec);
    return std::string(date);
}
// 聊天客户端程序:main线程用作发送线程,子线程用作接收线程
int main(int argc, char **argv)
{
    if (argc < 3)
    {
        cerr << "command invalid ! example:./ChatClient 127.0.0.1 6000" << endl;
        exit(-1);
    }

    // 解析通过命令行参数传递的ip和port
    char *ip = argv[1];
    uint16_t port = atoi(argv[2]);

    // 创建client端的socket
    int clientfd = socket(AF_INET, SOCK_STREAM, 0);
    if (-1 == clientfd)
    {
        cerr << "socket create error" << endl;
        exit(-1);
    }

    // 填写client需要连接的server信息ip + port
    sockaddr_in server;
    memset(&server, 0, sizeof(sockaddr_in));

    server.sin_family = AF_INET;
    server.sin_port = htons(port);
    server.sin_addr.s_addr = inet_addr(ip);

    // client和server进行连接
    if (-1 == connect(clientfd, (sockaddr *)&server, sizeof(sockaddr_in)))
    {
        cerr << "connect server error" << endl;
        close(clientfd);
        exit(-1);
    }

    // 初始化读写线程通信用的信号量
    sem_init(&rwsem, 0, 0);

    // 连接服务器成功,启动接收子线程
    std::thread readTask(readTaskHandler, clientfd); // 底层为pthread_create()
    readTask.detach();                               // 底层为pthread_detach
    // main线程用于接收用户输入,负责发送数据
    for (;;)
    {
        // 显示首页面菜单:登录、注册、退出
        cout << "======================" << endl;
        cout << "1. login " << endl;
        cout << "2. register" << endl;
        cout << "3. quit" << endl;
        cout << "======================" << endl;
        cout << "please choice:";
        int choice = 0;
        cin >> choice;
        cin.get(); // 读掉缓冲区残留的回车

        switch (choice)
        {
        case 1: // login业务
        {
            // 获取用户的用户id和密码
            int id = 0;
            char pwd[50] = {0};
            cout << "userid:";
            cin >> id;
            cin.get(); // 读掉缓冲区残留的回车
            cout << "userpassword:";
            cin.getline(pwd, 50);

            // 组装json数据,将json数据对象序列化为字符串后通过网络发送给服务器
            json js;
            js["msgid"] = LOGIN_MSG;
            js["id"] = id;
            js["password"] = pwd;
            string request = js.dump();

            g_isLoginSuccess = false;

            int len = send(clientfd, request.c_str(), strlen(request.c_str()) + 1, 0);
            if (len == -1)
            {
                cerr << "send login msg error:" << request << endl;
            }

            sem_wait(&rwsem); // 等待信号量,由子线程处理完登录的响应消息后通知主线程

            if (g_isLoginSuccess) // 登录成功
            {
                // 进入聊天主菜单页面
                isMainMenuRunning = true;
                mainMenu(clientfd);
                
            }
        }
        break;
        case 2: // register业务
        {
            // 获取用户输入的用户名、密码
            char name[50] = {0};
            char pwd[50] = {0};
            cout << "username:";
            cin.getline(name, 50);
            cout << "userpassword:";
            cin.getline(pwd, 50);

            // 组装json数据,将json数据对象序列化为字符串后通过网络发送给服务器
            json js;
            js["msgid"] = REG_MSG;
            js["name"] = name;
            js["password"] = pwd;
            string request = js.dump();

            int len = send(clientfd, request.c_str(), strlen(request.c_str()) + 1, 0);
            if (len == -1) // 响应失败
            {
                cerr << "send res msg error:" << request << endl;
            }

            sem_wait(&rwsem); // 等待信号量,由子线程处理完登录的响应消息后通知主线程
        }
        break;
        case 3: // quit业务
        {
            close(clientfd);
            sem_destroy(&rwsem);
            exit(0);
        }
        default:
            cerr << "invalid input!" << endl;
            break;
        }
    }

    return 0;
}

//系统支持的客户端命令列表
unordered_map<string, string> commandMap = {
    {"help", "显示所有支持的命令,格式help"},
    {"chat", "一对一聊天,格式chat:friendid:message"},
    {"addfriend", "添加好友,格式addfriend:friendid"},
    {"ackaddfriend", "响应添加好友请求,格式ackaddfriend:friendid:true/false"},
    {"creategroup", "创建群组,格式creategroup:groupname:groupdesc"},
    {"addgroup", "加入群组,格式addgroup:groupid"},
    {"groupchat", "群聊,格式groupchat:groupid:message"},
    {"loginout","注销,格式loginout"}
};

//帮助信息:打印系统所支持的命令
void help(int fd = 0, string str = "")
{
    cout << "show command list >>> " << endl;
    for (auto &p : commandMap)
    {
        cout << p.first << " : " << p.second << endl;
    }
    cout << endl;
}

//一对一聊天:int接收sockfd,string接收用户发送的数据
void chat(int, string);

//添加好友请求:int接收sockfd,string接收用户发送的数据
void addfriend(int, string); 

//响应好友请求
void ackaddfriend(int, string); 

//创建群组:int接收sockfd,string接收用户发送的数据
void creategroup(int, string); 

//加入群组:int接收sockfd,string接收用户发送的数据
void addgroup(int, string); 

//群聊:int接收sockfd,string接收用户发送的数据
void groupchat(int, string); 

//注销:int接收sockfd,string接收用户发送的数据
void loginout(int, string); 

//注册系统支持的客户端命令处理
unordered_map<string, function<void(int,string)>> commandHandlerMap = {
    {"help", help},
    {"chat", chat},
    {"addfriend", addfriend},
    {"ackaddfriend",ackaddfriend},
    {"creategroup", creategroup},
    {"addgroup", addgroup},
    {"groupchat", groupchat},
    {"loginout", loginout}
};

// 登录成功后主聊天页面程序:设计符合开闭原则
void mainMenu(int clientfd)
{
    help();
    // cout<<"====================分割线===================="<<endl;
    // showCurrentUserData();
    char buffer[1024] = {0};
    for (;;)
    {
        cin.getline(buffer, 1024); //获取用户输入:命令分为两种,有冒号的业务与无冒号的业务
        string commandbuf(buffer);
        string command; //存储命令
        int idx = commandbuf.find(":");
        if (-1 == idx) //无冒号
        {
            command = commandbuf;
        }
        else //有冒号
        {
            command = commandbuf.substr(0, idx);
        }
        auto it = commandHandlerMap.find(command);
        if (it == commandHandlerMap.end()) //输入错误,未找到用户输入对应的业务
        {
            cerr << "invalid input command!" << endl;
            continue;
        }

        //调用响应命令的事件处理回调,mainMenu对修改封闭,添加新功能不需要修改该函数
        it->second(clientfd, commandbuf.substr(idx + 1,commandbuf.size() - idx)); //调用命令处理方法
        if(command == "loginout") {
            break;
        }
    }
}

// //添加好友:int接收sockfd,string接收用户发送的数据
// void addfriend(int clientfd, string str)
// {
//     int friendid = atoi(str.c_str());
//     json js;
//     js["msgid"] = ADD_FRIEND_MSG;
//     js["id"] = g_currentUser.getId();
//     std::cout<<"ADD_FRIEND_MSG: "<<ADD_FRIEND_MSG<<std::endl;
//     std::cout<<"g_currentUser.getId(): "<<g_currentUser.getId()<<std::endl;
//     js["friendid"] = friendid;
//     std::cout<<"friendid: "<<friendid<<std::endl;
//     string buffer = js.dump();
//     int len = send(clientfd, buffer.c_str(), strlen(buffer.c_str()) + 1, 0);
//     if (-1 == len)
//     {
//         cerr << "send addfriend msg error -> " << buffer << endl;
//     }
// }

//添加好友:int接收sockfd,string接收用户发送的数据
void addfriend(int clientfd, string str)
{
    int friendid = atoi(str.c_str());
    json js;
    js["msgid"] = ADD_FRIEND_REQ_MSG;
    js["id"] = g_currentUser.getId();
    // std::cout<<"ADD_FRIEND_REQ_MSG: "<<ADD_FRIEND_REQ_MSG<<std::endl;
    // std::cout<<"g_currentUser.getId(): "<<g_currentUser.getId()<<std::endl;
    js["friendid"] = friendid;
    // std::cout<<"friendid: "<<friendid<<std::endl;
    string buffer = js.dump();
    int len = send(clientfd, buffer.c_str(), strlen(buffer.c_str()) + 1, 0);
    if (-1 == len)
    {
        cerr << "send addfriend msg error -> " << buffer << endl;
    }
}

void ackaddfriend(int clientfd, string str) {
    int idx = str.find(":");
    if (-1 == idx)
    {
        cerr << "ackaddfriend command invalid!" << endl;
        return;
    }
    
    int friendid = atoi(str.substr(0, idx).c_str());
    bool flag = static_cast<bool>(str.substr(idx + 1, str.size() - idx).c_str());

    json js;
    js["msgid"] = ADD_FRIEND_MSG_ACK;
    js["id"] = g_currentUser.getId();
    js["friendid"] = friendid;
    js["flag"] = flag;
    string buffer = js.dump();

    int len = send(clientfd, buffer.c_str(), strlen(buffer.c_str()) + 1, 0);
    if (-1 == len)
    {
        cerr << "send ackaddfriend msg error -> " << buffer << endl;
    }
}

//一对一聊天:int接收sockfd,string接收用户发送的数据
void chat(int clientfd, string str)
{
    //解析用户输入的命令
    int idx = str.find(":"); //friendid:message
    if (-1 == idx)
    {
        cerr << "chat command invalid!" << endl;
        return;
    }

    int friendid = atoi(str.substr(0, idx).c_str());
    string message = str.substr(idx + 1, str.size() - idx);

    json js;
    js["msgid"] = ONE_CHAT_MSG;
    js["id"] = g_currentUser.getId();
    js["name"] = g_currentUser.getName();
    js["toid"] = friendid;
    js["msg"] = message;
    js["time"] = getCurrentTime();
    string buffer = js.dump();

    int len = send(clientfd, buffer.c_str(), strlen(buffer.c_str()) + 1, 0);
    if (-1 == len)
    {
        cerr << "send chat msg error -> " << buffer << endl;
    }
}


// 创建群组:int接收sockfd,string接收用户发送的数据
void creategroup(int clientfd, string str)
{
    int idx = str.find(":");
    if (-1 == idx)
    {
        cerr << "creategroup command invalid!" << endl;
        return;
    }

    string groupname = str.substr(0, idx);
    string groupdesc = str.substr(idx + 1, str.size() - idx);

    json js;
    js["msgid"] = CREATE_GROUP_MSG;
    js["id"] = g_currentUser.getId();
    js["groupname"] = groupname;
    js["groupdesc"] = groupdesc;
    string buffer = js.dump();

    int len = send(clientfd, buffer.c_str(), strlen(buffer.c_str()) + 1, 0);
    if (-1 == len)
    {
        cerr << "send creategroup msg error -> " << buffer << endl;
    }
}


// 加入群组:int接收sockfd,string接收用户发送的数据
void addgroup(int clientfd, string str)
{
    int groupid = atoi(str.c_str());
    json js;
    js["msgid"] = ADD_GROUP_MSG;
    js["id"] = g_currentUser.getId();
    js["groupid"] = groupid;
    string buffer = js.dump();

    int len = send(clientfd, buffer.c_str(), strlen(buffer.c_str()) + 1, 0);
    if (-1 == len)
    {
        cerr << "send addgroup msg error -> " << buffer << endl;
    }
}


// 群聊:int接收sockfd,string接收用户发送的数据
void groupchat(int clientfd, string str)
{
    int idx = str.find(":");
    if (-1 == idx)
    {
        cerr << "groupchat command invalid!" << endl;
        return;
    }

    int groupid = atoi(str.substr(0, idx).c_str());
    string message = str.substr(idx + 1, str.size() - idx);

    json js;
    js["msgid"] = GROUP_CHAT_MSG;
    js["id"] = g_currentUser.getId();
    js["name"] = g_currentUser.getName();
    js["groupid"] = groupid;
    js["msg"] = message;
    js["time"] = getCurrentTime();
    string buffer = js.dump();

    int len = send(clientfd, buffer.c_str(), strlen(buffer.c_str()) + 1, 0);
    if (-1 == len)
    {
        cerr << "send groupchat msg error -> " << buffer << endl;
    }
}

// 注销:int接收sockfd,string接收用户发送的数据
void loginout(int clientfd, string str)
{
    json js;
    js["msgid"] = LOGIN_OUT_MSG;
    js["id"] = g_currentUser.getId();
    string buffer = js.dump();
    std::cout<<"注销:int接收sockfd,string接收用户发送的数据 buffer: "<<buffer<<std::endl;

    int len = send(clientfd, buffer.c_str(), strlen(buffer.c_str()) + 1, 0);
    if (-1 == len)
    {
        cerr << "send loginout msg error -> " << buffer << endl;
    }
    else
    {
        isMainMenuRunning = false;
    }
    cout<<"isMainMenuRunning: "<<isMainMenuRunning<<endl;
}

autobash.sh

set -x
rm -rf `pwd`/build/*
cmake -B build
cmake --build build

 CMakeLists.txt

cmake_minimum_required(VERSION 3.28.0)
project(chat)

# 配置编译选项
set(CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS} -g)

# 配置可执行文件生成路径
set(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/bin)

# 配置头文件搜索路径
include_directories(${PROJECT_SOURCE_DIR}/include)
include_directories(${PROJECT_SOURCE_DIR}/include/server)
include_directories(${PROJECT_SOURCE_DIR}/include/server/db)
include_directories(${PROJECT_SOURCE_DIR}/include/server/model)
include_directories(${PROJECT_SOURCE_DIR}/include/server/redis)
include_directories(${PROJECT_SOURCE_DIR}/thirdparty)

# 加载子目录
add_subdirectory(src)

src/CMakeLists.txt

add_subdirectory(server)
add_subdirectory(client)

 src/server/CMakeLists.txt

# 定义了一个SRC_LIST变量 包含了该目录下所有的源文件
aux_source_directory(. SRC_LIST)
aux_source_directory(./db DB_LIST)
aux_source_directory(./model MODEL_LIST)
aux_source_directory(./redis REDIS_LIST)

# 指定生成可执行文件
add_executable(ChatServer ${SRC_LIST} ${DB_LIST} ${MODEL_LIST} ${REDIS_LIST})

# 指定可执行文件链接时需要依赖的库文件
target_link_libraries(ChatServer muduo_net muduo_base mysqlclient hiredis pthread)

 src/client/CMakeLists.txt

# 定义了一个SRC_LIST变量,包含了该目录下所有的源文件
aux_source_directory(. SRC_LIST)
# 指定生成可执行文件
add_executable(ChatClient ${SRC_LIST})
# 指定可执行文件链接时需要依赖的库文件
target_link_libraries(ChatClient pthread)

完整项目:

heheda102410/ChatServer: C++集群聊天服务器 nginx+redis+muduo+mysql数据库连接池 (github.com)https://github.com/heheda102410/ChatServerC++集群聊天服务器 muduo+nginx+redis+mysql数据库连接池 笔记 (下),数据库,服务器,nginx,redis,muduo,数据库连接池

heheda@linux:~/Linux/Chat/bin$ ./ChatClient 127.0.0.1 8888
======================
1. login 
2. register
3. quit
======================
please choice:1
userid:1
userpassword:1024
======================login user======================
current login user id:1 name:heheda
----------------------friend list---------------------
8 coco offline
9 daoji offline
2 Tom offline
3 Jerry offline
----------------------group list----------------------
3 1 C++ Chat Group
1 heheda online creator
2 Tom offline normal
3 Jerry offline normal
======================login user======================
groupmsg[3]:2024-02-15 19:19:29 [3]Jerry said: wwoahis
show command list >>> 
addgroup : 加入群组,格式addgroup:groupid
creategroup : 创建群组,格式creategroup:groupname:groupdesc
ackaddfriend : 响应添加好友请求,格式ackaddfriend:friendid:true/false
loginout : 注销,格式loginout
addfriend : 添加好友,格式addfriend:friendid
groupchat : 群聊,格式groupchat:groupid:message
chat : 一对一聊天,格式chat:friendid:message
help : 显示所有支持的命令,格式help
heheda@linux:~/Linux/Chat/bin$ ./ChatClient 127.0.0.1 8888
======================
1. login 
2. register
3. quit
======================
please choice:1
userid:2
userpassword:520
======================login user======================
current login user id:2 name:Tom
----------------------friend list---------------------
1 heheda online
8 coco offline
----------------------group list----------------------
3 1 C++ Chat Group
1 heheda online creator
2 Tom online normal
3 Jerry offline normal
======================login user======================
show command list >>> 
addgroup : 加入群组,格式addgroup:groupid
creategroup : 创建群组,格式creategroup:groupname:groupdesc
ackaddfriend : 响应添加好友请求,格式ackaddfriend:friendid:true/false
loginout : 注销,格式loginout
addfriend : 添加好友,格式addfriend:friendid
groupchat : 群聊,格式groupchat:groupid:message
chat : 一对一聊天,格式chat:friendid:message
help : 显示所有支持的命令,格式help

推荐文章: 

jsoncpp库和nlohmann-json库实现JSON与字符串类型转换_jsoncpp string转json-CSDN博客https://blog.csdn.net/gezongbo/article/details/132083993

ubuntu 垃圾清理的方式 - 简书 (jianshu.com)https://www.jianshu.com/p/5cba6a541eb9

linux机器报错: 设备上没有空间_fatal error: error closing /tmp/cczy5luu.s: 设备上没有空-CSDN博客https://blog.csdn.net/qq_45003354/article/details/135698562本地代码上传至github的两种方法_如何将本地代码上传到github-CSDN博客https://blog.csdn.net/weixin_62526435/article/details/128386541文章来源地址https://www.toymoban.com/news/detail-828016.html

到了这里,关于C++集群聊天服务器 muduo+nginx+redis+mysql数据库连接池 笔记 (下)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • C++集群聊天服务器 网络模块+业务模块+CMake构建项目 笔记 (上)

    跟着施磊老师做C++项目,施磊老师_腾讯课堂 (qq.com) 一、网络模块ChatServer chatserver.hpp chatserver.cpp json 里边会包含一个 msgid .由于客户端和服务器通信收发消息,需要判断这个消息是属于哪种业务的,就需要一个业务的标识,所以就用 msgid 来表示业务的标识.在 onMessage 函数中,并不想

    2024年02月20日
    浏览(51)
  • 基于ssm+shiro+redis+nginx tomcat服务器集群管理项目

    毕业设计——基于ssm+shiro+redis+nginx tomcat服务器集群管理项目 完整项目地址:https://download.csdn.net/download/lijunhcn/88430549 1.搭建一个最简洁,模块划分最明确的ssm+swargger+shiro+redis+nginx整合项目,采用maven作为构建工具,在有新项目开发时可以借助此demo快速构建项目 2.实现shiro的授

    2024年02月03日
    浏览(64)
  • 毕业设计——基于ssm+shiro+redis+nginx tomcat服务器集群管理项目

    毕业设计——基于ssm+shiro+redis+nginx tomcat服务器集群管理项目 完整项目地址:https://download.csdn.net/download/lijunhcn/88430549 1.搭建一个最简洁,模块划分最明确的ssm+swargger+shiro+redis+nginx整合项目,采用maven作为构建工具,在有新项目开发时可以借助此demo快速构建项目 2.实现shiro的授

    2024年02月04日
    浏览(59)
  • 一、C++项目:仿muduo库实现高性能高并发服务器

    仿mudou库one thread oneloop式并发服务器实现 仿muduo库One Thread One Loop式主从Reactor模型实现高并发服务器: 通过实现的高并发服务器组件,可以简洁快速的完成一个高性能的服务器搭建。并且,通过组件内提供的不同应用层协议支持,也可以快速完成一个高性能应用服务器的搭建

    2024年02月07日
    浏览(51)
  • Nginx(7)Nginx实现服务器端集群搭建

    前面课程已经将Nginx的大部分内容进行了讲解,我们都知道了Nginx在高并发场景和处理静态资源是非常高性能的,但是在实际项目中除了静态资源还有就是后台业务代码模块,一般后台业务都会被部署在Tomcat,weblogic或者是websphere等web服务器上。那么如何使用Nginx接收用户的请

    2024年02月09日
    浏览(54)
  • 概述、搭建Redis服务器、部署LNP+Redis、创建Redis集群、连接集群、集群工作原理

    Top 案例1:搭建redis服务器 案例2:常用命令限 案例3:部署LNP+Redis 案例4:创建redis集群 1.1 具体要求如下 在主机redis64运行redis服务 修改服务运行参数 ip 地址192.168.88.64 服务监听的端口6364 redis服务的连接密码为 tarenaplj 1.2 方案 准备1台新虚拟机,要求如表-1所示。   1.3 步骤 实

    2024年02月12日
    浏览(84)
  • 一台服务器上部署 Redis 伪集群

    哈喽大家好,我是咸鱼 今天这篇文章介绍如何在一台服务器(以 CentOS 7.9 为例)上通过 redis-trib.rb 工具搭建 Redis cluster (三主三从) redis-trib.rb 是一个基于 Ruby 编写的脚本,其功能涵盖了创建、管理以及维护 Redis 集群的各个方面 值得注意的是,随着时间的推移,一些较新版

    2024年02月11日
    浏览(87)
  • minio集群部署,4台服务器+1台nginx

    分布式Minio里所有的节点需要有同样的access秘钥和secret秘钥,即:用户名和密码 分布式Minio存放数据的磁盘目录必须是空目录 分布式Minio官方建议生产环境最少4个节点,因为有N个节点,得至少保证有N/2的节点才能可读,保证至少N/2+1的节点才能可写。这里只是作演示搭建,只

    2024年02月15日
    浏览(56)
  • docker服务器中redis-cluster集群配置(redis-5.0.7)

    因为需要使用到docker服务器下的redis-cluster集群环境,而以前redis3.2.8版本的redis搭配起来费事费力还没有成功,所以使用了较新一些的redis版本----redis-5.0.7。 默认:dockers已经安装成功 1.1下载tar包 1.2把进行安装 2.1 编写配置文件 #编写目录 mkdir -p /usr/local/docker-redis/redis-cluster #切

    2024年02月21日
    浏览(60)
  • Redis持久化说明及其单台Linux服务器搭建Redis集群架构

    说明:RDB快照主要以二进制文件的形式进行存储数据,主要以文件名dump.rdb进行存储,主要设置redis.conf里面设置’save 60 1000’命令可以开启, 表示在60秒内操作1000次进行一次备份数据。在客户端执行save(同步)和bgsave(异步操作)。 redis.conf 启动redis相关命令 说明:主要把文件生

    2024年02月10日
    浏览(59)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包