Kotlin 协程四 —— Flow 和 Channel 的应用

这篇具有很好参考价值的文章主要介绍了Kotlin 协程四 —— Flow 和 Channel 的应用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、 Flow 与 Channel 的相互转换

1.1 Flow 转换为 Channel

1.1.1 ChannelFlow

@InternalCoroutinesApi
public abstract class ChannelFlow<T>(
    // upstream context
    @JvmField public val context: CoroutineContext,
    // buffer capacity between upstream and downstream context
    @JvmField public val capacity: Int,
    // buffer overflow strategy
    @JvmField public val onBufferOverflow: BufferOverflow
) : FusibleFlow<T> {
    ...


    public open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
        scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun)

    ...

}

前面提到 ChannelFlow 是热流。只要上游产生数据,就会立即发射给下游收集者。
ChannelFlow 是一个抽象类,并且被标记为内部 Api,不应该在外部代码直接使用。
注意到它内部有一个方法 produceImpl 返回的是一个 ReceiveChannel,它的实现是收集上游发射的数据,然后发送到 Channel 中。
有此作为基础。我们可以 调用 asChannelFlow 将 Flow 转换 ChannelFlow, 进而转换成 Channel 。

1.1.2 produceIn —— 将 Flow 转换为单播式 Channel

produceIn()转换创建了一个produce 协程来 collect 原Flow,因此该produce协程应该在恰当时候被关闭或者取消。转换后的 Channel 拥有处理背压的能力。其基本使用方式如下:

fun main() = runBlocking {
    val flow = flow<Int> {
        repeat(5) {
            delay(500)
            emit(it)
        }
    }

    val produceIn = flow.produceIn(this)
    for (ele in produceIn) {
        println(ele)
    }
}

输出结果:

0
1
2
3
4

查看 produceIn 源码:

@FlowPreview
public fun <T> Flow<T>.produceIn(scope: CoroutineScope): ReceiveChannel<T> = asChannelFlow().produceImpl(scope)

1.1.3 broadcastIn —— 将 Flow 转换为广播式 BroadcastChannel。

broadcastIn 转换方式与 produceIn 转换方式实现原理一样,区别是创建出来的 BroadcastChannel。
源码如下:

public fun <T> Flow<T>.broadcastIn(
    scope: CoroutineScope,
    start: CoroutineStart = CoroutineStart.LAZY
): BroadcastChannel<T> {
    // Backwards compatibility with operator fusing
    val channelFlow = asChannelFlow()
    val capacity = when (channelFlow.onBufferOverflow) {
        BufferOverflow.SUSPEND -> channelFlow.produceCapacity
        BufferOverflow.DROP_OLDEST -> Channel.CONFLATED
        BufferOverflow.DROP_LATEST ->
            throw IllegalArgumentException("Broadcast channel does not support BufferOverflow.DROP_LATEST")
    }
    return scope.broadcast(channelFlow.context, capacity = capacity, start = start) {
        collect { value ->
            send(value)
        }
    }
}

使用方式见上文 BroadcastChannel。

和 BroadcastChannel 一样,broadcastIn 也标记为过时的 API, 不建议继续使用了。

1.2 Channel 转换为 Flow

1.2.1 consumeAsFlow/receiveAsFlow —— 将单播式 Channel 转换为 Flow

使用 consumeAsFlow()/receiveAsFlow() 将 Channel 转换为 Flow

fun main() = runBlocking<Unit> {
    val testChannel = Channel<String>()

    val testFlow = testChannel.receiveAsFlow()

    launch {
        testFlow.collect {
            println(it)
        }
    }

    delay(100)
    testChannel.send("hello")
    delay(100)
    testChannel.send("coroutine")
    delay(100)

    testChannel.close() // 注意只有 Channel 关闭了,协程才能结束
}

查看源码:

public fun <T> ReceiveChannel<T>.consumeAsFlow(): Flow<T> = ChannelAsFlow(this, consume = true)

public fun <T> ReceiveChannel<T>.receiveAsFlow(): Flow<T> = ChannelAsFlow(this, consume = false)

