Linux使用匿名管道实现进程池得以高效通信

🎬慕斯主页:修仙—别有洞天
♈️今日夜电波:Nonsense—Sabrina Carpenter
0:50━━━━━━️💟──────── 2:43
🔄 ◀️ ⏸ ▶️ ☰
💗关注👍点赞🙌收藏您的每一次鼓励都是对我莫大的支持😍
目录
思路梳理
匿名管道知识回忆
匿名管道实现进程池思路
池化技术怎么提高效率?
具体实现
进程以及管道的创建操作
分发任务操作
回收资源操作
解决上述所提到Bug
总体代码及代码效果
Makefile
Task.hpp
mulpipe.cpp
实现效果
思路梳理
匿名管道知识回忆
上一篇的文章中,详细介绍了管道的知识点,下面还是复习一下对于匿名管道相关的知识点。如下是匿名管道的一个示例图,这表现了两个“有血缘关系”的两个进程之间通过匿名管道进行通信的过程,我们通过控制struct files *fd_array[]中读或者写的struct files的开关来实现两个进程间的通信:

我们主要通过如下的接口创建匿名管道来实现以上的匿名管道通信:
#include 功能:创建一无名管道 原型 int pipe(int fd[2]); 参数 fd:文件描述符数组,其中fd[0]表示读端, fd[1]表示写端 返回值:成功返回0,失败返回错误代码
匿名管道实现进程池思路
说大白话(●—●):我们使用一个父进程创建很多的管道,再创建对应数量的进程,然后这些管道分别与其他的进程进行直接的连接。这样我们就提前创建和联系好了一定数量的进程。我们想让父进程向其中一个子进程发消息就可通过选择管道直接发消息,而如果不发消息其它进程只会等待。通过信息的发送,我们可以就通过选择管道从而让子进程分别执行对应的任务。大致的图解如下:

