CountDownLatch 和 CyclicBarrier:如何让多线程步调一致?
原始对账系统
- 对账系统的业务简化后:
- 首先用户通过在线商城下单,会生成电子订单,保存在订单库;
- 之后物流会生成派送单给用户发货,派送单保存在派送单库。
- 为了防止漏派送或者重复派送,对账系统每天还会校验是否存在异常订单。
- 目前对账系统的处理逻辑是首先查询订单,然后查询派送单,之后对比订单和派送单,将差异写入差异库。
- 对账系统的核心代码如下,就是在一个单线程里面循环查询订单、派送单,然后执行对账,最后将写入差异库。
while (存在未对账订单) { // 查询未对账订单 pos = getPOrders(); // 查询派送单 dos = getDOrders(); // 执⾏对账操作 diff = check(pos, dos); // 差异写⼊差异库 save(diff); }
利用并行优化对账系统
- 对于串行化的系统,优化性能首先想到的是能否利用多线程并行处理。
- 查询未对账订单 getPOrders() 和查询派送单 getDOrders() 是否可以并行处理呢?
- 显然是可以的,因为这两个操作并没有先后顺序的依赖。
while () { // 查询未对账订单 Thread t1 = new Thread(() -> { pos = getPOrders(); }); t1.start(); // 查询派送单 Thread t2 = new Thread(()->{ dos = getDOrders(); }); t2.start(); // 等待 t1、t2 结束 t1.join(); t2.join(); // 执⾏对账操作 diff = check(pos, dos); // 差异写⼊差异库 save(diff); }
用 CountDownLatch 实现线程等待
- while 循环里面每次都会创建新的线程,而创建线程可是个耗时的操作。所以最好是创建出来的线程能够循环利用,线程池就能解决这个问题。
Executor executor = Executors.newFixedThreadPool(2); while (存在未对账订单) { // 计数器初始化为 2 CountDownLatch latch = new CountDownLatch(2); // 查询未对账订单 executor.execute(() -> { pos = getPOrders(); latch.countDown(); }); // 查询派送单 executor.execute(()-> { dos = getDOrders(); latch.countDown(); }); // 等待两个查询操作结束 latch.await(); // 执⾏对账操作 diff = check(pos, dos); // 差异写⼊差异库 save(diff); }
- 我们将 getPOrders() 和 getDOrders() 这两个查询操作并行了,但这两个查询操作和对账操作 check()、save() 之间还是串行的。很显然,这两个查询操作和对账操作也是可以并行的,也就是说,在执行对账操作的时候,可以同时去执行下一轮的查询操作。
- 针对对账这个项目,我设计了两个队列,并且两个队列的元素之间还有对应关系。
- 订单查询操作将订单查询结果插入订单队列,派送单查询操作将派送单插入派送单队列,这两个队列的元素之间是有对应关系的。
- 两个队列的好处是,对账操作可以每次从订单队列出一个元素,从派送单队列出一个元素,然后对这两个元素执行对账操作,这样数据一定不会乱掉。
- ⼀个线程 T1 执行订单的查询工作,一个线程 T2 执行派送单的查询工作,当线程 T1 和 T2 都各自生产完 1 条数据的时候,通知线程 T3 执行对账操作。
- 线程 T1 和线程 T2 只有都生产完 1 条数据的时候,才能一起向下执行,也就是说,线程 T1 和线程 T2 要互相等待,步调要一致。
- 同时当线程 T1和 T2 都生产完一条数据的时候,还要能够通知线程 T3 执行对账操作。
用 CyclicBarrier 实现线程同步
- 我们首先创建了一个计数器初始值为 2 的 CyclicBarrier,你需要注意的是创建 CyclicBarrier 的时候,我们还传入了一个回调函数,当计数器减到 0 的时候,会调用这个回调函数。
- 线程 T1 负责查询订单,当查出一条时,调用 barrier.await() 来将计数器减 1,同时等待计数器变成 0;
- 线程 T2 负责查询派送单,当查出一条时,也调用 barrier.await() 来将计数器减 1,同时等待计数器变成 0;
- 当 T1 和 T2 都调用 barrier.await() 的时候,计数器会减到 0,此时 T1 和 T2 就可以执行下⼀条语句了,同时会调用 barrier 的回调函数来执行对账操作。
- CyclicBarrier 的计数器有自动重置的功能,当减到 0 的时候,会自动重置你设置的初始值。
// 订单队列 Vector<P> pos; // 派送单队列 Vector<D> pos; // 执⾏回调的线程池 Executor executor = Executors.newFixedThreadPool(1); final CyclicBarrier barrier = new CyclicBarrier(2, () -> { executor.execute(() -> check()); }); void check() { P p = pos.remove(0); D d = dos.remove(0); // 执⾏对账操作 diff = check(p, d); // 差异写⼊差异库 save(diff); } void checkAll() { // 循环查询订单库 Thread t1 = new Thread(() -> { while (存在未对账订单) { // 查询订单库 pos.add(getPOrders()); // 等待 barrier.await(); } }); t1.start(); // 循环查询运单库 Thread T2 = new Thread(()->{ while(存在未对账订单){ // 查询运单库 dos.add(getDOrders()); // 等待 barrier.await(); } }); T2.start(); }
文章来源地址https://www.toymoban.com/news/detail-476429.html
文章来源:https://www.toymoban.com/news/detail-476429.html
到了这里,关于《Java并发编程实战》课程笔记(十二)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!