go并发 - channel

这篇具有很好参考价值的文章主要介绍了go并发 - channel。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

概述

并发编程是利用多核心能力,提升程序性能,而多线程之间需要相互协作、共享资源、线程安全等。任何并发模型都要解决线程间通讯问题,毫不夸张的说线程通讯是并发编程的主要问题。go使用著名的CSP(Communicating Sequential Process,通讯顺序进程)并发模型,从设计之初 Go 语言就注重如何在编程语言层级上设计一个简洁安全高效的抽象模型,让程序员专注于分解问题和组合方案,而且不用被线程管理和信号互斥这些繁琐的操作分散精力。channel是线程简通讯的具体实现之一,本质就是一个线程安全的 FIFO 阻塞队列(先进先出),向队列中写入数据,在另一个线程从队列读取数据。很多语言都有类似实现,比如 Java 的线程池任务队列。

基本使用

通道是引用类型,需要使用 make 创建,格式如下

通道实例 := make(chan 数据类型, 通道长度)
  • 数据类型:通道内传输的元素类型,可以基本数据类型,也可以使自定义数据类型。
  • 通道实例:通过make创建的通道句柄,与函数名称一样,指向通道的内存首地址。
  • 通道长度:通道本质是队列,创建时候可指定长度,默认为0

创建通道

ch1 := make(chan int)                 // 创建一个整型类型的通道
ch2 := make(chan interface{})         // 创建一个空接口类型的通道, 可以存放任意格式
ch3 := make(chan *Equip)             // 创建Equip指针类型的通道, 可以存放*Equip
ch4 := make(chan *Equip, 10)         // 创建Equip指针类型的通道, 并指定队列长度

通道本质就是线程安全的队列,创建时候可以指定队列长度,默认为0。

向通道写入数据,使用语法非常形象,写入channel <-,读取<-channel

ch2 := make(chan interface{}, 10)
ch2<- 10			// 向队列写入
n := <-ch2 			// 从队列读取
fmt.Println(n)		// 10

箭头语法虽然很形象,但是有些奇怪,也不利于扩展。使用函数方式感觉更好,也更主流,如func (p *chan) get() any func (p *chan) put(any) err,扩展性也更强,通过参数可增加超时、同步、异步等技能。

箭头符号并没有规定位置,与C指针一样,如下两个语句等效

ch1 := make(chan int)
i := <-ch1			
i := <- ch1

箭头语法的读写有相对性,可读性一般,有时候无法分辨是读或写,看起来很奇怪,如下伪代码

func main() {
	input := make(chan int, 2)
	output := make(chan int, 2)

	go func() {
		input <- 10
	}()
	output<- <-input
	fmt.Println(<-output)
}

管道是用于协程之间通讯,主流使用方式如下

ch2 := make(chan interface{}, 10)

go func() {
    data := <-ch2			// 用户协程读取
    fmt.Println(data)
}()
     
ch2 <- "hello"				// 主协程写入
time.Sleep(time.Second)

管道也支持遍历,与箭头符号一样,无数据时候循环将被阻塞,循环永远不会结束,除非关闭管道

chanInt := make(chan int, 10)

for chanInt, ok := range chanInts {
    fmt.Println(chanInt)
}

管道也支持关闭,关闭后的管道不允许写入,panic 异常

chanInts := make(chan int, 10)
close(chanInts)
chanInts <- 1		// panic: send on closed channel

读取则不同,已有数据可继续读取,无数据时返回false,不阻塞

if value, ok := <-chanInts; ok {			// 从管道读取数据不在阻塞
    fmt.Println("从管读取=", value)
} else {
    fmt.Println("从管道读取失败", ok)
    return
}

单向管道

管道也支持单向模式,仅允许读取、或者写入

var queue <-chan string = make(chan string)

函数形参也可以定义定向管道

func customer(channel <-chan string) {		// 形参为只读管道
    for {		
        message := <-channel				// 只允许读取数据
        fmt.Println(message)
    }
}
channel := make(chan string)
go customer(channel)

管道阻塞

Go管道的读写都是同步模式,当管道容量还有空间,则写入成功,否则将阻塞直到写入成功。从管道读取也一样,有数据直接读取,否则将阻塞直到读取成功。

var done = make(chan bool)

