(线程池)多线程使用场景--es数据批量导入、数据汇总、异步调用;如何控制某个方法允许并发访问线程的数量;对ThreadLocal的理解及实现原理、源码解析、ThreadLocal的内存泄露问题

这篇具有很好参考价值的文章主要介绍了(线程池)多线程使用场景--es数据批量导入、数据汇总、异步调用;如何控制某个方法允许并发访问线程的数量;对ThreadLocal的理解及实现原理、源码解析、ThreadLocal的内存泄露问题。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

线程池使用场景(CountDownLatch、Future)

CountDownLatch(闭锁/倒计时锁) 用来进行线程同步协作,等待所有线程完成倒计时(一个或者多个线程,等待其他多个线程完成某件事情之后才能执行)

  • 其中构造参数用来初始化等待计数值

  • await() 用来等待计数归零

  • countDown() 用来让计数 减一
    分布式es数据批量导入场景,java,数据库,hash,spring boot,spring cloud

public class CountDownLatchDemo {

    public static void main(String[] args) throws InterruptedException {
        //初始化了一个倒计时锁 参数为 3
        CountDownLatch latch = new CountDownLatch(3);

        new Thread(() -> {
            System.out.println(Thread.currentThread().getName()+"-begin...");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            //count--
            latch.countDown();
            System.out.println(Thread.currentThread().getName()+"-end..." +latch.getCount());
        }).start();
        new Thread(() -> {
            System.out.println(Thread.currentThread().getName()+"-begin...");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            //count--
            latch.countDown();
            System.out.println(Thread.currentThread().getName()+"-end..." +latch.getCount());
        }).start();
        new Thread(() -> {
            System.out.println(Thread.currentThread().getName()+"-begin...");
            try {
                Thread.sleep(1500);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            //count--
            latch.countDown();
            System.out.println(Thread.currentThread().getName()+"-end..." +latch.getCount());
        }).start();
        String name = Thread.currentThread().getName();
        System.out.println(name + "-waiting...");
        //等待其他线程完成
        latch.await();
        System.out.println(name + "-wait end...");
    }
    
}

多线程使用场景一:( es数据批量导入)

在我们项目上线之前,我们需要把数据库中的数据一次性的同步到es索引库中,但是当时的数据好像是1000万左右,一次性读取数据肯定不行(oom异常,内存溢出),当时我就想到可以使用线程池的方式导入,利用CountDownLatch来控制,就能避免一次性加载过多,防止内存溢出。

整体流程就是通过CountDownLatch+线程池配合去执行

分布式es数据批量导入场景,java,数据库,hash,spring boot,spring cloud
分布式es数据批量导入场景,java,数据库,hash,spring boot,spring cloud

多线程使用场景二(数据汇总)

在一个电商网站中,用户下单之后,需要查询数据,数据包含了三部分:订单信息、包含的商品、物流信息;这三块信息都在不同的微服务中进行实现的,我们如何完成这个业务呢?
分布式es数据批量导入场景,java,数据库,hash,spring boot,spring cloud
在实际开发的过程中,难免需要调用多个接口来汇总数据,如果所有接口(或部分接口)的没有依赖关系,就可以使用线程池+future来提升性能

多线程使用场景三(异步调用)

分布式es数据批量导入场景,java,数据库,hash,spring boot,spring cloud
在进行搜索的时候,需要保存用户的搜索记录,而搜索记录不能影响用户的正常搜索,我们通常会开启一个线程去执行历史记录的保存,在新开启的线程在执行的过程中,可以利用线程提交任务。

多线程使用的场景总结:

  • **批量导入:**使用了线程池+CountDownLatch批量把数据库中的数据导入到了ES(任意)中,避免OOM
  • **数据汇总:**调用多个接口来汇总数据,如果所有接口(或部分接口)的没有依赖关系,就可以使用线程池+future来提升性能
  • **异步线程(线程池)︰**为了避免下一级方法影响上一级方法(性能考虑),可使用异步线程调用下一个方法(不需要下一级方法返回值),可以提升方法响应时间

如何控制某个方法允许并发访问线程的数量

Semaphore [ˈsɛməˌfɔr] 信号量,是JUC包下的一个工具类,我们可以通过其限制执行的线程数量,达到限流的效果。
当一个线程执行时先通过其方法进行获取许可操作,获取到许可的线程继续执行业务逻辑,当线程执行完成后进行释放许可操作,未获取达到许可的线程进行等待或者直接结束。

Semaphore两个重要的方法

