c++多线程按行读取同一个每行长度不规则文件

这篇具有很好参考价值的文章主要介绍了c++多线程按行读取同一个每行长度不规则文件。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

对于非常大的比如上百G的大文件读取,单线程读是非常非常慢的,需要考虑用多线程读,多个线程读同一个文件时不用加锁的,每个线程打开一个独立的文件句柄

多线程读同一个文件实现思路

思路1

  1. 先打开一个文件句柄,获取整个文件大小file_size
  2. 确定要采用线程读取的部分大小read_size和多线程的个数thread_num,算出平均每个线程要读取的大小为read_size/thread_num=each_size
  3. 计算出每个线程读取的位置start_pos和它下一个线程的读取位置next_pos
  4. 对于每个线程来说,读取时的情况可以有如下种情况:
  • start_pos等于0(整个文件都采用多线程读取),这种情况下直接用getline读取,直到读取某一行后读取指针位置超过next_pos
  • start_pos>0, 读取当前位置所在的字符,如果字符恰好为\n,则直接用getline读取,直到读取某一行后读取指针位置超过next_pos
  • start_pos>0, 读取当前位置所在的字符,如果字符不为\n,则先用getline读取一行,假设读取这行后新的位置为cur_pos,如果cur_pos >= next_pos则这个线程直接退出,不读取任何数据,因为这个线程的下一个线程会和它读取同一行,这一行的内容应该有下一个线程读取; 如果cur_pos < next_pos则当前读取的这一行直接丢弃(因为这一行交给了上一个线程来读取), 直接从下一行开始用getline读取,直到读取某一行后读取指针位置超过next_pos
  • 最后代码还要计算剩下的部分,因为文件大小read_size不一定能整除线程个数thread_num,剩下的部分应该全部交给主线程来读

这个思路实现起来容易出bug,需要保证每一个线程至少能读取一个完整的行

源码实现

可能有bug,但是功能基本实现

#include "spdlog/sinks/basic_file_sink.h"
#include "spdlog/spdlog.h"
#include <chrono>
#include <fstream>
#include <iostream>
#include <sstream>
#include <string>
#include <thread>
#include <vector>
using namespace std;

void init_log()
{
    try
    {
        auto new_logger = spdlog::basic_logger_mt("new_default_logger", "test.log", true);
        spdlog::set_default_logger(new_logger);
        spdlog::info("new logger log start");
    }
    catch (const spdlog::spdlog_ex &ex)
    {
        std::cout << "Log init failed: " << ex.what() << std::endl;
    }
}

void thread_read_file(int tid, const string &file_path, std::streampos start_pos, std::streampos next_pos, int each_size)
{
    ifstream file(file_path.c_str(), ios::in);
    if (!file.good())
    {
        file.close();
        spdlog::info("线程{} 打开文件{}失败", tid, file_path);
        return;
    }

    file.seekg(start_pos, ios::beg);
    //
    string text;
    if (start_pos != 0)
    {
        char cur_ch = 0;
        // spdlog::info("读取前{}", file.tellg());
        file.read(&cur_ch, 1); //会让指针向后移动一个字节
        // spdlog::info("读取后{}", file.tellg());
        if (start_pos == 115)
        {
            spdlog::info("tid={},115={}", tid, cur_ch);
        }
        if (cur_ch != '\n')
        {
            getline(file, text);
            spdlog::info("线程{},跳过{}", tid, text);
            if (file.tellg() >= next_pos)
            {
                /*
                1. 如果线程起始位置不为换行符,则要跳过本行,本行内容交给上一个线程读取,如果跳过本行后的读取位置(一定是换行符)>=下一个线程的起始位置,
                如果位置等于下一个线程起始位置,说明下个线程起始位置是换行符,下一行内容应该由下一个线程读取;如果位置>下一个线程起始位置,同样本行内容由上一个线程
                读取,下一行内容也不用本线程读取,可能是下一个线程读取
                 */
                spdlog::info("线程{} start_pos={},next_pos={},each_size={} 起始位置不是\\n,读取一行后的指针位置{}>=next_pos,不需要读取内容",
                             tid, start_pos, next_pos, each_size, file.tellg());
                file.close();
                return;
            }
        }
        else
        {
            file.seekg(-1, ios::cur);
        }
        // spdlog::info("线程{} cur_ch={}", tid, cur_ch);
    }

    std::streampos cur_pos = file.tellg();
    while (cur_pos < next_pos && getline(file, text))
    {
        /*
        1. cur_pos始终指向每一行的行尾,如果cur_pos=next_pos则说明next_pos是行尾,则接下来的一行应该由
        下一个线程读,所以这里是cur_pos < next_pos,而不是cur_pos <= next_pos
         */
        int cur_line_len = file.tellg() - cur_pos;
        spdlog::info("线程{} start_pos={},next_pos={},each_size={},本行开始pos={},本行结束pos={},本行读长={},text={}",
                     tid, start_pos, next_pos, each_size, cur_pos, file.tellg(), cur_line_len, text);
        cur_pos = file.tellg();
    }
    spdlog::info("线程{} start_pos={},next_pos={},each_size={},结束时cur_pos={},总共区间长度为{}\n", tid, start_pos, next_pos, each_size, cur_pos, cur_pos - start_pos);
    file.close();
    return;
}

