【软件架构】流水线设计模式

这篇具有很好参考价值的文章主要介绍了【软件架构】流水线设计模式。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

流水线模式

流水线模式是一种软件设计模式,它提供了构建和执行一系列操作的能力。

【软件架构】流水线设计模式

此模式最好与插件模式结合使用,以便在应用程序启动时动态构建流水线。

顺序

流水线的最基本实现是一个简单的操作序列。

    public interface IOperation<T>
    {
        void Invoke(T data);
    }

可以调用操作的接口来处理数据。

public class Pipeline<T> : IOperation<T>
    {
        private readonly List<IOperation<T>> operations = new List<IOperation<T>>();

        // add operation at the end of the pipeline
        public void Register(IOperation<T> operation)
        {
            operations.Add(operation);
        }

        // invoke every operations
        public void Invoke(T data)
        {
            foreach (var operation in operations) operation.Invoke(data);
        }
    }

流水线一个一个地处理每个操作。流水线类还实现了 IOperation 接口,因此它们可以组合在一起

public class ReverseOperation : IOperation<string>
{
    public void Invoke(string data) => Console.WriteLine($" The string is reversed : {data.Reverse()}");
}

该操作可以写在专用中。

public class Operation<T> : IOperation<T>
{
    private readonly Action<T> action;

    public Operation(Action<T> action)
    {
        this.action = action;
    }

    public void Invoke(T data) => action(data);
}

或者使用包装器从lambda自动创建操作。

// build
var pipeline = new Pipeline<string>();

// lambda
pipeline.Register(new Operation<string>(str =>
{
    Console.WriteLine($"The string {str} contains {str.Length} characters.");
}));

// class
pipeline.Register(new ReverseOperation());

// execute
pipeline.Invoke("apple");

应在调用流水线之前注册流水线操作。

断路器

您要添加到流水线中的第一个功能是添加断路器

public interface IOperation<T>
{
    bool Invoke(T data);
}

每个操作都会返回结果:失败成功

class Pipeline<T> : IOperation<T>
{
    // invoke every operations
    public bool Invoke(T data)
    {
        foreach (var operation in operations)
        {
            if (!operation.Invoke(data))
            {
                Console.WriteLine("Aborting pipeline..");
                return false;
            }
        }

        return true;
    }
}

如果操作失败,流水线执行应该停止

异步

另一个要求可能是拥有一个可以处理异步操作的流水线。

public interface IOperation<T>
{
    void SetNext(IOperation<T> next);
    void Invoke(T data);
}

在完成数据处理后,每个操作现在都必须调用流水线中的下一个操作。

class Pipeline<T> : IOperation<T>
{
    // not the best
    private readonly List<IOperation<T>> operations = new List<IOperation<T>>();
    private readonly IOperation<T> terminate;

    public Pipeline()
    {
        terminate = new Operation<T>(data => {}); // not the best
    }

    void IOperation<T>.SetNext(IOperation<T> next)
    {
        terminate.SetNext(next);
    }

    // append an operation at the end of the pipeline
    public void RegisterOperation(IOperation<T> operation)
    {
        // when the operation is finished, it will call terminate
        operation.SetNext(terminate);
        if (operations.Any())
        {
            // link the last registered operation with the newly registered one
            operations.Last().SetNext(operation);
        }
        operations.Add(operation);
    }

    public void Invoke(T data)
    {
        var operation = operations.Any() ? operations.First() : terminate;
        operation.Invoke(data);
    }
}

流水线稍微复杂一点,因为它需要在注册新操作时设置一个操作。另一种解决方案是使用构建器

public class WriteOperation : IOperation<string>
{
    private IOperation<string> next;

    public void SetNext(IOperation<string> next)
    {
        this.next = next;
    }

    public void Invoke(string data)
    {
        Task.Run(() =>
        {
            Console.WriteLine("Writing data to the disk...");
            Thread.Sleep(100); // just kidding !
            Console.WriteLine("Data successfully written to the disk !");
            next?.Invoke(data);
        });
    }
}

此操作是异步的,将在专用线程中运行,当时间到时,它将调用下一个操作以继续流水线。

class Operation<T> : IOperation<T>
{
    private readonly Func<T, bool> action;
    private IOperation<T> next;