  • lsemaphore.acquire(): 请求一个信号量,这时候的信号量个数-1(一旦没有可使用的信号量,也即信号量个数变为负数时,再次请求的时候就会阻塞,直到其他线程释放了信号量)

  • lsemaphore.release(): 释放一个信号量,此时信号量个数+1

线程任务类:

public class SemaphoreCase {
    public static void main(String[] args) {
        // 1. 创建 semaphore 对象
        Semaphore semaphore = new Semaphore(3);
        // 2. 10个线程同时运行
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {

                try {
                    // 3. 获取许可
                    semaphore.acquire();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                try {
                    System.out.println("running...");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("end...");
                } finally {
                    // 4. 释放许可
                    semaphore.release();
                }
            }).start();
        }
    }

}

对ThreadLocal的理解

ThreadLocal是多线程中对于解决线程安全的一个操作类,它会为每个线程都分配一个独立的线程副本从而解决了变量并发访问冲突的问题。ThreadLocal 同时实现了线程内的资源共享

案例:使用JDBC操作数据库时,会将每一个线程的Connection放入各自的ThreadLocal中,从而保证每个线程都在各自的 Connection 上进行数据库的操作,避免A线程关闭了B线程的连接。
分布式es数据批量导入场景,java,数据库,hash,spring boot,spring cloud
三个主要方法:

  • set(value) 设置值

  • get() 获取值

  • remove() 清除值

public class ThreadLocalTest {
    static ThreadLocal<String> threadLocal = new ThreadLocal<>();

    public static void main(String[] args) {
        new Thread(() -> {
            String name = Thread.currentThread().getName();
            threadLocal.set("itcast");
            print(name);
            System.out.println(name + "-after remove : " + threadLocal.get());
        }, "t1").start();
        new Thread(() -> {
            String name = Thread.currentThread().getName();
            threadLocal.set("itheima");
            print(name);
            System.out.println(name + "-after remove : " + threadLocal.get());
        }, "t2").start();
    }

    static void print(String str) {
        //打印当前线程中本地内存中本地变量的值
        System.out.println(str + " :" + threadLocal.get());
        //清除本地内存中的本地变量
        threadLocal.remove();
    }

}

ThreadLocal的实现原理&源码解析

ThreadLocal本质来说就是一个线程内部存储类,从而让多个线程只操作自己内部的值,从而实现线程数据隔离
分布式es数据批量导入场景,java,数据库,hash,spring boot,spring cloud
在ThreadLocal中有一个内部类叫做ThreadLocalMap,类似于HashMap

ThreadLocalMap中有一个属性 table数组 ,这个是真正 存储数据 的位置

set方法
分布式es数据批量导入场景,java,数据库,hash,spring boot,spring cloud
get方法/remove方法
分布式es数据批量导入场景,java,数据库,hash,spring boot,spring cloud

ThreadLocal的内存泄露问题

Java对象中的四种引用类型:强引用、软引用、弱引用、虚引用

  • 强引用:最为普通的引用方式,表示一个对象处于有用且必须的状态,如果一个对象具有强引用,则GC并不会回收它。即便堆中内存不足了,宁可出现OOM,也不会对其进行回收
User user = new User();
  • 弱引用:表示一个对象处于可能有用且非必须的状态。在GC线程扫描内存区域时,一旦发现弱引用,就会回收到弱引用相关联的对象。对于弱引用的回收,无关内存区域是否足够,一旦发现则会被回收
User user = new User();
WeakReference weakReference = new WeakReference(user);

每一个Thread维护一个ThreadLocalMap,在ThreadLocalMap中的Entry对象继承WeakReference。其中key为使用弱引用的ThreadLocal实例,value为线程变量的副本。
分布式es数据批量导入场景,java,数据库,hash,spring boot,spring cloud

在使用ThreadLocal的时候,强烈建议:务必手动remove文章来源地址https://www.toymoban.com/news/detail-857700.html

