【Kotlin 协程】Flow 流展平 ( 连接模式 flatMapConcat | 合并模式 flatMapMerge | 最新展平模式 flatMapLatest )

这篇具有很好参考价值的文章主要介绍了【Kotlin 协程】Flow 流展平 ( 连接模式 flatMapConcat | 合并模式 flatMapMerge | 最新展平模式 flatMapLatest )。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。





一、Flow 流展平



Flow 流在 接收元素 时 , 可能需要 另一个 流的元素 , 两个流之间进行 交互的操作 就是 展平 , 常见的 展平模式有 :

  • 连接模式 flatMapConcat : m 个元素的流 与 n 个元素的流 连接后 , 元素个数为 m x n 个 ;
  • 合并模式 flatMapMerge : m 个元素的流 与 n 个元素的流 合并后 , 元素个数为 n x m 个 ;
  • 最新展平模式 flatMapLatest : 前面的看时间间隔进行结合 , 中间的可能跳过某些元素 , 不要中间值 , 只重视最新的数据 ;

1、连接模式 flatMapConcat 代码示例


连接模式 flatMapConcat : m 个元素的流 与 n 个元素的流 连接后 , 元素个数为 m x n 个 ;

flatMapConcat 函数原型 :

/**
 * 通过应用[transform]转换原始流发出的元素,它返回另一个流,
 * 然后连接并压平这些流。
 *
 * 该方法是' map(transform).flattenConcat() '的快捷方式。看到[flattenConcat]。
 *
 * 请注意,尽管这个操作符看起来非常熟悉,但我们不鼓励在常规的特定于应用程序的流中使用它。
 * 最有可能的是,暂停[map]操作符中的操作就足够了,线性转换更容易推理。
 */
@FlowPreview
public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R> =
    map(transform).flattenConcat()

调用 FlowA.flatMapConcat(FlowB) 代码 , 先拿到 FlowA , 然后让 FlowA 每个元素 与 FlowB 进行连接 , 以 FlowA 的元素顺序为主导 ;


代码示例 : 注意 两个 流 连接后的间隔 , (0…2) 流之间的发射间隔 100ms , stringFlow 流元素发射间隔 200ms , 连接后的流要结合上述两个间隔 , 在 (0…2) 流 的元素之间间隔为 100ms , 在 (0…2) 流单个元素与所有的 stringFlow 流元素连接的间隔为 200ms ;

package kim.hsl.coroutine

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

class MainActivity : AppCompatActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        runBlocking {
            val startTime = System.currentTimeMillis()
            (0..2).asFlow()
                        .onEach { delay(100) }
                        // 该 Flow 流与  stringFlow 进行连接
                        .flatMapConcat { stringFlow(it) }
                        .collect {
                            println("收集到元素 $it, 时间 ${System.currentTimeMillis() - startTime}")
                        }
        }
    }

    suspend fun stringFlow(num: Int) = flow<String> {
        emit("$num flatMapContact Hello First")
        delay(200)
        emit("$num flatMapContact Hello Second")
    }
}

执行结果 :

I/System.out: 收集到元素 0 flatMapContact Hello First, 时间 201
I/System.out: 收集到元素 0 flatMapContact Hello Second, 时间 448
I/System.out: 收集到元素 1 flatMapContact Hello First, 时间 608
I/System.out: 收集到元素 1 flatMapContact Hello Second, 时间 837
I/System.out: 收集到元素 2 flatMapContact Hello First, 时间 954
I/System.out: 收集到元素 2 flatMapContact Hello Second, 时间 1196

【Kotlin 协程】Flow 流展平 ( 连接模式 flatMapConcat | 合并模式 flatMapMerge | 最新展平模式 flatMapLatest )


2、合并模式 flatMapMerge 代码示例


合并模式 flatMapMerge : m 个元素的流 与 n 个元素的流 合并后 , 元素个数为 n x m 个 ;

flatMapMerge 函数原型 :

/**
 * 通过应用[transform]转换原始流发出的元素,它返回另一个流,
 * 然后合并并压平这些气流。
 *
 * 此操作符按顺序调用[transform],然后将结果流与[concurrency]合并
 * 对并发收集流的数量的限制。
 * 它是' map(transform).flattenMerge(concurrency)'的快捷方式。
 * 详见[flattenMerge]。
 *
 * 请注意,尽管这个操作符看起来非常熟悉,但我们不鼓励在常规的特定于应用程序的流中使用它。
 * 最有可能的是,暂停[map]操作符中的操作就足够了,线性转换更容易推理。
 *
 * ###算子融合
 *
 * [flowOn]、[buffer]和[produceIn] __after_此操作符的应用被融合
 * 它是并发合并,因此只有一个正确配置的通道用于执行合并逻辑。
 *
 * @param并发控制运行中的流的数量,最多收集[concurrency]个流
 * 同时。默认情况下,它等于[DEFAULT_CONCURRENCY]。
 */