    public Operation(Func<T, bool> action)
    {
        this.action = action;
    }

    public Operation(Action<T> action)
    {
        this.action = data =>
        {
            action(data);
            return true;
        };
    }

    public void SetNext(IOperation<T> next)
    {
        this.next = next;
    }

    public void Invoke(T data)
    {
        if (action(data)) next.Invoke(data);
    }
}

通用操作既可以与简单操作一起使用,也可以在使用函数时使用内置断路器

// the main pipeline
var pipeline = new Pipeline<string>();
pipeline.RegisterOperation(new Operation<string>(data =>
{
    Console.WriteLine($"Everyone likes {data} !");
    return true;
}));
pipeline.RegisterOperation(new WriteOperation());
pipeline.RegisterOperation(new Operation<string>(data =>
{
    if (data == "banana")
    {
        Console.WriteLine("This banana made the pipeline abort...");
        return false;
    }

    return true;
}));
pipeline.RegisterOperation(new Operation<string>(data => Console.WriteLine("This operation should not be called !")));

// a verbose pipeline to wrap the main pipeline
var verbose = new Pipeline<string>();
verbose.RegisterOperation(new Operation<string>(data => Console.WriteLine("Beginning of the pipeline...")));
verbose.RegisterOperation(pipeline);
verbose.RegisterOperation(new Operation<string>(data => Console.WriteLine("End of the pipeline...")));
verbose.Invoke("banana");

Console.WriteLine("The pipeline is asynchronous, so we should have more messages after this one : ");

这个简单的例子使用了我们实现的几个特性

【软件架构】流水线设计模式
现在你知道如何让你的流水线异步了
【软件架构】流水线设计模式

如果对之前的操作有另一个回调**,那就更好了,这样您就可以让结果逆流通过流水线。

插入

使用流水线设计模式的主要原因通常是需要能够添加插件,这些插件可以将操作附加到现有流水线或在其中挂钩操作。

public class Pipeline: IOperation
{
    // exposes operations for hooking
    public readonly LinkedList<IOperation> Operations = new LinkedList<IOperation>();

    // add operation at the end of the pipeline
    public void Register(IOperation operation)
    {
        Operations.AddLast(operation);
    }

    // invoke every operations
    public void Invoke()
    {
        foreach (var operation in Operations) operation.Invoke();
    }
}

流水线确实很基础,但这一次,操作暴露了

class Application
{
    internal abstract class Plugin
    {
        public abstract void Initialize(Application application);
    }

    private readonly List<Plugin> plugins = new List<Plugin>();
    public readonly Pipeline Pipeline = new Pipeline();

    public void RegisterPlugin(Plugin plugin)
    {
        plugins.Add(plugin);
    }

    public void Initialize()
    {
        Pipeline.Register(new Operation(() => Console.WriteLine("Step 1")));
        Pipeline.Register(new Operation(() => Console.WriteLine("Step 2")));
        Pipeline.Register(new Operation(() => Console.WriteLine("Step 3")));

        foreach (var plugin in plugins) plugin.Initialize(this);
    }

    public void Run()
    {
        Pipeline.Invoke();
    }
}

让我们来看一个带有流水线的简单应用程序,它只会在控制台中显示 3 个步骤。此应用程序还支持插件来修改流水线。

class HookPlugin : Application.Plugin
{
    public override void Initialize(Application application)
    {
        var operations = application.Pipeline.Operations;
        operations.AddAfter(operations.First, new Operation(() => Console.WriteLine("I really want to be second !")));
    }
}

第一个插件将在流水线的第二个插槽中挂接另一个操作。

class LatePlugin : Application.Plugin
{
    public override void Initialize(Application application)
    {
        application.Pipeline.Register(new Operation(() => Console.WriteLine("Sorry guys, I am late...")));
    }
}

第二个插件将在流水线末尾附加一个新操作。

var application = new Application();
application.RegisterPlugin(new HookPlugin());
application.RegisterPlugin(new LatePlugin());
application.Initialize();
application.Run();

应用程序和插件放在一起,我们可以调用流水线。
【软件架构】流水线设计模式

另一个有用的功能是能够在与单个项目相同的流水线中处理批处理数据。

