下面我将为你实现一个完整的TCP连接管理器,包含连接建立、维护、数据收发和连接关闭等功能。
核心设计
#include <iostream>
#include <vector>
#include <memory>
#include <unordered_map>
#include <mutex>
#include <thread>
#include <atomic>
#include <functional>
#include <boost/asio.hpp>
using boost::asio::ip::tcp;
using namespace std;
// 连接状态枚举
enum class ConnectionState {
DISCONNECTED,
CONNECTING,
CONNECTED,
DISCONNECTING
};
// TCP连接类
class TCPConnection : public enable_shared_from_this<TCPConnection> {
public:
using Pointer = shared_ptr<TCPConnection>;
using DataCallback = function<void(Pointer, const char*, size_t)>;
using StateCallback = function<void(Pointer, ConnectionState)>;
TCPConnection(tcp::socket socket, DataCallback dataCb, StateCallback stateCb)
: socket_(move(socket)), dataCallback_(dataCb), stateCallback_(stateCb) {
state_ = ConnectionState::CONNECTED;
if (stateCallback_) {
stateCallback_(shared_from_this(), state_);
}
}
~TCPConnection() {
if (state_ != ConnectionState::DISCONNECTED) {
disconnect();
}
}
tcp::socket& socket() { return socket_; }
void send(const char* data, size_t length) {
if (state_ != ConnectionState::CONNECTED) {
return;
}
bool write_in_progress = !writeQueue_.empty();
writeQueue_.emplace(data, data + length);
if (!write_in_progress) {
doWrite();
}
}
void disconnect() {
if (state_ == ConnectionState::DISCONNECTED || state_ == ConnectionState::DISCONNECTING) {
return;
}
state_ = ConnectionState::DISCONNECTING;
if (stateCallback_) {
stateCallback_(shared_from_this(), state_);
}
boost::system::error_code ec;
socket_.shutdown(tcp::socket::shutdown_both, ec);
socket_.close(ec);
state_ = ConnectionState::DISCONNECTED;
if (stateCallback_) {
stateCallback_(shared_from_this(), state_);
}
}
ConnectionState getState() const { return state_; }
tcp::endpoint remoteEndpoint() const {
return socket_.remote_endpoint();
}
private:
void doRead() {
auto self(shared_from_this());
socket_.async_read_some(boost::asio::buffer(readBuffer_),
[this, self](boost::system::error_code ec, size_t length) {
if (!ec && state_ == ConnectionState::CONNECTED) {
if (dataCallback_) {
dataCallback_(shared_from_this(), readBuffer_.data(), length);
}
doRead();
} else {
if (ec && ec != boost::asio::error::operation_aborted) {
cerr << "Read error: " << ec.message() << endl;
}
disconnect();
}
});
}
void doWrite() {
auto self(shared_from_this());
boost::asio::async_write(socket_,
boost::asio::buffer(writeQueue_.front()),
[this, self](boost::system::error_code ec, size_t /*length*/) {
if (!ec) {
writeQueue_.pop();
if (!writeQueue_.empty()) {
doWrite();
}
} else {
cerr << "Write error: " << ec.message() << endl;
disconnect();
}
});
}
tcp::socket socket_;
queue<pair<const char*, const char*>> writeQueue_;
char readBuffer_[8192];
ConnectionState state_;
DataCallback dataCallback_;
StateCallback stateCallback_;
};
// TCP连接管理器
class TCPConnectionManager {
public:
using ConnectionPtr = TCPConnection::Pointer;
using ConnectionMap = unordered_map<tcp::endpoint, ConnectionPtr>;
using NewConnectionCallback = function<void(ConnectionPtr)>;
using DisconnectionCallback = function<void(ConnectionPtr)>;
TCPConnectionManager(boost::asio::io_context& ioContext)
: ioContext_(ioContext), acceptor_(ioContext) {}
~TCPConnectionManager() {
stop();
}
// 启动服务器监听
bool startServer(unsigned short port) {
try {
tcp::endpoint endpoint(tcp::v4(), port);
acceptor_.open(endpoint.protocol());
acceptor_.set_option(tcp::acceptor::reuse_address(true));
acceptor_.bind(endpoint);
acceptor_.listen();
startAccept();
return true;
} catch (const exception& e) {
cerr << "Server start failed: " << e.what() << endl;
return false;
}
}
// 连接到远程服务器
void connectToServer(const string& host, unsigned short port) {
tcp::resolver resolver(ioContext_);
auto self(shared_from_this());
resolver.async_resolve(host, to_string(port),
[this, self, host, port](boost::system::error_code ec, tcp::resolver::results_type endpoints) {
if (!ec) {
for (auto& endpoint : endpoints) {
doConnect(endpoint);
break; // 只连接第一个解析的地址
}
} else {
cerr << "Resolve failed: " << ec.message() << endl;
}
});
}
// 停止所有连接
void stop() {
lock_guard<mutex> lock(connectionsMutex_);
for (auto& pair : connections_) {
pair.second->disconnect();
}
connections_.clear();
if (acceptor_.is_open()) {
boost::system::error_code ec;
acceptor_.close(ec);
}
}
// 断开指定连接
void disconnect(const tcp::endpoint& endpoint) {
lock_guard<mutex> lock(connectionsMutex_);
auto it = connections_.find(endpoint);
if (it != connections_.end()) {
it->second->disconnect();
connections_.erase(it);
}
}
// 发送数据到指定连接
void sendTo(const tcp::endpoint& endpoint, const char* data, size_t length) {
lock_guard<mutex> lock(connectionsMutex_);
auto it = connections_.find(endpoint);
if (it != connections_.end()) {
it->second->send(data, length);
}
}
// 广播数据到所有连接
void broadcast(const char* data, size_t length) {
lock_guard<mutex> lock(connectionsMutex_);
for (auto& pair : connections_) {
pair.second->send(data, length);
}
}
// 设置回调函数
void setNewConnectionCallback(NewConnectionCallback cb) {
newConnectionCallback_ = cb;
}
void setDisconnectionCallback(DisconnectionCallback cb) {
disconnectionCallback_ = cb;
}
// 获取连接数量
size_t getConnectionCount() const {
lock_guard<mutex> lock(connectionsMutex_);
return connections_.size();
}
private:
void startAccept() {
acceptor_.async_accept(
[this](boost::system::error_code ec, tcp::socket socket) {
if (!ec) {
tcp::endpoint endpoint = socket.remote_endpoint();
auto connection = make_shared<TCPConnection>(
move(socket),
[this](ConnectionPtr conn, const char* data, size_t length) {
if (dataCallback_) dataCallback_(conn, data, length);
},
[this](ConnectionPtr conn, ConnectionState state) {
if (state == ConnectionState::CONNECTED) {
onNewConnection(conn);
} else if (state == ConnectionState::DISCONNECTED) {
onConnectionDisconnected(conn);
}
});
{
lock_guard<mutex> lock(connectionsMutex_);
connections_[connection->remoteEndpoint()] = connection;
}
connection->doRead();
} else {
cerr << "Accept failed: " << ec.message() << endl;
}
startAccept();
});
}
void doConnect(const tcp::endpoint& endpoint) {
try {
tcp::socket socket(ioContext_);
boost::system::error_code ec;
socket.connect(endpoint, ec);
if (!ec) {
auto connection = make_shared<TCPConnection>(
move(socket),
[this](ConnectionPtr conn, const char* data, size_t length) {
if (dataCallback_) dataCallback_(conn, data, length);
},
[this](ConnectionPtr conn, ConnectionState state) {
if (state == ConnectionState::CONNECTED) {
onNewConnection(conn);
} else if (state == ConnectionState::DISCONNECTED) {
onConnectionDisconnected(conn);
}
});
{
lock_guard<mutex> lock(connectionsMutex_);
connections_[connection->remoteEndpoint()] = connection;
}
connection->doRead();
} else {
cerr << "Connect failed: " << ec.message() << endl;
}
} catch (const exception& e) {
cerr << "Connect exception: " << e.what() << endl;
}
}
void onNewConnection(ConnectionPtr conn) {
if (newConnectionCallback_) {
newConnectionCallback_(conn);
}
}
void onConnectionDisconnected(ConnectionPtr conn) {
lock_guard<mutex> lock(connectionsMutex_);
auto it = connections_.find(conn->remoteEndpoint());
if (it != connections_.end() && it->second == conn) {
connections_.erase(it);
}
if (disconnectionCallback_) {
disconnectionCallback_(conn);
}
}
boost::asio::io_context& ioContext_;
tcp::acceptor acceptor_;
ConnectionMap connections_;
mutable mutex connectionsMutex_;
NewConnectionCallback newConnectionCallback_;
DisconnectionCallback disconnectionCallback_;
function<void(ConnectionPtr, const char*, size_t)> dataCallback_;
};
![图片[1]_C++ TCP连接管理器实现_知途无界](https://zhituwujie.com/wp-content/uploads/2025/09/d2b5ca33bd20250924101931.png)
使用示例
int main() {
try {
boost::asio::io_context ioContext;
// 创建连接管理器
TCPConnectionManager manager(ioContext);
// 设置回调
manager.setNewConnectionCallback([](TCPConnection::Pointer conn) {
cout << "New connection from: " << conn->remoteEndpoint() << endl;
});
manager.setDisconnectionCallback([](TCPConnection::Pointer conn) {
cout << "Disconnected: " << conn->remoteEndpoint() << endl;
});
manager.dataCallback_ = [](TCPConnection::Pointer conn, const char* data, size_t length) {
cout << "Received " << length << " bytes from " << conn->remoteEndpoint()
<< ": " << string(data, length) << endl;
};
// 启动服务器监听端口 8080
if (manager.startServer(8080)) {
cout << "Server started on port 8080" << endl;
}
// 或者连接到远程服务器
// manager.connectToServer("127.0.0.1", 8080);
// 启动一个线程运行io_context
thread ioThread([&ioContext]() {
ioContext.run();
});
// 主线程可以在这里做其他事情,比如发送数据
this_thread::sleep_for(chrono::seconds(1));
// 示例:广播消息给所有连接
// manager.broadcast("Hello all clients!", 16);
// 等待退出
cout << "Press Enter to exit..." << endl;
cin.get();
// 停止管理器
manager.stop();
ioContext.stop();
ioThread.join();
} catch (const exception& e) {
cerr << "Exception: " << e.what() << endl;
}
return 0;
}
功能特点
- 连接管理:
- 支持同时作为服务器和客户端
- 自动管理连接的生命周期
- 线程安全的连接存储和访问
- 数据通信:
- 异步读写操作
- 支持发送和接收数据
- 支持广播和定向发送
- 事件回调:
- 新连接建立回调
- 连接断开回调
- 数据接收回调
- 错误处理:
- 完善的异常处理
- 连接状态管理
- 扩展性:
- 模块化设计,易于扩展
- 支持自定义回调函数
编译说明
此实现使用了Boost.Asio库,编译时需要链接Boost库:
g++ -std=c++17 tcp_manager.cpp -o tcp_manager -lboost_system -lpthread
这个TCP连接管理器提供了完整的网络通信功能,可以作为更复杂网络应用的基础框架。你可以根据具体需求进行扩展和定制。
© 版权声明
文中内容均来源于公开资料,受限于信息的时效性和复杂性,可能存在误差或遗漏。我们已尽力确保内容的准确性,但对于因信息变更或错误导致的任何后果,本站不承担任何责任。如需引用本文内容,请注明出处并尊重原作者的版权。
THE END

























暂无评论内容