当前位置:首页 > 技术文章 > 正文内容

linux服务器开发之网关服务器的实现

arlanguage3个月前 (02-11)技术文章20

什么是网关服务器

初学linux服务器开发时,我们的服务器是很简单的,只需要一个程序完成与客户端的连接,接收客户端数据,数据处理,向客户端发送数据。
但是在处理量很大的情况下,一台机器不能满足我们的需求,此时我们应该怎么办。
我们可以将服务端的任务分摊到多台机器上完成,见下图

从图中可见,此时整个服务端主要分为了三部分。

网关服务器:负责连接客户端与逻辑服务器,在两者间完成数据转发,使用负载均衡算法保证每个逻辑服务器的工作量均衡,以及进行数据加密。

逻辑服务器:负责业务逻辑的处理,与网关服务器进行数据交互,同时与数据库服务器进行数据交互。

数据库服务器:数据存储与读取的具体执行者。

实现网关服务器需要考虑哪些问题

效率问题

当我们需要用到网关服务器来负载均衡时,我可以假定我们需要处理的客户端请求是很多的(当然,我这里只是为了学习,具体业务并不需要),也就是说我们需要高并发,高效处理。

因为网关服务器在客户端和逻辑服务器间相当于纽带的作用,所有的数据包都要从此经过,所以我们的网关服务器必须要保证可以高效的处理大量连接上的事件。

安全问题

如上所说,如果网关服务器被恶意发起连接,一旦挂掉,我们的全部服务都会终止,因此我们必须要对这种情况进行处理。同时,还有与客户端交互时的数据加密,这个事也是要交给网关服务器来进行的。逻辑服务器一般都会与网关服务器配置于同一个局域网,所以通常不需要考虑数据的加密。

对连接的标识

逻辑服务器和客户端都会连接在网关服务器上,而网关服务器需要对其sockfd进行标识,要知晓究竟谁是服务器,谁是客户端,而且要对客户端的连接加一条可检索属性(比如用户名).

为什么呢?因为对于客户端发送过来的数据,我们无论转到哪个逻辑服务器上都可以,而逻辑服务器返回的数据,我们需要知道要将该数据返回给哪个客户端,逻辑服务器并不能知道每个客户端的sockfd是多少。

下面我们着重聊聊效率问题:

多路复用

我们不会去为每个sockfd都分配一个线程去服务它,我们更需要有一个线程可以去监听所有的fd上的事件,如果发生,我们再去分配线程去处理他。这就是多路复用。

多路复用有select poll epoll,几乎凡是知道多路复用的人都知道epoll的高效。因为其底层红黑树,以及回调机制,是我们最好的选择(在大量连接,活跃量不高的情况下)。

而epoll分两种工作模式,LT和ET,LT模式下,epoll只是一个高效的poll,ET模式下会更高效。事实上众多的第三方库都使用的是LT模式,说白了就是性价比,LT已经很高效,而改用ET模式,除了效率会更高,也会给编写带来一些复杂性以及产生一些头疼的问题,而处理这些特殊情况也需要时间,处理方式不当的话反而还不如LT,所以,总而言之,性价比不高。(本人为了学习,此处使用的et模式)。

非阻塞

每个连接的sockfd,我们都有两种操作其的方式,阻塞和非阻塞,阻塞意味着我们此刻必须对sockfd进行等待,就是说我们不能去干别的事,这显然不可以。因此,在以高并发为目标的服务器程序里,非阻塞是我们唯一的选择。

并且,et模式下,必须非阻塞,不然会产生套接字饿死的情况。

非阻塞模式下,我们还需要一样东西,就是缓冲区,因为你并不能保证你接受到的数据就是完整的。

工作模式

这里使用的是多线程Reacter半同步半异步模式。

主线程负责监听以及接收新的连接,维护一个任务队列,其余线程从任务队列里获取任务并完成,同时也将新的任务添加进任务队列。

架构

总体分为以下部分

main.h

程序主线程:监听fd绑定、监听,epoll监听

Connection.h

客户端和逻辑服务器的连接的封装

实现对连接的操作:

HandleRead()读, HandleWrite()写, Worker()数据处理, shutdown()连接关闭,getData()从用户缓冲区获取数据,puttData()将数据写入用户缓冲区

