协程并发下数据汇总:除了互斥锁,还有其他方式吗?

这篇具有很好参考价值的文章主要介绍了协程并发下数据汇总:除了互斥锁,还有其他方式吗?。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1. 简介

本文介绍了在并发编程中数据汇总的问题,并探讨了在并发环境下使用互斥锁和通道两种方式来保证数据安全性的方法。

首先,通过一个实例,描述了一个并发拉取数据并汇总的案例,并使用互斥锁来确保线程安全。然后,讨论了互斥锁的一些缺点,引出了通道作为一种替代方案,并介绍了通道的基本使用和特性。接下来,通过实例演示了如何使用通道来实现并发下的数据汇总。

最后,引用了etcd中使用通道实现协程并发下数据汇总的例子,展示了通道在实际项目中的应用。

2. 问题引入

在请求处理过程中,经常需要通过RPC接口拉取数据。有时候,由于数据量较大,单个数据拉取操作可能会导致整个请求的处理时间较长。为了加快处理速度,我们通常考虑同时开启多个协程并发地拉取数据。一旦多个协程并发拉取数据后,主协程需要汇总这些协程拉取到的数据,然后再返回结果。在这个过程中,往往涉及对共享资源的并发访问,为了保证线程安全性,通常会使用互斥锁。下面通过一个简单的代码来展示该过程:

package main

import (
        "fmt"
        "sync"
        "time"
)

type Data struct {
        ID   int
        Name string
}

var (
        // 汇总结果
        dataList []Data
        // 互斥锁
        mutex    sync.Mutex
)

func fetchData(page int, wg *sync.WaitGroup) {
        // 模拟RPC接口拉取数据的耗时操作
        time.Sleep(time.Second)

        // 假设从RPC接口获取到了一批数据
        data := Data{
                ID:   page,
                Name: fmt.Sprintf("Data %d", page),
        }

        // 使用互斥锁保护共享数据的并发访问
        mutex.Lock()
        defer mutext.Unlock()
        dataList = append(dataList, data)

        wg.Done()
}

func main() {
        var wg sync.WaitGroup

        // 定义需要拉取的数据页数
        numPages := 10

        // 启动多个协程并发地拉取数据
        for i := 1; i <= numPages; i++ {
            wg.Add(1)
            go fetchData(i, &wg)
        }

        // 等待所有协程完成
        wg.Wait()

        // 打印拉取到的数据
        fmt.Println("Fetched data:")
        for _, data := range dataList {
            fmt.Printf("ID: %d, Name: %s\n", data.ID, data.Name)
        }
}

在上述示例中,我们定义了一个共享的dataList切片用于保存拉取到的数据。每个goroutine通过调用fetchData函数来模拟拉取数据的过程,并使用互斥锁mutex保护dataList的并发访问。主协程使用sync.WaitGroup等待所有协程完成数据拉取任务,然后打印出拉取到的数据。通过并发地拉取数据,并使用互斥锁保证线程安全,我们可以显著提高数据拉取的速度,并且确保数据的正确性和一致性。

回看上述实现,其实是涉及到了多个协程操作同一份数据,有可能导致线程安全的问题,然后这里是通过互斥锁来保证线程安全的。确实,使用互斥锁是可以保证线程安全的,但是也是存在一些缺点的,比如竞争和阻塞,两个协程同时竞争互斥锁时,只有一个协程能够获得锁,而其他协程则会被阻塞,这个就可能导致性能瓶颈,当然在这个场景下问题不大。其次就是代码的复杂性提高了,使用互斥锁需要仔细设计和管理,确保锁的正确获取和释放。这增加了代码的复杂性和维护成本,如果在代码中处理锁的方式不正确,可能会死锁,导致程序无法继续执行。

那我们其实就有疑问,在协程并发下数据汇总的场景,是否存在其他方式,不需要通过使用互斥锁,也能够保证线程安全呢? 其实还真有,Go语言中的channel非常适用于这种情况。通过使用通道,我们可以实现线程安全的数据共享和同步,而无需显式地使用互斥锁。下面我们来了解一下channel