池化技术怎么提高效率?
池化技术通过资源共享和优化来提高效率,具体表现在以下几个方面:
- 资源复用:池化技术通过重用已创建的资源,减少了频繁创建和销毁资源的开销。例如,线程池中的线程可以被多个任务重复使用,这样可以避免每次任务执行时都创建新线程的开销。
- 减少等待时间:池化技术可以减少请求的等待时间。因为资源是预先分配好的,当有新的请求到来时,可以立即使用池中的资源,而不需要等待资源的创建过程。
- 提高响应速度:由于资源已经准备好,池化技术可以快速响应请求,提高了处理速度。这对于需要快速响应的系统来说尤其重要。
- 统一管理:池化技术提供了对资源的集中管理,这有助于监控系统资源的使用情况,及时回收不再使用的资源,避免资源浪费。
- 优化性能:在大数据处理等场景中,池化技术可以通过合并多个请求来减少数据处理的时间和空间复杂度,从而提高数据处理的性能。
具体实现
进程以及管道的创建操作
创建一个channel类用来来存储管道的读文件描述符ctrlfd以及进程描述符workerid,完成初始化操作以及后续的销毁操作。
static int number = 1;//标识对应的进程和管道
class channel
{
public:
channel(int fd, pid_t id) : ctrlfd(fd), workerid(id)
{
name = "channel-" + std::to_string(number++);
}
public:
int ctrlfd;
pid_t workerid;
std::string name;
};
再根据先描述在组织的原则,我们在主函数使用一个std::vector channels来管理上面的结构体,可以根据需求进行增删查改等等操作。在完成这些预备操作后,创建对应数量的进程以及管道。
特别注意:如下函数中的std::vector old;以及如下代码是为了解决父子进程继承而产生的一些bug,这个将在最后解释:
std::vector old;
if(!old.empty())
{
for(auto fd : old)
{
close(fd);
}
PrintFd(old);
}
old.push_back(pipefd[1]);
如下函数的操作为创建管道以及进程,可以先忽略上述所提到的解决bug的代码,通过循环创建对应的子进程、关闭父子进程对应的读写文件,并且存储到vector *c(也就是主函数的中channels),下面的work()函数为子进程要做的工作,可以理解了后面的分发任务操作再来理解。
const int num = 5;//全局定义创建管道以及进程数
void CreateChannels(std::vector *c)
{
std::vector old;
for (int i = 0; i push_back(channel(pipefd[1], id));
old.push_back(pipefd[1]);
// childid, pipefd[1]
}
}
因为前面我们已经重定向了管道的写入作为子进程的写入,接下来通过read就会读取对应的操作,需要注意的是:我们是通过一个int类型的变量来控制要完成的任务,后续再task.hpp中会定义对应要完成的任务。通过read的返回值来判断是要执行任务还是退出,我们在管道的知识中知道,read会等待write,也就是等待数据的输入。当写进程退出,读进程会跟着退出,我们根据以上特性来判断是要执行任务还是等待任务还是退出进程:
void Work()
{
while (true)
{
int code = 0;
ssize_t n = read(0, &code, sizeof(code));
if (n == sizeof(code))
{
if (!init.CheckSafe(code))
continue;
init.RunTask(code);
}
else if (n == 0)
{
break;
}
else
{
// do nothing
}
}
std::cout << "child quit" << std::endl;
}
分发任务操作
当我们创建好管道以及进程后,可以注意到接下来的都是没有经过 if (id == 0) 控制下的程序,也就是说接下来的程序都是父进程运行的程序,因此我们就可以让父进程来分配任务使得子进程完成任务。接下来,我们创建一个task.hpp来模拟创建的任务以及对应的封装、任务分发等等操作如下:
#pragma once
#include
#include
#include
#include
#include
#include
// using task_t = std::function;
typedef std::function task_t;
void Download()
{
std::cout << "我是一个下载任务"
<< " 处理者: " << getpid() << std::endl;
}
void PrintLog()
{
std::cout << "我是一个打印日志的任务"
<< " 处理者: " << getpid() << std::endl;
}
void PushVideoStream()
{
std::cout << "这是一个推送视频流的任务"
<< " 处理者: " << getpid() << std::endl;
}
// void ProcessExit()
// {
// exit(0);
// }
class Init
{
public:
// 任务码
const static int g_download_code = 0;
const static int g_printlog_code = 1;
const static int g_push_videostream_code = 2;
// 任务集合
std::vector tasks;
public:
Init()
{
tasks.push_back(Download);
tasks.push_back(PrintLog);
tasks.push_back(PushVideoStream);
srand(time(nullptr) ^ getpid());
}
bool CheckSafe(int code)
{
if (code >= 0 && code < tasks.size())
return true;
else
return false;
}
void RunTask(int code)
{
return tasks[code]();
}
int SelectTask()
{
return rand() % tasks.size();
}
std::string ToDesc(int code)
{
switch (code)
{
case g_download_code:
return "Download";
case g_printlog_code:
return "PrintLog";
case g_push_videostream_code:
return "PushVideoStream";
default:
return "Unknow";
}
}
};
Init init; // 定义对象
传入主函数用于管理的channels,flag用于控制进程执行完任务后是否需要退出(1表示要退出,0表示不退出),num为要执行任务的次数,需要注意的是num是要在flag为1的前提下才能有效的,如果不传则程序只会执行一次。在选择完任务以及进程后,通过write来向指定的管道写入。具体读写操作可看:Linux进程间通信(IPC)机制之一:管道(Pipes)详解:匿名管道的特性与情况
void SendCommand(const std::vector &c, bool flag, int num = -1)
{
int pos = 0;
while (true)
{
// 1. 选择任务
int command = init.SelectTask();
// 2. 选择信道(进程)
const auto &channel = c[pos++];
pos %= c.size();
// debug
std::cout << "send command " << init.ToDesc(command) << "[" << command << "]"
<< " in "
<< channel.name << " worker is : " << channel.workerid << std::endl;
// 3. 发送任务
write(channel.ctrlfd, &command, sizeof(command));
// 4. 判断是否要退出
if (!flag)
{
num--;
if (num <= 0)
break;
}
sleep(1);
}
std::cout << "SendCommand done..." << std::endl;
}
回收资源操作
通过close关闭对应的管道,waitpid等待子进程的结束。
const int num = 5;//全局定义创建管道以及进程数
void ReleaseChannels(std::vector c)
{
// version 2
// int num = c.size() - 1;
// for (; num >= 0; num--)
// {
// close(c[num].ctrlfd);
// waitpid(c[num].workerid, nullptr, 0);
// }
// version 1
for (const auto &channel : c)
{
close(channel.ctrlfd);
waitpid(channel.workerid, nullptr, 0);
}
// for (const auto &channel : c)
// {
// pid_t rid = waitpid(channel.workerid, nullptr, 0);
// if (rid == channel.workerid)
// {
// std::cout << "wait child: " << channel.workerid << " success" << std::endl;
// }
// }
}
解决上述所提到Bug
如下代码:
std::vector old;
if(!old.empty())
{
for(auto fd : old)
{
close(fd);
}
PrintFd(old);
}
old.push_back(pipefd[1]);
这是个什么Bug呢?如果不加上上这段代码,那么关闭进程及管道就必须从最后面生成的进程和管道向前关闭。为啥呢?这是因为当我们父进程依次创建子进程,其中的对于管道的读写操作struct file也被继承了下来,也就是说有着上一个生成的子进程会被下一个子进程的写操作struct file指向,而下下个生成的子进程会指向前面两个子进程,以此类推…因此,我们需要在创建的时候关闭新创建子进程对应的写操作。
具体的子进程继承写操作的struct file例子如下:
总体代码及代码效果
Makefile
processpool:mulpipe.cpp g++ -o $@ $^ -std=c++11.PHONY:cleanclean: rm -f processpool
Task.hpp
#pragma once#include #include #include #include #include #include // using task_t = std::function;typedef std::function task_t;void Download(){ std::cout << "我是一个下载任务" << " 处理者: " << getpid() << std::endl;}void PrintLog(){ std::cout << "我是一个打印日志的任务" << " 处理者: " << getpid() << std::endl;}void PushVideoStream(){ std::cout << "这是一个推送视频流的任务" << " 处理者: " << getpid() << std::endl;}// void ProcessExit()// {// exit(0);// }class Init{public: // 任务码 const static int g_download_code = 0; const static int g_printlog_code = 1; const static int g_push_videostream_code = 2; // 任务集合 std::vector tasks;public: Init() { tasks.push_back(Download); tasks.push_back(PrintLog); tasks.push_back(PushVideoStream); srand(time(nullptr) ^ getpid()); } bool CheckSafe(int code) { if (code >= 0 && code < tasks.size()) return true; else return false; } void RunTask(int code) { return tasks[code](); } int SelectTask() { return rand() % tasks.size(); } std::string ToDesc(int code) { switch (code) { case g_download_code: return "Download"; case g_printlog_code: return "PrintLog"; case g_push_videostream_code: return "PushVideoStream"; default: return "Unknow"; } }};Init init; // 定义对象mulpipe.cpp
#include #include #include #include #include #include #include #include "Task.hpp"const int num = 5;static int number = 1;class channel{public: channel(int fd, pid_t id) : ctrlfd(fd), workerid(id) { name = "channel-" + std::to_string(number++); }public: int ctrlfd; pid_t workerid; std::string name;};void Work(){ while (true) { int code = 0; ssize_t n = read(0, &code, sizeof(code)); if (n == sizeof(code)) { if (!init.CheckSafe(code)) continue; init.RunTask(code); } else if (n == 0) { break; } else { // do nothing } } std::cout << "child quit" << std::endl;}void PrintFd(const std::vector &fds){ std::cout << getpid() << " close fds: "; for(auto fd : fds) { std::cout << fd << " "; } std::cout << std::endl;}// 传参形式:// 1. 输入参数:const &// 2. 输出参数:*// 3. 输入输出参数:&void CreateChannels(std::vector *c){ // bug std::vector old; for (int i = 0; i push_back(channel(pipefd[1], id)); old.push_back(pipefd[1]); // childid, pipefd[1] }}void PrintDebug(const std::vector &c){ for (const auto &channel : c) { std::cout << channel.name << ", " << channel.ctrlfd << ", " << channel.workerid << std::endl; }}void SendCommand(const std::vector &c, bool flag, int num = -1){ int pos = 0; while (true) { // 1. 选择任务 int command = init.SelectTask(); // 2. 选择信道(进程) const auto &channel = c[pos++]; pos %= c.size(); // debug std::cout << "send command " << init.ToDesc(command) << "[" << command << "]" << " in " << channel.name << " worker is : " << channel.workerid << std::endl; // 3. 发送任务 write(channel.ctrlfd, &command, sizeof(command)); // 4. 判断是否要退出 if (!flag) { num--; if (num <= 0) break; } sleep(1); } std::cout << "SendCommand done..." << std::endl;}void ReleaseChannels(std::vector c){ // version 2 // int num = c.size() - 1; // for (; num >= 0; num--) // { // close(c[num].ctrlfd); // waitpid(c[num].workerid, nullptr, 0); // } // version 1 for (const auto &channel : c) { close(channel.ctrlfd); waitpid(channel.workerid, nullptr, 0); } // for (const auto &channel : c) // { // pid_t rid = waitpid(channel.workerid, nullptr, 0); // if (rid == channel.workerid) // { // std::cout << "wait child: " << channel.workerid << " success" << std::endl; // } // }}int main(){ std::vector channels; // 1. 创建信道,创建进程 CreateChannels(&channels); // 2. 开始发送任务 const bool g_always_loop = true; // SendCommand(channels, g_always_loop); SendCommand(channels, !g_always_loop, 10); // 3. 回收资源,想让子进程退出,并且释放管道,只要关闭写端 ReleaseChannels(channels); return 0;}实现效果

感谢你耐心的看到这里ღ( ´・ᴗ・` )比心,如有哪里有错误请踢一脚作者o(╥﹏╥)o!

给个三连再走嘛~
本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://net2asp.com/8531dd34f3.html
