分布式异步任务处理组件(七)

这篇具有很好参考价值的文章主要介绍了分布式异步任务处理组件(七)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

分布式异步任务处理组件底层网络通信模型的设计--如图:

分布式异步任务处理组件(七),分布式文章来源地址https://www.toymoban.com/news/detail-625297.html

  1. 使用Java原生NIO来实现TCP通信模型
  2. 普通节点维护一个网络IO线程,负责和主节点的网络数据通信连接--这里的网络数据是指组件通信协议之下的直接面对字节流的数据读写,上层会有另一个线程负责网络通信协议的实现;---也就是说维护一个selector线程,负责处理socketchannel的IO事件;
  3. Leader节点网络通信层有多个线程--一个selector线程负责接受其他节点的连接请求,然后为每个连接建立一个线程并分配单独的selector来处理各自连接上的IO事件--如此设计的原因是各节点的状态严格依赖与主节点的心跳和其他通信,防止主节点线程阻塞导致心跳失败;从而引发节点下线带来的大量同步工作--后续会聊到;
  4. 各节点网络通信线程之上会有一个线程专门负责组件的网络通信协议,就是将网络传输的字节流解码成组件的通信协议包,因为NIO的buffer是数据块,所以首先通过读写队列将字节转化为字节流,通过协议转化为网络通信命令包,同时解决粘包半包等问题;
  5. 网络通信线程和协议实现线程之间通过读写两个队列来实现(网络IO线程的读队列就是协议线程的写队列,反过来一样,所以这里读写队列是相对的;),为了保证性能,避免重复创建对象和对象回收,设计了ByteBuffer缓存机制和异步读写队列数据结构--详细结构如图--分布式异步任务处理组件(七),分布式
  6. 说一下三个队列--读写队列和缓存队列,用来实现IO通信线程和协议通信线程之间的数据通信--两个线程基本上会轮训处理网络IO事件,和上层协议事件,基本过程如下--
    1. 从网络IO线程角度出发--
      1. 当产生可读事件时,网络IO线程会从缓存队列中获取一个空的ByteBuffer,这里设计为当没有可用的缓存Buffer对象时会新建一个--具体在队列实现里讲,可能会产生写扩张现象,后期性能优化时考虑加入回收机制;
      2. 将socket缓冲区中的网络数据read进Buffer中,然后将Buffer对象入队到IO写队列中;
      3. 然后检查IO读队列不为空时,对IO读队列出队,获取要发送的数据Buffer对象,发送到其他节点中;
  7. 异步多线程队列,支持两个线程同时出队入队操作;原理和代码贴下来,基本实现:
package org.example.web.buffer;

import org.example.web.api.SocketBufferQueue;

import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;