class BatchPipeline<T> : IOperation<T[]>
{
    private readonly Pipeline<T> pipeline;

    public BatchPipeline(Pipeline<T> pipeline)
    {
        this.pipeline = pipeline;
    }

    // invoke each operation on each item
    public bool Invoke(T[] data)
    {
        // wrap items
        var items = data.Select(item => new Result<T>(item)).ToArray();

        foreach (var operation in pipeline.Operations)
        {
            // detects when every operation failed
            var failed = true;

            foreach (var item in items)
            {
                if(!item.Success) continue;
                if (!operation.Invoke(item.Data)) item.Fail();
                else failed = false;
            }

            // circuit breaker
            if (failed) return false;
            Console.WriteLine("----------------------");
        }

        return true;
    }
}

批处理流水线包装流水线并调用每个项目的每个操作。

class Result<T>
{
    public readonly T Data;
    public bool Success { get; private set; } = true;
    public void Fail() => Success = false;

    public Result(T data)
    {
        Data = data;
    }
}

每个项目都被包裹起来,所以我们可以记住断路器的结果。

public class CheckMagnitudeOperation : IOperation<int>
{
    private readonly int threshold;

    public CheckMagnitudeOperation(int magnitude)
    {
        threshold = (int) Math.Pow(10, magnitude);
    }

    public bool Invoke(int data)
    {
        if (data < threshold)
        {
            Console.WriteLine($"{data} < {threshold} -> ko");
            return false;
        }

        Console.WriteLine($"{data} >= {threshold} -> ok");
        return true;
    }
}

此操作检查整数是否具有所需的数量级

// base pipeline
var pipeline = new Pipeline<int>();
pipeline.Register(new CheckMagnitudeOperation(1));
pipeline.Register(new CheckMagnitudeOperation(2));
pipeline.Register(new CheckMagnitudeOperation(3));
pipeline.Register(new CheckMagnitudeOperation(4));

// batch pipeline
var batch = new BatchPipeline<int>(pipeline);
batch.Invoke(new []{ 12, 345, 6789 });

流水线将检查一批整数数量级。
【软件架构】流水线设计模式流水线只为没有失败的项目调用下一个操作。

高性能流水线

流水线设计模式也可以指更具体和以性能为导向的软件架构。

【软件架构】流水线设计模式
一些项目使用流水线通过在专用线程中运行流水线的每个操作来优化大量数据的处理。

abstract class Processor<T> : IOperation<T>
{
    private readonly BlockingCollection<T> queue = new BlockingCollection<T>();

    public IOperation<T> Next { private get; set; }
    public IOperation<T> Terminate { private get; set; }
    void IOperation<T>.Invoke(T data) => queue.Add(data);

    protected Processor()
    {
        Task.Run(Run);
    }

    private void Run()
    {
        Console.WriteLine($"Thread {GetType().Name} Started !");
        while (true)
        {
            var data = queue.Take();
            var operation = Process(data) ? Next : Terminate;
            operation.Invoke(data);
            Sleep(); // hack to have random output ;)
        }
    }

    protected abstract bool Pr

每个线程都将从充当缓冲区的并发队列中消费生成数据

public interface IOperation<T>
{
    IOperation<T> Next { set; }
    IOperation<T> Terminate { set; }
    void Invoke(T data);
}

这一次,我们将使用带有断路器的异步操作。

class Operation<T> : IOperation<T>
{
    private readonly Func<T, bool> action;

    public Operation(Func<T, bool> action)
    {
        this.action = action;
    }

    public IOperation<T> Next { private get; set; }
    public IOperation<T> Terminate { private get; set; }

    public void Invoke(T data)
    {
        var operation = action(data) ? Next : Terminate;
        operation?.Invoke(data);
    }
}

如果操作成功,它应该调用next,如果失败则终止

class Pipeline<T> : IOperation<T>
{
    private readonly List<IOperation<T>> operations = new List<IOperation<T>>();

    public IOperation<T> Next { private get; set; }
    public IOperation<T> Terminate { private get; set; }

    private readonly IOperation<T> success;
    private readonly IOperation<T> fail;

    public Pipeline()
    {
        success = new Operation<T>(Success);
        fail = new Operation<T>(Fail);
    }

