c++生产者消费者者模式学习笔记-2内存积压
生产者消费者模式是并发编程的核心模式之一,核心是想要提高程序的运行效率。
这里记录一下自己的思考,使用通俗的语言,和以日志记录为例,解读生产者消费者模式,并实现生产者消费者模式。
将生产者消费者模式的核心内容划分为三个问题:阻塞问题、内存积压问题、cpu空转问题。
这里是第二章,内存积压问题。
速度不一致问题
异步解耦合解决了生产者和消费者相互阻塞的问题,能有效提高程序的运行效率。
此时消费者和生产者的分离,理想情况是两者速度一致,生产者生产一份数据,消费者就进行消费,程序处于一个动态平衡状态。
但是实际运行中,生产者和消费者的速度是不一致的,
当生产者生产速度大于消费者消费速度时,会导致数据积压,当数据过多时,还会导致内存溢出。
日志场景
在日志记录的案例中,生产者和消费者的速度是明显不一致的
因为日志生产在cpu和内存中,而日志消费在io硬盘中,cpu和内存的速度远大于io硬盘的速度。
代码实现
以第一章中的异步解耦合日志系统为例。
void async_log() { std::cout<<"异步解耦日志系统,不再相互阻塞"<<std::endl; std::ofstream log_file; std::string log_path="log2.txt"; // 缓存 std::deque<std::string> log_buffer; // 停止标志 std::atomic<bool> stop_flag(false); // 加锁,避免数据竞争 std::mutex log_mutex; // cpu内存处理数据 auto log_data_func=[&log_buffer,&stop_flag,&log_mutex](){ int count=1; std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now(); while(true) { // std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now(); //创建数据 std::string large_data(2048, 'x'); // 4KB 数据 // std::this_thread::sleep_for(std::chrono::milliseconds(100)); //时间记录 auto time = std::chrono::system_clock::now(); auto time_t = std::chrono::system_clock::to_time_t(time); std::string content_time=std::ctime(&time_t); // std::cout<<"生成日志:"<<content_time<<std::endl; std::string content="["+content_time+"] "+large_data; //写入缓存 std::unique_lock<std::mutex> lock(log_mutex); log_buffer.push_back(content); lock.unlock(); // std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now(); // std::cout<<"处理业务耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count()<<"微秒"<<std::endl<<std::endl; count++; if(count==10) { std::chrono::high_resolution_clock::time_point t_10 = std::chrono::high_resolution_clock::now(); std::cout<<"生成10条日志耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t_10-start).count()<<"ms"<<std::endl; } if (stop_flag) { break; } }; std::cout<<"生成日志结束"<<std::endl; std::cout<<"总共生成数据量:"<<count<<"条"<<std::endl; }; std::thread thread_log_data(log_data_func); // 日志写入磁盘 auto log_disk_func=[&log_buffer,&log_file,&log_path,&stop_flag,&log_mutex](){ //计时开始 std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now(); int count=1; while(true) { //判断退出循环 if (count>10) { break; } if(!log_buffer.empty()) { std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now(); //打开文件 log_file.open(log_path,std::ios::app); //写日志 std::unique_lock<std::mutex> lock(log_mutex); auto data=log_buffer.front(); log_buffer.pop_front(); lock.unlock(); log_file<<data<<std::endl; //关闭文件 log_file.close(); std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now(); // std::cout<<"写日志耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count()<<"微秒"<<std::endl; count++; } } //通知生产进程结束 stop_flag = true; //计时结束 std::chrono::high_resolution_clock::time_point end = std::chrono::high_resolution_clock::now(); std::cout<<"io耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(end-start).count()<<"ms"<<std::endl; }; std::thread thread_log_disk(log_disk_func); thread_log_data.join(); thread_log_disk.join(); } int main() { // 同步阻塞问题 std::cout<<"----------------"<<std::endl; async_log(); return 0; }运行结果:
---------------- 异步解耦日志系统,不再相互阻塞 生成10条日志耗时:46ms io耗时:3864ms 生成日志结束 总共生成数据量:600条结果分析:
可以看到,生成10条日志耗时46ms,而写日志耗时3864ms,说明io硬盘的写入速度远小于cpu和内存的速度。
当10条日志写入磁盘完成后,内存中生成的日志数据量已经达到了600条,说明内存中已经积压了大量的日志数据。
解决内存积压
内存积压的核心是生产者的速度大于消费者的速度,这个受算法的影响一般不能直接修改,但是次一级的原因是分配的共享内存没有进行限制。
所以,我们可以对共享内存进行限制,当共享内存达到一定大小的时候,生产者需要等待消费者消费完共享内存中的数据。
代码实现:
voidasync_log_buffer_size_question(){std::cout<<"异步解耦日志系统,io硬盘不再阻塞,缓存积压"<<std::endl;std::ofstream log_file;std::string log_path="log2.txt";// 缓存std::deque<std::string>log_buffer;// 资源上锁std::mutex log_mutex;// 停止标志std::atomic<bool>stop_flag(false);// 生成数据autolog_data_func=[&log_buffer,&log_mutex,&stop_flag](){intcount=1;std::chrono::high_resolution_clock::time_point start=std::chrono::high_resolution_clock::now();while(true){// std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();//创建数据std::stringlarge_data(2048,'x');// 4KB 数据//时间记录autotime=std::chrono::system_clock::now();autotime_t=std::chrono::system_clock::to_time_t(time);std::string content_time=std::ctime(&time_t);// std::cout<<"生成日志:"<<content_time<<std::endl;std::string content="["+content_time+"] "+large_data;//写入缓存std::unique_lock<std::mutex>lock(log_mutex);log_buffer.push_back(content);lock.unlock();// std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now();// std::cout<<"处理业务耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count()<<"微秒"<<std::endl<<std::endl;count++;if(stop_flag){break;}if(count==10){std::chrono::high_resolution_clock::time_point t_10=std::chrono::high_resolution_clock::now();std::cout<<"生成10条日志耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t_10-start).count()<<"ms"<<std::endl;}};};std::threadthread_log_data(log_data_func);// 日志写入磁盘autolog_disk_func=[&log_buffer,&log_file,&log_path,&log_mutex,&stop_flag](){//计时开始std::chrono::high_resolution_clock::time_point start=std::chrono::high_resolution_clock::now();intcount=1;while(true){//判断退出循环if(count>10){break;}if(!log_buffer.empty()){// std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();//打开文件log_file.open(log_path,std::ios::app);//写日志std::unique_lock<std::mutex>lock(log_mutex);autodata=log_buffer.front();log_buffer.pop_front();lock.unlock();log_file<<data<<std::endl;//关闭文件log_file.close();// std::this_thread::sleep_for(std::chrono::milliseconds(100));std::chrono::high_resolution_clock::time_point t2=std::chrono::high_resolution_clock::now();// std::cout<<"写日志耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count()<<"微秒"<<std::endl;count++;}}//关闭生产者stop_flag=true;//计时结束std::chrono::high_resolution_clock::time_point end=std::chrono::high_resolution_clock::now();std::cout<<"io耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(end-start).count()<<"ms"<<std::endl;//缓存积压std::cout<<"缓存积压:"<<log_buffer.size()<<std::endl;};std::threadthread_log_disk(log_disk_func);thread_log_data.join();thread_log_disk.join();}voidasync_log_buffer_size_solution(){std::cout<<"异步解耦日志系统,io硬盘不再阻塞,缓存积压"<<std::endl;std::cout<<"限制缓存,缓存尺寸10"<<std::endl;std::ofstream log_file;std::string log_path="log2.txt";// 缓存尺寸intbuffer_size=10;// 缓存std::deque<std::string>log_buffer;// 资源上锁std::mutex log_mutex;// 停止标志std::atomic<bool>stop_flag(false);// cpu内存处理数据autolog_data_func=[&log_buffer,&log_mutex,&buffer_size,&stop_flag](){intcount=1;std::chrono::high_resolution_clock::time_point start=std::chrono::high_resolution_clock::now();while(true){if(log_buffer.size()<buffer_size){// std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();//创建数据std::stringlarge_data(2048,'x');// 4KB 数据//时间记录autotime=std::chrono::system_clock::now();autotime_t=std::chrono::system_clock::to_time_t(time);std::string content_time=std::ctime(&time_t);// std::cout<<"生成日志:"<<content_time<<std::endl;std::string content="["+content_time+"] "+large_data;//写入缓存std::unique_lock<std::mutex>lock(log_mutex);log_buffer.push_back(content);lock.unlock();// std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now();// std::cout<<"处理业务耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count()<<"微秒"<<std::endl<<std::endl;count++;}if(stop_flag){break;}if(count==10){std::chrono::high_resolution_clock::time_point t_10=std::chrono::high_resolution_clock::now();std::cout<<"生成10条日志耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t_10-start).count()<<"ms"<<std::endl;}};};std::threadthread_log_data(log_data_func);// 日志写入磁盘autolog_disk_func=[&log_buffer,&log_file,&log_path,&log_mutex,&stop_flag](){//计时开始std::chrono::high_resolution_clock::time_point start=std::chrono::high_resolution_clock::now();intcount=1;while(true){//判断退出循环if(count>10){break;}if(!log_buffer.empty()){std::chrono::high_resolution_clock::time_point t1=std::chrono::high_resolution_clock::now();//打开文件log_file.open(log_path,std::ios::app);//写日志std::unique_lock<std::mutex>lock(log_mutex);autodata=log_buffer.front();log_buffer.pop_front();lock.unlock();log_file<<data<<std::endl;//关闭文件log_file.close();// std::this_thread::sleep_for(std::chrono::milliseconds(100));std::chrono::high_resolution_clock::time_point t2=std::chrono::high_resolution_clock::now();// std::cout<<"写日志耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count()<<"微秒"<<std::endl;count++;}}//通知生产者stop_flag=true;//计时结束std::chrono::high_resolution_clock::time_point end=std::chrono::high_resolution_clock::now();std::cout<<"io耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(end-start).count()<<"ms"<<std::endl;//缓存积压std::cout<<"缓存积压:"<<log_buffer.size()<<std::endl;};std::threadthread_log_disk(log_disk_func);thread_log_data.join();thread_log_disk.join();}intmain(){// 缓存积压问题std::cout<<"----------------"<<std::endl;async_log_buffer_size_question();std::cout<<"----------------"<<std::endl;async_log_buffer_size_solution();return0;}结果:
---------------- 异步解耦日志系统,异步解耦合,缓存积压 生成10条日志耗时:587ms io耗时:3489ms 缓存积压:514 ---------------- 异步解耦日志系统,异步解耦合,缓存积压解决 限制缓存,缓存尺寸10 生成10条日志耗时:46ms io耗时:3122ms 缓存积压:10结果分析:
限制共享内存和没限制内存,两者记录相同数量的日志,消耗时间基本一致
没有限制内存,会生成514条数据在缓冲区,每条数据2kB,内存占用1MB左右,如果生成时间拉长,会造成更大的内存占用,直到内存溢出。
限制内存,最多生成10条数据在缓冲区,每条数据2kB,内存占用20kB左右,不会出现内存占用堆积的问题。
结论:
在异步解耦日志系统中,通过限制共享内存的大小,成功防止内存大量占用。