3. channel的使用

3.1 channel的基本介绍

3.1.1 基本说明

channel在Go语言中是一种特殊的数据结构,用于协程之间的通信和同步。它类似于一个先进先出(FIFO)的队列,用于数据的传输和共享。在并发环境中,可以将数据发送到通道,也可以从通道中接收数据,而这两个操作都是线程安全的。

使用channel的优势在于它提供了内置的同步机制,无需显式地使用互斥锁来处理并发访问。

当一个协程向通道发送数据时,如果通道已满,发送操作会被阻塞,直到有其他协程从通道中接收数据释放空间。同样地,当一个协程从通道接收数据时,如果通道为空,接收操作也会被阻塞,直到有其他协程向通道发送数据。

同时,当多个协程同时访问通道时,Go运行时系统会自动处理协程之间的同步和并发访问的细节,保证数据的正确性和一致性。从而可以放心地在多个协程中使用通道进行数据的发送和接收操作,而不需要额外的锁或同步机制来保证线程安全。

因此,使用channel其实是可以避免常见的并发问题,如竞态条件和死锁,简化了并发编程的复杂性。

3.1.2 基本使用

通过上面对channel的基本介绍,我们已经对channel有了基本的了解,其实可以粗略理解其为一个并发安全的队列。下面来了解下channel的基本语法,从而能够开始使用channel

channel基本操作分为创建channel,发送数据到channel,接收channel中的数据,以及关闭channel。下面对其进行简单展示:

创建channel,使用make函数创建通道,通道的类型可以根据需要选择,例如intstring等:

ch := make(chan int)

发送数据到channel:使用<-操作符将数据发送到通道中

ch <- data

接收channel中的数据: 使用<-操作符从通道中接收数据

result := <-ch

关闭channel, 使用close函数关闭通道。关闭通道后,仍然可以从通道接收数据,但无法再向通道发送数据

close(ch)

通过上面channel的四个基本操作,便能够实现在不同协程间线程安全得传递数据。最后通过一个例子,完整得展示channel的基本使用。

package main

import "fmt"

func main() {
        ch := make(chan string) // 创建字符串通道
        defer close(ch)
        go func() {
                ch <- "hello, channel!" // 发送数据到通道
        }()

        result := <-ch // 从通道接收数据
        fmt.Println(result)
}

在这个示例中,我们创建了一个字符串通道ch。然后,在一个单独的协程中,我们向通道发送了字符串"hello, channel!"。最后,主协程从通道中接收数据,并将其打印出来。

通过使用通道,我们可以实现协程之间的数据传输和同步,确保数据的安全共享和线程安全性。通道的使用能够简化并发编程的复杂性,提供一种高效、可靠的方式来处理并发场景下的数据传递。

3.2 使用channel实现汇总数据

下面,我们使用channel来实现并发数据汇总,替换掉之前使用互斥锁来保证线程安全的实现:

package main

import (
        "fmt"
        "sync"
        "time"
)

type Data struct {
        ID   int
        Name string
}

func fetchData(page int, ch chan Data, wg *sync.WaitGroup) {
        // 模拟 RPC 接口拉取数据的耗时操作
        time.Sleep(time.Second)

        // 假设从 RPC 接口获取到了一批数据
        data := Data{
                ID:   page,
                Name: fmt.Sprintf("Data %d", page),
        }

        ch <- data // 将数据发送到通道

        wg.Done()
}

func main() {
        var wg sync.WaitGroup

        // 定义需要拉取的数据页数
        numPages := 10

        dataCh := make(chan Data, 10) // 创建用于接收数据的通道

        // 启动多个协程并发地拉取数据
        for i := 1; i <= numPages; i++ {
                wg.Add(1)
                go fetchData(i, dataCh, &wg)
        }

        go func() {
                wg.Wait()
                close(dataCh) // 关闭通道,表示数据已经全部发送完成
        }()

        // 从通道接收数据并汇总
        var dataList []Data
        for data := range dataCh {
            dataList = append(dataList, data)
        }

        // 打印拉取到的数据
        fmt.Println("Fetched data:")
        for _, data := range dataList {
                fmt.Printf("ID: %d, Name: %s\n", data.ID, data.Name)
        }
}

