第14章 多播与广播
14.1 多播
多播(Multicast)方式的数据传输是基于 UDP 完成的。因此 ,与 UDP 服务器端/客户端的实现方式非常接近。区别在于,UDP 数据传输以单一目标进行,而多播数据同时传递到加入(注册)特定组的大量主机。换言之,采用多播方式时,可以同时向多个主机传递数据。
14.1.1 多播的数据传输方式及流量方面的优点
多播的数据特点可整理如下:
- 多播服务器端针对特定多播组,只发送 1 次数据
- 即使只发送 1 次数据,但该组内的所有客户端都会接收数据
- 多播组数可以在 IP 地址范围内任意增加
- 加入特定组即可接收发往该多播组的数据
多播组是 D 类IP地址(224.0.0.0~239.255.255.255),**「加入多播组」**可以理解为通过程序完成如下声明:
在 D 类IP地址中,我希望接收发往目标 239.234.218.234 的多播数据
多播是基于 UDP 完成的,也就是说,多播数据包的格式与 UDP 数据包相同。只是与一般的 UDP 数据包不同。向网络传递 1 个多播数据包时,路由器将复制该数据包并传递到多个主机。像这样,多播需要借助路由器完成。如图所示:
若通过TCP或UDP向1000个主机发送文件,则共需要传递1000次。即便将10台主机合为1个网络,使99%的传输路径相同的情况下也是如此。但此时若使用多播方式传输文件,则只需发送1次 (一个区域只需要发送一个数据包,然后由该区域的路由器转发给其他主机
)。这时由1000台主机构成的网络中的路由器负责复制文件并传递到主机。就因为这种特性,多播主要用于“多媒体数据的实时传输”。
另外,理论上可以完成多播通信,但是不少路由器并不支持多播,或即便支持也因网络拥堵问题故意阻断多播。因此,为了在不支持多播的路由器中完成多播通信,也会使用隧道(Tunneling)技术(多播程序员不考虑)。
14.1.2 路由(Routing)和 TTL(Time to Live,生存时间),以及加入组的办法
接下来讨论多播相关编程方法。为了传递多播数据包,必须设置TTL。TTL 是Time to Live的简写,是决定”数据包传递距离“的主要因素。TTL用整数表示,并且每经过1个路由器就减1。TTL变为0时,该数据包无法再被传递,只能销毁。因此,TTL的值设置过大将影响网络流量。当然,设置过小也会无法传递到目标,需要引起注意。
接下来是 TTL 的设置方法。TTL 是可以通过第9章的套接字可选项完成的。与设置 TTL 相关的协议层为IPPROTO_IP
,选项名为 IP_MULTICAST_TTL
。因此,可以用如下代码把 TTL 设置为 64:
int send_sock;
int time_live = 64;
...
send_sock = socket(PF_INET, SOCK_DGRAM, 0);
setsockopt(send_sock, IPPROTO_IP, IP_MULTICAST_TTL, (void*)&time_live, sizeof(time_live);
...
加入多播组也通过设置设置套接字可选项来完成。加入多播组相关的协议层为 IPPROTO_IP
,选项名为IP_ADD_MEMBERSHIP
。可通过如下代码加入多播组:
int recv_sock;
struct ip_mreq join_adr;
...
recv_sock = socket(PF_INET, SOCK_DGRAM, 0);
...
join_adr.imr_multiaddr.s_addr = "多播组地址信息";
join_adr.imr_interface.s_addr = "加入多播组的主机地址信息";
setsockopt(recv_sock, IPPROTO_IP, IP_ADD_MEMBERSHIP. (void*)&join_adr, sizeof(join_adr);
...
ip_mreq
结构体如下所示:
struct ip_mreq {
struct in_addr imr_multiaddr;
struct in_addr imr_interface;
}
第3章讲过in_addr
结构体,因此只介绍结构体成员。首先,第一个成员imr_multiaddr
中写入加入的组IP地址。第二个成员imr_interface
是加入该组的套接字所属主机的IP地址,也可使用INADDR_ANY
。
14.1.3 实现多播 Sender 和 Receiver
多播中用“发送者”(以下称为Sender )和“接受者”(以下称为Receiver)替代服务器端和客户端。顾名思义,此处的Sender是多播数据的发送主体,Receiver是需要多播组加入过程的数据接收主体。下面讨论即将给出的示例,该示例的运行场景如下:
- Sender : 向 AAA 组广播(Broadcasting)文件中保存的新闻信息
- Receiver : 接收传递到 AAA 组的新闻信息。
接下来是代码示例,Sender比Receiver简单,因为Receiver需要经过加入组的过程,Sender只需创建UDP套接字,并向多播地址发送数据。
// news_sender.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <string.h>
#define TTL 64
#define BUF_SIZE 30
void error_handling(char* message);
int main(int argc, char* argv[]) {
int send_sock;
struct sockaddr_in mul_adr;
int time_live = TTL;
FILE *fp;
char buf[BUF_SIZE];
if (argc != 3) {
printf ("Usage : %s <GroupIP> <PORT> \n", argv[0]);
exit(1);
}
send_sock = socket(PF_INET, SOCK_DGRAM, 0);
if (send_sock == -1) error_handling("sock() error");
memset(&mul_adr, 0, sizeof(mul_adr));
mul_adr.sin_family = AF_INET;
mul_adr.sin_addr.s_addr = inet_addr(argv[1]); // Multicast IP
mul_adr.sin_port = htons(atoi(argv[2])); // Multicast Port
setsockopt(send_sock, IPPROTO_IP, IP_MULTICAST_TTL, (void*)&time_live, sizeof(time_live));
if ((fp = fopen("news.txt", "r")) == NULL)
error_handling("fopen() error");
while (!feof(fp)) { // Broadcasting 判断文件是否结束
fgets(buf, BUF_SIZE, fp);
sendto(send_sock, buf, strlen(buf), 0, (struct sockaddr*)&mul_adr, sizeof(mul_adr));
sleep(2);
}
fclose(fp);
close(send_sock);
return 0;
}
void error_handling(char* message) {
fputs(message, stderr);
fputc('\n', stderr);
exit(1);
}
设置传输数据的目标地址信息。重要的是,必须将IP地址设置为多播地址。实际传输数据的区域。基于UDP套接字传输数据,因此需要利用sendto函数。另外,第40行的sleep
函数调用主要是为了给传输数据提供一定的时间间隔而添加的,没有其他特殊意义。
下面是Receiver
的程序:
// news_sender.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <string.h>
#define BUF_SIZE 30
void error_handling(char* message);
int main(int argc, char* argv[]) {
int recv_sock;
struct sockaddr_in adr;
int str_len;
char buf[BUF_SIZE];
struct ip_mreq join_adr;
if (argc != 3) {
printf("Usage : %s <GroupIp> <IP> \n", argv[0]);
exit(1);
}
recv_sock = socket(PF_INET, SOCK_DGRAM, 0);
if (recv_sock = -1) error_handling("sock() error");
memset(&adr, 0, sizeof(adr));
adr.sin_family = AF_INET;
adr.sin_addr.s_addr = inet_addr(argv[1]);
adr.sin_port = htons(atoi(argv[2]));
if (bind(recv_sock, (struct sockaddr*)&adr, sizeof(adr)) == -1)
error_handling("bind() error");
join_adr.imr_multiaddr.s_addr = inet_addr(argv[1]);
join_adr.imr_interface.s_addr = htonl(INADDR_ANY);
setsockopt(recv_sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, (void*)&join_adr, sizeof(join_adr));
while (1) {
str_len = recvfrom(recv_sock, buf, BUF_SIZE - 1, 0, NULL, 0);
if (str_len < 0)
break;
buf[str_len] = 0;
fputs(buf, stdout);
}
close (recv_sock);
return 0;
}
void error_handling(char* message) {
fputs(message, stderr);
fputc('\n', stderr);
exit(1);
}
初始化结构体ip_mreg
变量。第26行初始化多播组地址,第27行初始化待加入主机的IP地址。(使用INADDR_ANY
)
利用套接字选项IP_ADD_MEMBERSHIP
加入多播组。至此完成了接收第26行指定的多播组数据的所有准备。
通过调用recvfrom函数接收多播数据。如果不需要知道传输数据的主机地址信息,可以向recvfrom函数的第五个和第六个参数分别传递NULL和0。
编译运行:文章来源:https://www.toymoban.com/news/detail-437888.html
gcc news_sender.c -o sender
gcc news_receiver.c -o receiver
./sender 224.1.1.2 9190
./receiver 224.1.1.2 9190
结果:这里跑不通,原因也暂时还没找到。
14.2 广播
广播(Broadcast)在「一次性向多个主机发送数据」这一点上与多播类似,但传输数据的范围有区别。多播即使在跨越不同网络的情况下,只要加入多播组就能接受数据。相反,广播只能向同一网络中的主机传输数据。
14.2.1 广播的理解及实现方法
广播是向同一网络中的所有主机传输数据的方法。与多播相同,广播也是通过 UDP 来完成的。根据传输数据时使用的IP地址形式,广播分为以下两种:
- 直接广播(Directed Broadcast)
- 本地广播(Local Broadcast)
二者在实现上的差别主要在于IP地址。直接广播的IP地址中除了网络地址外,其余主机地址全部设置成1。例如,希望向网络地址 192.12.34 中的所有主机传输数据时,可以向 192.12.34.255 传输。换言之,可以采取直接广播的方式向特定区域内所有主机传输数据。
数据通信中使用的IP地址是与 UDP 示例的唯一区别。默认生成的套接字会阻止广播,因此,只需通过如下代码更改默认设置。
int send_sock;
int bcast = 1; // 对变量进行初始化以将SO_BROADCAST选项信息改为1。
...
send_sock=socket(PF_INET,SOCK_DGRAM,0);
...
setsockopt(send_sock, SOL_SOCKET, SO_BROADCAST, (void*)&bcast, sizeof(bcast));
...
调用setsockopt函数,将SO_BROADCAST
选项设置为bcast变量中的值1。这意味着可以进行数据广播。当然,上述套接字选项只需在Sender中更改,Receiver的实现不需要该过程。
14.2.2 实现广播数据的Sender和Receiver
下面首先实现广播的Sender。
// news_sender_brd.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#define TTL 64
#define BUF_SIZE 30
void error_handling(char *message);
int main(int argc, char *argv[])
{
int send_sock;
struct sockaddr_in broad_adr;
int so_brd = 1;
FILE *fp;
char buf[BUF_SIZE];
if (argc != 3)
{
printf("Usage : %s <GroupIP> <PORT>\n", argv[0]);
exit(1);
}
send_sock = socket(PF_INET, SOCK_DGRAM, 0); //创建 UDP 套接字
memset(&broad_adr, 0, sizeof(broad_adr));
broad_adr.sin_family = AF_INET;
broad_adr.sin_addr.s_addr = inet_addr(argv[1]); //必须将IP地址设置为多播地址
broad_adr.sin_port = htons(atoi(argv[2]));
//指定套接字中 TTL 的信息
setsockopt(send_sock, SOL_SOCKET, SO_BROADCAST, (void *)&so_brd, sizeof(so_brd));
if ((fp = fopen("news.txt", "r")) == NULL)
error_handling("fopen() error");
while (!feof(fp)) //如果文件没结束就返回0
{
fgets(buf, BUF_SIZE, fp);
sendto(send_sock, buf, strlen(buf), 0, (struct sockaddr *)&broad_adr, sizeof(broad_adr));
sleep(2);
}
fclose(fp);
close(send_sock);
return 0;
}
void error_handling(char *message)
{
fputs(message, stderr);
fputc('\n', stderr);
exit(1);
}
与多播不同的是,广播的Receiver中就不需要setsockopt了。
// news_receiver_brd.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#define BUF_SIZE 30
void error_handling(char *message);
int main(int argc, char* argv[]) {
int recv_sock;
struct sockaddr_in adr;
int str_len;
char buf[BUF_SIZE];
if (argc != 2) {
printf("Usage : %s <Port> \n", argv[0]);
exit(1);
}
recv_sock = socket(PF_INET, SOCK_DGRAM, 0);
memset(&adr, 0, sizeof(adr));
adr.sin_family = AF_INET;
adr.sin_addr.s_addr = htonl(INADDR_ANY);
adr.sin_port = htons(atoi(argv[1]));
if (bind(recv_sock, (struct sockaddr*)&adr, sizeof(adr)) == -1)
error_handling("bind() error");
while (1) {
str_len = recvfrom(recv_sock, buf, BUF_SIZE - 1, 0, NULL, 0);
if (str_len < 0) break;
buf[str_len] = 0;
fputs(buf, stdout);
}
close(recv_sock);
return 0;
}
void error_handling(char* message){
fputs(message, stderr);
fputc('\n', stderr);
exit(1);
}
编译运行:
gcc news_receiver_brd.c -o receiver
gcc news_sender_brd.c -o sender
./sender 255.255.255.255 9190
./receiver 9190
结果:
与多播的情况一样,不出来结果也不知道是为什么。先暂时放着,之后再来看看。
14.3 基于Windows的实现
暂略。
14.4 习题
第15章 套接字和标准I/O
我们之前采用的都是默认数据通信手段read & write函数以及各种系统I/O函数,这一章我们来学习使用C语言的标准I/O函数。
15.1 标准I/O的优点
15.1.1 标准I/O函数的两个优点
标准I/O函数的两大优点:
- 标准 I/O 函数具有良好的移植性
- 标准 I/O 函数可以利用缓冲提高性能
创建套接字时,操作系统会准备 I/O 缓冲。此缓冲在执行 TCP 协议时发挥着非常重要的作用。此时若使用标准 I/O 函数,将得到额外的缓冲支持。如下图:
从图中可以看到,使用标准I/O函数传输数据时,会经过两个缓冲。假设使用 fputs
函数进行传输字符串 「Hello」
时,首先将数据传递到标准 I/O 缓冲,然后将数据移动到套接字输出缓冲,最后将字符串发送到对方主机。
设置缓冲的主要目的是为了提高性能。从以下两点可以说明性能的提高:
- 传输的数据量
- 数据向输出缓冲移动的次数
比较 1 个字节的数据发送 10 次的情况和 10 个数据包发送 1 次的情况。发送数据时,数据包中含有头信息。头信与数据大小无关,是按照一定的格式填入的。假设头信息占 40 个字节,需要传输的数据量也存在较大区别:
- 1个字节 10次 40×10=400字节
- 10个字节 1次 40×1=40字节
另外,为了发送数据,向套接字输出缓冲移动数据也会消耗不少时间。但这同样与移动次数有关。1个字节数据共移动10次花费的时间将近10个字节数据移动1次花费时间的10倍。
15.1.2 标准I/O函数和系统函数之间的性能对比
接下来分别利用标准I/O函数和系统函数编写文件复制程序,检验缓冲提高性能的程度。首先是使用系统函数复制文件的示例:
// syscpy.c
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>
#include <time.h>
#define BUF_SIZE 3 // 用最短数组长度构成
int main (int argc, char* argv[]) {
int fd1, fd2; // 文件描述符
int len;
char buf[BUF_SIZE];
clock_t start, end;
double cpu_time_used;
fd1 = open("news.txt", O_RDONLY);
fd2 = open("cpy.txt", O_WRONLY | O_CREAT|O_TRUNC);
start = clock(); // 记录开始时间
while (len = read(fd1, buf, sizeof(buf)) > 0)
write(fd2, buf, len);
end = clock(); // 记录结束时间
cpu_time_used = ((double) (end - start)) / CLOCKS_PER_SEC; // 计算用时(秒)
printf("程序用时:%f 秒。\n", cpu_time_used);
close(fd1);
close(fd2);
return 0;
}
然后是标准I/O的复制:
// stdcpy.c
#include <stdio.h>
#include <time.h>
#define BUF_SIZE 3
int main(int argc, char* argv[]) {
FILE * fp1; // 保存在fp1中的是FILE结构体指针
FILE * fp2;
char buf[BUF_SIZE];
clock_t start, end;
double cpu_time_used;
fp1 = fopen("news.txt", "r");
fp2 = fopen("cpy.txt", "w");
start = clock(); // 记录开始时间
while (fgets(buf, BUF_SIZE, fp1) != NULL)
fputs(buf, fp2);
end = clock(); // 记录结束时间
cpu_time_used = ((double) (end - start)) / CLOCKS_PER_SEC; // 计算用时(秒)
printf("程序用时:%f 秒。\n", cpu_time_used);
fclose(fp1);
fclose(fp2);
return 0;
}
编译运行:
gcc syscpy.c -o syscpy
gcc stdcpy.c -o stdcpy
./syscpy
./stdcpy
结果:
根据结果来看,这速度已经不用多分析了。
15.1.3 标准I/O函数的几个缺点
标准 I/O 函数存在以下几个缺点:
- 不容易进行双向通信
- 有时可能频繁调用
fflush
函数 - 需要以
FILE
结构体指针的形式返回文件描述符
15.2 使用标准I/O函数
如前所述,创建套接字时返回文件描述符,而为了使用标准I/O函数,只能将其转换为FILE结构体指针。先介绍其转换方法。
15.2.1 利用fdopen函数转换为FILE结构体指针
函数原型如下:
#include <stdio.h>
FILE *fdopen(int fildes, const char * mode);
/*
成功时返回转换的 FILE 结构体指针,失败时返回 NULL
fildes : 需要转换的文件描述符
mode : 将要创建的 FILE 结构体指针的模式信息
*/
上述函数的第二个参数与fopen函数中的打开模式相同。常用的参数有读模式 “r” 和写模式 “w”。下面通过简单示例给出上述函数的使用方法。
#include <stdio.h>
#include <fcntl.h>
int main(void) {
FILE * fp;
int fd = open("data.dat", O_WRONLY | O_CREAT|O_TRUNC);
if (fd == -1) {
fputs("file open error", stdout);
return -1;
}
fp = fdopen(fd, "w");
fputs("Network C progamming \n", fp);
fclose(fp);
return 0;
}
调用fdopen函数将文件描述符转换为FILE指针,此时第二个参数传递了“w“,因此返回写模式的FILE指针。然后调用标准输出函数fputs。使用fclose关闭fp之后,不需要再使用close关闭文件描述符,没有意义了。
编译运行:
gcc desto.c -o desto
./desto
cat data.dat
结果:
此示例中需要注意的是,文件描述符转换为FILE指针,并可以通过该指针调用标准I/O函数。
15.2.2 利用fileno函数转换为文件描述符
接下来介绍与fdopen函数提供相反功能的函数,该函数在有些情况下非常有用。
#include <stdio.h>
int fileno(FILE * stream);
// 成功时返回转换后的文件描述符,失败时返回-1
下面是示例:
// todes.c
#include <stdio.h>
#include <fcntl.h>
int main(void) {
FILE * fp;
int fd = open("data.dat", O_WRONLY | O_CREAT|O_TRUNC);
if (fd == -1) {
fputs("file open error", stdout);
return -1;
}
printf("First file descriptor: %d \n", fd);
fp = fdopen(fd, "w");
fputs("TCP/IP SOCKET PROGRAMMING \n", fp);
printf("Second file descriptor: %d \n", fileno(fp));
fclose(fp);
return 0;
}
结果:
两次输出的文件描述符值相同,证明fileno函数正确转换了文件描述符。
15.3 基于套接字的标准I/O函数使用
把第四章的回声客户端的代码改成基于标准I/O函数进行数据交换的形式。
服务端的代码如下:
// echo_stdserv.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#define BUF_SIZE 30
void error_handling(char* message);
int main(int argc, char* argv[]) {
int serv_sock, clnt_sock;
struct sockaddr_in serv_adr, clnt_adr;
socklen_t clnt_adr_sz;
int str_len, i;
char message[BUF_SIZE];
FILE * readfp;
FILE * writefp;
if (argc != 2) {
printf("Usage : %s <Port> \n", argv[0]);
exit(1);
}
serv_sock = socket(PF_INET, SOCK_STREAM, 0);
if (serv_sock == -1) error_handling("sock() error");
memset(&serv_adr, 0, sizeof(serv_adr));
serv_adr.sin_family = AF_INET;
serv_adr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_adr.sin_port = htons(atoi(argv[1]));
if (bind(serv_sock, (struct sockaddr*)&serv_adr, sizeof(serv_adr)) == -1)
error_handling("bind() error");
if (listen(serv_sock, 5) == -1)
error_handling("listen() error");
clnt_adr_sz = sizeof(clnt_adr);
for(i = 0; i < 5; i++) {
clnt_sock = accept(serv_sock, (struct sockaddr*)&clnt_adr, &clnt_adr_sz);
if (clnt_sock == -1) error_handling("accept() error");
else printf("Connected client %d \n", i + 1);
readfp = fdopen(clnt_sock, "R");
writefp = fdopen(clnt_sock, "w");
while (!feof(readfp)) { // 这里就使用的是feof来判断是否结束了
fgets(message, BUF_SIZE, readfp);
fputs(message, writefp);
fflush(writefp);
}
fclose(readfp);
fclose(writefp);
}
close(serv_sock);
return 0;
}
void error_handling(char* message) {
fputs(message, stderr);
fputc('\n', stderr);
}
调用基于字符串的fgets、fputs函数提供服务,并在第50行调用fflush函数。标准IO函数为了提高性能,内部提供额外的缓冲。因此,若不调用fflush函数则无法保证立即将数据传输到客户端 (这里直接记下来即可)。接下来给出回声客户端代码。
// echo_stdclient.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#define BUF_SIZE 30
void error_handling(char* message);
int main(int argc, char* argv[]) {
int sock;
char message[BUF_SIZE];
int str_len;
struct sockaddr_in serv_adr;
FILE * readfp;
FILE * writefp;
if (argc != 3) {
printf("Usage : %s <IP> <Port> \n", argv[0]);
exit(1);
}
sock = socket(PF_INET, SOCK_STREAM, 0);
if (sock == -1) error_handling("sock() error");
memset(&serv_adr, 0, sizeof(serv_adr));
serv_adr.sin_family = AF_INET;
serv_adr.sin_addr.s_addr = inet_addr(argv[1]);
serv_adr.sin_port = htons(atoi(argv[2]));
if (connect(sock, (struct sockaddr*)&serv_adr, sizeof(serv_adr)) == -1)
error_handling("connect() error");
else puts("Connetced......");
readfp = fdopen(sock, "r");
writefp = fdopen(sock, "w");
while (1) {
fputs("Input message(Q to quit): ", stdout);
fgets(message, BUF_SIZE, stdin);
if (!strcmp(message, "q\n") || !strcmp(message, "Q\n"))
break;
fputs(message, writefp);
fflush(writefp);
fgets(message, BUF_SIZE, readfp);
printf("Message from server: %s", message);
}
fclose(readfp);
fclose(readfp);
return 0;
}
void error_handling(char* message) {
fputs(message, stderr);
fputc('\n', stderr);
exit(1);
}
这里收到message之后没有手动在末尾添加0,这是因为fgets会自己添加。
编译运行:
gcc echo_client.c -o estdclient
gcc echo_stdserv.c -o estdserv
./estdserv 9190
./estdclient 127.0.0.1 9190
结果:
结果和之前的回声服务器的结果是一样的。
以上就是标准IO函数在套接字编程中的应用方法,因为需要编写额外的代码,所以并不像想象中那么常用。但某些情况下也是非常有用的,而且可以再次复习标准IO函数。
15.4 习题
第16章 关于I/O流分离的其他内容
调用fopen函数打开文件后可以与文件交换数据,因此说调用fopen函数后创建了“流”( Stream )。此处的“流”是指“数据流动”,但通常可以比喻为“以数据收发为目的的一种桥梁”。我们一般将“流”理解为数据收发路径。
16.1 分离I/O流
「分离 I/O 流」是一种常用表达。有 I/O 工具可区分二者,无论采用哪种方法,都可以认为是分离了I/O 流。
16.1.1 2次I/O流分离
之前有两种分离方法:
- 第一种是第 10 章的**「TCP I/O 过程分离」。通过调用 fork 函数复制出一个文件描述符,以区分输入和输出中使用的文件描述符。虽然文件描述符本身不会根据输入和输出进行区分,但我们分开了2 个文件描述符的用途**,因此,这也属于「流」的分离。
- 第二种分离是在第 15 章。通过 2 次
fdopen
函数的调用,创建读模式 FILE 指针(FILE 结构体指针)和写模式 FILE 指针。换言之,我们分离了输入工具和输出工具,因此也可视为「流」的分离。下面是分离的理由。
16.1.2 分离流的好处
两次分离流的目的是不一样的,首先分析第10章“流”分离的目的:
- 通过分开输入过程(代码)和输出过程降低实现难度
- 与输入无关的输出操作可以提高速度
然后是第15章”流“分离的目的:
- 为了将 FILE 指针按读模式和写模式加以区分
- 可以通过区分读写模式降低实现难度
- 通过区分 I/O 缓冲提高缓冲性能
16.1.3 “流”分离带来的EOF问题
第 7 章介绍过 EOF 的传递方法和半关闭的必要性。有一个语句:
shutdown(sock, SHUT_WR);
当时讲过调用shutdown函数的基于半关闭的EOF传递方法。第10章还利用这些技术在echo_mpclient.c
示例中添加了半关闭相关代码。也就是说,第10章的“流”分离没有问题。但第15章的基于fdopen
函数的“流”则不同,我们还不知道在这种情况下如何进行半关闭。一个常见的错误就是我们直接使用是不是可以针对输出模式的FLIE指针调用fclose函数呢? 这样可以向对方传递EOF,变成可以接收数据但无法发送数据的半关闭状态呀。我们可以来验证一下,下面是服务器端的代码:
// sep_serv.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#define BUF_SIZE 1024
int main(int argc, char* argv[]) {
int serv_sock, clnt_sock;
FILE * readfp;
FILE * writefp;
struct sockaddr_in serv_adr, clnt_adr;
socklen_t clnt_adr_sz;
char buf[BUF_SIZE];
serv_sock = socket(PF_INET, SOCK_STREAM, 0);
memset(&serv_adr, 0, sizeof(serv_adr));
serv_adr.sin_family = AF_INET;
serv_adr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_adr.sin_port = htons(atoi(argv[1]));
bind(serv_sock, (struct sockaddr*)&serv_adr, sizeof(serv_adr));
listen(serv_sock, 5);
clnt_adr_sz = sizeof(clnt_adr);
clnt_sock = accept(serv_sock, (struct sockaddr*)&clnt_adr, &clnt_adr_sz);
readfp = fdopen(clnt_sock, "r");
writefp = fdopen(clnt_sock, "w");
fputs("FROM SERVER: HI~ client? \n", writefp);
fputs("I love all of the world \n", writefp);
fputs("You are awesome! \n", writefp);
fflush(writefp);
fclose(writefp);
fgets(buf, sizeof(buf), readfp);
fputs(buf, stdout);
fclose(readfp);
return 0;
}
首先通过writefp向客户端发送消息,然后调用fflush函数。调用fclose函数终止套接字时,对方主机将收到EOF。但是后面还有fgets函数接收客户端的消息,这一部分怎么理解呢?我们接着往后看。以下是客户端的代码:
// sep_clnt.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#define BUF_SIZE 1024
int main(int argc, char* argv[]) {
int sock;
FILE * readfp;
FILE * writefp;
struct sockaddr_in serv_adr;
char buf[BUF_SIZE];
if (argc != 3){
printf("Usage : %s <IP> <port>\n", argv[0]);
exit(1);
}
sock = socket(PF_INET, SOCK_STREAM, 0);
memset(&serv_adr, 0, sizeof(serv_adr));
serv_adr.sin_family = AF_INET;
serv_adr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_adr.sin_port = htons(atoi(argv[1]));
connect(sock, (struct sockaddr *)&serv_adr, sizeof(serv_adr));
readfp = fdopen(sock, "r");
writefp = fdopen(sock, "w");
while (1) {
if (fgets(buf, sizeof(buf), readfp) == NULL) break;
fputs(buf, stdout);
fflush(stdout);
}
fputs("FROM CLIENT: Thank you! \n", writefp);
fflush(writefp);
fclose(readfp);
fclose(writefp);
return 0;
}
当服务器端关闭后,会向客户端发送EOF,当客户端收到EOF之后,fgets为NULL,所以会退出循环,然后在收到EOF之后,再通过fputs向服务器端发送信息。这是我们猜想的结果,但是是这样的吗?
编译运行:
gcc sep_clnt.c -o clnt
gcc sep_serv.c -o serv
./serv 9190
./clnt 127.0.0.1 9190
结果:
服务器端:
客户端:
可以看到,结果和我们预想的不太一样,服务器端并没有收到最后的信息!!!
很容易判断其原因: sep_serv.c
示例的第38行调用的fclose函数完全终止了套接字,而不是半关闭。以上就是需要通过本章解决的问题。半关闭在多种情况下都非常有用,我们必须能够针对fdopen函数调用时生成的FILE指针进行半关闭操作。(这是需要学习的!)
16.2 文件描述符的复制和关闭
16.2.1 终止“流”时无法半关闭的原因
下图描述的是sep_serv.c
中2个FILE指针、文件描述符以及套接字之间的关系:
从图中可以明显看出,对任意一个FILE指针调用fclose函数都会关闭文件描述符,也就是会终止套接字,如图所示:
那么针对上述问题,如何实现半关闭呢?其实很简单,创建FILE指针前先复制文件描述符即可,示意图如下图所示:
这里补充一下一些知识:
- 套接字属于操作系统的范畴,第十章使用fork创建多进程的时候,套接字并没有被复制,复制的只是套接字的文件描述符。所以父进程和子进程是共享套接字的。也就是说一个套接字是可以有多个文件描述符的。
- 如果套接字对应有多个描述符,那就需要把所有的文件描述符都关闭,才能关闭套接字。
- 第十一章提到的管道也属于操作系统的范畴,所以进入子进程之后,需要先关闭父进程的相应文件描述符,才能保证最终完全关闭套接字。
所以我们这里是可以给套接字复制一份文件描述符的,需要把所有的文件描述符关闭,才能关闭套接字。这就为半关闭准备好了环境。
也就是说,针对写模式FILE指针调用fclose函数时,只能销毁与该FILE指针相关的文件描述符,无法销毁套接字,参考下图:
如图所示,调用fclose函数后还剩1个文件描述符,因此没有销毁套接字。那此时的状态是否为半关闭状态?不是!!!
上图讲过,只是准备好了半关闭环境。要进入真正的半关闭状态需要特殊处理。(还需要进行特殊的处理
)
仔细观察,还剩下一个文件描述符。而该文件描述符可以同时进行 I/O 。因此,不但没有发送 EOF ,而且仍然可以利用文件描述
符进行输出。(所以我们后面的改进就是需要人为发送EOF)
16.2.2 复制文件描述符
之前提到的文件描述符的复制与fork函数中进行的复制有所区别。调用fork函数时将复制整个进程,因此同一进程内不能同时有原件和副本。但此处讨论的复制并非针对整个进程,而是在同一进程内完成描述符的复制,如图所示:
复制完成后,两个文件描述符都可以访问文件,但是编号不同。”复制“具有如下含义:
“为了访问同一文件或套接字,创建另一个文件描述符。”
16.2.3 dup和dup2
复制文件描述符的方法,通过下列函数完成:
#include <unistd.h>
int dup(int fildes);
int dup2(int fildes, int fildes2);
/*
成功时返回复制的文件描述符,失败时返回 -1
fildes : 需要复制的文件描述符
fildes2 : 明确指定的文件描述符的整数值。
*/
下面通过示例来验证dup的功能:
// dup.c
#include <stdio.h>
#include <unistd.h>
int main(int argc, char* argv[]) {
int cfd1, cfd2;
char str1[] = "Hi~ \n";
char str2[] = "It's a nice day~ \n";
cfd1 = dup(1);
cfd2 = dup2(cfd2, 7);
printf("fd1 = %d, fd2 = %d \n", cfd1, cfd2);
write(cfd1, str1, sizeof(str1));
write(cfd2, str2, sizeof(str2));
close(cfd1);
close(cfd2);
write(1, str1, sizeof(str1));
close(1);
write(1, str2, sizeof(str2));
return 0;
}
将文件描述符1复制了两次,分别是3和7,然后进行测试,由于所有的都关闭了,最后一次write无法执行。
编译运行:
gcc dup.c -o dup
./dup
结果:
16.2.4 复制文件描述符后“流”的分离
下面更改sep_serv.c和sep_cInt.c示例,使其能够正常工作(只需更改sep_serv.c示例)。所谓“正常工作”是指,通过服务器端的半关闭状态接收客户端最后发送的字符串。当然,为了完成这一任务,服务器端需要同时发送EOF。下面给出示例:
// sep_serv2.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#define BUF_SIZE 1024
int main(int argc, char* argv[]) {
int serv_sock, clnt_sock;
FILE * readfp;
FILE * writefp;
struct sockaddr_in serv_adr, clnt_adr;
socklen_t clnt_adr_sz;
char buf[BUF_SIZE] = {0,};
serv_sock = socket(PF_INET, SOCK_STREAM, 0);
memset(&serv_adr, 0, sizeof(serv_adr));
serv_adr.sin_family = AF_INET;
serv_adr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_adr.sin_port = htons(atoi(argv[1]));
bind(serv_sock, (struct sockaddr*)&serv_adr, sizeof(serv_adr));
listen(serv_sock, 5);
clnt_adr_sz = sizeof(clnt_adr);
clnt_sock = accept(serv_sock, (struct sockaddr*)&clnt_adr, &clnt_adr_sz);
readfp = fdopen(clnt_sock, "r");
writefp = fdopen(dup(clnt_sock), "w"); // 使用dup复制FILE指针
fputs("FROM SERVER: HI~ client? \n", writefp);
fputs("I love all of the world \n", writefp);
fputs("You are awesome! \n", writefp);
fflush(writefp);
shutdown(fileno(writefp), SHUT_WR); // 使用shutdown函数发送EOF
fclose(writefp);
fgets(buf, sizeof(buf), readfp);
fputs(buf, stdout);
fclose(readfp);
return 0;
}
这个代码可以配合sep_clnt.c
使用,运行结果为:
服务器端收到了Thank you!
,说明确实是处于半关闭的状态,但是还能收到来自客户端的数据。
16.3 习题
第17章 优于select的epoll
17.1 epoll理解与应用
select 复用方法由来已久,因此,利用该技术后,无论如何优化程序性能也无法同时接入上百个客户端。这种 select 方式并不适合以 web 服务器端开发为主流的现代开发环境,所以需要学习 Linux 环境下的 epoll。
17.1.1 基于select的I/O复用技术速度慢的原因
第 12 章实现了基于 select 的 I/O 复用技术服务端,其中有不合理的设计如下:
- 调用 select 函数后常见的针对所有文件描述符的循环语句
- 每次调用 select 函数时都需要向该函数传递监视对象信息
上述两点可以从 echo_selectserv.c
得到确认,调用 select 函数后,并不是把发生变化的文件描述符单独集中在一起,而是通过作为监视对象的 fd_set
变量的变化,找出发生变化的文件描述符(54,56行),因此无法避免针对所有监视对象的循环语句。而且,作为监视对象的 fd_set
会发生变化,所以调用 select 函数前应该复制并保存原有信息,并在每次调用 select 函数时传递新的监视对象信息。
select的最大缺陷(也就是最消耗时间的点)不是针对所有对象的循环语句,而是每次调用select函数时向操作系统
传递监视对象信息。准确的说,select 是监视套接字变化的函数。而套接字是操作系统管理的,所以 select 函数要借助操作系统才能完成功能。select 函数的这一缺点可以通过如下方式弥补:
仅向操作系统传递一次监视对象,监视范围或内容发生变化时只通知发生变化的事项
这样就无需每次调用 select 函数时都想操作系统传递监视对象信息,但是前提操作系统支持这种处理方式。Linux 的支持方式是 epoll
,Windows 的支持方式是 IOCP
。
17.1.2 select也有优点
select 的兼容性比较高,这样就可以支持很多的操作系统,不受平台的限制,使用 select 函数满足以下两个条件其实还是很香的:
- 服务器接入者少
- 程序应该具有兼容性
17.1.3 实现epoll时必要的函数和结构体
epoll函数具有如下优点,这些优点正好就是select的缺点:
- 无需编写以监视状态变化为目的的针对所有文件描述符的循环语句
- 调用对应于 select 函数的
epoll_wait
函数时无需每次传递监视对象信息
下面介绍epoll服务器端实现中需要的3个函数:
- epoll_create:创建保存epoll文件描述符的空间
- epoll_ctl:向空间注册并注销文件描述符
- epoll_wait:与select函数类似,等待文件描述符发生变化
select 函数中为了保存监视对象的文件描述符,直接声明了 fd_set
变量,但 epoll
方式下的操作系统负责保存监视对象文件描述符,因此需要向操作系统请求创建保存文件描述符的空间,此时用的函数就是epoll_create
。(操作系统直接保存监视对象文件描述符)
此外,为了添加和删除监视对象文件描述符,select
方式中需要 FD_SET、FD_CLR 函数。但在 epoll
方式中,通过 epoll_ctl
函数请求操作系统完成。最后,select 方式下调用 select 函数等待文件描述符的变化,而 epoll
中调用 epoll_wait
函数。还有,select
方式中通过 fd_set
变量查看监视对象的状态变化,而 epoll
方式通过如下结构体 epoll_event
将发生变化的文件描述符单独集中在一起。
struct epoll_event {
__uint32_t events;
epoll_data_t data;
}
typedef union epoll_data {
void *ptr;
int fd;
__unit32_t u32;
__unit64_t u64;
} epoll_data_t;
声明足够大的 epoll_event
结构体数组后,传递给 epoll_wait
函数时,发生变化的文件描述符信息将被填入数组。因此,无需像 select 函数那样针对所有文件描述符进行循环。(数组中装的内容就是发生变化的文件描述符)接下来给出这些函数的详细说明。
17.1.4 epoll_create
epoll 是从 Linux 的 2.5.44 版内核开始引入的。通过以下命令可以查看 Linux 内核版本:
cat /proc/sys/kernel/osrelease
下面是epoll_create函数的定义:
#include<sys/epoll.h>
int epoll_create(int size);
// 成功时返回epoll文件描述符,失败时返回-1
// size: epoll实例的大小
调用 epoll_create
函数时创建的文件描述符保存空间称为**「epoll 例程」,但有些情况下名称不同,需要稍加注意。通过参数 size
传递的值决定 epoll
例程的大小,但该值只是向操作系统提出的建议**。换言之,size 并不用来决定 epoll 的大小,而仅供操作系统参考。(size值只是给操作系统的建议)
Linux 2.6.8之后的内核将完全忽略传入epoll_create函数的size参数,因为内核会根据情况调整epoll例程的大小。但撰写本书时 Linux版本未达到2.6.8,因此无法在忽略size参数的情况下编写程序。
epoll_create
函数创建的资源与套接字相同,也由操作系统管理。因此,该函数和创建套接字的情况相同,也会返回文件描述符,也就是说返回的文件描述符主要用于区分 epoll
例程。需要终止时,与其他文件描述符相同,也要调用 close 函数。
17.1.5 epoll_ctl
生成例程后,应在其内部注册监视对象文件描述符,此时使用 epoll_ctl
函数。
#include<sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event);
/*
成功时返回 0 ,失败时返回 -1
epfd:用于注册监视对象的 epoll 例程的文件描述符
op:用于制定监视对象的添加、删除或更改等操作
fd:需要注册的监视对象文件描述符
event:监视对象的事件类型
*/
与其他 epoll
函数相比,该函数看起来有些复杂,但通过调用语句就很容易理解,假设按照如下形式调用 epoll_ctl
函数:
epoll_ctl(A, EPOLL_CTL_ADD, B, C);
第二个参数 EPOLL_CTL_ADD 意味着「添加」,上述语句有如下意义:
epoll
例程 A 中注册文件描述符 B ,主要目的是为了监视参数 C 中的事件
再介绍一个调用语句:
epoll_ctl(A, EPOLL_CTL_DEL, B, NULL);
上述语句中第二个参数意味这「删除」,有以下含义:
从
epoll
例程 A 中删除文件描述符 B
从上述示例中可以看出,从监视对象中删除时,不需要监视类型,因此向第四个参数可以传递为 NULL下面是第二个参数的含义:
- EPOLL_CTL_ADD:将文件描述符注册到
epoll
例程 - EPOLL_CTL_DEL:从
epoll
例程中删除文件描述符 - EPOLL_CTL_MOD:更改注册的文件描述符的关注事件发生情况
关于EPOLL_CTL_MOD常量稍后讲解,下面讲解一下epoll_ctl函数的第四个参数,其类型是epoll_event结构体指针。
之前说过,epoll_event
结构体用于保存发生事件的文件描述符结合。但也可以在 epoll
例程中注册文件描述符时,用于注册关注的事件。该函数中epoll_event
结构体的定义并不显眼,因此通过调用语句说明该结构体在epoll_ctl 函数中的应用。
struct epoll_event event;
......
event.events = EPOLLIN; // 发生需要读取数据的情况(事件)时
event.data.fd = sockfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &event);
......
上述代码将 sockfd
注册到 epoll
例程 epfd
中,并在需要读取数据的情况下产生相应事件。接下来给出epoll_event
的成员 events
中可以保存的常量及所指的事件类型。
- EPOLLIN:需要读取数据的情况
- EPOLLOUT:输出缓冲为空,可以立即发送数据的情况
- EPOLLPRI:收到 OOB 数据的情况
- EPOLLRDHUP:断开连接或半关闭的情况,这在边缘触发方式下非常有用
- EPOLLERR:发生错误的情况
- EPOLLET:以边缘触发的方式得到事件通知
- 发生一次事件后,相应文件描述符不再收到事件通知。因此需要向
epoll_ctl
函数的第二个参数传递 EPOLL_CTL_MOD ,再次设置事件。
可以通过位或运算同时传递上述多个参数,关于“边缘触发”稍后讲解,目前只需要记住EPOLLIN的情况即可。
17.1.6 epoll_wait
epoll_wait是与select所对应的函数,其定义如下:
#include<sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);
/*
成功时返回发生事件的文件描述符,失败时返回 -1
epfd : 表示事件发生监视范围的 epoll 例程的文件描述符
events : 保存发生事件的文件描述符集合的结构体地址值
maxevents : 第二个参数中可以保存的最大事件数
timeout : 以 1/1000 秒为单位的等待时间,传递 -1 时,一直等待直到发生事件
*/
该函数的调用方式如下。需要注意的是,第二个参数所指缓冲需要动态分配。
int event_cnt;
struct epoll_event* ep_events;
......
ep_events = malloc(sizeof(struct epoll_event)*EPOLL_SIZE); // EPOLL_SIZE是宏常量
......
event_cnt = epoll_wait(epfd, ep_events, EPOLL_SIZE, -1);
......
调用函数后,返回发生事件的文件描述符,同时在第二个参数指向的缓冲中保存事件发生的文件描述符集合。因此,无需像select那样插入针对所有文件描述符的循环。(目前来说,epoll_event有两个作用,第一是注册的时候,第二个是调用epoll_wat的时候)
17.1.7 基于epoll的回声服务器端
下面是基于epoll的回声服务器端,注意和第12章的基于select的回声服务器端进行对比。
// echo_epollserv.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#define BUF_SIZE 100
#define EPOLL_SIZE 50
void error_handling(char* message);
int main(int argc, char* argv[]) {
int serv_sock, clnt_sock;
struct sockaddr_in serv_adr, clnt_adr;
socklen_t adr_sz;
int str_len, i;
char buf[BUF_SIZE];
struct epoll_event *ep_events;
struct epoll_event event;
int epfd, event_cnt;
if(argc != 2) {
printf("Usage : %s <Port> \n", argv[0]);
exit(1);
}
serv_sock = socket(PF_INET, SOCK_STREAM, 0);
memset(&serv_adr, 0, sizeof(serv_adr));
serv_adr.sin_family = AF_INET;
serv_adr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_adr.sin_port = htons(atoi(argv[1]));
if (bind(serv_sock, (struct sockaddr*)&serv_adr, sizeof(serv_adr)) == -1)
error_handling("sock() error");
if (listen(serv_sock, 5) == -1)
error_handling("listen() error");
epfd = epoll_create(EPOLL_SIZE);
ep_events = malloc(sizeof(struct epoll_event)*EPOLL_SIZE);
event.events = EPOLLIN;
event.data.fd = serv_sock;
epoll_ctl(epfd, EPOLL_CTL_ADD, serv_sock, &event);
while(1) {
event_cnt = epoll_wait(epfd, ep_events, EPOLL_SIZE, -1);
if (event_cnt == -1) {
puts("epoll_wait() error");
break;
}
for (i = 0; i < event_cnt; i++) {
if (ep_events[i].data.fd == serv_sock) { // 如果是服务器端接收到了客户端的请求
adr_sz = sizeof(clnt_adr);
clnt_sock = accept(serv_sock, (struct sockaddr*)&clnt_adr, &adr_sz);
event.events = EPOLLIN;
event.data.fd = clnt_sock; // 将新来的客户端加到监听中
epoll_ctl(epfd, EPOLL_CTL_ADD, clnt_sock, &event);
printf("connect client: %d \n", clnt_sock);
}
else {
str_len = read(ep_events[i].data.fd, buf, BUF_SIZE);
if (str_len == 0) { // close request!
epoll_ctl(epfd, EPOLL_CTL_DEL, ep_events[i].data.fd, NULL);
close(ep_events[i].data.fd);
printf("closed client: %d \n", ep_events[i].data.fd);
}
else {
write(ep_events[i].data.fd, buf, str_len); // echo!
}
}
}
}
close(serv_sock);
close(epfd);
return 0;
}
void error_handling(char* message) {
fputs(message, stderr);
fputc('\n', stderr);
exit(1);
}
上述代码的整体逻辑和12章的select的逻辑基本是类似的,为了便于比较,我们把基于select的服务器端的代码贴在下面:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/select.h>
#define BUF_SIZE 30
void error_handling(char* message);
int main(int argc, char* argv[]) {
int serv_sock, clnt_sock;
struct sockaddr_in serv_adr, clnt_adr;
struct timeval timeout;
fd_set reads, cpy_reads;
socklen_t adr_sz;
int fd_max, str_len, fd_num, i;
char buf[BUF_SIZE];
if (argc != 2) {
printf("Usage : %s <Port> \n", argv[0]);
exit(1);
}
serv_sock = socket(PF_INET, SOCK_STREAM, 0);
if (serv_sock == -1) error_handling("socket error()");
memset(&serv_adr, 0, sizeof(serv_adr));
serv_adr.sin_family = AF_INET;
serv_adr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_adr.sin_port = htons(atoi(argv[1]));
if (bind(serv_sock, (struct sockaddr*)&serv_adr, sizeof(serv_adr)) == -1)
error_handling("bind() error");
if (listen(serv_sock, 5) == -1)
error_handling("listen() error");
FD_ZERO(&reads);
FD_SET(serv_sock, &reads);
fd_max = serv_sock;
while (1) {
cpy_reads = reads;
timeout.tv_sec = 5;
timeout.tv_usec = 5000;
if ((fd_num = select(fd_max + 1, &cpy_reads, 0, 0, &timeout)) == -1) // 监视失败
break;
if (fd_num == 0) continue; // 超时
for (i = 0; i < fd_max + 1; i++) {
if (FD_ISSET(i, &cpy_reads)) {
if (i == serv_sock) { // Connect request!
adr_sz = sizeof(clnt_adr);
clnt_sock = accept(serv_sock,(struct sockaddr*)&serv_adr, &adr_sz);
FD_SET(clnt_sock, &reads);
if (fd_max < clnt_sock) fd_max = clnt_sock;
printf("connected client: %d \n", clnt_sock);
}
else { // read message!
str_len = read(i, buf, BUF_SIZE);
if (str_len == 0) { // close request!
FD_CLR(i, &reads);
close(i);
printf("closed client: %d \n", i);
}
else {
write(i, buf, str_len); // echo!
}
}
}
}
}
close (serv_sock);
return 0;
}
void error_handling(char* message) {
fputs(message, stderr);
fputc('\n', stderr);
exit(1);
}
可以看出,二者的逻辑是很像的。但是基于epoll的服务器端不用每次都传递监视对象信息,只用传一次,后面只需要传改变的即可。(基于select每次都需要复制一份新的,这里就有明显的区别)然后基于select的是对所有的文件描述符都进行循环遍历,包括没有发生的,所以后面就需要多一个判断if (FD_ISSET(i, &cpy_reads))
,而基于epoll的对ep_events进行遍历,这里面都是发生的事件,并没有对没有发生的事件进行遍历。
编译运行:
gcc echo_epollserv.c -o serv
./serv 9190
上述的函数可以搭配任意一个回声客户端使用,运行的结果如下:
运行结果:
可以看到,运行结果和以前 select 实现的和 fork 实现的结果一样,都可以支持多客户端同时运行,但是epoll的效率比select高。
总结一下epoll的流程:
- epoll_create创建一个保存epoll文件描述符的空间,可以没有参数
- 动态分配内存,给将要监视的epoll_wait
- 利用epoll_ctl控制 添加 删除 监听事件
- 利用epoll_wait来获取改变的文件描述符,来执行程序
17.2 条件触发和边缘触发
学习 epoll
时要了解条件触发(Level Trigger)和边缘触发(Edge Trigger)。
17.2.1 条件触发和边缘触发的区别在于发生事件的时间点
条件触发的特性:
条件触发方式中,只要输入缓冲有数据就会一直通知该事件。
例如,服务器端输入缓冲收到 50 字节数据时,服务器端操作系统将通知该事件(注册到发生变化的文件描述符)。但是服务器端读取 20 字节后还剩下 30 字节的情况下,仍会注册事件。也就是说,条件触发方式中,只要输入缓冲中还剩有数据,就将以事件方式再次注册。
边缘触发的特性:
边缘触发中输入缓冲收到数据时仅注册 1 次该事件。即使输入缓冲中还留有数据,也不会再进行注册。
17.2.2 掌握条件触发的事件特性:
接下来通过代码了解条件触发的事件注册方式。下列代码是稍微修改之前的echo_epollserv.c
示例得到的。epoll默认以条件触发方式工作,因此可以通过该示例验证条件触发的特性。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#define BUF_SIZE 4
#define EPOLL_SIZE 50
void error_handling(char* message);
int main(int argc, char* argv[]) {
int serv_sock, clnt_sock;
struct sockaddr_in serv_adr, clnt_adr;
socklen_t adr_sz;
int str_len, i;
char buf[BUF_SIZE];
struct epoll_event *ep_events;
struct epoll_event event;
int epfd, event_cnt;
if(argc != 2) {
printf("Usage : %s <Port> \n", argv[0]);
exit(1);
}
serv_sock = socket(PF_INET, SOCK_STREAM, 0);
memset(&serv_adr, 0, sizeof(serv_adr));
serv_adr.sin_family = AF_INET;
serv_adr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_adr.sin_port = htons(atoi(argv[1]));
if (bind(serv_sock, (struct sockaddr*)&serv_adr, sizeof(serv_adr)) == -1)
error_handling("sock() error");
if (listen(serv_sock, 5) == -1)
error_handling("listen() error");
epfd = epoll_create(EPOLL_SIZE);
ep_events = malloc(sizeof(struct epoll_event)*EPOLL_SIZE);
event.events = EPOLLIN;
event.data.fd = serv_sock;
epoll_ctl(epfd, EPOLL_CTL_ADD, serv_sock, &event);
while(1) {
event_cnt = epoll_wait(epfd, ep_events, EPOLL_SIZE, -1);
if (event_cnt == -1) {
puts("epoll_wait() error");
break;
}
puts("return epoll_wait");
for (i = 0; i < event_cnt; i++) {
if (ep_events[i].data.fd == serv_sock) { // 如果是服务器端接收到了客户端的请求
adr_sz = sizeof(clnt_adr);
clnt_sock = accept(serv_sock, (struct sockaddr*)&clnt_adr, &adr_sz);
event.events = EPOLLIN;
event.data.fd = clnt_sock; // 将新来的客户端加到监听中
epoll_ctl(epfd, EPOLL_CTL_ADD, clnt_sock, &event);
printf("connect client: %d \n", clnt_sock);
}
else {
str_len = read(ep_events[i].data.fd, buf, BUF_SIZE);
if (str_len == 0) { // close request!
epoll_ctl(epfd, EPOLL_CTL_DEL, ep_events[i].data.fd, NULL);
close(ep_events[i].data.fd);
printf("closed client: %d \n", ep_events[i].data.fd);
}
else {
write(ep_events[i].data.fd, buf, str_len); // echo!
}
}
}
}
close(serv_sock);
close(epfd);
return 0;
}
void error_handling(char* message) {
fputs(message, stderr);
fputc('\n', stderr);
exit(1);
}
上述示例与之前的代码的差异如下:
- 将调用read函数时使用的缓冲大小缩减为4个字节
- 插入验证epoll_wait函数调用次数的语句
减少缓冲大小是为了阻止服务器端一次性读取接收的数据。换之,调用read函数后,输入缓冲中仍有数据需要读取。而且会因此注册新的事件并从epoll_wait函数返回时将循环输出“returnepoll_wait”字符串。
编译运行:
gcc echo_EPLTserv.c -o serv
./serv 9190
运行结果:
下面的代码是修改后的边缘触发方式的代码,仅仅是把上面的代码改为:
event.events = EPOLLIN | EPOLLET;
```c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#define BUF_SIZE 4
#define EPOLL_SIZE 50
void error_handling(char* message);
int main(int argc, char* argv[]) {
int serv_sock, clnt_sock;
struct sockaddr_in serv_adr, clnt_adr;
socklen_t adr_sz;
int str_len, i;
char buf[BUF_SIZE];
struct epoll_event *ep_events;
struct epoll_event event;
int epfd, event_cnt;
if(argc != 2) {
printf("Usage : %s <Port> \n", argv[0]);
exit(1);
}
serv_sock = socket(PF_INET, SOCK_STREAM, 0);
memset(&serv_adr, 0, sizeof(serv_adr));
serv_adr.sin_family = AF_INET;
serv_adr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_adr.sin_port = htons(atoi(argv[1]));
if (bind(serv_sock, (struct sockaddr*)&serv_adr, sizeof(serv_adr)) == -1)
error_handling("sock() error");
if (listen(serv_sock, 5) == -1)
error_handling("listen() error");
epfd = epoll_create(EPOLL_SIZE);
ep_events = malloc(sizeof(struct epoll_event)*EPOLL_SIZE);
event.events = EPOLLIN;
event.data.fd = serv_sock;
epoll_ctl(epfd, EPOLL_CTL_ADD, serv_sock, &event);
while(1) {
event_cnt = epoll_wait(epfd, ep_events, EPOLL_SIZE, -1);
if (event_cnt == -1) {
puts("epoll_wait() error");
break;
}
puts("return epoll_wait");
for (i = 0; i < event_cnt; i++) {
if (ep_events[i].data.fd == serv_sock) { // 如果是服务器端接收到了客户端的请求
adr_sz = sizeof(clnt_adr);
clnt_sock = accept(serv_sock, (struct sockaddr*)&clnt_adr, &adr_sz);
event.events = EPOLLIN | EPOLLET;
event.data.fd = clnt_sock; // 将新来的客户端加到监听中
epoll_ctl(epfd, EPOLL_CTL_ADD, clnt_sock, &event);
printf("connect client: %d \n", clnt_sock);
}
else {
str_len = read(ep_events[i].data.fd, buf, BUF_SIZE);
if (str_len == 0) { // close request!
epoll_ctl(epfd, EPOLL_CTL_DEL, ep_events[i].data.fd, NULL);
close(ep_events[i].data.fd);
printf("closed client: %d \n", ep_events[i].data.fd);
}
else {
write(ep_events[i].data.fd, buf, str_len); // echo!
}
}
}
}
close(serv_sock);
close(epfd);
return 0;
}
void error_handling(char* message) {
fputs(message, stderr);
fputc('\n', stderr);
exit(1);
}
运行结果:
打印了一次,说明使用边缘触发的方式只会注册一次! 但是为什么回声客户端没有回答呢?
我们首先学习一下puts("return epoll_wait");
为什么能够检测两种触发方式?puts("return epoll_wait");
语句可以检测是条件触发还是边缘触发的原因是,当使用边缘触发时,在非阻塞模式下 epoll_wait
函数只会返回一次触发事件,因此在 for 循环内部只会处理一个事件,再次调用 epoll_wait
函数会阻塞直到有新的事件发生。我们这里使用的回声服务器是改进版本的,存在如下的代码:
while (recv_len < str_len) {
recv_cnt = read(sock, &message[recv_len], BUF_SIZE - 1);
if (recv_cnt == -1) error_handling("read() error");
recv_len += recv_cnt;
}
要一直等接收到发送的长度才行,所以会卡住,如果我们使用最开始版本的回声客户端,没有这个限制的话,就可以有回声。
由于BUF_SIZE是2,并且没有做字符末尾的处理,这里只能返回前两个,并且没有回车。
select模型是以条件触发的方式工作的。
17.2.3 边缘触发的服务器端必知的两点
下面讲解边缘触发服务器的实现方法,下面两点是必备的知识:
- 通过errno变量验证错误原因
- 为了完成非阻塞(Non-blocking)I/O ,更改了套接字特性
Linux 套接字相关函数一般通过 -1 通知发生了错误。虽然知道发生了错误,但仅凭这些内容无法得知产生错误的原因。因此,为了在发生错误的时候提供给额外的信息,Linux 声明了如下全局变量:
int errno;
为了访问该变量,需要引入 error.h
头文件,因此此头文件有上述变量的 extren
声明。另外,每种函数发生错误时,保存在 errno
变量中的值都不同。这里我们以下面的情况举例说明:
read 函数发现输入缓冲中没有数据可读时返回 -1,同时在
errno
中保存 EAGAIN 常量
稍后通过示例给出errno
的使用方法。下面讲解套接字改为非阻塞方式的方法。Linux提供更改或读取文件属性的如下方法:
#include <fcntl.h>
int fcntl(int filedes, int cmd, ...);
/*
成功时返回 cmd 参数相关值,失败时返回 -1
filedes : 属性更改目标的文件描述符
cmd : 表示函数调用目的
*/
从上述声明可以看出 fcntl 有可变参数的形式。如果向第二个参数传递 F_GETFL ,可以获得第一个参数所指的文件描述符属性(int 型)。反之,如果传递 F_SETFL ,可以更改文件描述符属性。若希望将文件(套接字)改为非阻塞模式,需要如下 2 条语句。
int flag = fcntl(fd, F_GETFL, 0);
fcntl(fd, F_SETFL, flag | O_NONBLOCK);
通过第一条语句,获取之前设置的属性信息,通过第二条语句在此基础上添加非阻塞 O_NONBLOCK 标志。调用 read/write 函数时,无论是否存在数据,都会形成非阻塞文件(套接字)。fcntl
函数的适用范围很广,学习到一个就要学会总结一个。
17.2.4 实现边缘触发回声服务器端
之所以介绍读取错误原因的方法和非阻塞模式的套接字创建方法,原因在于二者都与边缘触发的服务器端实现有密切联系。首先说明为何需要通过errno确认错误原因。
边缘触发方式中,接收数据时仅注册1次该事件
所以,就因为这种特点,一旦发生输入相关事件,就应该读取输入缓冲中的全部数据。因此需要验证输入缓冲是否为空。
read 函数返回 -1,变量 errno 中的值变成 EAGAIN 时,说明没有数据可读。
既然如此,为何还需要将套接字变成非阻塞模式?边缘触发方式下,以阻塞方式工作的read& write
函数有可能引起服务器端的长时间停顿。因此,边缘触发方式中一定要采用非阻塞read &write
函数。接下来给出以边缘触发方式工作的回声服务器端示例。也正是说之前收的数据不完整,所以需要做相应的改进:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <errno.h>
#define BUF_SIZE 4 // 为了验证边缘触发的工作方式,将缓冲设置为4字节
#define EPOLL_SIZE 50
void setnonblockingmode(int fd);
void error_handling(char* message);
int main(int argc, char* argv[]) {
int serv_sock, clnt_sock;
struct sockaddr_in serv_adr, clnt_adr;
socklen_t adr_sz;
int str_len, i;
char buf[BUF_SIZE];
struct epoll_event *ep_events;
struct epoll_event event;
int epfd, event_cnt;
if(argc != 2) {
printf("Usage : %s <Port> \n", argv[0]);
exit(1);
}
serv_sock = socket(PF_INET, SOCK_STREAM, 0);
memset(&serv_adr, 0, sizeof(serv_adr));
serv_adr.sin_family = AF_INET;
serv_adr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_adr.sin_port = htons(atoi(argv[1]));
if (bind(serv_sock, (struct sockaddr*)&serv_adr, sizeof(serv_adr)) == -1)
error_handling("sock() error");
if (listen(serv_sock, 5) == -1)
error_handling("listen() error");
epfd = epoll_create(EPOLL_SIZE);
ep_events = malloc(sizeof(struct epoll_event)*EPOLL_SIZE);
setnonblockingmode(serv_sock);
event.events = EPOLLIN;
event.data.fd = serv_sock;
epoll_ctl(epfd, EPOLL_CTL_ADD, serv_sock, &event);
while(1) {
event_cnt = epoll_wait(epfd, ep_events, EPOLL_SIZE, -1);
if (event_cnt == -1) {
puts("epoll_wait() error");
break;
}
puts("return epoll_wait"); // 为观察事件发生数而添加的输出字符串的语句
for (i = 0; i < event_cnt; i++) {
if (ep_events[i].data.fd == serv_sock) { // 如果是服务器端接收到了客户端的请求
adr_sz = sizeof(clnt_adr);
clnt_sock = accept(serv_sock, (struct sockaddr*)&clnt_adr, &adr_sz);
setnonblockingmode(clnt_sock); // 将accept函数创建的套接字改为非阻塞模式
event.events = EPOLLIN|EPOLLET; // 将套接字事件注册方式改为边缘触发
event.data.fd = clnt_sock; // 将新来的客户端加到监听中
epoll_ctl(epfd, EPOLL_CTL_ADD, clnt_sock, &event);
printf("connect client: %d \n", clnt_sock);
}
else {
while(1) { // 之前的条件触发回声服务器端中没有该while循环。边缘触发方式中,发生事件时需要读取输入缓冲中的所有数据,因此需要循环调用read函数
str_len = read(ep_events[i].data.fd, buf, BUF_SIZE);
if (str_len == 0) { // close request!
epoll_ctl(epfd, EPOLL_CTL_DEL, ep_events[i].data.fd, NULL);
close(ep_events[i].data.fd);
printf("closed client: %d \n", ep_events[i].data.fd);
break;
}
else if (str_len < 0) {
if(errno == EAGAIN) break; // read函数返回-1且errno值为EAGAIN时,意味着读取了输入缓冲中的全部数据,因此需要通过break语句跳出循环
}
else {
write(ep_events[i].data.fd, buf, str_len); // echo!
}
}
}
}
}
close(serv_sock);
close(epfd);
return 0;
}
void setnonblockingmode(int fd) {
int falg = fcntl(fd, F_GETFL, 0);
fcntl(fd, F_GETFL, falg | O_NONBLOCK);
}
void error_handling(char* message) {
fputs(message, stderr);
fputc('\n', stderr);
exit(1);
}
可以配合改进版本的回声客户端使用。
编译运行:
gcc echo_EPETserv.c -o serv
./serv
运行结果:
可以看到,经过改进之后的代码只会注册一次,所以在服务器端只会打印一次。由于使用了while进行read,所以在客户端也可以收到发出去的数据。
17.2.5 条件触发和边缘触发孰优孰劣
边缘触发方式可以做到这点:
可以分离接收数据和处理数据的时间点!
可以结合下面的图更加深入地理解:
运行流程如下:
- 服务器端分别从 A B C 接收数据
- 服务器端按照 A B C 的顺序重新组合接收到的数据
- 组合的数据将发送给任意主机
为了完成这个过程,如果可以按照如下流程运行,服务端的实现并不难:
- 客户端按照 A B C 的顺序连接服务器,并且按照次序向服务器发送数据
- 需要接收数据的客户端应在客户端 A B C 之前连接到服务器端并等待
但是实际情况中可能是下面这样:
- 客户端 C 和 B 正在向服务器发送数据,但是 A 并没有连接到服务器
- 客户端 A B C 乱序发送数据
- 服务端已经接收到数据,但是要接收数据的目标客户端并没有连接到服务器端
因此,即使输入缓冲收到数据,服务器端也能决定读取和处理这些数据的时间点,这样就给服务器端的
实现带来很大灵活性。(这也就是边缘触发的优点)
此外,边缘触发更有可能带来高性能,但不能简单地认为“只要使用边缘触发就一定能提高速度”。
17.3 习题
第18章 多线程服务器端的实现
18.1 理解线程的概念
18.1.1 引入线程背景
第 10 章介绍了多进程服务端的实现方法。多进程模型与 select 和 epoll 相比的确有自身的优点,但同时也有问题。如前所述,创建(复制)进程的工作本身会给操作系统带来相当沉重的负担。而且,每个进程都具有独立的内存空间,所以进程间通信的实现难度也会随之提高。换言之,多进程的缺点可概括为:
- 创建进程的过程会带来一定的开销
- 为了完成进程间数据交换,需要特殊的 IPC 技术(管道技术)。
但是下面的缺点是最大的:
每秒少则数十次、多则数千次的‘上下文切换’是创建进程时的最大开销。
对于1个CPU(准确地说是CPU地运算设备核心)的系统也可以同时运行多个进程,这是因为系统将CPU时间分成多个微小的块后分配给了多个进程。为了分时使用CPU,需要**“上下文切换”**过程。下面了解一下“上下文切换”的概念。运行程序前需要将相应进程信息读入内存,如果运行进程A后需要紧接着运行进程B,就应该将进程A相关信息移出内存,并读入进程B相关信息。这就是上下文切换 (信息不断进出内存的过程就可以理解为所谓上下文切换)。但此时进程A的数据将被移动到硬盘,所以上下文切换需要很长时间。即使通过优化加快速度,也会存在一定的局限。
为了保持多进程的优点,同时在一定程度上克服其缺点,人们引入的线程(Thread)的概念。这是为了将进程的各种劣势降至最低程度(不是直接消除)而设立的一种**「轻量级进程」**。线程比进程具有如下优点:
- 线程的创建和上下文切换比进程的创建和上下文切换更快
- 线程间交换数据时无需特殊技术
18.1.2 线程和进程的差异
线程是为了解决:为了得到多条代码执行流而复制整个内存区域的负担太重。
每个进程的内存空间都由保存全局变量的「数据区」、向 malloc 等函数动态分配提供空间的堆(Heap)、函数运行时间使用的栈(Stack)构成。每个进程都有独立的这种空间,多个进程的内存结构如图所示:(每个进程拥有独立的区域)
但如果以获得多个代码执行流为主要目的,则不应该像上图那样完全分离内存结构,而只需分离栈区域。通过这种方式可以获得如下优势。
- 上下文切换切换时不需要切换数据区和堆
- 可以利用数据区和堆交换数据
实际上这就是线程。线程为了保持多条代码执行流而隔开了栈区域,其内存结构如下图所示:
如图所示,多个线程将共享数据区和堆。为了保持这种结构,线程将在进程内创建并运行。也就是说,进程和线程可以定义为:
- 进程:在操作系统构成单独执行流的单位
- 线程:在进程构成单独执行流的单位
操作系统、进程、线程三者之间的关系如下图所示:
下面通过代码进一步理解线程的含义。
18.2 线程创建及运行
POSIX是Portable Operating System Interface for Computer Environment(适用于计算机环境的可移植操作系统接口)的简写,是为了提高UNIX系列操作系统间的移植性而制定的API规范。下面要介绍的线程创建方法也是以POSIX标准为依据的。因此,它不仅适用于Linux,也适用于大部分UNIX系列的操作系统。
18.2.1 线程的创建和执行流程
线程具有单独的执行流,因此需要单独定义线程的main函数,还需要请求操作系统在单独的执行流中执行该函数,完成该功能的函数如下:
#include <pthread.h>
int pthread_create(pthread_t * restrict thread, const pthread_attr_t * restrict attr
void * (*start_routine)(void *), void * restrict arg);
/*
成功时返回 0 ,失败时返回 -1
thread : 保存新创建线程 ID 的变量地址值。线程与进程相同,也需要用于区分不同线程的 ID
attr : 用于传递线程属性的参数,传递 NULL 时,创建默认属性的线程
start_routine : 相当于线程 main 函数的、在单独执行流中执行的函数地址值(函数指针)
arg : 通过第三个参数传递的调用函数时包含传递参数信息的变量地址值
*/
要想理解好上述函数的参数,需要熟练掌握restrict关键字和函数指针相关语法。但如果只关注使用方法(当然以后要掌握restrict和函数指针),那么该函数的使用比想象中要简单。下面通过简单示例了解该函数的功能。
// thread1.c
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
void* thread_main(void* arg);
int main(int argc, char* argv[]) {
pthread_t t_id;
int thread_param = 5;
// 请求创建一个线程,从thread_main调用开始,在单独的执行流中运行,同时传递参数
if (pthread_create(&t_id, NULL, thread_main, (void*)&thread_param) != 0) {
puts("pthread_create() error");
return -1;
}
sleep(2); // 延迟进程终止时间
puts("end of main");
return 0;
}
void* thread_main(void *arg) { // 传入的参数是pthread_create的第四个
int i;
int cnt = *((int*)arg);
for (i = 0; i < cnt; i++) {
sleep(1);
puts("running thread");
}
return NULL;
}
编译运行:
gcc thread1.c -o tr1 -lpthread // 线程相关代码编译时需要添加 -lpthread 选项声明需要连接到线程库
./tr1
运行结果:
上述程序的执行如图所示:
可以看出,程序在主进程没有结束时,生成的线程每隔一秒输出一次 running thread ,但是如果主进程没有等待十秒,而是直接结束,这样也会强制结束线程,不论线程有没有运行完毕。(如果将sleep(10)
改为sleep(2)
就会很快结束)
那是否意味着主进程必须每次都 sleep 来等待线程执行完毕?并不需要,可以通过以下函数解决。
sleep的方法存在很大的问题,比如使用sleep可能会影响程序的正常执行流,而且也无法计算线程具体运行的时间,所以使用sleep时不合理且不方便的。
下面的方法可以解决这个问题,还可以同时了解线程ID的用法。
#include <pthread.h>
int pthread_join(pthread_t thread, void ** status);
// 成功时返回0,失败时返回其他值
// thread : 该参数值 ID 的线程终止后才会从该函数返回
// status : 保存线程的 main 函数返回值的指针的变量地址值
简言之,调用该函数的进程(或线程)将进入等待状态,直到第一个参数为ID的线程终止为止。而且可以得到线程的main函数返回值,所以该函数比较有用。下面通过示例了解该函数的功能。
// thread2.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
void *thread_main(void* arg);
int main(int argc, char* argv[]) {
pthread_t t_id;
int thread_param = 5;
void *thr_ret;
// 请求创建一个线程,从thread_main调用开始,在单独的执行流中运行,同时传递参数
if (pthread_create(&t_id, NULL, thread_main, (void*)&thread_param) != 0) {
puts("pthread_create() error");
return -1;
}
// main函数将的等待ID保存在t_id变量中的线程终止
if (pthread_join(t_id, &thr_ret) != 0) {
puts("pthread_join() error");
return -1;
}
printf("Thread return message : %s \n", (char *)thr_ret);
free(thr_ret);
return 0;
}
void* thread_main(void* arg) {
int i;
int cnt = *((int *)arg);
char* msg = (char *)malloc(sizeof(char)*50);
strcpy(msg, "Hello, I'm thread~ \n");
for (i = 0; i < cnt; i++) {
sleep(1);
puts("running thread");
}
return (void*)msg;
}
上述代码中,有几个地方需要解释一下:thread_main
的返回值怎么定?上面那个例子返回值没什么别的作用,所以返回值就是NULL。而这个例子的返回值为什么是(void*)msg
呢?这是由于这个地方的返回值将保存到pthread_join的第二个参数thr_ret中。需要注意的是,该返回值是thread_main函数内部动态分配的内存空间地址值,使用了malloc
关键字,最后要使用free
释放。
编译运行:
gcc thread2.c -o tr2 -lpthread
./tr2
运行结果:
可以看到,这个代码中在main里面没有使用sleep,也会等到线程执行完才结束运行。
下图可以更好理解这个例子。
18.2.2 可在临界区内调用的函数
之前的示例中只创建了1个线程,接下来的示例将开始创建多个线程。当然,无论创建多少线程,其创建方法没有区别。但关于线程的运行需要考虑**“多个线程同时调用函数时(执行时)可能产生问题”**。这类函数内部存在临界区(Critical Section ),也就是说,多个线程同时执行这部分代码时,可能引起问题。临界区中至少存在1条这类代码。
下面是关于临界区概念的一一些补充:
在同步的程序设计中,临界区块(Critical section)指的是一个访问共享资源(例如:共享设备或是共享存储器)的程序片段,而这些共享资源有无法同时被多个线程访问的特性。
当有线程进入临界区块时,其他线程或是进程必须等待(例如:bounded waiting 等待法),有一些同步的机制必须在临界区块的进入点与离开点实现,以确保这些共享资源是被异或的使用,例如:semaphore。只能被单一线程访问的设备,例如:打印机。
根据临界区是否引起问题,函数可以分为下面2类:
- 线程安全函数
- 非线程安全函数
线程安全函数被多个线程同时调用时也不会引发问题。反之,非线程安全函数被同时调用时会引发问题。但这并非关于有无临界区的讨论,线程安全的函数中同样可能存在临界区。只是在线程安全函数中,同时被多个线程调用时可通过一些措施避免问题。
幸运的是,大多数标准函数都是线程安全的函数。更幸运的是,我们不用自己区分线程安全的函数和非线程安全的函数(在Windows程序中同样如此)。因为这些平台在定义非线程安全函数的同时,提供了具有相同功能的线程安全的函数。比如,第8章介绍过的如下函数就不是线程安全的函数:
struct hostent * gethostbyname(const char * hostname);
同时提供线程安全的同一功能的函数:
struct hostent * gethostbyname_r(const char * name, struct hostent * result, char * buffer, int buflen. int * h_errnop);
在Linux中,线程安全函数的后缀名通常为r。可以看到,同样的功能,线程安全函数的定义和参数非常复杂,如果我们在多线程的开发中全部使用这种的话,代码的可读性很差,并且也会给程序员增加负担。可以通过下面的方法自动将gethostbyname函数调用改为gethostbyname_r函数调用!
声明头文件前定义_REENTRANT宏
另外,不用刻意添加define语句定义宏,可以在编译的时候添加-D_REENTRANT
选项定义宏
gcc -D_REENTRANT mythread.c -o mthread -lpthread
18.2.3 工作(Worker)线程模型
下面将介绍创建多个线程的情况。
将要介绍的示例将计算1到10的和,但并不是在main函数中进行累加运算,而是创建2个线程,其中一个线程计算1到5的和,另一个线程计算6到10的和,main函数只负责输出运算结果。这种方式的编程模型称为“工作线程(Worker thread)模型”。计算1到5之和的线程与计算6到10之和的线程将成为main线程管理的工作(Worker )。(main函数可以理解为主线程)下面是程序的流程图:
下面是示例,由于线程的代码执行流程可能会比较复杂,要会画上面这种图进行分析。上面这个图是一个一个线程运行的,并不是交替执行的。
// thread3.c
#include <stdio.h>
#include <pthread.h>
void* thread_summation(void* arg);
int sum = 0;
int main(int argc, char* argv[]) {
pthread_t id_t1, id_t2;
int range1[] = {1, 5};
int range2[] = {6, 10};
pthread_create(&id_t1, NULL, thread_summation, (void *)range1);
pthread_create(&id_t2, NULL, thread_summation, (void *)range2);
pthread_join(id_t1, NULL); // 这里不需要啥返回值,所以是NULL
pthread_join(id_t1, NULL);
printf("result: %d \n", sum);
return 0;
}
void* thread_summation(void* arg) {
int start = ((int*)arg)[0];
int end = ((int*)arg)[1];
while (start <= end) {
sum += start;
start++;
}
return NULL;
}
根据代码我们可以看到,2个线程都直接访问全局变量sum,这个从代码的角度看起来很理所当然,但是支持这个操作的前提是2个线程共享保存全局变量的数据区。
编译运行:
gcc thread3.c -D_REENTRANT -o tr3 -lpthread
// gcc -D_REENTRANT thread3.c -o tr3 -lpthread 把宏放在前面也可以
./tr3
运行结果:
运行结果是55,虽然正确,但示例本身存在问题。此处存在临界区相关问题,因此再介绍另一示例。该示例与上述示例相似,只是增加了发生临界区相关错误的可能性,即使在高配置系统环境下也容易验证产生的错误。
// thread.c
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#define NUM_THREAD 100
void* thread_inc(void* arg);
void* thread_des(void* arg);
long long num = 0;
int main(int argc, char* argv[]) {
pthread_t thread_id[NUM_THREAD];
int i;
printf("sizeof long long: %ld \n", sizeof(long long)); // 查看long long的大小
for (i = 0; i < NUM_THREAD; i++) {
if (i % 2) pthread_create(&(thread_id[i]), NULL, thread_inc, NULL);
else pthread_create(&(thread_id[i]), NULL, thread_des, NULL);
}
for (i = 0; i < NUM_THREAD; i++) pthread_join(thread_id[i], NULL);
printf("result: %lld \n", num);
return 0;
}
void* thread_inc(void* arg) {
int i;
for (i = 0; i < 50000000; i++) num += 1;
return NULL;
}
void* thread_des(void* arg) {
int i;
for (i = 0; i < 50000000; i++) num -= 1;
return NULL;
}
上述示例中共创建了100个线程,其中一半执行thread_inc函数中的代码,另一半则执行thread_des函数中的代码。理想的结果应该是0的,下面我们看看结果是不是如我们所想。
编译运行:
gcc -D_REENTRANT thread4.c -o tr4 -lpthread
运行结果:
运行结果并不是0!而且每次运行的结果均不同。虽然其原因尚不得而知,但可以肯定的是,这对于线程的应用是个大问题。
18.3 线程存在的问题和临界区
下面分析thread4.c
中存在的问题并给出解决方案。
18.3.1 多个线程访问同一变量的问题
上面的例子存在的问题就是:2个线程正在同时访问全局变量num。
不止全局变量,任何内存空间只要被同时访问都有可能发生问题。下面同时示例解释”同时访问“的含义,并说明为何会引起问题。一个线程访问num的时候还没有完成应有的操作,就被其他线程抢过去访问了,此时就会出现问题。
因此,线程访问变量num时应该阻止其他线程访问,知道该线程完成操作。这就是这就是同步(Synchronization)。
18.3.2 临界区位置
那么在刚才代码中的临界区位置是:
函数内同时运行多个线程时引发问题的多条语句构成的代码块
全局变量 num 不能视为临界区,因为他不是引起问题的语句,只是一个内存区域的声明。临界区通常位于由线程运行的函数内部,下面是刚才代码的两个 main 函数。
void *thread_inc(void *arg)
{
int i;
for (i = 0; i < 50000000; i++)
num += 1;//临界区
return NULL;
}
void *thread_des(void *arg)
{
int i;
for (i = 0; i < 50000000; i++)
num -= 1;//临界区
return NULL;
}
由上述代码可知,临界区并非 num 本身,而是访问 num 的两条语句,这两条语句可能由多个线程同时运行,也是引起这个问题的直接原因。产生问题的原因可以分为以下三种情况:
- 2 个线程同时执行
thread_inc
函数 - 2 个线程同时执行
thread_des
函数 - 2 个线程分别执行
thread_inc
和thread_des
函数
需要关注第三点,2条语句由不同的线程执行,也有可能构成临界区。前提是这2条语句访问同一内存空间。
补充一点:pthread_join是会阻塞线程的,如果存在多个线程,某一个线程使用了pthread_join,就会进入阻塞状态,直到该线程结束,才会开始其他线程。上面的例子中即使使用了pthread_join将当前的线程阻塞,但实际上线程的顺序和时间是不确定的,所以无法避免竞态条件的发生,针对这种情况,应该使用线程同步来解决。
18.4 线程同步
18.4.1 同步的两面性
线程同步用于解决线程访问顺序引发的问题。需要同步的情况可以从如下两方面考虑。
- 同时访问同一内存空间时发生的情况
- 需要指定访问同一内存空间的线程执行顺序的情况
下面是第二种情况的解释:假设有A、B两个线程,线程A负责向指定内存空间写入(保存)数据,线程B负责取走该数据。这种情况下**,线程A首先应该访问约定的内存空间并保存数据**。万一线程B先访问并取走数据,将导致错误结果。像这种需要控制执行顺序的情况也需要使用同步技术。
18.4.2 互斥量
互斥锁(英语:英语:Mutual exclusion,缩写 Mutex)是一种用于多线程编程中,防止两条线程同时对同一公共资源(比如全域变量)进行读写的机制。该目的通过将代码切片成一个一个的临界区域(critical section)达成。临界区域指的是一块对公共资源进行访问的代码,并非一种机制或是算法。一个程序、进程、线程可以拥有多个临界区域,但是并不一定会应用互斥锁。通俗的说就互斥量就是一把优秀的锁,当临界区被占据的时候就上锁,等占用完毕然后再放开。接下来介绍互斥量的创建及销毁函数。
#include <pthread.h>
int pthread_mutex_init(pthread_mutex_t * mutex, const pthread_mutexattr_t * attr);
int pthread_mutex_destory(pthread_mutex_t * mutex);
/*
成功时返回 0,失败时返回其他值
mutex : 创建互斥量时传递保存互斥量的变量地址值,销毁时传递需要销毁的互斥量地址
attr : 传递即将创建的互斥量属性,没有特别需要指定的属性时传递 NULL
*/
从上述函数声明中可以看出,为了创建相当于锁系统的互斥量,需要声明如下 pthread_mutex_t
型变量:
pthread_mutex_t mutex;
该变量的地址值传递给 pthread_mutex_init 函数,用来保存操作系统创建的互斥量(锁系统)。调用pthread_mutex_destroy
函数时同样需要该信息。如果不需要配置特殊的互斥量属性,则向第二个参数传递 NULL 时,可以利用 PTHREAD_MUTEX_INITIALIZER
进行如下声明:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
但是不推荐使用这个方法,还是老老实实使用pthread_mutex_init
进行互斥量的创建。接下来介绍利用互斥量锁住或释放临界区时使用的函数:
#include <pthread.h>
int pthread_mutex_lock(pthread_mutex_t * mutex);
int pthread_mutex_unlock(pthread_mutex_t * mutex);
// 成功时返回0,失败时返回其他值
函数名本身含有lock、unlock等词汇,很容易理解其含义。进入临界区前调用的函数就是pthread_mutex_lock。调用该函数时,发现有其他线程已进入临界区,则pthread_mutex_lock函数不会返回,直到里面的线程调用pthread_mutex_unlock函数退出临界区为止。也就是说,其他线程让出临界区之前,当前线程将一直处于阻塞状态 (lock发现其他线程在使用公共资源的时候,就把自己的线程阻塞,直到其他线程完成了对公共资源的操作)。接下来整理一下保护临界区的代码块编写方法。创建好互斥量的前提下,可以通过如下结构保护临界区。
pthread_mutex_lock(&mutex);
// 临界区的开始
// ......
// 临界区的结束
pthread_mutex_unlock(&mutex);
简言之,就是利用lock和unlock函数围住临界区的两端。此时互斥量相当于一把锁,阻止多个线程同时访问。还有一点需要注意,线程退出临界区时,如果忘了调用pthread_mutex_unlock函数,那么其他为了进入临界区而调用pthread_mutex_lock函数的线程就无法摆脱阻塞状态。这种情况称为**“死锁”( Dead-lock )**,需要格外注意。接下来利用互斥量解决示例thread4.c中遇到的问题。
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#define NUM_THREAD 100
void* thread_inc(void* arg);
void* thread_des(void* arg);
long long num = 0;
pthread_mutex_t mutex;
int main(int argc, char* argv[]) {
pthread_t thread_id[NUM_THREAD];
int i;
pthread_mutex_init(&mutex, NULL);
printf("sizeof long long: %ld \n", sizeof(long long)); // 查看long long的大小
for (i = 0; i < NUM_THREAD; i++) {
if (i % 2) pthread_create(&(thread_id[i]), NULL, thread_inc, NULL);
else pthread_create(&(thread_id[i]), NULL, thread_des, NULL);
}
for (i = 0; i < NUM_THREAD; i++) pthread_join(thread_id[i], NULL);
printf("result: %lld \n", num);
pthread_mutex_destroy(&mutex);
return 0;
}
void* thread_inc(void* arg) {
int i;
pthread_mutex_lock(&mutex);
for (i = 0; i < 50000000; i++) num += 1;
pthread_mutex_unlock(&mutex);
return NULL;
}
void* thread_des(void* arg) {
int i;
for (i = 0; i < 50000000; i++) {
pthread_mutex_lock(&mutex);
num -= 1;
pthread_mutex_unlock(&mutex);
}
return NULL;
}
编译运行:
gcc mutex.c -D_REENTRANT -o mutex -lpthread
./mutex
运行结果:
和我们想象的结果是一样的。
观察代码,我们可以发现,两个线程函数中锁的范围是不一样的。对于thread_inc:
void* thread_inc(void* arg) {
int i;
pthread_mutex_lock(&mutex);
for (i = 0; i < 50000000; i++) num += 1;
pthread_mutex_unlock(&mutex);
return NULL;
}
以上临界区划分范围较大,但这是考虑到如下优点所做的决定:
最大限度减少互斥量lock、unlock函数的调用次数。
但是对于thread_des,居然调用了50000000次lock和unlock。从时间的成本来看,好像临界区越大越好,但是对于thread_inc,当num的值增加到50000000的时间内都不允许其他线程访问,这样也存在不合理。所以临界区的大小,需要根据需求以及编程经验来确定。
18.4.3 信号量
下面介绍信号量。信号量与互斥量极为相似,在互斥量的基础上很容易理解信号量。此处只涉及利用**“二进制信号量”(只用0和1)完成“控制线程顺序”**为中心的同步方法。下面给出信号量创建及销毁方法。
#include <semaphore.h>
int sem_init(sem_t * sem, int pshared, unsigned int value);
int sem_destory(sem_t * sem);
/*
成功时返回 0 ,失败时返回其他值
sem : 创建信号量时保存信号量的变量地址值,销毁时传递需要销毁的信号量变量地址值
pshared : 传递其他值时,创建可由多个继承共享的信号量;传递 0 时,创建只允许 1 个进程内部
使用的信号量。需要完成同一进程的线程同步,故为0
value : 指定创建信号量的初始值
*/
上述函数的pshared参数超出了我们关注的范围,故默认向其传递0。稍后讲解通过value参数初始化的信号量值究竟是多少。接下来介绍信号量中相当于互斥量lock、unlock的函数。
#include <semaphore.h>
int sem_post(sem_t * sem);
int sem_wait(sem_t * sem);
// 成功时返回0,失败时返回其他值
// sem : 传递保存信号量读取值的变量地址值,传递给 sem_post 的信号量增1,传递给 sem_wait 时信号量减1
上述的sem_post相当于unlock,sem_wait相当于lock!!不要弄混了,具体的解释如下:
调用sem_init函数时,操作系统将创建信号量对象,此对象中记录着“信号量值”( Semaphore Value)整数。该值在调用sem_post函数时增1,调用sem_wait函数时减1。但信号量的值不能小于0,因此,在信号量为0的情况下调用sem_wait函数时,调用函数的线程将进入阻塞状态(因为函数未返回) (这里就跟lock的时候,如果其他线程已经lock了,自己的lock就不会返回,处于阻塞的状态)。当然,此时如果有其他线程调用sem_post函数,信号量的值将变为1,而原本阻塞的线程可以将该信号量重新减为0并跳出阻塞状态(这里就和unlock的机制是一样的)。
可以这么理解:信号量值就是能够同时访问某个共享资源的线程或进程的最大数量。如果是二进制信号量的话,实现的功能其实就和互斥量是一样的了。
可以通过如下形式同步临界区(假设信号量的初始值是1):
sem_wait(&sem); // 信号量变为0
// 临界区的开始
// ......
// 临界区的结束
sem_post(&sem); // 信号量变为1
上述代码结构中,调用sem_wait函数进入临界区的线程在调用sem_post函数前不允许其他线程进入临界区。信号量的值在0和1之间跳转,因此,具有这种特性的机制称为“二进制信号量”。接下来给出信号量相关示例。即将介绍的示例并非关于同时访问的同步,而是关于控制访问顺序的同步 (解决的是第二种)。该示例的场景如下:
线程A从用户输入得到值后存入全局变量num,此时线程B将取走该值并累加。该过程共进行5次,完成后输出总和并退出程序。
为了按照上述要求构建程序,应按照线程A、线程B的顺序访问变量num,且需要线程同步。接下来给出示例:
// semaphore.c
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
void * read(void * arg);
void * accu(void * arg);
static sem_t sem_one;
static sem_t sem_two;
static int num;
int main (int argc, char* argv[]) {
pthread_t id_t1, id_t2;
sem_init(&sem_one, 0, 0);
sem_init(&sem_two, 0, 1); // 两个信号量,一个信号量的值是0,另一个为1
pthread_create(&id_t1, NULL, read, NULL); // 不需要传参数,最后一个参数也是NULL
pthread_create(&id_t2, NULL, accu, NULL);
pthread_join(id_t1, NULL);
pthread_join(id_t2, NULL);
sem_destroy(&sem_one);
sem_destroy(&sem_two);
}
void* read(void* arg) {
int i;
for (i = 0; i < 5; i++) {
fputs("Inputs num: ", stdout);
sem_wait(&sem_two);
scanf("%d", &num);
sem_post(&sem_one);
}
return NULL;
}
void* accu(void* arg) {
int sum = 0, i;
for (i = 0; i < 5; i++) {
sem_wait(&sem_one);
sum += num;
sem_post(&sem_two);
}
printf("Result: %d \n", sum);
return NULL;
}
acc的信号量初始值设置为0,是为了防止最开始没有值的时候调用了acc,从而出现错误。利用信号量变量sem_two调用wait函数和post函数。这是为了防止在调用accu函数的线程还未取走数据的情况下,调用read函数的线程覆盖原值。利用信号量变量sem_one调用wait和post函数。这是为了防止调用read函数的线程写入新值前,accu函数取走(再取走旧值)数据。
编译运行:
gcc semaphore.c -D_REENTRANT -o sema -lpthread
./sema
运行结果:
下面在多线程的基础上编写服务器端。
18.5 线程的销毁和多线程并发服务器端的实现
线程的销毁同样也很重要,下面介绍线程的销毁:
18.5.1 销毁线程的3种方法
Linux线程并不是在首次调用的线程main函数返回时自动销毁,所以用如下2种方法之一加以明确。否则由线程创建的内存空间将一直存在。
- 调用pthread_join函数
- 调用pthread_detach函数
之前调用过pthread_join函数。调用该函数时,不仅会等待线程终止,还会引导线程销毁。但该函数的问题是,线程终止前,调用该函数的线程将进入阻塞状态。因此,通常通过如下函数调用引导线程销毁。
#include <pthread.h>
int pthread_detach(pthread_t thread);
// 成功时返回0,失败时返回其他值
// thread : 终止的同时需要销毁的线程 ID
调用上述函数不会引起线程终止或进入阻塞状态,可以通过该函数引导销毁线程创建的内存空间。调用该函数后不能再针对相应线程调用 pthread_join
函数。其他的方法和pthread_detach
结果上没有太大差异,我们重点掌握这个就可以了。
18.5.2 多线程并发服务器端的实现
下面介绍一个可以交换信息的简单的聊天程序:
// chat_serv.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/socket.h>
#include <pthread.h>
#include <arpa/inet.h>
#define BUF_SIZE 100
#define MAX_CLNT 256 // 最大客户数
void * handle_clnt(void* arg);
void send_msg(char* msg, int len);
void error_handling(char* msg);
int clnt_cnt = 0;
int clnt_socks[MAX_CLNT]; // 这两行管理接入的客户端套接字的变量和数组。访问这两个变量的代码将构成临界区
pthread_mutex_t mutex;
int main(int argc, char* argv[]) {
int serv_sock, clnt_sock;
struct sockaddr_in serv_adr, clnt_adr;
int clnt_adr_sz;
pthread_t t_id;
if (argc != 2) {
printf("Usage : %s <Port> \n", argv[0]);
exit(1);
}
pthread_mutex_init(&mutex, NULL);
serv_sock = socket(PF_INET, SOCK_STREAM, 0);
if (serv_sock == -1) error_handling("sock() error");
memset(&serv_adr, 0, sizeof(serv_adr));
serv_adr.sin_family = AF_INET;
serv_adr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_adr.sin_port = htons(atoi(argv[1]));
if (bind(serv_sock, (struct sockaddr*)&serv_adr, sizeof(serv_adr)) == -1)
error_handling("sock() error");
if (listen(serv_sock, 5) == -1)
error_handling("listen() error");
while (1) {
clnt_adr_sz = sizeof(clnt_adr);
clnt_sock = accept(serv_sock, (struct sockaddr*)&clnt_adr, &clnt_adr_sz);
pthread_mutex_lock(&mutex);
clnt_socks[clnt_cnt++] = clnt_sock; // main函数其实也看作一个线程,这里涉及到临界区,所以也要使用互斥锁。其作用是每当有新连接时,将相关信息写入变量clnt_cnt和clnt_socks中
pthread_mutex_unlock(&mutex);
pthread_create(&t_id, NULL, handle_clnt, (void*)&clnt_sock); // 创建线程向新接入的客户端提供服务,执行handle_clnt函数的内容
pthread_detach(t_id); // 从内存中销毁已经终止的线程
printf("Connected client IP : %s \n", inet_ntoa(clnt_adr.sin_addr));
}
close(serv_sock);
return 0;
}
void* handle_clnt(void* arg) {
int clnt_sock = *((int*)arg);
int str_len = 0, i;
char msg[BUF_SIZE];
while ((str_len = read(clnt_sock, msg, sizeof(msg))) != 0)
send_msg(msg, str_len);
pthread_mutex_lock(&mutex);
for (i = 0; i < clnt_cnt; i++) { // 删除该线程的东西,这里牵涉到二者的同步变动,所以也要加锁
if (clnt_sock == clnt_socks[i]) {
while(i++ < clnt_cnt - 1)
clnt_socks[i] = clnt_socks[i + 1];
break;
}
}
clnt_cnt--;
pthread_mutex_unlock(&mutex);
close(clnt_sock);
return NULL;
}
void send_msg(char* msg, int len) { // 该函数负责向所有连接的客户端发送消息
int i;
pthread_mutex_lock(&mutex);
for (i = 0; i < clnt_cnt; i++)
write(clnt_socks[i], msg, len);
pthread_mutex_unlock(&mutex);
}
void error_handling(char* msg) {
fputs(msg, stderr);
fputc('\n', stderr);
exit(1);
}
对于上述代码,我们需要掌握的是临界区的构成形式。上面的例子临界区有如下特点:
访问全局变量clnt_cnt和数组clnt_socks的代码将构成临界区
添加或删除客户端时,变量clnt_cnt和数组cInt_socks同时发生变化。因此,在如下情形中均会导致数据不一致,从而引发严重错误。
- 线程A从数组clnt_socks中删除套接字信息,同时线程B读取clnt_cnt变量
- 线程A读取变量cInt_cnt,同时线程B将套接字信息添加到clnt_socks数组
总之就是会导致二者不匹配。所以访问变量cInt_cnt和数组cInt _socks的代码应组织在一起并构成临界区。
在充分理解这个代码之前,我查找了一些前面没有说清楚的问题:
- 首先是pthread_join的理解,问题就是既然pthread_join会阻塞线程,那为什么还需要锁呢? 当一个线程被调用pthread_join函数等待时,其他线程仍然可以独立地运行和访问共享资源(受并发控制保护),所以是需要锁来控制共享资源只被一个线程使用的。
- 其次是pthread_detach的理解,我感觉这一块本书讲的并不是很好,本书仅仅讲的是使用这个函数进行线程的销毁与内存释放(当然这是其最后的一部分功能)。pthread_detach函数的主要作用是将线程设置为分离状态,并在该线程结束后自动从内存中销毁。在这种情况下,该线程的资源(如内存、CPU 时间等)都被操作系统回收,不再占用额外的系统资源。需要注意的是,在一个线程被设置为分离状态之后,该线程不再具有 joinable 属性,主线程也不能通过调用pthread_join来等待该线程结束。因此在线程相关的工作结束后,最好使用pthread_exit显式地让线程结束,而不是依赖于分离属性自动回收线程资源。
-
pthread_create(&t_id, NULL, handle_clnt, (void*)&clnt_sock);
这个t_id
虽然每次都是一个,但是当主线程接收到一个新客户端连接时,都会创建一个新的子进程来处理该客户端的请求,并将子进程的标识符返回给t_id
变量。所以说这个t_id是不一样的嘞。
下面就是while(1) 中代码的理解:首先服务器端使用accept会阻塞,一旦服务器端有请求,accept就会返回给clnt_sock。接下来就涉及到临界区了,由于涉及到clnt_cnt和serv_socks都是临界区的代码,所以即使在main中,也需要加锁。然后就是调用pthread_create创建线程,并使用pthread_detach将线程设置为分离状态,不接受主线程的管辖,最后打印IP信息。一次循环就执行完毕了。接下来accept函数会继续阻塞,直到下一个客户端发起请求,则会继续相同的事情。
handle_clnt
中不断接收来自客户端的数据,并将数据发给所有的客户端。下面的代码部分是当客户端发送q的时候,read就读不到东西了,代表这个线程也就结束了,这里是将这个线程的东西删除。但是数组没有删除的操作,只能替换。需要注意的是,clnt_cnt和clnt_socks需要同步改变,所以这里也需要加锁,防止混乱。
接下来介绍聊天客户端,客户端示例为了分离输入和输出过程而创建了线程。
// chat_clnt.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <pthread.h>
#define BUF_SIZE 100
#define NAME_SIZE 20
void *send_msg(void *arg);
void *recv_msg(void *arg);
void error_handling(char *msg);
char name[NAME_SIZE] = "[DEFAULT]";
char msg[BUF_SIZE];
int main(int argc, char *argv[])
{
int sock;
struct sockaddr_in serv_addr;
pthread_t snd_thread, rcv_thread;
void *thread_return;
if (argc != 4)
{
printf("Usage : %s <IP> <port> <name>\n", argv[0]);
exit(1);
}
sprintf(name, "[%s]", argv[3]);
sock = socket(PF_INET, SOCK_STREAM, 0);
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = inet_addr(argv[1]);
serv_addr.sin_port = htons(atoi(argv[2]));
if (connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) == -1)
error_handling("connect() error");
pthread_create(&snd_thread, NULL, send_msg, (void *)&sock); //创建发送消息线程
pthread_create(&rcv_thread, NULL, recv_msg, (void *)&sock); //创建接受消息线程
pthread_join(snd_thread, NULL);
pthread_join(rcv_thread, NULL);
close(sock);
return 0;
}
void *send_msg(void *arg) // 发送消息
{
int sock = *((int *)arg);
char name_msg[NAME_SIZE + BUF_SIZE];
while (1)
{
fgets(msg, BUF_SIZE, stdin);
if (!strcmp(msg, "q\n") || !strcmp(msg, "Q\n"))
{
close(sock);
exit(0);
}
sprintf(name_msg, "%s %s", name, msg);
write(sock, name_msg, strlen(name_msg));
}
return NULL;
}
void *recv_msg(void *arg) // 读取消息
{
int sock = *((int *)arg);
char name_msg[NAME_SIZE + BUF_SIZE];
int str_len;
while (1)
{
str_len = read(sock, name_msg, NAME_SIZE + BUF_SIZE - 1);
if (str_len == -1)
return (void *)-1;
name_msg[str_len] = 0;
fputs(name_msg, stdout);
}
return NULL;
}
void error_handling(char *msg)
{
fputs(msg, stderr);
fputc('\n', stderr);
exit(1);
}
编译运行:
gcc chat_server.c -D_REENTRANT -o cserv -lpthread
gcc chat_clnt.c -D_REENTRANT -o cclnt -lpthread
./cserv 9191
./cclnt 127.0.0.1 9191 lilei
./cclnt 127.0.0.1 9191 hanmeimei
运行结果:
为了能更清楚了解线程的运行,我在两个while循环中加了printf语句,来看调用的时机:
while (1)
{
printf("hello1 \n");
str_len = read(sock, name_msg, NAME_SIZE + BUF_SIZE - 1);
if (str_len == -1)
return (void *)-1;
name_msg[str_len] = 0;
fputs(name_msg, stdout);
}
while (1)
{
printf("hello2 \n");
str_len = read(sock, name_msg, NAME_SIZE + BUF_SIZE - 1);
if (str_len == -1)
return (void *)-1;
name_msg[str_len] = 0;
fputs(name_msg, stdout);
}
这里打印了hello1和hello2之后就阻塞了,send_msg是因为fgets阻塞了,recv_msg是因为没有读到来自于服务器的信息,所以二者都阻塞了。
输入一个good之后就可以解除两个阻塞了,然后通过recv_msg打印结果。之后就会进行新的阻塞。如果将recv_msg的read语句删除,那么将会一直打印hello2,但是此时的send_msg是阻塞的,这也证明了一个线程阻塞不会影响其他线程的工作
。
但是将pthread_join换成pthread_detach的话,程序会直接结束。按理来说将线程分离,也是可以正常运行的,这里还没想明白,先放这儿记录一下。
以上就是一个简单的基于多线程服务器的例子。
18.6 习题
第24章 制作HTTP服务器端
24.1 HTTP概要
本章以编写真实应用程序为目标,在所学理论知识的基础上,编写HTTP (Hypertext TransferProtocol,超文本传输协议)服务器端,即Web服务器端。
24.1.1 理解Web服务器端
Web服务器端是以HTTP协议为基础传输超文本的服务器端,从结果上来看,实现该协议相当于实现Web服务器端。
24.1.2 HTTP
下面详细讨论HTTP协议。虽然它相对简单,但要完全驾驭也并非易事。接下来只介绍编写Web服务器端时的必要内容。
无状态的Stateless协议
为了在网络环境下同时向大量客户提供服务,HTTP协议的请求及响应方式设计如图所示:
从上图可以看到,服务器端响应客户端请求后立即断开连接。换言之,服务器端不会维持客户端状态。即使同一客户端再次发送请求,服务器端也无法辨认出是原先那个,而会以相同方式处理新请求。因此,HTTP又称“无状态的Stateless协议”。
Cookie 和 Session
为了弥补 HTTP无法保持连接的缺点,Web 编程中通常会使用Cookie 和Session 技术。
请求消息(Request Message)的结构
下面介绍客户端向服务器端发送的请求消息的结构。Web服务器端需要解析并响应客户端请求,客户端和服务器端的数据请求方式标准如下图所示:
从图中可以看出,请求消息可以分为请求头、消息头、消息体 3 个部分。其中,请求行含有请求方式(请求目的)信息。典型的请求方式有 GET 和 POST ,GET 主要用于请求数据,POST 主要用于传输数据。为了降低复杂度,我们实现只能响应 GET 请求的 Web 服务器端,下面解释图中的请求行信息。其中「GET/index.html HTTP/1.1」 具有如下含义:
请求(GET)index.html文件,希望以1.1版本的HTTP协议进行通信
请求行只能通过 1 行(line)发送,因此,服务器端很容易从 HTTP 请求中提取第一行,并分别分析请求行中的信息。
请求行下面的消息头中包含发送请求的(将要接收响应信息的)浏览器信息、用户认证信息等关于HTTP消息的附加信息。最后的消息体中装有客户端向服务器端传输的数据,为了装入数据,需要以POST方式发送请求。但我们的目标是实现GET方式的服务器端,所以可以忽略这部分内容。另外,消息体和消息头之间以空行分开,因此不会发生边界问题。
响应消息(Response Message)的结构
下面介绍Web服务器端向客户端传递的响应信息的结构。从下图可以看到,该响应消息由状态行、头信息、消息体等3个部分构成。状态行中含有关于请求的状态信息,这是其与请求消息相比最为显著的区别。
从上图可以看到,第一个字符串状态行中含有关于客户端请求的处理结果。例如,客户端请求index.html文件时,表示index.html文件是否存在、服务器端是否发生问题而无法响应等不同情况的信息将写入状态行。图中的“HTTP/1.1 200 OK”具有如下含义:
我想用HTTP1.1版本进行响应,你的请求已正确处理(200 OK)
表示“客户端请求的执行结果”的数字称为状态码,典型的有以下几种:
- 200 OK : 成功处理了请求
- 404 Not Found : 请求的文件不存在
- 400 Bad Request : 请求方式错误,请检查
消息头中含有传输的数据类型和长度等信息。图中的消息头含有如下信息:
服务端名为SimpleWebServer,传输的数据类型为 text/html。数据长度不超过 2048 个字节。
最后插入一个空行后,通过消息体发送客户端请求的文件数据。以上就是实现 Web 服务端过程中必要的 HTTP 协议。
24.2 实现简单的Web服务器端
24.2.1 实现基于Windows的多线程Web服务器端
暂略
24.2.2 实现基于Linux的多线程Web服务器端
实现该服务器端使用标准I/O函数:
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <pthread.h>
#define BUF_SIZE 1024
#define SMALL_BUF 100
void* request_handler(void* arg);
void send_data(FILE* fp, char* ct, char* file_name);
char* content_type(char* file);
void send_error(FILE* fp);
void error_handling(char* message);
int main(int argc, char* argv[]) {
int serv_sock, clnt_sock;
struct sockaddr_in serv_adr, clnt_adr;
int clnt_adr_sz;
char buf[BUF_SIZE];
pthread_t t_id;
if (argc != 2) {
printf("Usage : %s <Port>\n", argv[0]);
exit(1);
}
serv_sock = socket(PF_INET, SOCK_STREAM, 0);
if (serv_sock == -1) error_handling("sock() error");
memset(&serv_adr, 0, sizeof(serv_adr));
serv_adr.sin_family = AF_INET;
serv_adr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_adr.sin_port = htons(atoi(argv[1]));
if (bind(serv_sock, (struct sockaddr*)&serv_adr, sizeof(serv_adr)) == -1)
error_handling("bind() error");
if (listen(serv_sock, 20) == -1)
error_handling("listen() error");
while (1) {
clnt_adr_sz = sizeof(clnt_adr);
clnt_sock = accept(serv_sock, (struct sockaddr*)&clnt_adr, &clnt_adr_sz);
printf("Connection Request : %s:%d\n",
inet_ntoa(clnt_adr.sin_addr), ntohs(clnt_adr.sin_port));
pthread_create(&t_id, NULL, request_handler, &clnt_sock);
pthread_detach(t_id);
}
close(serv_sock);
return 0;
}
void* request_handler(void* arg) {
int clnt_sock = *((int*)arg);
char req_line[SMALL_BUF];
FILE* clnt_read;
FILE* clnt_write;
char method[10];
char ct[15];
char file_name[30];
// printf("hello world");
clnt_read = fdopen(clnt_sock, "r");
clnt_write = fdopen(dup(clnt_sock), "w"); // dup表示复制一个文件描述符,这里想用标准I/O函数来进行传输
fgets(req_line, SMALL_BUF, clnt_read); // 得到请求信息
if (strstr(req_line, "HTTP/") == NULL) { // strstr:查找字串
send_error(clnt_write);
fclose(clnt_read);
fclose(clnt_write);
return NULL;
}
int name_len = strlen(req_line);
for (int i = 0; i < name_len; i++) {
printf("%c", req_line[i]);
}
printf("\n");
strcpy(method, strtok(req_line, " /")); // 通过" /"分割req_line,并将其存在method中
strcpy(file_name, strtok(NULL, " /")); // 传递NULL,就是从刚刚的分割位置,继续向后找" /"进行分割,并将其存在file_name中
strcpy(ct, content_type(file_name)); // 文本的类型
if (strcmp(method, "GET") != 0) { // 如果不是请求消息
send_error(clnt_write);
fclose(clnt_read);
fclose(clnt_write);
return NULL;
}
fclose(clnt_read);
send_data(clnt_write, ct, file_name);
}
void send_data(FILE* fp, char* ct, char* file_name) {
char protocol[] = "HTTP/1.0 200 OK\r\n"; // 虽然在LInux中字符串的结尾是"\n",但是在HTTP协议中,行结尾应该使用"\r\n"作为标准
char server[] = "Server:Linux Web Server \r\n";
char cnt_len[] = "Content-length:2048\r\n";
char cnt_type[SMALL_BUF];
char buf[BUF_SIZE];
FILE* send_file;
sprintf(cnt_type, "Content-type:%s\r\n\r\n", ct); // 这里有两个\r\n前面一个是参数值和后面的数据进行分割,后面的是分割消息头部分和正文部分的,这部分之间通常有一行空行
send_file = fopen(file_name, "r");
if (send_file == NULL) {
printf("here\n");
send_error(fp);
return;
}
// 传输头信息
fputs(protocol, fp);
fputs(server, fp);
fputs(cnt_len, fp);
fputs(cnt_type, fp);
// 传输请求数据
while (fgets(buf, BUF_SIZE, send_file) != NULL) {
fputs(buf, fp);
fflush(fp);
}
fflush(fp);
fclose(fp);
}
char* content_type(char* file) {
char extension[SMALL_BUF];
char file_name[SMALL_BUF];
strcpy(file_name, file);
strtok(file_name, ".");
strcpy(extension, strtok(NULL, ".")); // 文件扩展名,这里是.html
if(strcmp(extension, "html") || strcmp(extension, "htm"))
return "text/html";
else
return "text/plain";
}
void send_error(FILE* fp) {
char protocol[] = "HTTP/1.0 400 Bad Request\r\n";
char server[] = "Server:Linux Web Server \r\n";
char cnt_len[] = "Content-length:2048\r\n";
char cnt_type[] = "Content-type:text/html\r\n\r\n";
char content[] = "<html><head><title>NETWORK</title></head>"
"<body><font size=+5><br>发生错误! 查看请求文件名和请求方式!"
"</font></body></html>";
fputs(protocol, fp);
fputs(server, fp);
fputs(cnt_len, fp);
fputs(cnt_type, fp);
fflush(fp);
}
void error_handling(char* message) {
fputs(message, stderr);
fputc('\n', stderr);
exit(1);
}
下面是代码的一些说明:主要是strtok
函数的说明:
char *strtok(char *str, const char *delim);
// 该函数返回被分解的第一个子字符串,如果没有可检索的字符串,则返回一个空指针
// str:要被分解成一组小字符串的字符串
// delim:包含分隔符的 C 字符串
注意是包含分隔符的C字符串,只要需要的包含在里面就可以了。如果str传入的值为NULL,那就从上一次分割的地方开始下一次分割。
上面的代码中:
strcpy(method, strtok(req_line, " /")); // 通过" /"分割req_line,并将其存在method中
strcpy(file_name, strtok(NULL, " /")); // 传递NULL,就是从刚刚的分割位置,继续向后找" /"进行分割,并将其存在file_name中
我们知道,req_line为请求第一行,为:"GET /index.html HTTP/1.1"
我们使用" /"分割,所以method就是GET,然后传入的参数是NULL,代表从这个点继续往后分割,遇到了空格就开始分割,因为空格包含在了” /“中,所以file_name就是index.html。
编译运行:
gcc webserv_linux.c -D_REENTRANT -o serv -lpthread
./web_serv 9190
结果:
但是存在一个问题,如果请求index2,不会打印错误信息,会一直转圈。这里我将源码也做了测试,还是会出现这样的问题,暂时没搞明白。文章来源地址https://www.toymoban.com/news/detail-437888.html
24.3 习题
到了这里,关于TCP/IP网络编程(三)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!