C++ TCP连接管理器实现

下面我将为你实现一个完整的TCP连接管理器,包含连接建立、维护、数据收发和连接关闭等功能。

核心设计

#include <iostream>
#include <vector>
#include <memory>
#include <unordered_map>
#include <mutex>
#include <thread>
#include <atomic>
#include <functional>
#include <boost/asio.hpp>

using boost::asio::ip::tcp;
using namespace std;

// 连接状态枚举
enum class ConnectionState {
    DISCONNECTED,
    CONNECTING,
    CONNECTED,
    DISCONNECTING
};

// TCP连接类
class TCPConnection : public enable_shared_from_this<TCPConnection> {
public:
    using Pointer = shared_ptr<TCPConnection>;
    using DataCallback = function<void(Pointer, const char*, size_t)>;
    using StateCallback = function<void(Pointer, ConnectionState)>;

    TCPConnection(tcp::socket socket, DataCallback dataCb, StateCallback stateCb)
        : socket_(move(socket)), dataCallback_(dataCb), stateCallback_(stateCb) {
        state_ = ConnectionState::CONNECTED;
        if (stateCallback_) {
            stateCallback_(shared_from_this(), state_);
        }
    }

    ~TCPConnection() {
        if (state_ != ConnectionState::DISCONNECTED) {
            disconnect();
        }
    }

    tcp::socket& socket() { return socket_; }

    void send(const char* data, size_t length) {
        if (state_ != ConnectionState::CONNECTED) {
            return;
        }

        bool write_in_progress = !writeQueue_.empty();
        writeQueue_.emplace(data, data + length);
        
        if (!write_in_progress) {
            doWrite();
        }
    }

    void disconnect() {
        if (state_ == ConnectionState::DISCONNECTED || state_ == ConnectionState::DISCONNECTING) {
            return;
        }

        state_ = ConnectionState::DISCONNECTING;
        if (stateCallback_) {
            stateCallback_(shared_from_this(), state_);
        }

        boost::system::error_code ec;
        socket_.shutdown(tcp::socket::shutdown_both, ec);
        socket_.close(ec);
        
        state_ = ConnectionState::DISCONNECTED;
        if (stateCallback_) {
            stateCallback_(shared_from_this(), state_);
        }
    }

    ConnectionState getState() const { return state_; }

    tcp::endpoint remoteEndpoint() const {
        return socket_.remote_endpoint();
    }

private:
    void doRead() {
        auto self(shared_from_this());
        socket_.async_read_some(boost::asio::buffer(readBuffer_),
            [this, self](boost::system::error_code ec, size_t length) {
                if (!ec && state_ == ConnectionState::CONNECTED) {
                    if (dataCallback_) {
                        dataCallback_(shared_from_this(), readBuffer_.data(), length);
                    }
                    doRead();
                } else {
                    if (ec && ec != boost::asio::error::operation_aborted) {
                        cerr << "Read error: " << ec.message() << endl;
                    }
                    disconnect();
                }
            });
    }

    void doWrite() {
        auto self(shared_from_this());
        boost::asio::async_write(socket_, 
            boost::asio::buffer(writeQueue_.front()),
            [this, self](boost::system::error_code ec, size_t /*length*/) {
                if (!ec) {
                    writeQueue_.pop();
                    if (!writeQueue_.empty()) {
                        doWrite();
                    }
                } else {
                    cerr << "Write error: " << ec.message() << endl;
                    disconnect();
                }
            });
    }

    tcp::socket socket_;
    queue<pair<const char*, const char*>> writeQueue_;
    char readBuffer_[8192];
    ConnectionState state_;
    DataCallback dataCallback_;
    StateCallback stateCallback_;
};

// TCP连接管理器
class TCPConnectionManager {
public:
    using ConnectionPtr = TCPConnection::Pointer;
    using ConnectionMap = unordered_map<tcp::endpoint, ConnectionPtr>;
    using NewConnectionCallback = function<void(ConnectionPtr)>;
    using DisconnectionCallback = function<void(ConnectionPtr)>;

    TCPConnectionManager(boost::asio::io_context& ioContext)
        : ioContext_(ioContext), acceptor_(ioContext) {}

    ~TCPConnectionManager() {
        stop();
    }

