libevent(十一)bufferevent filter zlib 压缩通信(一)

wuchangjian2021-11-03 18:16:36编程学习

在这里插入图片描述
main.cpp

#include <event2/event.h>
#include <event2/listener.h>
#include <string.h>
#ifndef _WIN32
#include <signal.h>
#endif
#include <iostream>
using namespace std;

int main()
{
#ifdef _WIN32 
	//初始化socket库
	WSADATA wsa;
	WSAStartup(MAKEWORD(2, 2), &wsa);
#else
	//忽略管道信号,发送数据给已关闭的socket
	if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
		return 1;
#endif

	std::cout << "test server!\n";
	//创建libevent的上下文
	event_base* base = event_base_new();
	if (base)
	{
		cout << "event_base_new success!" << endl;
	}

	void Server(event_base * base);
	Server(base);
	void Client(event_base * base);
	Client(base);
	//事件分发处理
	if (base)
		event_base_dispatch(base);
	if (base)
		event_base_free(base);
#ifdef _WIN32
	WSACleanup();
#endif
	return 0;
}

zlib_server.cpp

#include <event2/event.h>
#include <event2/listener.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <string.h>
#ifndef _WIN32
#include <signal.h>
#endif
#include <iostream>
#include <string>
using namespace std;
#define SPORT 5001


struct Status
{
	bool start = false;
	FILE* fp = 0;
};


bufferevent_filter_result filter_in(evbuffer* s, evbuffer* d, 
	ev_ssize_t limit, bufferevent_flush_mode mode, void* arg) 
{

	//cout << "server recv "<<data << endl;
	
	//1 接收客户端发送的文件名
	char data[1024] = { 0 };
	int len = evbuffer_remove(s, data, sizeof(data) - 1);
	evbuffer_add(d, data, len);
	return BEV_OK;
}


static void read_cb(bufferevent* bev, void* arg)
{
	Status* status = (Status*)arg;
	if (!status->start)
	{
		//001接收文件名
		char data[1024] = { 0 };
		bufferevent_read(bev, data, sizeof(data) - 1);
		string out = "out\\";
		out += data;

		//打开写入文件
		status->fp = fopen(out.c_str(), "wb");
		if (!status->fp)
		{
			cout << "server open " << out << " failed!" << endl;
			return;
		}

		//002 回复OK
		bufferevent_write(bev, "OK", 2);
		status->start = true;
		return;
	}

	do
	{
		//写入文件
		char data[1024] = { 0 };
		int len = bufferevent_read(bev, data, sizeof(data));
		if (len >= 0)
		{
			fwrite(data, 1, len, status->fp);
			fflush(status->fp);
		}
	} while (evbuffer_get_length(bufferevent_get_input(bev)) > 0);
}

static void event_cb(bufferevent* bev, short events, void* arg)
{
	cout << "server event_cb " << events << endl;
	Status* status = (Status*)arg;
	if (events & BEV_EVENT_EOF)
	{
		cout << "server event BEV_EVENT_EOF success!" << endl;
		if (status->fp)
		{
			fclose(status->fp);
			status->fp = 0;
		}
		bufferevent_free(bev);
	}
}


static void listen_cb(struct evconnlistener* e, evutil_socket_t s, struct sockaddr* a, int socklen, void* arg)
{
	cout << "listen_cb" << endl;
	event_base* base = (event_base*)arg;

	//1 创建一个bufferevent 用来通信
	bufferevent* bev = bufferevent_socket_new(base, s, BEV_OPT_CLOSE_ON_FREE);
	Status* status = new Status();

	//2 添加过滤 并设置输入回调
	bufferevent* bev_filter = bufferevent_filter_new(bev,
		filter_in,               // 输入过滤函数
		0,                       // 输出过滤
		BEV_OPT_CLOSE_ON_FREE,   // 关闭filter同时管理bufferevent
		0,                       // 清理回调
		status                   // 传递参数
	);

	//3 设置回调 读取 事件(处理连接断开) 
	bufferevent_setcb(bev_filter, read_cb, 0, event_cb, status);
	bufferevent_enable(bev_filter, EV_READ | EV_WRITE);
}


void Server(event_base* base)
{
	cout << "----begin Server----" << endl;
	//监听端口(socket ,bind,listen 绑定事件)

	sockaddr_in sin;
	memset(&sin, 0, sizeof(sin));
	sin.sin_family = AF_INET;
	sin.sin_port = htons(SPORT);

	evconnlistener* ev = evconnlistener_new_bind(base,    // libevent的上下文
		listen_cb,                                        // 接收到连接的回调函数
		base,                                             // 回调函数获取的参数 arg
		LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE,        // 地址重用,evconnlistener关闭同时关闭socket
		10,                                               // 连接队列大小,对应listen函数
		(sockaddr*)&sin,                                  // 绑定的地址和端口
		sizeof(sin)
	);
}

