协程:Channel 热流
1、Channel是什么?
- 生产者:多个协程
- 消费者:多个协程
- 中间:Channel 管道 并发安全队列
- 发送send
- 接收recv
协程间通信
1、Channel可以用于协程间通信
// 通道Channel
val channel = Channel<Int>()
// 生产者
launch{
(1..6).forEach {
delay(1000L)
println("我生产了一个:$it")
channel.send(it)
}
}
// 消费者
launch{
(1..6).forEach {
val r= channel.receive()
println("消费了一个:$r")
}
}
capacity
1、生产速度>消费速度
- 如果缓冲区满了,send会挂起,消费完后再生产
- capacity,默认容量,0
UNLIMITED:send不再挂起
- 容量接近于无限
- 容量不满就不会挂起
// 通道Channel
val channel = Channel<Int>(Channel.UNLIMITED)
消费方式
// 第一种发方式 消费
(1..8).forEach {
delay(2000L)
val r= channel.receive()
println("消费了一个:$r")
}
iterator
// 第二种发方式 消费
val it = channel.iterator()
while (it.hasNext()) {
val item = it.next()
delay(2000L)
println("消费了一个:$item")
}
item in channel
// 第三种发方式 消费
for (item in channel) {
delay(2000L)
println("消费了一个:$item")
}
快捷方式
produce和ReceiveChannel
- produce快速构建消费者
// 生产者的快捷方式
val produce = produce {
(1..20).forEach { delay(2000L) ; send(it) }
}
// 普通的消费
launch {
for (item in produce) {
println("消费了一个:$item")
}
}
// receive()接收数据,有数据没有消费,send会一直阻塞
launch {
println("消费了一个:${produce.receive()}")
delay(2000)
println("消费了一个:${produce.receive()}")
println("消费了一个:${produce.receive()}")
println("消费了一个:${produce.receive()}")
println("消费了一个:${produce.receive()}")
println("消费了一个:${produce.receive()}")
}
produce(capacity = 100),会增加缓冲区,只要没有放满send不会再阻塞。
actor和SendChannel
- actor快速构建消费者
// 消费者的快捷方式
val consumer = actor<Int> {
(1..20).forEach {
println("消费了一个:${receive()}")
}
}
// 普通的生成
launch {
(1..20).forEach { delay(2000L) ; consumer.send(it) }
}
close
1、channel.close
- 关闭
- 一般是生产者去close
isClosedForSend
channel.close() 之前 isClosedForSend == false
channel.close() 之后 isClosedForSend == true
// 生产者
launch {
(1..6).forEach {
if (!channel.isClosedForSend) {
channel.send(it)
println("我生产了一个$it")
// if (it == 3) channel.close() // 大部分情况下,是生产者 去close
}
}
println("close前 isClosedForSend:${channel.isClosedForSend} " +
" isClosedForReceive:${channel.isClosedForReceive}")
channel.close()
println("close后 isClosedForSend:${channel.isClosedForSend} " +
" isClosedForReceive:${channel.isClosedForReceive}")
}
isClosedForReceive
如果消费完了 isClosedForReceive == true, 否则就是false
如果缓冲区里面还有内容,没有消费完 也是 false
// 消费者
launch {
try {
for (i in channel) {
delay(2000L)
println("我消费了一个:$i")
}
}finally {
println("finally isClosedForSend:${channel.isClosedForSend} " +
" isClosedForReceive:${channel.isClosedForReceive}")
}
}
BroadcastChannel
1、广播给所有消费者,多个地方可以接收到
- 创建
val channel = Channel<Int>()
val broadcastChannel = channel.broadcast(Channel.BUFFERED)
- 生产
// 生产者
launch {
repeat(8) {
delay(1000L)
broadcastChannel.send(it + 100001) // 发送
}
broadcastChannel.close() // 关闭
}
openSubscription
- 消费
repeat(8) {
// 消费者
launch {
val r = broadcastChannel.openSubscription()
for (i in r) {
println("协程$it ---- 消费者 ${i}")
}
}
}
select
1、select: 择优选择数据,谁先返回用谁的
- 加载首页数据,可以作缓存
- 缓存有用缓存,缓存不存在去请求
- "慢的不会再执行"会被cancel
2、select 是一个用于多路选择的结构,可以同时等待多个挂起函数或通道的操作完成。它类似于 switch 或 if-else 的多路分支语句,但是它是用于协程的异步操作。
suspend fun selectExample() {
select<Unit> {
someChannel.onReceive { value ->
// 处理从通道接收到的值
}
someDeferred.onAwait { result ->
// 处理异步操作完成后的返回值
}
onTimeout(1000) {
// 在指定时间内没有任何操作完成时执行
}
}
}
3、select可以用于上游,也可以用于下游
onAwait
- async有onAwait
data class Home(val info1: String, val info2: String)
data class HomeRequestResponseResultData(val code: Int, val msg: String, val home: Home)
// 请求本地加载首页数据
fun CoroutineScope.getHomeLocalData() = async (Dispatchers.IO) {
delay(3000)
Home("数据1...", "数据1...")
}
// 请求网络服务器加载首页数据
fun CoroutineScope.getHomeRemoteData() = async (Dispatchers.IO) {
delay(6000)
Home("数据3...", "数据4...")
}
launch {
val localRequestAction = getHomeLocalData()
val remoteRequestAction = getHomeRemoteData()
val resultResponse = select<HomeRequestResponseResultData> {
localRequestAction.onAwait {
// 做校验 工作
// ...
// 省略1000行代码
HomeRequestResponseResultData(200, "恭喜你,请求成功", it) // 最后一行作为返回值
}
remoteRequestAction.onAwait {
// 做校验 工作
// ...
// 省略1000行代码
HomeRequestResponseResultData(200, "恭喜你,请求成功", it) // 最后一行作为返回值
}
}
println("resultResponse:$resultResponse")
}
2、async需要在调用的CoroutineScope中执行
fun CoroutineScope.getHomeLocalData() = async (Dispatchers.IO) {
delay(3000)
Home("数据1...", "数据1...")
}
// 对CoroutineScope扩展
channel数组
- 哪个更快选择哪个Channel
onReceive
- onReceive: 接收数据后的回调
val channels = arrayOf(Channel<String?>(), Channel<String?>())
launch {
delay(6000)
channels[0].send("login successful")
}
launch {
delay(8000)
channels[1].send("register successful")
}
val receiveResult = select<String ?> {
for (channel in channels) {
channel.onReceive {
// 做校验 工作
// ...
// 省略1000行代码
"[$it]" // 最后一行作为返回值
}
}
}
println(receiveResult)
onJoin
launch无返回值,但想看谁执行的最快文章来源:https://www.toymoban.com/news/detail-655409.html
val job1 = launch {println("launch1 run")} // 无返回值
val job2 = launch {println("launch2 run")} // 无返回值
select<Unit> {
job1.onJoin { println("launch1 执行完成了 很快") }
job2.onJoin { println("launch2 执行完成了 很快") }
}
onSend
发送数据,并且显示回调的内容(上游)文章来源地址https://www.toymoban.com/news/detail-655409.html
// 准备Channel数组
val channels = arrayOf(Channel<Char>(), Channel<Char>())
// 协程一:Channel 的 发射源
launch(Dispatchers.Default) {
select<Unit> {
// 并行干活,send
launch {
channels[0].onSend('女') {
println("channels[0].onSend('女') { $it }")
}
}
// 并行干活,send
launch {
channels[1].onSend('男') {
println("channels[1].onSend('男') { $it }")
}
}
}
}
// 协程二:下游 接收阶段
launch { println("channel1 下游接收 ${channels[0].receive()}") }
launch { println("channel2 下游接收 ${channels[1].receive()}") }
输出:
channel1 下游接收 女
channels[0].onSend('女') { RendezvousChannel@34206005{EmptyQueue} }
// 1. onSend先发送消息
// 2. 下游接收到
// 3. onSend回调打印消息
await
复用Channel
到了这里,关于Kotlin Channel 热流的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!