public class AsynchronousQueue<T extends AbstractBuffer> implements SocketBufferQueue {
    //异步读写队列实现原理;
    /*
    * 当队列中的元素个数>1时,读线程和写线程可以同时进行,因为这时候不涉及操作共享变量
    *当队列中的元素个数<=1时,读写队列中只能有一个线程操作读或者写,因为此时会涉及队列头尾指针的操作;
    * 实现原理,写线程在获取写锁时可以正常做写操作:此时有两种情况--
    *     1,获取写锁之后队列为空,此时不会有读线程做读操作,只有获得写锁的该线程可以put,put完成之后将头尾指针同时指向改为以元素即可;此时队列元素个数为1;
    *     2,获取写锁之后队列中只有一个元素,这时也可以保证只有该线程在做写入,因为只有一个元素的情况下,读线程要读取该元素必须同时获得读锁和写锁;此时队列元素个数为2;
    *     3,读线程获取读锁之后有三种情况;size>1;size=1;size=0;
    *     4, 重点是保证不能多个线程同时进入队列元素为零的状态;就是读线程消费了最后一个元素,正好此时写线程在队列为空的时候写入,读写线程会同时操作头尾指针,造成错乱,所以在元素数量为1
    * 的时候就要进行同步操作;原理:
    *           1.读线程获取读锁之后如果size=1,此时不会先消费,而是试图获取写锁,防止此时有写线程同时操作,获取写锁之后再判断size是否为1,如果为1则做出队操作,然后释放写锁,如果为2则直接释放写锁--再进行出队操作;
    *           2,这里读线程获取读锁之后判断size=1,再获取读锁成功之后有两种情况--
    *                   1,有写线程在读线程之前获取到了写锁,则读线程获取到写锁的时候size>=2了(可能不止一个),
    *                   2,判断size=1之后直接获取到了写锁,此时就应该阻塞其他写线程做入队操作,等待自己完成出队操作之后再释放写锁;
    *     5,再说一下size怎么保证同步,
    *           1,在size<=1的时候严格保证线程同步操作,保证size;
    *           2,在size>1的时候,此时可以理解为队列同时在出队和入队,size在两个线程操作的时候先出队-1还是先入队+1其实是没有关系的,因为原子操作保证了最后结果是没有问题的就行;
    * */
    private AtomicInteger size;
    protected T head;
    protected T tail;
    private Object readLock;
    private Object writeLock;
    //这里考虑使用cas还是Synchronized


    AsynchronousQueue(){
        this.writeLock=new Object();
        this.readLock=new Object();
    }
    AsynchronousQueue(int initSize){
        this();
        this.size=new AtomicInteger(initSize);
    }
    //空队列初始化要创建一个node
    AsynchronousQueue(T node){
        this(1);
        this.head=node;
        this.tail=this.head;
    }
    public boolean offerFirstOne(T node){
        synchronized (this.writeLock){
            if(this.size.get()>0){
                return false;
            }
            this.head=this.tail=node;
            return this.size.compareAndSet(0,1);
        }
    }

    public boolean offer(T node){
        preOfferElement(node);
        synchronized (this.writeLock){
            if(this.size.get()==0){
                return this.offerFirstOne(node);
            }else{
                T temp=this.head;
                node.next=temp;
                temp.pre=node;
                this.head=node;
            }
            return this.size.incrementAndGet() > 1;
        }
    }
    private void preOfferElement(T bufferNode){
        bufferNode.next=null;
        bufferNode.pre=null;
    }
    public T pollLastOne(){
        return this.size.compareAndSet(1,0)?this.tail:null;
    }

    public T poll(){
        synchronized (this.readLock){
            if(this.size.get()==0){
                return null;
            }
            if(this.size.get()==1){
                synchronized (this.writeLock){
                    if(this.size()>1){
                        return this.getTailElement();
                    }
                    if(this.size()==1){
                        this.pollLastOne();
                    }
                }
            }
            return this.getTailElement();
        }
    }

    private T getTailElement(){
        if(this.size()>1){
            this.tail= (T) this.tail.pre;
            this.size.decrementAndGet();
            return (T) this.tail.next;
        }
        return null;
    }

    public int size(){
        return this.size.get();
    }
    public int increamentSize(){
        return this.size.incrementAndGet();
    }
    public int decrementSize(){
        return this.size.decrementAndGet();
    }
    private class BufferNode{
        private ByteBuffer buffer;
        private BufferNode pre;
        private BufferNode next;
        BufferNode(ByteBuffer byteBuffer){
            this.buffer=byteBuffer;
        }
        BufferNode(){
        }
    }
}

