GO语言:Worker Pools线程池、Select语句、Metex互斥锁详细示例教程

这篇具有很好参考价值的文章主要介绍了GO语言:Worker Pools线程池、Select语句、Metex互斥锁详细示例教程。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、Buffered Channels and Worker Pools

1. Goroutine and Channel Example 线程和通道示例

          package main

          import (
              "fmt"
              "time"
          )

          func write(ch chan int) {
              for i := 0; i < 5; i++ {
                  ch <- i // 向通道写入 0-4		因为通道容量是2 需要读取数据才会进行下一步 否则一直在阻塞态
                  fmt.Println("Successfully wrote", i, "to ch")
              }
              close(ch) // 关闭通道
          }

          func main() {
              ch := make(chan int, 2) // 创建一个容量为2的缓冲通道  通道容量大小会导致阻塞
              go write(ch)
              time.Sleep(2 * time.Second) // 模拟时间间隔
              for v := range ch {
                  fmt.Println("Read value", v, "from ch") // 读取数据 goroutine继续运行
                  time.Sleep(2 * time.Second)
              }

              // 并发的 goroutine 和通道的阻塞机制,write() 函数和 range ch 循环可以交替执行,使得循环不会一次执行完毕,而是在读取完所有值之后等待新的值出现,再次进行循环迭代。
          }

          // Successfully wrote 0 to ch
          // Successfully wrote 1 to ch
          // Read value 0 from ch
          // Successfully wrote 2 to ch
          // Read value 1 from ch
          // Successfully wrote 3 to ch
          // Read value 2 from ch
          // Successfully wrote 4 to ch
          // Read value 3 from ch
          // Read value 4 from ch

2. Deadlock 死锁

         package main

         import (  
             "fmt"
         )

         func main() {  
             ch := make(chan string, 2)
             ch <- "naveen"
             ch <- "paul"
             ch <- "steve"			// 其容量是2 但是写入三个 导致死锁
             fmt.Println(<-ch)
             fmt.Println(<-ch)
         }

         // fatal error: all goroutines are asleep - deadlock!

         // goroutine 1 [chan send]:  
         // main.main()  
         //     /tmp/sandbox091448810/prog.go:11 +0x8d

3. Closing buffered channels 关闭通道

         ch := make(chan int, 5)
         ch <- 6
         ch <- 9
         close(ch)
         n, open := <-ch
         fmt.Printf("Received: %d, open: %t\n", n, open)
         n, open = <-ch
         fmt.Printf("Received: %d, open: %t\n", n, open)
         n, open = <-ch
         fmt.Printf("Received: %d, open: %t\n", n, open)

         // Received: 5, open: true  
         // Received: 6, open: true  
         // Received: 0, open: false  

4. Length vs Capacity 长度和容量

        ch := make(chan string, 3)
        ch <- "Like"
        ch <- "LiangXiaoQing"
        fmt.Println("capacity is", cap(ch))
        fmt.Println("length is", len(ch)) // 通道写入的个数
        fmt.Println("read value", <-ch)
        fmt.Println("new length is", len(ch))
        fmt.Println("read value", <-ch)
        fmt.Println("new length is", len(ch))

        // capacity is 3
        // length is 2
        // read value Like
        // new length is 1
        // read value LiangXiaoQing
        // new length is 0

5. WaitGroup

         // Add() 添加任务
         // Done() 通知wait完成任务
         // Wait() 阻塞等待所有任务完成

         package main

         import (  
             "fmt"
             "sync"
             "time"
         )

         func process(i int, wg *sync.WaitGroup) {
             fmt.Println("started Goroutine", i)   // 3.打印 Goroutine 开始执行的信息
             time.Sleep(2 * time.Second)           // 4.暂停 2 秒,模拟任务执行时间
             fmt.Printf("Goroutine %d ended\n", i) // 5.打印 Goroutine 执行结束的信息
             wg.Done()                             // 6.通知等待组任务已完成
         }

         func main() {
             no := 3
             var wg sync.WaitGroup
             for i := 0; i < no; i++ {
                 wg.Add(1)          // 1.循环三次添加三次任务
                 go process(i, &wg) // 2.每次传入当前i 0-2 及wg内存地址
             }
             wg.Wait() 	// 7.等待所有任务完成
             fmt.Println("All go routines finished executing")
         }

         // started Goroutine 1
         // started Goroutine 0
         // started Goroutine 2
         // Goroutine 2 ended
         // Goroutine 0 ended
         // Goroutine 1 ended
         // All go routines finished executing