    // 启动服务器监听
    bool startServer(unsigned short port) {
        try {
            tcp::endpoint endpoint(tcp::v4(), port);
            acceptor_.open(endpoint.protocol());
            acceptor_.set_option(tcp::acceptor::reuse_address(true));
            acceptor_.bind(endpoint);
            acceptor_.listen();
            
            startAccept();
            return true;
        } catch (const exception& e) {
            cerr << "Server start failed: " << e.what() << endl;
            return false;
        }
    }

    // 连接到远程服务器
    void connectToServer(const string& host, unsigned short port) {
        tcp::resolver resolver(ioContext_);
        auto self(shared_from_this());
        
        resolver.async_resolve(host, to_string(port),
            [this, self, host, port](boost::system::error_code ec, tcp::resolver::results_type endpoints) {
                if (!ec) {
                    for (auto& endpoint : endpoints) {
                        doConnect(endpoint);
                        break; // 只连接第一个解析的地址
                    }
                } else {
                    cerr << "Resolve failed: " << ec.message() << endl;
                }
            });
    }

    // 停止所有连接
    void stop() {
        lock_guard<mutex> lock(connectionsMutex_);
        for (auto& pair : connections_) {
            pair.second->disconnect();
        }
        connections_.clear();
        
        if (acceptor_.is_open()) {
            boost::system::error_code ec;
            acceptor_.close(ec);
        }
    }

    // 断开指定连接
    void disconnect(const tcp::endpoint& endpoint) {
        lock_guard<mutex> lock(connectionsMutex_);
        auto it = connections_.find(endpoint);
        if (it != connections_.end()) {
            it->second->disconnect();
            connections_.erase(it);
        }
    }

    // 发送数据到指定连接
    void sendTo(const tcp::endpoint& endpoint, const char* data, size_t length) {
        lock_guard<mutex> lock(connectionsMutex_);
        auto it = connections_.find(endpoint);
        if (it != connections_.end()) {
            it->second->send(data, length);
        }
    }

    // 广播数据到所有连接
    void broadcast(const char* data, size_t length) {
        lock_guard<mutex> lock(connectionsMutex_);
        for (auto& pair : connections_) {
            pair.second->send(data, length);
        }
    }

    // 设置回调函数
    void setNewConnectionCallback(NewConnectionCallback cb) {
        newConnectionCallback_ = cb;
    }

    void setDisconnectionCallback(DisconnectionCallback cb) {
        disconnectionCallback_ = cb;
    }

    // 获取连接数量
    size_t getConnectionCount() const {
        lock_guard<mutex> lock(connectionsMutex_);
        return connections_.size();
    }

private:
    void startAccept() {
        acceptor_.async_accept(
            [this](boost::system::error_code ec, tcp::socket socket) {
                if (!ec) {
                    tcp::endpoint endpoint = socket.remote_endpoint();
                    auto connection = make_shared<TCPConnection>(
                        move(socket),
                        [this](ConnectionPtr conn, const char* data, size_t length) {
                            if (dataCallback_) dataCallback_(conn, data, length);
                        },
                        [this](ConnectionPtr conn, ConnectionState state) {
                            if (state == ConnectionState::CONNECTED) {
                                onNewConnection(conn);
                            } else if (state == ConnectionState::DISCONNECTED) {
                                onConnectionDisconnected(conn);
                            }
                        });
                    
                    {
                        lock_guard<mutex> lock(connectionsMutex_);
                        connections_[connection->remoteEndpoint()] = connection;
                    }
                    
                    connection->doRead();
                } else {
                    cerr << "Accept failed: " << ec.message() << endl;
                }
                
                startAccept();
            });
    }

    void doConnect(const tcp::endpoint& endpoint) {
        try {
            tcp::socket socket(ioContext_);
            boost::system::error_code ec;
            socket.connect(endpoint, ec);
            
            if (!ec) {
                auto connection = make_shared<TCPConnection>(
                    move(socket),
                    [this](ConnectionPtr conn, const char* data, size_t length) {
                        if (dataCallback_) dataCallback_(conn, data, length);
                    },
                    [this](ConnectionPtr conn, ConnectionState state) {
                        if (state == ConnectionState::CONNECTED) {
                            onNewConnection(conn);
                        } else if (state == ConnectionState::DISCONNECTED) {
                            onConnectionDisconnected(conn);
                        }
                    });
                
                {
                    lock_guard<mutex> lock(connectionsMutex_);
                    connections_[connection->remoteEndpoint()] = connection;
                }
                
                connection->doRead();
            } else {
                cerr << "Connect failed: " << ec.message() << endl;
            }
        } catch (const exception& e) {
            cerr << "Connect exception: " << e.what() << endl;
        }
    }