@FlowPreview
public fun <T, R> Flow<T>.flatMapMerge(
    concurrency: Int = DEFAULT_CONCURRENCY,
    transform: suspend (value: T) -> Flow<R>
): Flow<R> =
    map(transform).flattenMerge(concurrency)

调用 FlowA.flatMapMerge(FlowB) 代码 , 先拿到 FlowB , 然后让 FlowB 每个元素 与 FlowA 进行结合 , 以 FlowB 的元素顺序为主导 ;


代码示例 :

package kim.hsl.coroutine

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

class MainActivity : AppCompatActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        runBlocking {
            val startTime = System.currentTimeMillis()
            (0..2).asFlow()
                        .onEach { delay(100) }
                        // 该 Flow 流与  stringFlow 进行合并
                        .flatMapMerge { stringFlow(it) }
                        .collect {
                            println("收集到元素 $it, 时间 ${System.currentTimeMillis() - startTime}")
                        }
        }
    }

    suspend fun stringFlow(num: Int) = flow<String> {
        emit("$num flatMapMerge Hello First")
        delay(500)
        emit("$num flatMapMerge Hello Second")
    }
}

执行结果 :

I/System.out: 收集到元素 0 flatMapMerge Hello First, 时间 192
I/System.out: 收集到元素 1 flatMapMerge Hello First, 时间 328
I/System.out: 收集到元素 2 flatMapMerge Hello First, 时间 451
I/System.out: 收集到元素 0 flatMapMerge Hello Second, 时间 698
I/System.out: 收集到元素 1 flatMapMerge Hello Second, 时间 866
I/System.out: 收集到元素 2 flatMapMerge Hello Second, 时间 993

【Kotlin 协程】Flow 流展平 ( 连接模式 flatMapConcat | 合并模式 flatMapMerge | 最新展平模式 flatMapLatest )


3、最新展平模式 flatMapLatest 代码示例


最新展平模式 flatMapLatest : 前面的看时间间隔进行结合 , 中间的可能跳过某些元素 , 不要中间值 , 只重视最新的数据 ;

flatMapLatest 函数原型 :

/**
 * 返回一个流,每当原始流发出一个值时,该流切换到[transform]函数生成的新流。
 * 当原始流产生一个新值时,由' transform '块产生的前一个流将被取消。
 *
 * 例如,以下流程:
 * ```
 * flow {
 *     emit("a")
 *     delay(100)
 *     emit("b")
 * }.flatMapLatest { value ->
 *     flow {
 *         emit(value)
 *         delay(200)
 *         emit(value + "_last")
 *     }
 * }
 * ```
 * produces `a b b_last`
 *
 * 该操作符默认为[buffered][buffer],其输出缓冲区的大小可以通过应用后续的[buffer]操作符来改变。
 */
@ExperimentalCoroutinesApi
public inline fun <T, R> Flow<T>.flatMapLatest(@BuilderInference crossinline transform: suspend (value: T) -> Flow<R>): Flow<R> =
    transformLatest { emitAll(transform(it)) }

代码示例 :

package kim.hsl.coroutine

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

class MainActivity : AppCompatActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        runBlocking {
            val startTime = System.currentTimeMillis()
            (0..2).asFlow()
                        .onEach { delay(100) }
                        // 该 Flow 流与  stringFlow 进行合并
                        .flatMapLatest { stringFlow(it) }
                        .collect {
                            println("收集到元素 $it, 时间 ${System.currentTimeMillis() - startTime}")
                        }
        }
    }

    suspend fun stringFlow(num: Int) = flow<String> {
        emit("$num flatMapLatest Hello First")
        delay(500)
        emit("$num flatMapLatest Hello Second")
    }
}

执行结果 :

I/System.out: 收集到元素 0 flatMapLatest Hello First, 时间 233
I/System.out: 收集到元素 1 flatMapLatest Hello First, 时间 381
I/System.out: 收集到元素 2 flatMapLatest Hello First, 时间 547
I/System.out: 收集到元素 2 flatMapLatest Hello Second, 时间 1079

【Kotlin 协程】Flow 流展平 ( 连接模式 flatMapConcat | 合并模式 flatMapMerge | 最新展平模式 flatMapLatest )文章来源地址https://www.toymoban.com/news/detail-403630.html