void test_detach(const string &file_path)
{
    // for (int i = 0; i < 10; ++i)
    // {
    //     std::thread th(thread_read_file, i, file_path);
    //     th.detach();
    // }
}

void test_join(const string &file_path)
{
    //确定文件长度
    ifstream file(file_path.c_str(), ios::in);

    //把指针指到文件末尾求出文件大小
    int file_size = file.seekg(0, ios::end).tellg();
    file.close();

    int thread_nums = 50;                       //线程个数
    int each_size = file_size / thread_nums;    //平均每个线程读取的字节数
    std::streampos start_pos = 0, next_pos = 0; //每个线程读取位置的起始和下一个线程读取的起始位置
    vector<std::thread> vec_threads;            //线程列表
    spdlog::info("thread_nums={},each_size={},file_size={}", thread_nums, each_size, file_size);
    int t_id = 0; //线程id
    for (; t_id < thread_nums; ++t_id)
    {
        next_pos += each_size;
        std::thread th(thread_read_file, t_id, file_path, start_pos, next_pos, each_size);
        vec_threads.emplace_back(std::move(th)); // push_back() is also OK
        start_pos = next_pos;
    }
    if (file_size % thread_nums != 0)
    {
        thread_read_file(t_id, file_path, start_pos, file_size, each_size);
    }

    for (auto &it : vec_threads)
    {
        it.join();
    }
}
int main()
{
    init_log();
    string file_path = "./1.txt";
    // test_detach(file_path);
    // std::this_thread::sleep_for(std::chrono::seconds(1)); // wait for detached threads done
    test_join(file_path);
    return 0;
}

思路2

  • 整体思路和方法1一样,只是读取的时候不是按照位置来判断每个线程应该读取多少,而是统计每个线程读取的长度
  • 每次移动位置指针时,记录一下移动的位置,因为每个线程应该读取的平均长度已经提前计算,只要线程读取的数据超过了平均大小,或者读取到了文件末尾就结束

源码实现

没有bug,可以适应多个线程被分配到同一行的情况,但是每个线程读取的大小必须>0

#include <chrono>
#include <fstream>
#include <iostream>
#include <sstream>
#include <string>
#include <thread>
#include <vector>
using namespace std;

void thread_read_file(int tid, const string &file_path, int start_pos, int next_pos, int each_size)
{
    ifstream file(file_path.c_str(), ios::in);
    if (!file.good())
    {
        stringstream ss;
        ss << "Thread " << tid << " failed to open file: " << file_path << endl;
        cout << ss.str();
        return;
    }

    file.seekg(start_pos, ios::beg);
    //
    string text;
    stringstream ss;

    if (start_pos != 0)
    {
        char cur_ch;
        file.read(&cur_ch, 1);
        // ss << "Thread " << tid << ", cur_ch=" << cur_ch << endl;
        if (cur_ch != '\n')
        {
            getline(file, text);
        }
    }

    while (getline(file, text) && start_pos <= next_pos)
    {
        ss << "Thread " << tid << ", start_pos=" << start_pos << ";next_pos="
           << next_pos << ";each_size=" << each_size << ": " << text << endl;
        cout << ss.str();
        start_pos = file.tellg();
    }
    file.close();
    return;
}

void test_detach(const string &file_path)
{
    // for (int i = 0; i < 10; ++i)
    // {
    //     std::thread th(thread_read_file, i, file_path);
    //     th.detach();
    // }
}