6. Worker Pool Implementation 线程池

        type Job struct { // Job 结构表示一个具有 ID 和随机数的作业。
            id       int
            randomno int
        }

        type Result struct { // Result 结构表示作业的结果,包括作业本身和数字各位数之和。
            job         Job
            sumofdigits int
        }

        var jobs = make(chan Job, 10)       // jobs 是一个带有缓冲区大小为 10 的通道,用于传递作业。
        var results = make(chan Result, 10) // results 是一个带有缓冲区大小为 10 的通道,用于传递结果。

        func digits(number int) int { // digits 函数计算一个整数的各位数之和。
            sum := 0
            no := number
            for no != 0 { // 循环中,通过取模和除法操作,将数字的各位数相加。
                digit := no % 10
                sum += digit
                no /= 10
            }
            time.Sleep(2 * time.Second) // time.Sleep(2 * time.Second) 使函数暂停 2 秒钟,模拟一个耗时操作。
            return sum                  // 返回各位数之和。
        }

        func worker(wg *sync.WaitGroup) { // worker 函数是一个工作协程,用于处理作业。
            for job := range jobs { // 使用 range 循环从 jobs 通道接收作业。
                output := Result{job, digits(job.randomno)} // 通过调用 digits 函数计算作业的各位数之和。 将作业和结果封装为 Result 结构
                results <- output                           // 并发送到 results 通道。
            }
            wg.Done() // wg.Done() 声明一个任务已完成。
        }

        func createWorkerPool(noOfWorkers int) { // createWorkerPool 函数创建一个工作池,用于并发处理作业。
            var wg sync.WaitGroup              // 创建一个 sync.WaitGroup 对象 wg,用于等待所有工作协程完成。
            for i := 0; i < noOfWorkers; i++ { // 使用 for 循环创建指定数量的工作协程。
                wg.Add(1)      // 添加任务
                go worker(&wg) // 每个工作协程调用 worker 函数,并传递 &wg 作为参数。
            }
            wg.Wait()      // wg.Wait() 等待所有工作协程完成。
            close(results) // 关闭 results 通道,表示所有结果已经发送完毕。
        }

        func allocate(noOfJobs int) { // allocate 函数用于生成指定数量的作业并发送到 jobs 通道。
            for i := 0; i < noOfJobs; i++ { //  使用 for 循环创建指定数量的作业。
                randomno := rand.Intn(999) // 生成一个随机数 randomno,范围在 0 到 999 之间。
                job := Job{i, randomno}    // 创建一个 Job 结构体 job
                jobs <- job                // 并将其发送到 jobs 通道。
            }
            close(jobs) // 关闭 jobs 通道,表示所有作业已经发送完毕。
        }

        func result(done chan bool) { // result 函数用于接收并处理结果。
            for result := range results { // 使用 range 循环从 results 通道接收结果。
                fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
            }
            done <- true
        }

        func main() {
            startTime := time.Now()
            noOfJobs := 10
            go allocate(noOfJobs) 	// 传入job id 0-99 random 0-999的随机数字	Job id 1, input random no 636
            done := make(chan bool)
            go result(done)		// result线程一直等待results有值 如果有值则打印信息 传入true结束通道
            noOfWorkers := 10	// 控制线程池数量 
            createWorkerPool(noOfWorkers)	// 把&wg sync.WaitGroup 类型变量的指针 传给 results
            <-done	// 关闭通道
            endTime := time.Now()
            diff := endTime.Sub(startTime)	// 计算时间差
            fmt.Println("total time taken", diff.Seconds(), "seconds")
        }


        // Job id 1, input random no 636, sum of digits 15  
        // Job id 0, input random no 878, sum of digits 23  
        // Job id 9, input random no 150, sum of digits 6  
        // ...
        // total time taken  20.01081009 seconds  

二、Select