在修改后的代码中,我们创建了一个用于接收数据的 dataCh。每个协程通过将数据发送到该channel 来完成数据的汇总。主协程通过从channel接收数据,并将其添加到 dataList 中实现数据的汇总过程。这种方式不需要显式地加锁和解锁,并且避免了互斥锁带来的复杂性和性能问题。

通过使用channel,我们能够以一种更直观、更安全的方式实现协程之间的数据传递和同步。channel在并发编程中起到了关键的作用,简化了并发操作的管理和实现。同时,它提供了内置的同步机制,保证了数据的正确性和一致性,避免了死锁和竞态条件的问题。

3.3 总结

协程间的并发下汇总数据可以归类为协程间的数据传递这个场景。在这个场景中,多个协程并发地拉取数据,然后将数据汇总到一个共享的数据结构中。为了保证数据的正确性和一致性,需要使用某种机制来确保多个协程对共享数据的并发访问是安全的。

在原始的实现中,使用了互斥锁来保护共享数据的并发访问。互斥锁提供了互斥访问的机制,确保同一时间只有一个协程可以访问共享数据,从而避免了数据竞争和不一致性。这种方式在保证线程安全的同时,引入了锁的开销和复杂性。

而使用channel来实现协程间的安全数据传递可以更简洁和高效。每个协程可以将拉取到的数据通过channel发送到主协程,主协程通过接收channel中的数据来进行汇总。channel提供了并发安全的数据传递机制,协程之间的数据传输是同步和有序的。由于channel本身就提供了同步机制,不需要额外的锁和同步操作,能够更简洁地实现协程间的安全数据传递。

因此,如果需要在多个协程间实现数据传递,而且由此可能带来线程安全的问题,此时使用channel来实现是相对比较合适的。

4. 开源项目中的使用

假设我们需要对etcd进行性能测试,此时需要模拟大量并发请求,对etcd进行负载测试,并收集每个请求的执行时间、成功/失败状态等结果数据。然后主协程需要收集每一个请求的结果数据,并进行统计计算,生成相应的性能报告。基于此,能够计算出总请求数、请求成功率、平均执行时间、最慢/最快请求等统计信息,以及错误分布情况和慢速请求的详细信息。

从上面的讲述来看,其实我们可以大概想象出这个模型,多个协程并发执行,然后获取每个请求的结果数据。然后主协程需要收集汇总这些数据,基于此来生成性能报告。这个模型其实也就是我们上面所说的协程并发下的数据汇总,因此通过channel来实现协程间的数据传输,是非常合适的。

下面我们来看看etcd中对应的实现。etcd中存在一个report对象的实现,能够接受一系列的请求数据的结果,然后生成性能报告返回回去。结构体定义如下:

type report struct {
   results   chan Result
   stats Stats
}
func (r *report) Results() chan<- Result { return r.results }

// Result describes the timings for an operation.
type Result struct {
   Start  time.Time
   End    time.Time
   Err    error
}

func newReport(precision string) *report {
   r := &report{
      results:   make(chan Result, 16),
   }
   return r
}

Result结构体为单个测试的结果,而 report 结构体则用于整个测试过程的报告和统计信息。通过使用 results 通道,可以将每个测试的结果发送到 report 结构体中,以便进行统计和生成报告。

当进行性能压测时,首先通过newReport生成一个report对象,然后启动多个协程同时进行压测请求,每一个请求处理完成之后,便会生成一个处理结果,存储到Result对象当中。然后基于report对象的Results方法获取到对应的channel,将处理结果传输给主协程。

主协程便通过遍历report对象中的results变量对应的channel,汇总计算所有处理结果,基于此便能够生成压测结果和报告。下面来看其具体流程。