    // append an operation at the end of the pipeline
    public void RegisterOperation(IOperation<T> operation)
    {
        // when the operation is finished, it will call either call success or fail
        operation.Next = success;
        operation.Terminate = fail;

        if (operations.Any())
        {
            // link the last registered operation with the newly registered one
            operations.Last().Next = operation;
        }
        operations.Add(operation);
    }

    public void Invoke(T data)
    {
        var operation = operations.Any() ? operations.First() : fail;
        operation.Invoke(data);
    }

    private bool Success(T data)
    {
        Continue(data);
        return true;
    }

    private bool Fail(T data)
    {
        // we decide to bypass the circuit breaker
        Continue(data);
        return false;
    }

    private void Continue(T data)
    {
        Next?.Invoke(data);
    }
}

流水线被设计成绕过断路器。Success或Fail,它总是会继续流水线序列并调用下一个操作。

// build
var pipeline = new Pipeline<Order>();
pipeline.RegisterOperation(new CreateOrderProcessor());
pipeline.RegisterOperation(new PriceOrderProcessor());
pipeline.RegisterOperation(new PaymentOrderProcessor(User.Users.ToDictionary(user => user.Id, user => user.InitialBalance)));
pipeline.RegisterOperation(new DeliverOrderProcessor());

var monitor = new Pipeline<Order>();
monitor.RegisterOperation(pipeline);
monitor.RegisterOperation(new Operation<Order>(order =>
{
    var report = order.Status == OrderStatus.Delivered ? "Success" : "Fail";
    Console.WriteLine($"[IMPORTANT] Order {order.OrderId} Finished Processing : {report}");
    return true;
}));

// process
foreach (var product in GetOrders()) monitor.Invoke(product);

这个场景有点复杂,所以我不会解释一切。这个想法是让不同的线程处理传入的订单。订单处理完成后,我们会检查订单的状态。

class CreateOrderProcessor : Processor<Order>
{
    private readonly List<Order> orders = new List<Order>();

    protected override bool Process(Order order)
    {
        order.OrderId = orders.Count;
        order.CreationTime = DateTime.UtcNow;
        order.Status = OrderStatus.Created;
        orders.Add(order);
        Console.WriteLine($"Create Order {order.OrderId}");
        return true;
    }
}

每个订单处理器都隔离在一个专用线程中,因此您可以优化存储数据的方式并在不使用锁的情况下直接访问内存。

class PaymentOrderProcessor : Processor<Order>
{
    protected override bool Process(Order order)
    {
        var balance = GetBalance(order.UserId);
        var expected = balance - order.TotalPrice;

        if (expected >= 0)
        {
            Console.WriteLine($"Payment User {order.UserId} Order {order.OrderId} : {order.TotalPrice} USD | Balance {balance} -> {expected}");
            SetBalance(order.UserId, expected);
            order.Status = OrderStatus.Payed;
            return true;
        }
        else
        {
            Console.WriteLine($"Insufficient Balance : User {order.UserId} Balance {balance} USD | Order {order.OrderId} : {order.TotalPrice} USD");
            order.Status = OrderStatus.Canceled;
            return false;
        }
    }
}

支付订单处理器是唯一可以访问用户余额的线程。它可以获取或更新任何余额,而无需担心并发问题。

【软件架构】流水线设计模式

流水线正在尽可能快地处理操作序列。

结论

流水线设计模式有很多非常不同的实现方式,从简单的命令链到更复杂的工作流文章来源地址https://www.toymoban.com/news/detail-499977.html

到了这里,关于【软件架构】流水线设计模式的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Verilog流水线设计——Pipeline

    在工程师实际开发过程中,可能会经常遇到这样的需求:数据从数据源端不断地持续输入FPGA,FPGA需要对数据进行处理,最后将处理好的数据输出至客户端。 在数据处理过程中,可能需要一系列的处理步骤。比如常规的信号进行处理步骤有(这里的处理步骤只是举个例子):

    2024年02月08日
    浏览(51)
  • 【CPU设计实战】简单流水线CPU设计