func aGoroutine() {
    fmt.Println("hello")
    done <- true			// 写管道
}

func main() {
    go aGoroutine()
    <-done					// 读阻塞
}

主协程从管道读取数据时将被阻塞,直到用户协程写入数据。管道非常适合用于生产者消费者模式,需要平滑两者的性能差异,可通过管道容量实现缓冲,所以除非特定场景,都建议管道容量大于零。

有些场景可以使用管道控制线程并发数

// 待补充

阻塞特性也带来了些问题,程序无法控制超时(箭头函数语法的后遗症),go 也提供了解决方案, 使用select关键,与网络编程的select函数类似,监测多个通道是否可读状态,都可读随机选择一个,都不可读进入Default分支,否则阻塞

select {
    case n := <-input:
        fmt.Println(n)
    case m := <-output:
        fmt.Println(m)
    default:
        fmt.Println("default")
}

当然也可以使用select向管道写入数据,只要不关闭管道总是可写入,此时加入default分支永远不会被执行到,如下随机石头剪刀布

ch := make(chan string)
go func() {
    for {
        select {
            case ch <- "石头":
            case ch <- "剪刀":
            case ch <- "布":
        }
    }
}()

for value := range ch {
    log.Println(value)
    time.Sleep(time.Second)
}

模拟线程池

由于go的管道非常轻量且简洁,大部分直接使用,封装线程池模式并不常见。案例仅作为功能演示,非常简单几十行代码即可实现线程池的基本功能,体现了go并发模型的简洁、高效。

type Runnable interface {
	Start()
}

// 线程池对象
type ThreadPool struct {
	queueSize int
	workSize  int
	channel   chan Runnable
	wait      sync.WaitGroup
}

// 工作线程, 执行异步任务
func (pool *ThreadPool) doWorker(name string) {
	log.Printf("%s 启动工作协程", name)
	for true {
		if runnable, ok := <-pool.channel; ok {
			log.Printf("%s 获取任务, %v\n", name, runnable)
			runnable.Start()
			log.Printf("%s 任务执行成功, %v\n", name, runnable)
		} else {
			log.Printf("%s 线程池关闭, 退出工作协程\n", name)
			pool.wait.Done()
			return
		}
	}
}

// 启动工作线程
func (pool *ThreadPool) worker() {
	pool.wait.Add(pool.workSize)
	for i := 0; i < pool.workSize; i++ {
		go pool.doWorker(fmt.Sprintf("work-%d", i))
	}
}

// Submit 提交任务
func (pool *ThreadPool) Submit(task Runnable) bool {
	defer func() { recover() }()
	pool.channel <- task
	return true
}

// Close 关闭线程池
func (pool *ThreadPool) Close() {
	defer func() { recover() }()
	close(pool.channel)
}

// Wait 等待线程池任务完成
func (pool *ThreadPool) Wait() {
	pool.Close()
	pool.wait.Wait()
}

// NewThreadPool 工厂函数,创建线程池
func NewThreadPool(queueSize int, workSize int) *ThreadPool {
	pool := &ThreadPool{queueSize: queueSize, workSize: workSize, channel: make(chan Runnable, queueSize)}
	pool.worker()
	return pool
}

使用线程池

type person struct {
	name string
}

func (p *person) Start() {
	fmt.Println(p.name)
}

func main() {
	threadPool := executor.NewThreadPool(10, 2)		// 创建线程池, 队列长度10, 工作线程2

	for i := 0; i < 5; i++ {
		threadPool.Submit(&person{name: "xx"})		// 提交十个任务
	}
        
	threadPool.Wait()								// 阻塞等待所有任务完成
}

模拟管道

任何线程之间的通讯都依赖底层锁机制,channel是对锁机制封装后的实现对象,与Java中线程池任务队列机制几乎一样,但要简洁很多。使用切片简单模拟
接口声明

type Queue interface {
	// Put 向队列添加任务, 添加成功返回true, 添加失败返回false, 队列满了则阻塞直到添加成功
	Put(task interface{}) bool

	// Get 从队列获取任务, 一直阻塞直到获取任务, 队列关闭且没有任务则返回false
	Get() (interface{}, bool)

	// Size 查看队列中的任务数
	Size() int

	// Close 关闭队列, 关闭后将无法添加任务, 已有的任务可以继续获取
	Close()
}