到了这里,关于(线程池)多线程使用场景--es数据批量导入、数据汇总、异步调用;如何控制某个方法允许并发访问线程的数量;对ThreadLocal的理解及实现原理、源码解析、ThreadLocal的内存泄露问题的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【SpringBoot】springboot数据使用多线程批量入数据库

    springboot、mybatisPlus、mysql8 mysql8(部署在1核2G的服务器上,很卡,所以下面的数据条数用5000,太大怕不是要等到花儿都谢了 0.0) 共耗时:180121 ms 耗时时间:87217ms 耗时时间: 28235 可见时间从180秒,缩短到了28秒,但是@Transactional对于多线程是控制不了所有的事务的。 Spring实现

    2024年02月02日
    浏览(42)
  • StarRocks案例7:使用shell批量broker load导入hdfs数据

    近期需要进行补录数据,需要将hive的历史数据迁移到StarRocks,因为需要补录的数据较多,hive和StarRocks均使用的是分区表,两边的日期格式也不同,hive这边是 yyyymmdd格式,StarRocks这边是yyyy-mm-dd格式。 之前一直是使用DataX来从hive导入到StarRocks,因为DataX是单节点的,而hive和S

    2024年02月11日
    浏览(45)
  • 补充:es与mysql之间的数据同步 2 使用分页导入的方式把大量数据从mysql导入es

    本片文章只是对之前写的文章的补充, es与mysql之间的数据同步 http://t.csdn.cn/npHt4 补充一: 之前的文章对于交换机、队列、绑定,使用的是@bean, 而这里使用的是纯注解版 在消费方,声明交换机: 补充二: 之前的文章是直接使用es操作数据,新增和修改,这样做不是很合适

    2024年02月12日
    浏览(51)
  • 【二十四】springboot使用EasyExcel和线程池实现多线程导入Excel数据

      springboot篇章整体栏目:  【一】springboot整合swagger(超详细 【二】springboot整合swagger(自定义)(超详细) 【三】springboot整合token(超详细) 【四】springboot整合mybatis-plus(超详细)(上) 【五】springboot整合mybatis-plus(超详细)(下) 【六】springboot整合自定义全局异常

    2023年04月08日
    浏览(67)
  • 使用elasticdump实现es数据导入导出示例(持续更新中)

    Elasticdump是一个命令行工具,可用于将数据从Elasticsearch导出到JSON文件,以及将JSON文件导入到Elasticsearch中。以下是一个简单的示例,演示如何使用Elasticdump实现数据导入导出: 您可以使用npm命令在命令行中安装Elasticdump。(npm请自行安装。)例如,使用以下命令安装最新版本

    2023年04月11日
    浏览(44)
  • java中多线程去跑海量数据使用线程池批量ThreadPoolExecutor处理的方式和使用Fork/Join框架的方式那种效率高?

    在Java中,使用线程池(ThreadPoolExecutor)和使用Fork/Join框架来处理海量数据的效率取决于具体的应用场景和需求。下面是一些需要考虑的因素: 任务类型:如果任务是CPU密集型的,那么使用Fork/Join框架可能更高效,因为它可以自动进行任务分割和并行处理。如果任务是I/O密集

    2024年02月10日
    浏览(48)
  • 使用Logstash和JDBC将MySQL的数据导入到Elasticsearch(ES)的过程

    使用Logstash和JDBC将MySQL的数据导入到Elasticsearch(ES)的过程包含多个步骤。请注意,首先你需要准备好的JDBC驱动,Logstash实例,Elasticsearch实例,以及你希望导入的MySQL数据。 安装Logstash JDBC Input Plugin :Logstash包含大量插件,其中一个就是JDBC Input Plugin,可以用于从JDBC兼容的数据库

    2024年02月15日
    浏览(43)
  • 使用分页导入的方式把大量数据从mysql导入单点的es时报错:Connection refused: no further information

    我出现的问题: 意思是, 拒绝连接:没有进一步的信息 我的解决方案是:在yml文件中配置以下信息,问题就可以解决 但是,我水品有限,没有明白什么原因,还有这个配置文件中的内容也不是很清楚,如果有路过的大佬,原因耽误宝贵的时间,给小弟解释一下,小弟不胜感

    2024年02月11日
    浏览(76)
  • Elasticsearch 批量导入数据

    **Elasticsearch**是一款非常高效的全文检索引擎。 **Elasticsearch**可以非常方便地进行数据的多维分析,所以大数据分析领域也经常会见到它的身影,生产环境中绝大部分新产生的数据可以通过应用直接导入,但是历史或初始数据可能会需要单独处理,这种情况下可能遇到需要导

    2023年04月11日
    浏览(40)
  • 【大数据】Hive 中的批量数据导入

    在博客【大数据】Hive 表中插入多条数据 中,我简单介绍了几种向 Hive 表中插入数据的方法。然而更多的时候,我们并不是一条数据一条数据的插入,而是以批量导入的方式。在本文中,我将较为全面地介绍几种向 Hive 中批量导入数据的方法。 overwrite :表示覆盖表中已有数

    2024年02月11日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包