private class ChannelAsFlow<T>(
    private val channel: ReceiveChannel<T>,
    private val consume: Boolean,
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = Channel.OPTIONAL_CHANNEL,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlow<T>(context, capacity, onBufferOverflow) {}

consumeAsFlow 和 receiveAsFlow 都是调用 ChannelAsFlow 将 Channel 转换成了 ChannelFlow,所以转换结果是热流。但它们传递的第二个参数 consume 不一样。两者区别如下:

  • 使用 consumeAsFlow() 转换成的 Flow 只能有一个收集器收集,如果有多个收集器收集,将会抛出如下异常:
Exception in thread "main" java.lang.IllegalStateException: ReceiveChannel.consumeAsFlow can be collected just once
  • 使用 receiveAsFlow() 转换成的 Flow 可以有多个收集器收集,但是保证每个元素只能被一个收集器收集到,即单播式。

通俗点说,就是使用 consumeAsFlow() 只能有一个消费者。 使用 receiveAsFlow() 可以有多个消费者,但当向 Channel 中发射一个数据之后,收到该元素的消费者是不确定的。

1.2.2 asFlow —— 将广播式 BroadcastChannel 转换为 Flow

与单播式相对的就是广播式,让每个消费者都收到该元素,这就需要一个广播式的 Chanel:BroadcastChanel。
BroadcastChannel 调用 asFlow() 方法即可将其转换为 Flow。

由于该方法也被标记为过时了,替代方案有 SharedFlow 和 StateFlow。

二、SharedIn —— 将冷数据流转换为热数据流

将 flow 转换为 SharedFlow,可以使用 SharedIn 方法:

public fun <T> Flow<T>.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0
): SharedFlow<T> {
    ...
}

参数解释:

  • CoroutineScope 用于共享数据流的 CoroutineScope。此作用域函数的生命周期应长于任何使用方,以使共享数据流在足够长的时间内保持活跃状态
  • replay 每个新收集器的数据项数量
  • started “启动” 方式

启动方式有:

public fun interface SharingStarted {
    public companion object {
        // 立即启动,并且永远不会自动停止
        - public val Eagerly: SharingStarted = StartedEagerly() 

        // 第一个订阅者注册后启动,并且永远不会自动停止
        - public val Lazily: SharingStarted = StartedLazily() 

        // 第一个订阅者注册后启动,最后一个订阅者取消注册后停止
        - public fun WhileSubscribed(
                    stopTimeoutMillis: Long = 0,
                    replayExpirationMillis: Long = Long.MAX_VALUE
                ): SharingStarted =
                    StartedWhileSubscribed(stopTimeoutMillis, replayExpirationMillis)
    }
}

三、callbackFlow —— 将基于回调的 API 转换为数据流

Kotlin 协程和 Flow 可以完美解决异步调用、线程切换的问题。设计接口时,可以类似 Rxjava 那样,避免使用回调。比如 Room 在内的很多库已经支持将协程用于数据流操作。对于那些还不支持的库,也可以将任何基于回调的 API 转换为协程。

callbackFlow 是一个数据流构建器,可以将基于回调的 API 转换为数据流。

3.1 callbackFlow 的使用

举例:

interface Result<T>  {
    fun onSuccess(t: T)
    fun onFail(msg: String)
}

fun getApi(res: Result<String>) {
    thread{
        printWithThreadInfo("getApiSync")
        Thread.sleep(1000) // 模拟耗时任务
        res.onSuccess("hello")
    }.start()
}

getApi() 是一个基于回调设计的接口。如何使用 callbackFlow 转换为 Flow 呢?

fun getApi(): Flow<String> = callbackFlow {
    val res = object: Result<String> {
        override fun onSuccess(t: String) {
            trySend(t)
            close(Exception("completion"))
        }

        override fun onFail(msg: String) {
        }
    }
    getApi(res)

    // 一定要调用骨气函数 awaitClose, 保证流一直运行。在`awaitClose` 中移除 API 订阅,防止任务泄漏。
    awaitClose {
        println("close")
    }
}

// 新的 Api 使用方式
fun main() = runBlocking<Unit> {
    getApi().flowOn(Dispatchers.IO)
        .catch {
            println("getApi fail, cause: ${it.message}")
        }.onCompletion {
            println("onCompletion")
        }.collect {
            printWithThreadInfo("getApi success, result: $it")
        }
}

这时候你可能有疑问了,这在流的内部不还是使用了基于接口的调用吗,分明没有更方便。看下面的例子,就能体会到了。

3.2 callbackFlow 实战

Android 开发中有一个常见的场景:输入关键字进行查询。比如有个 EditText,输入文字后,基于输入的文字进行网络请求或者数据库查询。
假设查询数据的接口:

fun <T>query(keyWord: String): Flow<T> {
    return flow {
        //...
    }
}

首先定义一个方法将 EditText 内容变化的回调转换成 Flow

fun textChangeFlow(editText: EditText): Flow<String> = callbackFlow {
    val watcher = object : TextWatcher {
        override fun beforeTextChanged(s: CharSequence?, start: Int, count: Int, after: Int) {

        }

        override fun onTextChanged(s: CharSequence?, start: Int, before: Int, count: Int) {
        }

        override fun afterTextChanged(s: Editable?) {
            s?.let {
                trySend(s.toString()) 
            }
            
        }
    }
    editText.addTextChangedListener(watcher)
    awaitClose {
        editText.removeTextChangedListener(watcher)
    }
}

使用:

scope.launch{
    textChangeFlow(editText)
            .debounce(300) // 防抖处理
            .flatMapLatest { keyWord ->  // 只对最新的值进行搜索
                flow {
                    <String>query(keyWord)
                }
            }.collect {
                // ... 处理最终结果
            }
}

在这个过程中,我们可以充分使用 Flow 的各种变换,对我们的中间过程进行处理。实现一些很难实现的需求。文章来源地址https://www.toymoban.com/news/detail-831026.html

到了这里,关于Kotlin 协程四 —— Flow 和 Channel 的应用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Android使用kotlin+协程+room数据库的简单应用

    前言:一般主线程(UI线程)中是不能执行创建数据这些操作的,因为等待时间长。所以协程就是为了解决这个问题出现。 第一步:在模块级的build.gradle中引入   好了前期工作ok,正式编写room吧! 第二步:创建表实体  第三部:编写对应的Dao接口  第四步:创建数据库信息

    2024年02月13日
    浏览(51)
  • 【kotlin 协程】万字协程 一篇完成kotlin 协程进阶

    Kotlin 中的协程提供了一种全新处理并发的方式,可以在 Android 平台上使用它来简化异步执行的代码。协程是从 Kotlin 1.3 版本开始引入,但这一概念在编程世界诞生的黎明之际就有了,最早使用协程的编程语言可以追溯到 1967 年的 Simula 语言。 在过去几年间,协程这个概念发展

    2024年02月07日
    浏览(53)
  • Kotlin Flow 冷流

    1、Flow是什么? 处理异步事件流 可取消:通过取消协程取消Flow 组合操作符:复杂逻辑处理 缓冲和背压:发送和接收时用不同速度处理,实现流量控制、避免数据丢失 2、传统事件处理方案:同步、sequence、异步delay flow 1、flow的作用 RxJava和Flow完全一样 替代LiveData 2、getFlow(

    2024年02月12日
    浏览(41)
  • kotlin flow 定时任务

    要在 Kotlin 的 Flow 中定时广播数据,可以使用 Kotlin 的协程库中的 delay 函数和 while 循环结合使用。以下是一个简单的示例代码,每秒钟向 Flow 发送一个数字: kotlin Copy code import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flow funtimerFlow(): FlowInt = flow

    2024年02月15日
    浏览(83)
  • Kotlin Flow 转换以及上下游处理

    本片文章主要介绍Flow上下游处理,上游一个Flow使用map,上游两个Flow使用zip,上游三个Flow及以上使用combine  1、下面代码展示了upStreamFlow作为上游,downStreamFlow作为下游,通过对upStreamFlow使用map操作符函数将upStreamFlow转换为新的Flow对象,每个元素都通过lambda表达式进行处理,

    2024年02月11日
    浏览(37)
  • Kotlin:使用flow实现倒计时功能

    一、效果图 二、ExtendContext.kt 文件代码 注意:创建ExtendContext.kt选择file 使用kotlin扩展方法的特性创建countDown扩展方法,避免多个地方使用倒计时重复创建countDown方法 三、MainActivity.kt代码 四、build.gradle.kts代码

    2024年02月19日
    浏览(40)
  • Kotlin Channel 热流

    1、Channel是什么? 生产者:多个协程 消费者:多个协程 中间:Channel 管道 并发安全队列 发送send 接收recv 协程间通信 1、Channel可以用于协程间通信 capacity 1、生产速度消费速度 如果缓冲区满了,send会挂起,消费完后再生产 capacity,默认容量,0 UNLIMITED:send不再挂起 容量接近

    2024年02月12日
    浏览(34)
  • Kotlin 协程一 —— 协程 Coroutine

    1.1.1基本定义 进程 进程是一个具有一定独立功能的程序在一个数据集上的一次动态执行的过程,是操作系统进行资源分配和调度的一个独立单位,是应用程序运行的载体。 进程是资源分配的最小单位,在单核CPU中,同一时刻只有一个程序在内存中被CPU调用运行。 线程 基本的

    2024年02月05日
    浏览(50)
  • 【Android】Kotlin 中的Flow是个什么东西

    前言 Kotlin Flow 是 Kotlin Coroutine 用于异步获取数据流的一个库。它允许我们以类似于集合的方式发射多个异步生成的值,并通过类似于 RxJava 的操作符链式处理这些值。 基本概念 Flow 的基本概念是,一个 Flow 代表了一个异步生成的值序列,这些值可能会在不同的时间点被发送出

    2024年02月08日
    浏览(48)
  • kotlin语法进阶 - 协程(一)协程基础

    协程并不是一个新的概念,而是一个非常老的概念,很多语言都支持协程,建议去浏览器去了解一下协程的历史和基本概念,这里我们只讲一下kotlin中的协程的作用。 从代码实现角度来看:kotlin协程底层是用线程实现的,是一个封装完善供开发者使用的线程框架。kotlin的一个

    2024年02月09日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包