ThreadPool.h

线程池的封装

SyncQueue.h

任务队列的封装

实现队列的添加取出,以及同步加锁等处理

Buffer.h

用户缓存区的封装

BaseFunc.h

基本函数的封装:如 setNoBlocking(), addFd()…

Util.h

工具类

正确性测试结果

代码

main.cpp

//

// GataMain.cpp

// QuoridorServer

//

// Created by shiyi on 2016/12/2.

// Copyright ? 2016年 shiyi. All rights reserved.

//

#include

#include

#include

#include

#include

#include

#include

#include

#include

#include

#include

#include

#include

#include "Util.h"

#include "ThreadPool.h"

#include "Connection.h"

#include "BaseFunc.h"

static const char *IP = "10.105.44.34";

// static const char *IP = "127.0.0.1";

// static const char *IP = "182.254.243.29";

static const int PORT = 11111;

//处理的最大连接数

static const int USER_PROCESS = 655536;

//epoll能监听的最大事件

static const int MAX_EVENT_NUMBER = 10000;

//信号通信的管道

static int sigPipefd[2];

//信号回调函数

static void sigHandler(int sig)

{

int saveErrno = errno;

send(sigPipefd[1], (char*)&sig, 1, 0);

errno = saveErrno;

}

//添加信号回调

static void addSig(int sig, void(handler)(int), bool restart = true)

{

struct sigaction sa;

memset(&sa, 0, sizeof(sa));

sa.sa_handler = handler;

if(restart)

sa.sa_flags |= SA_RESTART;

sigfillset(&sa.sa_mask);

if(-1 == sigaction(sig, &sa, NULL))

Util::outError("sigaction");

}

static int setupSigPipe()

{

//新建epoll监听表和事件管道

int epollfd = epoll_create(USER_PROCESS);

if(epollfd == -1)

Util::outError("epoll_create");

int ret = socketpair(PF_UNIX, SOCK_STREAM, 0, sigPipefd);

assert(ret == 0);

//将写设置为非阻塞

setNoBlocking(sigPipefd[1]);

addFd(epollfd, sigPipefd[0], EPOLLIN | EPOLLET);

setNoBlocking(sigPipefd[0]);

//设置信号处理函数

addSig(SIGCHLD, sigHandler);

addSig(SIGTERM, sigHandler);

addSig(SIGINT, sigHandler);

addSig(SIGPIPE, sigHandler);

return epollfd;

}

int main()

{

int ret;

//构造协议地址结构

struct sockaddr_in address;

bzero(&address, sizeof(address));

address.sin_family = PF_INET;

inet_pton(PF_INET, IP, &address.sin_addr);

address.sin_port = htons(PORT);

int listenfd = socket(PF_INET, SOCK_STREAM, 0);

assert( listenfd >= 0 );

int opt = 1;

if(setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, (void*)&opt, sizeof(int)) < 0)

{

perror("setsockopt");

exit(1);

}

ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));

if(ret == -1)

{

perror("bind");

}

if(listen(listenfd, 1000) < 0)

{

perror("listen");

exit(1);

}

Connection *users = new Connection[USER_PROCESS];

ThreadPool threadPool;

//统一事件源

int epollfd = setupSigPipe();

epoll_event events[MAX_EVENT_NUMBER];

// addFd(epollfd, listenfd, EPOLLIN | EPOLLET);

addFd(epollfd, listenfd, EPOLLIN);

// setNoBlocking(m_listenfd);

bool isRunning = true;

while(isRunning)

{

int num = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);

//如果错误原因不是被中断,则循环退出

if((num < 0) && (errno != EINTR))

{

Util::outError("epoll_wait failure");

break;

}