首先是创建一个report对象,然后启动多个协程来处理请求,将结果发送到report对象中的results对应的channel中。

// 这里NewReportSample方法,其实是对上面newReport方法的一个封装
r := NewReportSample("%f")
// 这里假设只有一个协程,模拟执行一系列的测试,并将测试结果发送到 Report 对象的 results 通道中。
go func() {
   start := time.Now()
   for i := 0; i < 5; i++ {
      // 不真实进行请求,只是简单获取执行结果,将测试结果进行传输
      end := start.Add(time.Second)
      r.Results() <- Result{Start: start, End: end}
      start = end
   }
   r.Results() <- Result{Start: start, End: start.Add(time.Second), Err: fmt.Errorf("oops")}
   // 假设所有压测请求都执行完成了
   close(r.Results())
}()
// 主协程 汇总所有的处理结果,然后生成压测报告
stats := <-r.Stats()

以上代码中,r 是通过 NewReportSample("%f") 创建的一个 Report 对象。然后,在一个单独的协程中,执行了一系列的测试,并将测试结果发送到 r.Results() 通道中。

这段代码的作用是模拟执行一系列的测试,并将测试结果发送到 Report 对象的 results 通道中。通过使用 r.Results() 方法返回的通道,可以将测试结果发送到报告对象中进行统计和处理。

接下来,主协程应该不断从 r.Results()方法返回的通道中读取数据,汇总所有的处理结果,从而生成压测报告。这个方法其实是被封装在r.Stas()方法中,具体如下:

func (r *report) Stats() <-chan Stats {
    // 创建一个channel
   donec := make(chan Stats, 1)
   // 启动一个协程来执行
   go func() {
      defer close(donec)
      r.processResults()
      s := r.stats.copy()
      if r.sps != nil {
         s.TimeSeries = r.sps.getTimeSeries()
      }
      // 执行完成的话,将结果返回
      donec <- s
   }()
   // 返回channel
   return donec
}

// Stats方法启动的协程中,实际运行的任务
func (r *report) processResults() {
   st := time.Now()
   // 遍历r.results方法中channel中的数据,然后执行处理流程
   for res := range r.results {
      r.processResult(&res)
   }
   // 后续执行一些具体的计算逻辑
}

上述代码是 report 结构体中的两个方法,其中 Stats() 方法返回一个只读的 Stats 通道。这个方法会在一个单独的协程中执行,并处理 results 通道中的测试结果。事实上就是汇总channel中的数据,然后进行一定的处理,然后返回。

5. 总结

本文通过介绍并发编程中的数据汇总问题,提出了使用互斥锁和通道来保证线程安全的方法。互斥锁适用于临界区保护和共享资源的互斥访问,但可能存在死锁和性能瓶颈的问题。相比之下,通道提供了更直观和安全的协程间通信方式,避免了锁的问题,并提供了更灵活的并发模式。

基于以上内容的介绍,大概能够明确下,在数据传递和汇总的场景下,使用channel来实现可能是更为合适的,能够提高代码的可读性和并发安全性。希望以上内容对你有所帮助。文章来源地址https://www.toymoban.com/news/detail-452803.html

