基于libuv封装网络事件库
基于libuv封装网络事件库
本文详细介绍了如何基于libuv库封装网络事件库。文章内容包括libuv的简介、主要特点、使用注意事项以及具体实现代码。文章结构清晰,内容详实,包含了事件循环、异步事件、TCP客户端和定时器等多个方面的实现细节,并提供了完整的代码示例。
一、简介
官网:libuv | Cross-platform asynchronous I/O
源码地址:GitHub - libuv/libuv: Cross-platform asynchronous I/O
libuv是一个跨平台的异步事件驱动库,采用非阻塞方式执行I/O操作、网络通信、定时器等。
主要特点:
- 跨平台支持:支持Windows、Linux、Mac。
- 事件循环:libuv有一个基于事件循环的模型,不停轮询事件,有事件产生就会进行相应的回调。
- 异步非阻塞IO:支持文件、socket、管道等多种异步回调
事件循环如下
二、使用
使用注意:
libuv不是线程安全的,需要在一个线程中创建uv_loop,初始化,run,并后面的所有基于该loop的操作都必须在该线程中。
每个loop需运行在独立线程中,建立多个线程,每个线程运行一个loop,即可实现多线程循环。
其中注意 uv_close(uv_handle_t* handle, uv_close_cb close_cb),内部是将对象添加到loop的closing_handing队列中,等到下次执行,回调后才会从loop队列中移除。 所以需要再close_cb中才能销毁handle。UV的close操作都是需要异步回调完成后才能销毁
下面分别介绍不同的事件类型。
event_loop
初始化、运行在同一个线程中,需封装一个event_loop提供接口,支持外部线程切换到该loop线程。
int uv_loop_init(uv_loop_t* loop);
int uv_loop_close(uv_loop_t* loop);
int uv_run(uv_loop_t*, uv_run_mode mode);
EventLoop::EventLoop()
{
printf("EventLoop::EventLoop() this[%p]\n", this);
event_recorder_ = std::make_shared<EventRecorder>();
loop_ = new uv_loop_t();
uv_loop_init(loop_);
async_ = std::make_shared<EventAsync>(this);
}
EventLoop::~EventLoop()
{
if(loop_ != nullptr) {
uv_loop_close(loop_);
delete loop_;
loop_ = nullptr;
}
printf("EventLoop::~EventLoop() this[%p]\n", this);
}
int32_t EventLoop::Loop()
{
async_->Start();
int32_t ret = uv_run(loop_, UV_RUN_DEFAULT);
printf("EventLoop::Loop() run end this[%p]\n", this);
event_recorder_->Dump();
return ret;
}
void EventLoop::Quit()
{
printf("EventLoop::Quit() begin this[%p]\n", this);
//uv_stop(loop_);
RunInLoop([this]() {
event_recorder_->Dump();
async_->Close(nullptr);
});
printf("EventLoop::Quit() end this[%p]\n", this);
}
void EventLoop::RunInLoop(const PendingCallback& cb)
{
async_->submit(cb);
}
Async事件
int uv_async_init(uv_loop_t*,
uv_async_t* async,
uv_async_cb async_cb);
nt uv_async_send(uv_async_t* async);
void uv_close(uv_handle_t* handle, uv_close_cb close_cb);
EventAsync::EventAsync(EventLoop* loop)
: loop_(loop)
{
}
EventAsync::~EventAsync()
{
}
int32_t EventAsync::Start()
{
uv_async_init(loop_->GetLoop(), &uv_async_, EventAsync::OnAsyncCb);
uv_async_.data = this;
EventRecord::Sptr record = std::make_shared<EventRecord>(EventRecord::ASYNC, &uv_async_);
loop_->GetEventRecorder()->AddRecord(record);
return 0;
}
// must be by call on loop thread
void EventAsync::Close(const AsyncCloseCallback& cb)
{
close_cb_ = cb;
uv_close((uv_handle_t*)&uv_async_, EventAsync::OnCloseCb);
return;
}
int32_t EventAsync::submit(const PendingCallback &cb)
{
{
std::lock_guard<std::mutex> lock(mutex_);
pending_cbs.push_back(cb);
}
uv_async_send(&uv_async_);
return 0;
}
void EventAsync::OnAsyncCb(uv_async_t* handle)
{
auto async = static_cast<EventAsync*>(handle->data);
if (async){
async->DoCallback();
}
return;
}
void EventAsync::DoCallback()
{
PendingCallbackList cbs;
{
std::lock_guard<std::mutex> lock(mutex_);
cbs.swap(pending_cbs);
}
while (!cbs.empty())
{
auto &func = cbs.front();
func();
cbs.pop_front();
}
return;
}
void EventAsync::OnCloseCb(uv_handle_t* handle)
{
auto async = static_cast<EventAsync*>(handle->data);
async->loop_->GetEventRecorder()->RemoveRecord(handle);
if (async) {
async->DoClose();
}
}
void EventAsync::DoClose()
{
if (close_cb_) {
close_cb_();
}
}
tcpclient
以tcpclient为例,在该loop线程中,创建一个uv_tcp_t对象进行tcp connect。
uv_tcp_init(uv_loop_t*, uv_tcp_t* handle);
uv_tcp_connect(uv_connect_t* req, uv_tcp_t* handle, const struct sockaddr* addr, uv_connect_cb cb);
uv_close(uv_handle_t* handle, uv_close_cb close_cb);
uv_con_.data = this;
uv_tcp_handle_ = std::make_shared<uv_tcp_t>();
int32_t ret = uv_tcp_init(loop_->GetLoop(), uv_tcp_handle_.get());
if(ret != 0) {
printf("TcpClient::Start uv_tcp_init failed, ret:%d\n", ret);
return -1;
}
uv_tcp_handle_->data = this;
ret = uv_tcp_connect(&uv_con_, uv_tcp_handle_.get(), peer_addr.GetSockAddr(), TcpClient::OnConnect);
if(ret != 0) {
printf("TcpClient::Start uv_tcp_connect failed, ret:%d\n", ret);
return -1;
}
timer
UV_EXTERN int uv_timer_init(uv_loop_t*, uv_timer_t* handle);
UV_EXTERN int uv_timer_start(uv_timer_t* handle,
uv_timer_cb cb,
uint64_t timeout,
uint64_t repeat);
UV_EXTERN int uv_timer_stop(uv_timer_t* handle);
UV_EXTERN int uv_timer_again(uv_timer_t* handle);
UV_EXTERN void uv_timer_set_repeat(uv_timer_t* handle, uint64_t repeat);
UV_EXTERN uint64_t uv_timer_get_repeat(const uv_timer_t* handle);
CTimer::CTimer(EventLoop::Var loop)
: loop_(loop)
{
}
CTimer::~CTimer()
{
}
int32_t CTimer::Start(int32_t timer_ms)
{
uv_timer_.data = this;
int32_t ret = uv_timer_init(loop_->GetLoop(), &uv_timer_);
if(0 != ret ) {
printf("uv_timer_init error ret=%d err:%s\n", ret, uv_strerror(ret));
return -1;
}
ret = uv_timer_start(&uv_timer_, CTimer::OnTimer, timer_ms, 0);
if(0 != ret ) {
printf("uv_timer_start error ret=%d err:%s\n", ret, uv_strerror(ret));
return -1;
}
EventRecord::Sptr record = std::make_shared<EventRecord>(EventRecord::TIMER, &uv_timer_);
loop_->GetEventRecorder()->AddRecord(record);
return 0;
}
void CTimer::Close(const CloseCompleteCallback& cb)
{
close_cb_ = cb;
uv_timer_stop(&uv_timer_);
if (uv_is_closing((uv_handle_t*)&uv_timer_) == 0) {
uv_close((uv_handle_t*)&uv_timer_, CTimer::OnClose);
} else {
HandleCloseComplete();
}
}
void CTimer::OnTimer(uv_timer_t* handle)
{
CTimer* timer = static_cast<CTimer*>(handle->data);
if(timer) {
if(timer->cb_) {
timer->cb_();
}
}
return;
}
void CTimer::OnClose(uv_handle_t* handle)
{
auto ptr = static_cast<CTimer*>(handle->data);
ptr->loop_->GetEventRecorder()->RemoveRecord(handle);
if(ptr) {
ptr->HandleCloseComplete();
}
}
void CTimer::HandleCloseComplete()
{
if(close_cb_) {
close_cb_();
}
}
三、参考代码
封装了event_loop_thread、tcpclient、定时器等操作。
可参考gitee中自己封装的代码
https://gitee.com/skyzqw/uv_warp
欢迎补充交流