for(int i=0; i

{

int sockfd = events[i].data.fd;

//处理新的请求

if(sockfd == listenfd)

{

//连接新的请求

struct sockaddr_in clientAddr;

socklen_t clientLen = sizeof(clientAddr);

int connfd = accept(listenfd, (struct sockaddr*)&clientAddr, &clientLen);

if(connfd < 0)

{

Util::outError("accept");

break;

}

Util::outMsg("accept a new client : %d %s\n", connfd, inet_ntoa(clientAddr.sin_addr));

addFd(epollfd, connfd, EPOLLIN | EPOLLET | EPOLLONESHOT);

setNoBlocking(connfd);

//初始化客户端链接

users[connfd].init(epollfd, connfd, clientAddr);

}

//处理信号

else if((sockfd == sigPipefd[0]) && (events[i].events & EPOLLIN))

{

char sigMsg[1024];

int ret = recv(sockfd, sigMsg, sizeof(sigMsg), 0);

if(ret <= 0)

{

continue;

}

for(int j=0; j

{

//循环处理每个信号

switch(sigMsg[j])

{

case SIGCHLD:

{

break;

}

case SIGTERM:

case SIGINT:

{

//退出

Util::outMsg("程序退出\n");

isRunning = false;

break;

}

}

}

}

//处理读事件

else if(events[i].events & EPOLLIN)

{

//向任务队列添加读任务

threadPool.AddTask(std::bind(&Connection::HandleRead, users+sockfd));

}

//处理写事件

else if(events[i].events & EPOLLOUT)

{

// cout<<"hello"<

threadPool.AddTask(std::bind(&Connection::HandleWrite, users+sockfd));

}

}

}

delete[] users;

close(sigPipefd[0]);

close(sigPipefd[1]);

close(epollfd);

return 0;

}


由于篇幅有限,源码就不全部贴上来了,如果有需要可以私信我。

另外需要C/C++ Linux服务器开发学习资料私信“资料”(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),免费分享

扫描二维码推送至手机访问。

版权声明:本文由AR编程网发布,如需转载请注明出处。

本文链接:http://www.arlanguage.com/post/2448.html

标签: nginx bind
分享给朋友:

“linux服务器开发之网关服务器的实现” 的相关文章

Linux 安装 Alist 个人云盘 alpine linux安装

1. 简介云服务器有比较大的空间,想自己搭建一个个人云盘,做大文件的转存。其他百度云盘等的下载速度感人,不想充值会员。查询了一轮之后 Alist,Nextcloud,Cloudreve,ownCloud,Seafile等等看到了不少,但是很多都需要各种配置安装。要么是通过宝塔可以快速安装,但是我的服...

PHP日志记录

背景在生产环境中日志的重要性显而易见,能快速定位问题和程序的调优。在LNMP架构中怎么记录好程序中的错误日志。设置error_log记录PHP日志信息#将会向PHP报告发生的每个错误 error_reporting = E_ALL #关闭页面显示才能将错误回写到日志文件 display_err...

服务器排障nginx 499 错误地解决

问题描述:近期平台对外开放了数据查询接口,在数据量特别大时,返回结果时间可能会超过3秒,接口开放后,系统本身调用没有问题,其他第三方平台接入时,总会报链接超时问题;问题原因:查看tomcat日志无任何错误,一开始以为是tomcat接收参数最大限制问题,对tomcat做了一次整体优化,修改连接数、修改...

网站加载慢?让你的网站腾飞起来,LiteSpeed部署

说起LiteSpeed 还得说起我的小站,<开心洋葱网>,虽然流量不大,但是访问是真的慢,无奈经费有限,那就只能在服务器加速上动起心思来。之前一直听说 QUIC 访问网站的速度会让你感觉飞起来,那就搞下吧。我们先来看下 QUIC 、LiteSpeed都是些做什么的?QUIC(Quick...

路由虚拟服务器nginx转发400问题

背景:关于网络和硬件:旧服务基本是在公有云服务器或提供公网IP的服务器操作的。关于服务:后端jar服务运行,前端vue生成的包并通过nginx转发。以前的nginx配置文件:location / { root /data/project/web/; try...

基于.NetCore开发博客项目 StarBlog - (31) 发布和部署【领红包封面】

前言StarBlog 第一期规划的功能基本完成了,我想着在春节前应该可以把第一期的系列文章完结掉,那么在差缺补漏阶段就剩下开发项目的最后一个环节——部署了。?PS: 事实上,还有一个很重要但又经常被略过的测试环节我们没有提到,因为时间关系,第一期规划我没有写单元测试和集成测试,在开发中,测试环节是必...