到了这里,关于协程并发下数据汇总:除了互斥锁,还有其他方式吗?的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 除了百度还有什么搜索引擎比较好

    百度是国内最大的搜索引擎,也是国内最好的搜索引擎。但是由于其广告比较多,有些同学不太喜欢使用百度搜索。那么除了百度还有什么搜索引擎比较好?小编就来和大家分享几款国内可以使用的其他搜索引擎。 1.谷歌搜索,谷歌搜索是全球最受欢迎的搜索引擎,在国内无法

    2024年02月11日
    浏览(50)
  • 除了运行、休眠…进程居然还有僵尸、孤儿状态

    摘要: 本章我们将认识几种进程状态——运行状态、休眠状态、暂停状态、退出状态等。还要介绍两种具有惨烈身世的僵尸进程与孤儿进程~ 本文分享自华为云社区《僵尸进程?孤儿进程?为什么他有如此惨烈的身世...》,作者: 花想云 。 Linux中进程状态一般有: R(运行状

    2024年02月06日
    浏览(44)
  • 除了!important,还有什么方法修改默认CSS

    除了使用!important来修改默认CSS,还有以下几种方法: 使用特定性更高的选择器:通过使用更具体的选择器,可以覆盖默认CSS样式。例如,使用ID选择器或类选择器来针对特定的元素进行样式修改。 使用内联样式:可以直接在HTML元素的style属性中添加样式,这样可以覆盖默认

    2024年02月12日
    浏览(44)
  • 2023年除了百度还有哪些搜索引擎推荐?

    搜狗 https://sogou.com 搜索最早起源于搜狐,后来被腾讯收购。整体感觉,反应速度比较快,广告不算很多,大部分中文网站都有收录,有时候收录速度比较慢。 Bing.com https://cn.bing.com Bing是来自微软的搜索引擎,也是做的比较早,大概和搜狗同时起步的,搜索结果比较全面,性

    2024年02月04日
    浏览(100)
  • 除了 ChatGPT,还有哪些好用的AI工具?

    OnlyFans 订阅教程移步:【保姆级】2024年最新Onlyfans订阅教程 Midjourney 订阅教程移步: 【一看就会】五分钟完成MidJourney订阅 GPT-4.0 升级教程移步:五分钟开通GPT4.0 如果你需要使用Wildcard开通GPT4、Midjourney或是Onlyfans的话,请点击 :WildCard使用教程 GitHub Copilot,这款由GitHub与Op

    2024年04月13日
    浏览(62)
  • 除了Copilot还有这些AI代码辅助工具

    最近牛逼的GitHub Copilot试用到期了,离开它还有点不习惯,基础的代码它基本可以帮你搞定,开发效率直接翻倍。为啥这么好用,Copilot的背后是OpenAI和强大的GitHub代码库。那么有没有可以取代它而免费IDE的AI代码辅助工具呢?还真有,小编本文介绍几种免费的代码智能辅助工

    2024年02月14日
    浏览(51)
  • spingboot按照依赖包除了maven还有Gradle,两者的区别?

    Maven和Gradle是两种常用的构建工具,用于管理Java项目的依赖关系和构建过程。它们之间的区别如下: 语法:Maven使用XML作为构建文件的格式,而Gradle使用基于Groovy或Kotlin的领域特定语言(DSL)。 灵活性:相比Maven,Gradle提供更大的灵活性和自定义能力。Gradle的DSL允许你以声明

    2024年02月10日
    浏览(34)
  • 网络连接管理除了TCP三次握手,还有TCP四次挥手

    网络通信 建立连接 ,TCP会进行三次握手,三次握手主要是两个主机之间建立连接,和其他没有什么关系,那么两个主机之间是如何进行三次握手的呢?他们又会使用什么操作来建立连接呢? 这里我们先了解一下TCP的报文结构: 三次握手主要是理解成客户端与服务器经过三次

    2024年02月07日
    浏览(63)
  • 安卓手机除了UC,还有哪些良心好用的浏览器可选?

    浏览器作为手机端不可缺少的APP之一,几乎每个人的手机都安装有浏览器。UC浏览器作为国内最流行的手机浏览器之一,拥有庞大的用户群体。不少人醉心于它强大的功能,但是也有一些用户因为UC令人震惊的广告,以及越来越臃肿的体积,选择弃之而去。那么,安卓手机除了

    2024年02月10日
    浏览(47)
  • Vue3 除了 keep-alive,还有哪些实现页面缓存的方法

    有这么一个需求:列表页进入详情页后,切换回列表页,需要对列表页进行缓存,如果从首页进入列表页,就要重新加载列表页。 对于这个需求,我的第一个想法就是使用keep-alive来缓存列表页,列表和详情页切换时,列表页会被缓存;从首页进入列表页时,就重置列表页数

    2024年02月06日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包