基于切片实现

// SliceQueue 使用切片实现, 自动扩容属性队列永远都不会满, 扩容时候会触发数据复制, 性能一般
type SliceQueue struct {
	sync.Mutex
	cond  *sync.Cond
	queue []interface{}
	close atomic.Bool
}

func (q *SliceQueue) Get() (data interface{}, ok bool) {
	q.Lock()
	defer q.Unlock()

	for true {
		if len(q.queue) == 0 {
			if q.close.Load() == true {
				return nil, false
			}
			q.cond.Wait()
		}
		if data := q.doGet(); data != nil {
			return data, true
		}
	}
	return
}

func (q *SliceQueue) doGet() interface{} {
	if len(q.queue) >= 1 {
		data := q.queue[0]
		q.queue = q.queue[1:]
		return data
	}
	return nil
}

func (q *SliceQueue) Put(task interface{}) bool {
	q.Lock()
	defer func() {
		q.cond.Signal()
		q.Unlock()
	}()

	if q.close.Load() == true {
		return false
	}
	q.queue = append(q.queue, task)
	return true
}

func (q *SliceQueue) Size() int {
	return len(q.queue)
}

func (q *SliceQueue) Close() {
	if q.close.Load() == true {
		return
	}

	q.Lock()
	defer q.Unlock()

	q.close.Store(true)
	q.cond.Broadcast()
}

func NewSliceQueue() Queue {
	sliceQueue := &SliceQueue{queue: make([]interface{}, 0, 2)}
	sliceQueue.cond = sync.NewCond(sliceQueue)
	return sliceQueue
}

基于环行数组实现

type ArrayQueue struct {
	sync.Mutex
	readCond     *sync.Cond
	writeCond    *sync.Cond
	readIndex    int
	writeIndex   int
	queueMaxSize int
	close        atomic.Bool
	queue        []interface{}
}

func (q *ArrayQueue) Put(task interface{}) bool {
	q.Lock()
	defer q.Unlock()

	for true {
		if q.close.Load() == true {
			return false
		}
		if q.IsFull() {
			q.writeCond.Wait()
			if q.IsFull() {
				continue
			}
		}
		q.queue[q.writeIndex] = task
		q.writeIndex = (q.writeIndex + 1) % q.queueMaxSize
		q.readCond.Signal()
		return true
	}
	return true
}

func (q *ArrayQueue) Get() (interface{}, bool) {
	q.Lock()
	defer q.Unlock()

	for true {
		if q.IsEmpty() {
			if q.close.Load() == true {
				return nil, false
			}
			q.readCond.Wait()
			if q.IsEmpty() {
				continue
			}
		}
		task := q.queue[q.readIndex]
		q.readIndex = (q.readIndex + 1) % q.queueMaxSize
		q.writeCond.Signal()
		return task, true
	}
	return nil, true
}

func (q *ArrayQueue) Size() int {
	return q.queueMaxSize
}

func (q *ArrayQueue) Close() {
	if q.close.Load() == true {
		return
	}
	q.Lock()
	q.Unlock()
	q.close.Store(true)
	q.readCond.Broadcast()
}

func (q *ArrayQueue) IsFull() bool {
	return (q.writeIndex+1)%q.queueMaxSize == q.readIndex
}

func (q *ArrayQueue) IsEmpty() bool {
	return q.readIndex == q.writeIndex
}

func NewArrayQueue(size int) Queue {
	queue := &ArrayQueue{queue: make([]interface{}, size), readIndex: 0, writeIndex: 0, queueMaxSize: size}
	queue.readCond = sync.NewCond(queue)
	queue.writeCond = sync.NewCond(queue)
	return queue
}

测试用例文章来源地址https://www.toymoban.com/news/detail-746898.html

func TestWith(t *testing.T) {
	q := NewSliceQueue()
	go func() {
		time.Sleep(time.Second * 2)
		q.Put(true)  // 向队列写入数据, 与 chan<- 功能相同
	}()

	q.Get()			// 阻塞直到读取数据, 与 <-chan 功能相同
}