zlib_client.cpp

#include <event2/event.h>
#include <event2/listener.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <string.h>
#ifndef _WIN32
#include <signal.h>
#endif
#include <iostream>
using namespace std;
#define FILEPATH "001.txt"

struct ClientStatus
{
	FILE* fp = 0;
	bool end = false;
};

bufferevent_filter_result filter_out(evbuffer* s, evbuffer* d,
	ev_ssize_t limit, bufferevent_flush_mode mode, void* arg)
{

	//cout << "filter_out" << endl;
	char data[1024] = { 0 };
	int len = evbuffer_remove(s, data, sizeof(data));
	evbuffer_add(d, data, len);
	return BEV_OK;
}


static void client_read_cb(bufferevent* bev, void* arg)
{
	//002 接收服务端发送的OK回复
	char data[1024] = { 0 };
	int len = bufferevent_read(bev, data, sizeof(data) - 1);
	if (strcmp(data, "OK") == 0)
	{
		cout << data << endl;
		
		//开始发送文件,触发写入回调
		bufferevent_trigger(bev, EV_WRITE, 0);
	}
	else
	{
		bufferevent_free(bev);
	}
	cout << "client_read_cb " << len << endl;
}


static void client_write_cb(bufferevent* bev, void* arg)
{

	cout << "client_write_cb" << endl;
	ClientStatus* s = (ClientStatus*)arg;
	FILE* fp = s->fp;
	//判断什么时候清理资源
	if (s->end)
	{
		//判断缓冲是否有数据,如果有刷新
		//获取过滤器绑定的buffer
		bufferevent* be = bufferevent_get_underlying(bev);
		//获取输出缓冲及其大小
		evbuffer* evb = bufferevent_get_output(be);
		int len = evbuffer_get_length(evb);
		if (len <= 0)
		{
			//立刻清理 如果缓冲有数据,不会发送
			bufferevent_free(bev);
			delete s;
			return;
		}
		//刷新缓冲
		bufferevent_flush(bev, EV_WRITE, BEV_FINISHED);
		return;
	}

	if (!fp)return;
	
	//读取文件
	char data[1024] = { 0 };
	int len = fread(data, 1, sizeof(data), fp);
	if (len <= 0)
	{
		fclose(fp);
		s->end = true;
		//刷新缓冲
		bufferevent_flush(bev, EV_WRITE, BEV_FINISHED);
		return;
	}
	//发送文件
	bufferevent_write(bev, data, len);
}


static void client_event_cb(bufferevent* be, short events, void* arg)
{
	cout << "client_event_cb " << events << endl;
	if (events & BEV_EVENT_CONNECTED)
	{
		cout << "client BEV_EVENT_CONNECTED" << endl;
		//001 发送文件名
		bufferevent_write(be, FILEPATH, strlen(FILEPATH));

		//创建输出过滤
		bufferevent* bev_filter = bufferevent_filter_new(be, 0, filter_out,
			BEV_OPT_CLOSE_ON_FREE | BEV_OPT_DEFER_CALLBACKS, 0, 0);
		FILE* fp = fopen(FILEPATH, "rb");
		if (!fp)
		{
			cout << "open file " << FILEPATH << " failed!" << endl;
		}
		ClientStatus* s = new ClientStatus();
		s->fp = fp;

		//设置读取、写入和事件的回调
		bufferevent_setcb(bev_filter, client_read_cb, client_write_cb, client_event_cb, s);
		bufferevent_enable(bev_filter, EV_READ | EV_WRITE);
	}
}


void Client(event_base* base)
{
	cout << "-----begin Client-----" << endl;
	//连接服务端
	sockaddr_in sin;
	memset(&sin, 0, sizeof(sin));
	sin.sin_family = AF_INET;
	sin.sin_port = htons(5001);
	evutil_inet_pton(AF_INET, "127.0.0.1", &sin.sin_addr.s_addr);
	bufferevent* bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE);

	//只绑定事件回调,用来确认连接成功
	bufferevent_enable(bev, EV_READ | EV_WRITE);
	bufferevent_setcb(bev, 0, 0, client_event_cb, 0);

	bufferevent_socket_connect(bev, (sockaddr*)&sin, sizeof(sin));
	//接收回复确认OK
}

不知道为何,数据无法进入客户端的输出过滤回调函数,请求大佬指点,跑到如下步骤就不动了。
在这里插入图片描述

相关文章

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。