void test_join(const string &file_path)
{
    //确定文件长度
    ifstream file(file_path.c_str(), ios::in);

    //把指针指到文件末尾求出文件大小
    int file_size = file.seekg(0, ios::end).tellg();
    file.close();

    int thread_nums = 10;
    int each_size = file_size / thread_nums;
    int start_pos = 0, next_pos = 0;

    vector<std::thread> vec_threads;
    int t_id = 0;
    for (; t_id < thread_nums; ++t_id)
    {
        next_pos += each_size;
        std::thread th(thread_read_file, t_id, file_path, start_pos, next_pos, each_size);
        vec_threads.emplace_back(std::move(th)); // push_back() is also OK
        start_pos = next_pos;
    }
    if (file_size % thread_nums != 0)
    {
        thread_read_file(t_id, file_path, start_pos, next_pos, each_size);
    }

    for (auto &it : vec_threads)
    {
        it.join();
    }
}
int main()
{
    string file_path = "./1.txt";
    // test_detach(file_path);
    // std::this_thread::sleep_for(std::chrono::seconds(1)); // wait for detached threads done
    test_join(file_path);
    return 0;
}

运行结果

c++多线程按行读取同一个每行长度不规则文件

本文由博客一文多发平台 OpenWrite 发布!文章来源地址https://www.toymoban.com/news/detail-837610.html

到了这里,关于c++多线程按行读取同一个每行长度不规则文件的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 多个硬盘挂载到同一个目录

    同一目录无法重复挂载,后挂载的会覆盖之前挂载的磁盘。但是现在需要将4块磁盘并行挂载,该如何操作呢? 将2块磁盘合并到一个逻辑卷 进行挂载。 基本知识 基本概念PV(Physical Volume)- 物理卷物理卷在逻辑卷管理中处于最底层,它可以是实际物理硬盘上的分区,也可以是整

    2024年02月08日
    浏览(54)
  • CSS 沿着同一个方向旋转

    主要解决旋转360°后倒转的问题,沿着一个方向旋转,而不是倒回去重新开始。 效果  源码 在线示例 同方向旋转

    2024年02月15日
    浏览(30)
  • 【点击新增一个下拉框 与前一个内容一样 但不能选同一个值】

    主要是看下拉选择el-option的disabled,注意不要混淆 methods:

    2024年02月11日
    浏览(32)
  • 如何将微前端项目部署在同一台服务器同一个端口下

    作者:京东科技 高飞 本文旨在通过部署微前端项目的实践过程中沉淀出一套部署方案,现就一些重点步骤、碰到的问题做了一些总结。 因为线上部署主应用时需要用到子应用的线上可访问地址,因此部署顺序应该是先部署子应用,保证子应用能够线上可访问后,再将子应用

    2023年04月11日
    浏览(36)
  • 不同设备使用同一个Git账号

    想要在公司和家里的电脑上用同一个git账号来pull, push代码 第1种方法, 依次输入 第2种方法, 输入 用户名和邮箱与原设备保持一致 输入上面三句话之后会弹出提示是否在此电脑上创建.ssh文件夹,回车同意即可 接着会提示设置密码,不设置就连按两次回车 完成之后,在 “

    2024年04月10日
    浏览(32)
  • 不同版本.net引用同一个项目

    项目文件.csproj文件内容如下: 重点是:不能有其他的 netstandard2;net40;net45;net46;net6

    2024年02月10日
    浏览(32)
  • Unity批量给模型上同一个材质

    Unity批量给模型上同一个材质 第一步:先选择所有要上材质的模型:  第二步:将创建的材质拖到右侧属性栏:  

    2024年02月12日
    浏览(33)
  • vue 实现多个路由共用同一个页面组件

    这样的弊端是如果router-view里包含其他组件,切换其他组件会让其他组件也重新渲染。 这样的问题是导致切换路由会闪烁一下。因为切换后所有钩子函数都重新触发了。

    2024年02月07日
    浏览(35)
  • axios同一个接口,同时接收 文件 或者 数据

    1、前端代码  主要是: if (response.data instanceof Blob)  判断是否是 Blob 对象, 否则使用: response.data = JSON.parse(await (response.data).text()) 将后台返回的内容转成 文本 返回 主要是: responseType: \\\'blob\\\' 2、后端代码 需要注意的是: response.setHeader(\\\"Access-Control-Expose-Headers\\\", \\\"filename\\\"); 否则

    2024年02月12日
    浏览(36)
  • 【注意】当同一个Class的多个对象出现在同一段代码内的时候,极易出错。

    下面的 paymentQuery 方法坐落于我们lijianjin系统的LiJianJinPaymentQueryBizService。 这两个bankOrderFlow、orderFlow,... 乱花渐欲迷人眼的赶脚,但其实,这是像“shǐ”一样的代码。         在我们的zhongtai-channel-provider, 上周出现并紧急fix了一个bug。竟然也是 此种情况 引发的bug。   先上

    2024年04月28日
    浏览(38)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包