    void onNewConnection(ConnectionPtr conn) {
        if (newConnectionCallback_) {
            newConnectionCallback_(conn);
        }
    }

    void onConnectionDisconnected(ConnectionPtr conn) {
        lock_guard<mutex> lock(connectionsMutex_);
        auto it = connections_.find(conn->remoteEndpoint());
        if (it != connections_.end() && it->second == conn) {
            connections_.erase(it);
        }
        
        if (disconnectionCallback_) {
            disconnectionCallback_(conn);
        }
    }

    boost::asio::io_context& ioContext_;
    tcp::acceptor acceptor_;
    ConnectionMap connections_;
    mutable mutex connectionsMutex_;
    NewConnectionCallback newConnectionCallback_;
    DisconnectionCallback disconnectionCallback_;
    function<void(ConnectionPtr, const char*, size_t)> dataCallback_;
};
图片[1]_C++ TCP连接管理器实现_知途无界

使用示例

int main() {
    try {
        boost::asio::io_context ioContext;
        
        // 创建连接管理器
        TCPConnectionManager manager(ioContext);
        
        // 设置回调
        manager.setNewConnectionCallback([](TCPConnection::Pointer conn) {
            cout << "New connection from: " << conn->remoteEndpoint() << endl;
        });
        
        manager.setDisconnectionCallback([](TCPConnection::Pointer conn) {
            cout << "Disconnected: " << conn->remoteEndpoint() << endl;
        });
        
        manager.dataCallback_ = [](TCPConnection::Pointer conn, const char* data, size_t length) {
            cout << "Received " << length << " bytes from " << conn->remoteEndpoint() 
                 << ": " << string(data, length) << endl;
        };
        
        // 启动服务器监听端口 8080
        if (manager.startServer(8080)) {
            cout << "Server started on port 8080" << endl;
        }
        
        // 或者连接到远程服务器
        // manager.connectToServer("127.0.0.1", 8080);
        
        // 启动一个线程运行io_context
        thread ioThread([&ioContext]() {
            ioContext.run();
        });
        
        // 主线程可以在这里做其他事情,比如发送数据
        this_thread::sleep_for(chrono::seconds(1));
        
        // 示例:广播消息给所有连接
        // manager.broadcast("Hello all clients!", 16);
        
        // 等待退出
        cout << "Press Enter to exit..." << endl;
        cin.get();
        
        // 停止管理器
        manager.stop();
        ioContext.stop();
        ioThread.join();
        
    } catch (const exception& e) {
        cerr << "Exception: " << e.what() << endl;
    }
    
    return 0;
}

功能特点

  1. 连接管理​:
    • 支持同时作为服务器和客户端
    • 自动管理连接的生命周期
    • 线程安全的连接存储和访问
  2. 数据通信​:
    • 异步读写操作
    • 支持发送和接收数据
    • 支持广播和定向发送
  3. 事件回调​:
    • 新连接建立回调
    • 连接断开回调
    • 数据接收回调
  4. 错误处理​:
    • 完善的异常处理
    • 连接状态管理
  5. 扩展性​:
    • 模块化设计,易于扩展
    • 支持自定义回调函数

编译说明

此实现使用了Boost.Asio库,编译时需要链接Boost库:

g++ -std=c++17 tcp_manager.cpp -o tcp_manager -lboost_system -lpthread

这个TCP连接管理器提供了完整的网络通信功能,可以作为更复杂网络应用的基础框架。你可以根据具体需求进行扩展和定制。

© 版权声明
THE END
喜欢就点个赞,支持一下吧!
点赞11 分享
评论 抢沙发
头像
欢迎您留下评论!
提交
头像

昵称

取消
昵称表情代码图片

    暂无评论内容