1. Example

        package main

        import (
            "fmt"
            "time"
        )

        func server1(ch chan string) {
            time.Sleep(9 * time.Second)
            ch <- "From Server 1"
        }

        func server2(ch chan string) {
            time.Sleep(6 * time.Second)
            ch <- "From server 2"
        }

        func main() {
            output1 := make(chan string)
            output2 := make(chan string)
            go server1(output1)
            go server2(output2)
            select { 	// 使用select语句接收多个通道消息,select会接收最先准备好的通道接收操作
            case s1 := <-output1:
                fmt.Println(s1)
            case s2 := <-output2:
                fmt.Println(s2)
            }
        }

        // From server 2

2. Default case 默认选择

        func process(ch chan string) {
            time.Sleep(1 * time.Second)
            ch <- "Process Successful"
        }

        func main() {
            ch := make(chan string)
            go process(ch)
            for { 		// for循环一直循环 每次循环休息1秒 直到v有值 主要看上面process函数睡眠睡觉 否则一直输出default的值
                time.Sleep(1000 * time.Microsecond)
                select {
                case v := <-ch:
                    fmt.Println("Received value:", v)
                    return
                default:
                    fmt.Println("No value Received")
                }
            }
            
           // ....
           // No value Received
           //No value Received
           //No value Received
           //No value Received
           //Received value: Process Successful

3. Deadlock and default case 死锁与默认选择

        func main() {
            ch := make(chan string)
            select {
            case v := <-ch:
                fmt.Println("Received value", v)
            default:
                fmt.Println("Default case executed")
            }
        }

        // Default case executed

4. Random selection 随机选择

        package main

        import (  
            "fmt"
            "time"
        )

        func server1(ch chan string) {  
            ch <- "from server1"
        }
        func server2(ch chan string) {  
            ch <- "from server2"

        }
        func main() {  
            output1 := make(chan string)
            output2 := make(chan string)
            go server1(output1)
            go server2(output2)
            time.Sleep(1 * time.Second)
            select {		// 使用select语句接收多个通道消息,select会接收最先准备好的通道接收操作
            case s1 := <-output1:
                fmt.Println(s1)
            case s2 := <-output2:
                fmt.Println(s2)
            }
        }

        // From Server 1

三、Mutex

1. Program with a race condition 无锁示例

        package main

        import (
            "fmt"
            "sync"
        )

        var x = 0

        func increment(wg *sync.WaitGroup) {
            x = x + 1
            wg.Done()
        }

        func main() {
            var w sync.WaitGroup
            for i := 0; i < 1000; i++ {
                w.Add(1)
                go increment(&w)
            }
            w.Wait()
            fmt.Println("Final value of X", x)
        }


        // Final value of X 987		最终答案应该是1000 因为多线程全部都在操作x 导致有些操作未成功

2. Solving the race condition using a mutex 互斥锁解决方案

        package main

        import (
            "fmt"
            "sync"
        )

        var x = 0

        func increment(wg *sync.WaitGroup, m *sync.Mutex) {
            m.Lock() // 上锁
            x = x + 1
            m.Unlock() // 释放锁  只有拿到锁才能操作x 否则一直等待
            wg.Done()
        }

        func main() {
            var w sync.WaitGroup
            var m sync.Mutex
            for i := 0; i < 1000; i++ {
                w.Add(1)
                go increment(&w, &m)
            }
            w.Wait()
            fmt.Println("Final value of X", x)
        }


        // Final value of X 1000

3. Solving the race condition using channel 通道解决方案

        package main  
        import (  
            "fmt"
            "sync"
            )
        var x  = 0  
        func increment(wg *sync.WaitGroup, ch chan bool) {  
            ch <- true
            x = x + 1
            <- ch
            wg.Done()   
        }
        func main() {  
            var w sync.WaitGroup
            ch := make(chan bool, 1)	// 通道容量 1	所以每次都需要上一个结束 下一个才能进行操作
            for i := 0; i < 1000; i++ {
                w.Add(1)        
                go increment(&w, ch)
            }
            w.Wait()
            fmt.Println("final value of x", x)
        }

        // Final value of x 1000

技术小白记录学习过程,有错误或不解的地方请指出,如果这篇文章对你有所帮助请点点赞收藏+关注谢谢支持 !!!文章来源地址https://www.toymoban.com/news/detail-668768.html

到了这里,关于GO语言:Worker Pools线程池、Select语句、Metex互斥锁详细示例教程的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Go语言中的Select:深度解析与实战案例

    select  是操作系统中的系统调用,我们以前在学校中学习操作系统课程或者在工作当中,肯定都使用过或者了解过  select 、 poll  和  epoll  等函数构建 I/O 多路复用模型提升程序的性能。Go 语言的  select  与操作系统中的  select  很相似,今天这篇文章会深度解析 Go 语言 

    2024年01月24日
    浏览(35)
  • 【Go】Go 语言教程--GO条件和循环语句(八)

    往期教程: Go 语言教程–介绍(一) Go 语言教程–语言结构(二) Go 语言教程–语言结构(三) Go 语言教程–数据类型(四) Go 语言教程–语言变量(五) Go 语言教程–GO语言常量(六) Go 语言教程–GO语言运算符(七) 条件语句需要开发者通过指定一个或多个条件,并

    2024年02月13日
    浏览(36)
  • Go语言基础-if语句

      原文链接:  https://www.fearlazy.com/index.php/post/288.html   在目前接触的几种语言中条件语句都是必不可少的。很难想象没有条件语句要怎么写程序。   1.if语句 Go语言的条件语句和C++的很像,使用if。 其格式如下: if 表达式为true {    执行语句 } 和C++的区别是条件表达式

    2023年04月08日
    浏览(56)
  • Go语言中的流程控制语句

    目录 流程控制语句 if语句 if···else语句 switch语句 for循环 break语句 continue语句 goto语句 在Go语言中,可以在if后面加上一段执行语句,执行语句也可以用作对if条件的判断。它的语法格式为: a赋值为condition()方法的结果,然后让a与nil进行判断,如果为true,那么会执行代码块中

    2024年02月03日
    浏览(31)
  • Go语言之函数补充defer语句,递归函数,章节练习

    defer语句是go语言提供的一种用于注册延迟调用的机制,是go语言中一种很有用的特性。 defer语句注册了一个函数调用,这个调用会延迟到defer语句所在的函数执行完毕后执行,所谓执行完毕是指该函数执行了return语句、函数体已执行完最后一条语句或函数所在协程发生了恐慌

    2024年02月17日
    浏览(37)
  • 【Linux】线程互斥 -- 互斥锁 | 死锁 | 线程安全

    我们写一个多线程同时访问一个全局变量的情况(抢票系统),看看会出什么bug: 假如创建4个线程同时抢票,总票数有10000张,每个线程抢到票以后减一,按照正常情况我们应该是抢票到0截至。 多个线程交叉执行本质:就是让调度器尽可能的频繁发生线程调度与切换 线程一般

    2024年02月14日
    浏览(29)
  • 3.你所不知道的go语言控制语句——Leetcode习题69

    目录 本篇前瞻 Leetcode习题9 题目描述 代码编写 控制结构 顺序结构(Sequence) 声明和赋值 多返回值赋值 运算符 算术运算符 位运算符 逻辑运算 分支结构 if 语句 switch 语句 逻辑表达式 fallthrough 类型推断 循环语句 continue break goto Leetcode习题69 题目描述 题目分析 代码编写 本篇

    2024年02月12日
    浏览(28)
  • Mysql 数据库DQL 数据查询语言 SELECT 基本查询、条件查询、聚合查询、分组查询、排序查询、分页查询——包含DQL所有查询语句。吐血分享。

    DQL:数据查询语言; 用来对表内的数据进行查找 。Database Query Language SQL语句分为:基本查询、条件查询、聚合查询、分组查询、排序查询、分页查询。  可以发现name字段就只剩下一个张三了;   条件: 条件查询—比较运算符 比较运算符 功能 大于 = 大于等于 小于 = 小于等

    2024年01月19日
    浏览(45)
  • 多线程(线程互斥)

    学习了前面有关线程库的操作后,我们就可以模拟抢票的过程 假设我们创建四个线程,分别代表我们的用户 然后设定总票数为1000张,四个线程分别将进行循环抢票操作,其实就是循环对票数进行打印,并进行对应的减减操作 一旦票数为0,也就是票没有了,我们就让线程从

    2024年02月07日
    浏览(23)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包