到了这里,关于【Kotlin 协程】Flow 流展平 ( 连接模式 flatMapConcat | 合并模式 flatMapMerge | 最新展平模式 flatMapLatest )的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 协程Flow原理

    什么是Flow Flow直译过来就是“流”的意思,也就是将我们我们任务如同水流一样一步一步分割做处理。想象一下,现在有一个任务需要从山里取水来用你需要怎么做? 扛上扁担走几十里山路把水挑回来。简单粗暴,但是有可能当你走了几十里路发现水干涸了,你就白跑一趟。

    2024年02月04日
    浏览(33)
  • kotlin flow 定时任务

    要在 Kotlin 的 Flow 中定时广播数据,可以使用 Kotlin 的协程库中的 delay 函数和 while 循环结合使用。以下是一个简单的示例代码,每秒钟向 Flow 发送一个数字: kotlin Copy code import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flow funtimerFlow(): FlowInt = flow

    2024年02月15日
    浏览(82)
  • Kotlin Flow 冷流

    1、Flow是什么? 处理异步事件流 可取消:通过取消协程取消Flow 组合操作符:复杂逻辑处理 缓冲和背压:发送和接收时用不同速度处理,实现流量控制、避免数据丢失 2、传统事件处理方案:同步、sequence、异步delay flow 1、flow的作用 RxJava和Flow完全一样 替代LiveData 2、getFlow(

    2024年02月12日
    浏览(41)
  • 【kotlin 协程】万字协程 一篇完成kotlin 协程进阶

    Kotlin 中的协程提供了一种全新处理并发的方式,可以在 Android 平台上使用它来简化异步执行的代码。协程是从 Kotlin 1.3 版本开始引入,但这一概念在编程世界诞生的黎明之际就有了,最早使用协程的编程语言可以追溯到 1967 年的 Simula 语言。 在过去几年间,协程这个概念发展

    2024年02月07日
    浏览(53)
  • Kotlin Flow 转换以及上下游处理

    本片文章主要介绍Flow上下游处理,上游一个Flow使用map,上游两个Flow使用zip,上游三个Flow及以上使用combine  1、下面代码展示了upStreamFlow作为上游,downStreamFlow作为下游,通过对upStreamFlow使用map操作符函数将upStreamFlow转换为新的Flow对象,每个元素都通过lambda表达式进行处理,

    2024年02月11日
    浏览(37)
  • Kotlin:使用flow实现倒计时功能

    一、效果图 二、ExtendContext.kt 文件代码 注意:创建ExtendContext.kt选择file 使用kotlin扩展方法的特性创建countDown扩展方法,避免多个地方使用倒计时重复创建countDown方法 三、MainActivity.kt代码 四、build.gradle.kts代码

    2024年02月19日
    浏览(40)
  • 【Android】Kotlin 中的Flow是个什么东西

    前言 Kotlin Flow 是 Kotlin Coroutine 用于异步获取数据流的一个库。它允许我们以类似于集合的方式发射多个异步生成的值,并通过类似于 RxJava 的操作符链式处理这些值。 基本概念 Flow 的基本概念是,一个 Flow 代表了一个异步生成的值序列,这些值可能会在不同的时间点被发送出

    2024年02月08日
    浏览(48)
  • Kotlin 协程一 —— 协程 Coroutine

    1.1.1基本定义 进程 进程是一个具有一定独立功能的程序在一个数据集上的一次动态执行的过程,是操作系统进行资源分配和调度的一个独立单位,是应用程序运行的载体。 进程是资源分配的最小单位,在单核CPU中,同一时刻只有一个程序在内存中被CPU调用运行。 线程 基本的

    2024年02月05日
    浏览(50)
  • kotlin语法进阶 - 协程(一)协程基础

    协程并不是一个新的概念,而是一个非常老的概念,很多语言都支持协程,建议去浏览器去了解一下协程的历史和基本概念,这里我们只讲一下kotlin中的协程的作用。 从代码实现角度来看:kotlin协程底层是用线程实现的,是一个封装完善供开发者使用的线程框架。kotlin的一个

    2024年02月09日
    浏览(46)
  • Kotlin管道Channel在receiveAsFlow时debounce与flow差异

      - -- 0 channel 休眠 10 0 flow 休眠 10 -- - 1 flow 休眠 10 1 channel 休眠 10 - -- 2 channel 休眠 10 2 flow 休眠 10 - -- 3 channel 休眠 10 3 flow 休眠 10 - -- 4 channel 休眠 10 4 flow 休眠 10 flow 4-1693561918986   程序运行后,flow很快就收到了最后一条数据4-xxx...,而Channel在receiveAsFlow接收数据debounce时候,将

    2024年02月09日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包