    CPU输入的、运算的、存储的、输出的数据都在组合逻辑和时序逻辑电路上流转,这些逻辑电路称为 数据通路 。 32位处理器用32位触发器来存放PC。PC的输出送到 虚实地址转换部件进行转换 ,PC的输入有两个,一个是复位值0xBFC00000,一个是复位撤销后每执行一条指令更新为当前

    2024年02月12日
    浏览(49)
  • 16位流水线CPU设计(部分)

    概括:本文介绍一个16位的流水线CPU设计与模拟过程,该流水线CPU由 IF 、 ID 、 EXE 、 MEM 和 WB 五个功能段组成,结构如下图所示。在各功能段之间分别设计了四个锁存段,即 IF_LATCH、ID_LATCH、EXE_LATCH和 WB_LATCH 。各基本模块使用 VHDL 硬件描述语言进行描述,各段的大部分功能模

    2024年02月03日
    浏览(39)
  • 6设计指令流水线-1【FPGA模型机课程设计】

    2023-5-25 09:25:05 以下内容源自《【FPGA模型机课程设计】》 仅供学习交流使用 0集中实践环节计划书【FPGA模型机课程设计】 2023-5-30 16:03:03 添加MEM_WB模块 2023-5-30 19:00:25 IF模块添加stall 2023-5-30 21:08:26 修改stall相关的处理 在id ctrl 流水寄存器中修改 因为原来没有理解stall[5:0]是什么

    2024年02月08日
    浏览(48)
  • 【【verilog典型电路设计之流水线结构】】

    下图是一个4位的乘法器结构,用verilog HDL 设计一个两级流水线加法器树4位乘法器 对于流水线结构 其实需要做的是在每级之间增加一个暂存的数据用来存储 我们得到的东西 我们一般来说会通过在每一级之间插入D触发器来保证数据的联通 通过在第一级和第二级,第二级和第

    2024年02月12日
    浏览(39)
  • FPGA中的流水线设计(含Verilog范例)

    在高速通信系统设计中,如何提高系统的工作速度至关重要,通常使用的方法有两种: 1. 并行方式设计:可减少模块间的延时; 2. 流水线设计:流水线设计如同生产线一样,将整个执行过程分解为若干个工作段,从流水线的起点连续输入,各操作段以重叠方式执行。使得运

    2023年04月21日
    浏览(34)
  • 南京观海微电子----Verilog流水线设计——Pipeline

    1.  前言 在工程师实际开发过程中,可能会经常遇到这样的需求:数据从数据源端不断地持续输入FPGA,FPGA需要对数据进行处理,最后将处理好的数据输出至客户端。 在数据处理过程中,可能需要一系列的处理步骤。比如常规的信号进行处理步骤有(这里的处理步骤只是举个

    2024年01月19日
    浏览(44)
  • 从零开始设计RISC-V处理器——五级流水线之数据通路的设计

    (一)从零开始设计RISC-V处理器——指令系统 (二)从零开始设计RISC-V处理器——单周期处理器的设计 (三)从零开始设计RISC-V处理器——单周期处理器的仿真 (四)从零开始设计RISC-V处理器——ALU的优化 (五)从零开始设计RISC-V处理器——五级流水线之数据通路的设计

    2024年02月08日
    浏览(53)
  • 【计组实验】基于Verilog的多周期非流水线MIPS处理器设计

    设计多周期非流水线MIPS处理器,包括: 完成多周期MIPS处理器的Verilog代码; 在Vivado软件上进行仿真; 编写MIPS代码验证MIPS处理器; 相关代码及资源的下载地址如下: 本实验的Vivado工程文件和实验文档:Multi-Cycle MIPS Processor.zip(272KB) QtSpim 9.1.23和Vivado 2019.2的安装包:QtSpim Viv

    2024年02月11日
    浏览(47)
  • 1.6流水线:流水线、流水线周期、流水线执行时间、流水线吞吐率、流水线加速比

    相关参数计算:流水线执行时间计算、流水线吞吐率、流水线加速比等。 流水线是指在程序执行时多条指令重叠进行操作的一种准并行处理实现技术。各种部件同时处理是针对不同指令而言的,它们可同时为多条指令的不同部分进行工作,以提高各部件的利用率和指令的平均

    2024年02月09日
    浏览(52)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包