到了这里,关于分布式异步任务处理组件(七)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Asynq: 基于Redis实现的Go生态分布式任务队列和异步处理库

    Asynq [1] 是一个Go实现的分布式任务队列和异步处理库,基于redis,类似Ruby的 sidekiq [2] 和Python的 celery [3] 。Go生态类似的还有 machinery [4] 和goworker 同时提供一个WebUI asynqmon [5] ,可以源码形式安装或使用Docker image, 还可以和Prometheus集成 docker run --rm --name asynqmon -p 8080:8080 hibiken/as

    2024年02月14日
    浏览(33)
  • 分布式异步任务框架celery

    Celery是一个基于消息中间件的分布式任务队列框架,专门用于处理异步任务。它允许生产者发送任务到消息队列,而消费者则负责处理这些任务。Celery的核心特性包括异步执行、实时操作支持以及强大的调度能力,使其每天可以处理数以百万计的任务。 在Celery中,任务是以

    2024年04月10日
    浏览(37)
  • celery分布式异步任务队列-4.4.7

    version 4.4.7 学习总结 python实现、开源、遵循BSD许可的分布式任务队列; 可以处理大量消息,简单、灵活、可靠的分布式系统,专注任务的 实时处理 和 定时调度 处理; 它是线程、进程分配任务的一种机制,官方仅做支持linux开发。 五大部分: task,任务 beat,定时调度管理器

    2024年02月07日
    浏览(31)
  • 分布式任务调度,定时任务的处理方案

    适用场景: Spring 定时任务是 Spring 框架提供的一种轻量级的任务调度方案,它的特点是简单易用、轻量级。Spring 定时任务的执行是在 单个节点 上进行的,如果需要分布式任务调度,需要自己实现相应的解决方案。 1.导入依赖版本自己控制 2.启动类加上@EnableScheduling 3.编写业

    2023年04月14日
    浏览(44)
  • 4.4 媒资管理模块 - 分布式任务处理介绍、视频处理技术方案

    视频转码是指的对视频文件的编码格式进行转换 视频上传成功需要对视频的格式进行转码处理,比如:avi转成mp4 一般做文件存储的服务都需要对文件进行处理,例如对视频进行转码处理,可能由于文件量较大需要使用多线程等技术进行高效处理 文件格式 :是指.mp4、.avi、

    2024年02月02日
    浏览(36)
  • Celery分布式异步框架

    \\\"\\\"\\\" 1)可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket) 2)celery服务为为其他项目服务提供异步解决任务需求的 注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的

    2024年02月11日
    浏览(29)
  • 分布式、锁、延时任务

    Redis分布式锁-这一篇全了解(Redission实现分布式锁完美方案) ls / / 下有哪些子节点 get /zookeeper 查看某个子节点内容 create /aa “test” delete /aa set /aa “test01” 模式 默认创建永久 create -e 创建临时 create -e /zz “hello zz” create -s 创建 有序节点 create -s -e 临时序列化节点 一次性的监

    2024年02月09日
    浏览(49)
  • 分布式定时任务

    本文引用了谷粒商城的课程 定时任务是我们系统里面经常要用到的一些功能。如每天的支付订单要与支付宝进行对账操作、每个月定期进行财务汇总、在服务空闲时定时统计当天所有信息数据等。 定时任务有个非常流行的框架Quartz和Java原生API的Timer类。Spring框架也可以支持

    2023年04月15日
    浏览(43)
  • 分布式任务调度系统分析

    首先,我们来思考一些几个业务场景: XX 信用卡中心,每月 28 日凌晨 1:00 到 3:00 需要完成全网用户当月的费用清单的生成 XX 电商平台,需要每天上午 9:00 开始向会员推送送优惠券使用提醒 XX 公司,需要定时执行 Python 脚本,清理掉某文件服务系统中无效的 tmp 文件 最开始,

    2023年04月22日
    浏览(47)
  • ray-分布式计算框架-集群与异步Job管理

    0. ray 简介 ray是开源分布式计算框架,为并行处理提供计算层,用于扩展AI与Python应用程序,是ML工作负载统一工具包 Ray AI Runtime ML应用程序库集 Ray Core 通用分布式计算库 Task -- Ray允许任意Python函数在单独的Python worker上运行,这些异步Python函数称为任务 Actor -- 从函数扩展到类

    2023年04月25日
    浏览(22)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包