到了这里,关于go并发 - channel的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • AIGC之GPT-4:GPT-4的简介(核心原理/意义/亮点/技术点/缺点/使用建议)、使用方法、案例应用(计算能力/代码能力/看图能力等)之详细攻略

    AIGC之GPT-4:GPT-4的简介(核心原理/意义/亮点/技术点/缺点/使用建议)、使用方法、案例应用(计算能力/代码能力/看图能力等)之详细攻略 解读 :在2022年11月横空出世的ChatGPT,打遍天下无敌手的时候,就知道会有这么一天,知道它会来,也知道它一定来,但是,还是没想到,来的

    2024年02月03日
    浏览(30)
  • 零售行业供应链管理核心KPI指标(一) – 能力、速度、效率和成本

    有关零售行业供应链管理KPI指标的综合性分享,涉及到供应链能力、速度、效率和成本总共九大指标,是一个大框架,比较核心也比较综合。 衡量消费品零售企业供应链管理效率和水平的核心KPI通常有哪些? 图片来源-派可数据(虚拟数据) 可以分为供应链能力、响应速度、

    2024年02月12日
    浏览(26)
  • Go并发编程 Goroutine、Channel、Select、Mutex锁、sync、Atomic等

    本文所有实例代码运行go版本: go version go1.18.10 windows/amd64 串行:所有任务一件一件做,按照事先的顺序依次执行,没有被执行到的任务只能等待。最终执行完的时间等于各个子任务之和。 并发:是以交替的方式利用 等待 某个任务的时间来处理其他任务计算逻辑,在计算机

    2024年02月07日
    浏览(40)
  • 提示词在大模型Chatgpt、llama2、chatglm3、gemini、bert、bard、通义千问、文心一言、盘古大模型等的核心作用,谁掌握提示词工程能力,谁就拿到激发大模型强大生成能力的钥匙

    提示词在大模型Chatgpt、llama2、chatglm3、gemini、bert、bard、通义千问、文心一言、盘古大模型等的核心作用,谁掌握提示词工程能力,谁就拿到激发大模型强大生成能力的钥匙。 提示工程(Prompt Engineering):指导AI大模型完成任务的艺术 从本质上来看,提示工程也是一种人机交

    2024年03月09日
    浏览(42)
  • go并发 - channel

    并发编程是利用多核心能力,提升程序性能,而多线程之间需要相互协作、共享资源、线程安全等。任何并发模型都要解决线程间通讯问题,毫不夸张的说线程通讯是并发编程的主要问题。 go 使用著名的CSP(Communicating Sequential Process,通讯顺序进程)并发模型,从设计之初

    2024年02月05日
    浏览(46)
  • channel并发编程

    不要通过共享内存通信,要通过通信共享内存。 channel是golang并发编程中一种重要的数据结构,用于多个goroutine之间进行通信。 我们通常可以把channel想象成一个传送带,将goroutine想象成传送带周边的人,一个传送带的上游放上物品,通过传送带的传输,下游的人可以将物品取

    2024年02月11日
    浏览(25)
  • go 利用channel实现定时任务

    想5秒内结束就注释掉select{} 在linux上后台执行的话,可以这样

    2024年04月15日
    浏览(29)
  • 如何利用ChatGPT提升编程能力

    在当今这个快速发展的技术世界中,编程能力已经成为一项非常宝贵的技能。而作为一名编程爱好者或者职业程序员,如何提升自己的编程能力是一个非常重要的问题。本文将从一个有趣的角度出发,探讨如何利用OpenAI的ChatGPT提升编程能力。 在我们开始讨论如何利用ChatGPT提

    2024年02月08日
    浏览(33)
  • 掌握Go并发:Go语言并发编程深度解析

    🏷️ 个人主页 :鼠鼠我捏,要死了捏的主页  🏷️ 系列专栏 :Golang全栈-专栏 🏷️ 个人学习笔记,若有缺误,欢迎评论区指正   前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站AI学习网站。 当我们开发一个W

    2024年02月20日
    浏览(35)
  • GO语言网络编程(并发编程)并发介绍,Goroutine

    进程和线程 并发和并行 协程和线程 协程:独立的栈空间,共享堆空间,调度由用户自己控制,本质上有点类似于用户级线程,这些用户级线程的调度也是自己实现的。 线程:一个线程上可以跑多个协程,协程是轻量级的线程。 goroutine 只是由官方实现的超级\\\"线程池\\\"。 每个

    2024年02月09日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包