一、Buffered Channels and Worker Pools
1. Goroutine and Channel Example 线程和通道示例
package main
import (
"fmt"
"time"
)
func write(ch chan int) {
for i := 0; i < 5; i++ {
ch <- i // 向通道写入 0-4 因为通道容量是2 需要读取数据才会进行下一步 否则一直在阻塞态
fmt.Println("Successfully wrote", i, "to ch")
}
close(ch) // 关闭通道
}
func main() {
ch := make(chan int, 2) // 创建一个容量为2的缓冲通道 通道容量大小会导致阻塞
go write(ch)
time.Sleep(2 * time.Second) // 模拟时间间隔
for v := range ch {
fmt.Println("Read value", v, "from ch") // 读取数据 goroutine继续运行
time.Sleep(2 * time.Second)
}
// 并发的 goroutine 和通道的阻塞机制,write() 函数和 range ch 循环可以交替执行,使得循环不会一次执行完毕,而是在读取完所有值之后等待新的值出现,再次进行循环迭代。
}
// Successfully wrote 0 to ch
// Successfully wrote 1 to ch
// Read value 0 from ch
// Successfully wrote 2 to ch
// Read value 1 from ch
// Successfully wrote 3 to ch
// Read value 2 from ch
// Successfully wrote 4 to ch
// Read value 3 from ch
// Read value 4 from ch
2. Deadlock 死锁
package main
import (
"fmt"
)
func main() {
ch := make(chan string, 2)
ch <- "naveen"
ch <- "paul"
ch <- "steve" // 其容量是2 但是写入三个 导致死锁
fmt.Println(<-ch)
fmt.Println(<-ch)
}
// fatal error: all goroutines are asleep - deadlock!
// goroutine 1 [chan send]:
// main.main()
// /tmp/sandbox091448810/prog.go:11 +0x8d
3. Closing buffered channels 关闭通道
ch := make(chan int, 5)
ch <- 6
ch <- 9
close(ch)
n, open := <-ch
fmt.Printf("Received: %d, open: %t\n", n, open)
n, open = <-ch
fmt.Printf("Received: %d, open: %t\n", n, open)
n, open = <-ch
fmt.Printf("Received: %d, open: %t\n", n, open)
// Received: 5, open: true
// Received: 6, open: true
// Received: 0, open: false
4. Length vs Capacity 长度和容量
ch := make(chan string, 3)
ch <- "Like"
ch <- "LiangXiaoQing"
fmt.Println("capacity is", cap(ch))
fmt.Println("length is", len(ch)) // 通道写入的个数
fmt.Println("read value", <-ch)
fmt.Println("new length is", len(ch))
fmt.Println("read value", <-ch)
fmt.Println("new length is", len(ch))
// capacity is 3
// length is 2
// read value Like
// new length is 1
// read value LiangXiaoQing
// new length is 0
5. WaitGroup
// Add() 添加任务
// Done() 通知wait完成任务
// Wait() 阻塞等待所有任务完成
package main
import (
"fmt"
"sync"
"time"
)
func process(i int, wg *sync.WaitGroup) {
fmt.Println("started Goroutine", i) // 3.打印 Goroutine 开始执行的信息
time.Sleep(2 * time.Second) // 4.暂停 2 秒,模拟任务执行时间
fmt.Printf("Goroutine %d ended\n", i) // 5.打印 Goroutine 执行结束的信息
wg.Done() // 6.通知等待组任务已完成
}
func main() {
no := 3
var wg sync.WaitGroup
for i := 0; i < no; i++ {
wg.Add(1) // 1.循环三次添加三次任务
go process(i, &wg) // 2.每次传入当前i 0-2 及wg内存地址
}
wg.Wait() // 7.等待所有任务完成
fmt.Println("All go routines finished executing")
}
// started Goroutine 1
// started Goroutine 0
// started Goroutine 2
// Goroutine 2 ended
// Goroutine 0 ended
// Goroutine 1 ended
// All go routines finished executing
6. Worker Pool Implementation 线程池
type Job struct { // Job 结构表示一个具有 ID 和随机数的作业。
id int
randomno int
}
type Result struct { // Result 结构表示作业的结果,包括作业本身和数字各位数之和。
job Job
sumofdigits int
}
var jobs = make(chan Job, 10) // jobs 是一个带有缓冲区大小为 10 的通道,用于传递作业。
var results = make(chan Result, 10) // results 是一个带有缓冲区大小为 10 的通道,用于传递结果。
func digits(number int) int { // digits 函数计算一个整数的各位数之和。
sum := 0
no := number
for no != 0 { // 循环中,通过取模和除法操作,将数字的各位数相加。
digit := no % 10
sum += digit
no /= 10
}
time.Sleep(2 * time.Second) // time.Sleep(2 * time.Second) 使函数暂停 2 秒钟,模拟一个耗时操作。
return sum // 返回各位数之和。
}
func worker(wg *sync.WaitGroup) { // worker 函数是一个工作协程,用于处理作业。
for job := range jobs { // 使用 range 循环从 jobs 通道接收作业。
output := Result{job, digits(job.randomno)} // 通过调用 digits 函数计算作业的各位数之和。 将作业和结果封装为 Result 结构
results <- output // 并发送到 results 通道。
}
wg.Done() // wg.Done() 声明一个任务已完成。
}
func createWorkerPool(noOfWorkers int) { // createWorkerPool 函数创建一个工作池,用于并发处理作业。
var wg sync.WaitGroup // 创建一个 sync.WaitGroup 对象 wg,用于等待所有工作协程完成。
for i := 0; i < noOfWorkers; i++ { // 使用 for 循环创建指定数量的工作协程。
wg.Add(1) // 添加任务
go worker(&wg) // 每个工作协程调用 worker 函数,并传递 &wg 作为参数。
}
wg.Wait() // wg.Wait() 等待所有工作协程完成。
close(results) // 关闭 results 通道,表示所有结果已经发送完毕。
}
func allocate(noOfJobs int) { // allocate 函数用于生成指定数量的作业并发送到 jobs 通道。
for i := 0; i < noOfJobs; i++ { // 使用 for 循环创建指定数量的作业。
randomno := rand.Intn(999) // 生成一个随机数 randomno,范围在 0 到 999 之间。
job := Job{i, randomno} // 创建一个 Job 结构体 job
jobs <- job // 并将其发送到 jobs 通道。
}
close(jobs) // 关闭 jobs 通道,表示所有作业已经发送完毕。
}
func result(done chan bool) { // result 函数用于接收并处理结果。
for result := range results { // 使用 range 循环从 results 通道接收结果。
fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
}
done <- true
}
func main() {
startTime := time.Now()
noOfJobs := 10
go allocate(noOfJobs) // 传入job id 0-99 random 0-999的随机数字 Job id 1, input random no 636
done := make(chan bool)
go result(done) // result线程一直等待results有值 如果有值则打印信息 传入true结束通道
noOfWorkers := 10 // 控制线程池数量
createWorkerPool(noOfWorkers) // 把&wg sync.WaitGroup 类型变量的指针 传给 results
<-done // 关闭通道
endTime := time.Now()
diff := endTime.Sub(startTime) // 计算时间差
fmt.Println("total time taken", diff.Seconds(), "seconds")
}
// Job id 1, input random no 636, sum of digits 15
// Job id 0, input random no 878, sum of digits 23
// Job id 9, input random no 150, sum of digits 6
// ...
// total time taken 20.01081009 seconds
二、Select
1. Example
package main
import (
"fmt"
"time"
)
func server1(ch chan string) {
time.Sleep(9 * time.Second)
ch <- "From Server 1"
}
func server2(ch chan string) {
time.Sleep(6 * time.Second)
ch <- "From server 2"
}
func main() {
output1 := make(chan string)
output2 := make(chan string)
go server1(output1)
go server2(output2)
select { // 使用select语句接收多个通道消息,select会接收最先准备好的通道接收操作
case s1 := <-output1:
fmt.Println(s1)
case s2 := <-output2:
fmt.Println(s2)
}
}
// From server 2
2. Default case 默认选择
func process(ch chan string) {
time.Sleep(1 * time.Second)
ch <- "Process Successful"
}
func main() {
ch := make(chan string)
go process(ch)
for { // for循环一直循环 每次循环休息1秒 直到v有值 主要看上面process函数睡眠睡觉 否则一直输出default的值
time.Sleep(1000 * time.Microsecond)
select {
case v := <-ch:
fmt.Println("Received value:", v)
return
default:
fmt.Println("No value Received")
}
}
// ....
// No value Received
//No value Received
//No value Received
//No value Received
//Received value: Process Successful
3. Deadlock and default case 死锁与默认选择
func main() {
ch := make(chan string)
select {
case v := <-ch:
fmt.Println("Received value", v)
default:
fmt.Println("Default case executed")
}
}
// Default case executed
4. Random selection 随机选择
package main
import (
"fmt"
"time"
)
func server1(ch chan string) {
ch <- "from server1"
}
func server2(ch chan string) {
ch <- "from server2"
}
func main() {
output1 := make(chan string)
output2 := make(chan string)
go server1(output1)
go server2(output2)
time.Sleep(1 * time.Second)
select { // 使用select语句接收多个通道消息,select会接收最先准备好的通道接收操作
case s1 := <-output1:
fmt.Println(s1)
case s2 := <-output2:
fmt.Println(s2)
}
}
// From Server 1
三、Mutex
1. Program with a race condition 无锁示例
package main
import (
"fmt"
"sync"
)
var x = 0
func increment(wg *sync.WaitGroup) {
x = x + 1
wg.Done()
}
func main() {
var w sync.WaitGroup
for i := 0; i < 1000; i++ {
w.Add(1)
go increment(&w)
}
w.Wait()
fmt.Println("Final value of X", x)
}
// Final value of X 987 最终答案应该是1000 因为多线程全部都在操作x 导致有些操作未成功
2. Solving the race condition using a mutex 互斥锁解决方案
package main
import (
"fmt"
"sync"
)
var x = 0
func increment(wg *sync.WaitGroup, m *sync.Mutex) {
m.Lock() // 上锁
x = x + 1
m.Unlock() // 释放锁 只有拿到锁才能操作x 否则一直等待
wg.Done()
}
func main() {
var w sync.WaitGroup
var m sync.Mutex
for i := 0; i < 1000; i++ {
w.Add(1)
go increment(&w, &m)
}
w.Wait()
fmt.Println("Final value of X", x)
}
// Final value of X 1000
3. Solving the race condition using channel 通道解决方案
package main
import (
"fmt"
"sync"
)
var x = 0
func increment(wg *sync.WaitGroup, ch chan bool) {
ch <- true
x = x + 1
<- ch
wg.Done()
}
func main() {
var w sync.WaitGroup
ch := make(chan bool, 1) // 通道容量 1 所以每次都需要上一个结束 下一个才能进行操作
for i := 0; i < 1000; i++ {
w.Add(1)
go increment(&w, ch)
}
w.Wait()
fmt.Println("final value of x", x)
}
// Final value of x 1000
技术小白记录学习过程,有错误或不解的地方请指出,如果这篇文章对你有所帮助请点点赞收藏+关注谢谢支持 !!!文章来源地址https://www.toymoban.com/news/detail-668768.html
文章来源:https://www.toymoban.com/news/detail-668768.html
到了这里,关于GO语言:Worker Pools线程池、Select语句、Metex互斥锁详细示例教程的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!