一、Flow 流展平
Flow 流在 接收元素 时 , 可能需要 另一个 流的元素 , 两个流之间进行 交互的操作 就是 展平 , 常见的 展平模式有 :
- 连接模式 flatMapConcat : m 个元素的流 与 n 个元素的流 连接后 , 元素个数为 m x n 个 ;
- 合并模式 flatMapMerge : m 个元素的流 与 n 个元素的流 合并后 , 元素个数为 n x m 个 ;
- 最新展平模式 flatMapLatest : 前面的看时间间隔进行结合 , 中间的可能跳过某些元素 , 不要中间值 , 只重视最新的数据 ;
1、连接模式 flatMapConcat 代码示例
连接模式 flatMapConcat : m 个元素的流 与 n 个元素的流 连接后 , 元素个数为 m x n 个 ;
flatMapConcat 函数原型 :
/**
* 通过应用[transform]转换原始流发出的元素,它返回另一个流,
* 然后连接并压平这些流。
*
* 该方法是' map(transform).flattenConcat() '的快捷方式。看到[flattenConcat]。
*
* 请注意,尽管这个操作符看起来非常熟悉,但我们不鼓励在常规的特定于应用程序的流中使用它。
* 最有可能的是,暂停[map]操作符中的操作就足够了,线性转换更容易推理。
*/
@FlowPreview
public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R> =
map(transform).flattenConcat()
调用 FlowA.flatMapConcat(FlowB) 代码 , 先拿到 FlowA , 然后让 FlowA 每个元素 与 FlowB 进行连接 , 以 FlowA 的元素顺序为主导 ;
代码示例 : 注意 两个 流 连接后的间隔 , (0…2) 流之间的发射间隔 100ms , stringFlow 流元素发射间隔 200ms , 连接后的流要结合上述两个间隔 , 在 (0…2) 流 的元素之间间隔为 100ms , 在 (0…2) 流单个元素与所有的 stringFlow 流元素连接的间隔为 200ms ;
package kim.hsl.coroutine
import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
runBlocking {
val startTime = System.currentTimeMillis()
(0..2).asFlow()
.onEach { delay(100) }
// 该 Flow 流与 stringFlow 进行连接
.flatMapConcat { stringFlow(it) }
.collect {
println("收集到元素 $it, 时间 ${System.currentTimeMillis() - startTime}")
}
}
}
suspend fun stringFlow(num: Int) = flow<String> {
emit("$num flatMapContact Hello First")
delay(200)
emit("$num flatMapContact Hello Second")
}
}
执行结果 :
I/System.out: 收集到元素 0 flatMapContact Hello First, 时间 201
I/System.out: 收集到元素 0 flatMapContact Hello Second, 时间 448
I/System.out: 收集到元素 1 flatMapContact Hello First, 时间 608
I/System.out: 收集到元素 1 flatMapContact Hello Second, 时间 837
I/System.out: 收集到元素 2 flatMapContact Hello First, 时间 954
I/System.out: 收集到元素 2 flatMapContact Hello Second, 时间 1196
2、合并模式 flatMapMerge 代码示例
合并模式 flatMapMerge : m 个元素的流 与 n 个元素的流 合并后 , 元素个数为 n x m 个 ;
flatMapMerge 函数原型 :
/**
* 通过应用[transform]转换原始流发出的元素,它返回另一个流,
* 然后合并并压平这些气流。
*
* 此操作符按顺序调用[transform],然后将结果流与[concurrency]合并
* 对并发收集流的数量的限制。
* 它是' map(transform).flattenMerge(concurrency)'的快捷方式。
* 详见[flattenMerge]。
*
* 请注意,尽管这个操作符看起来非常熟悉,但我们不鼓励在常规的特定于应用程序的流中使用它。
* 最有可能的是,暂停[map]操作符中的操作就足够了,线性转换更容易推理。
*
* ###算子融合
*
* [flowOn]、[buffer]和[produceIn] __after_此操作符的应用被融合
* 它是并发合并,因此只有一个正确配置的通道用于执行合并逻辑。
*
* @param并发控制运行中的流的数量,最多收集[concurrency]个流
* 同时。默认情况下,它等于[DEFAULT_CONCURRENCY]。
*/
@FlowPreview
public fun <T, R> Flow<T>.flatMapMerge(
concurrency: Int = DEFAULT_CONCURRENCY,
transform: suspend (value: T) -> Flow<R>
): Flow<R> =
map(transform).flattenMerge(concurrency)
调用 FlowA.flatMapMerge(FlowB) 代码 , 先拿到 FlowB , 然后让 FlowB 每个元素 与 FlowA 进行结合 , 以 FlowB 的元素顺序为主导 ;
代码示例 :
package kim.hsl.coroutine
import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
runBlocking {
val startTime = System.currentTimeMillis()
(0..2).asFlow()
.onEach { delay(100) }
// 该 Flow 流与 stringFlow 进行合并
.flatMapMerge { stringFlow(it) }
.collect {
println("收集到元素 $it, 时间 ${System.currentTimeMillis() - startTime}")
}
}
}
suspend fun stringFlow(num: Int) = flow<String> {
emit("$num flatMapMerge Hello First")
delay(500)
emit("$num flatMapMerge Hello Second")
}
}
执行结果 :
I/System.out: 收集到元素 0 flatMapMerge Hello First, 时间 192
I/System.out: 收集到元素 1 flatMapMerge Hello First, 时间 328
I/System.out: 收集到元素 2 flatMapMerge Hello First, 时间 451
I/System.out: 收集到元素 0 flatMapMerge Hello Second, 时间 698
I/System.out: 收集到元素 1 flatMapMerge Hello Second, 时间 866
I/System.out: 收集到元素 2 flatMapMerge Hello Second, 时间 993
3、最新展平模式 flatMapLatest 代码示例
最新展平模式 flatMapLatest : 前面的看时间间隔进行结合 , 中间的可能跳过某些元素 , 不要中间值 , 只重视最新的数据 ;
flatMapLatest 函数原型 :
/**
* 返回一个流,每当原始流发出一个值时,该流切换到[transform]函数生成的新流。
* 当原始流产生一个新值时,由' transform '块产生的前一个流将被取消。
*
* 例如,以下流程:
* ```
* flow {
* emit("a")
* delay(100)
* emit("b")
* }.flatMapLatest { value ->
* flow {
* emit(value)
* delay(200)
* emit(value + "_last")
* }
* }
* ```
* produces `a b b_last`
*
* 该操作符默认为[buffered][buffer],其输出缓冲区的大小可以通过应用后续的[buffer]操作符来改变。
*/
@ExperimentalCoroutinesApi
public inline fun <T, R> Flow<T>.flatMapLatest(@BuilderInference crossinline transform: suspend (value: T) -> Flow<R>): Flow<R> =
transformLatest { emitAll(transform(it)) }
代码示例 :
package kim.hsl.coroutine
import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
runBlocking {
val startTime = System.currentTimeMillis()
(0..2).asFlow()
.onEach { delay(100) }
// 该 Flow 流与 stringFlow 进行合并
.flatMapLatest { stringFlow(it) }
.collect {
println("收集到元素 $it, 时间 ${System.currentTimeMillis() - startTime}")
}
}
}
suspend fun stringFlow(num: Int) = flow<String> {
emit("$num flatMapLatest Hello First")
delay(500)
emit("$num flatMapLatest Hello Second")
}
}
执行结果 :文章来源:https://www.toymoban.com/news/detail-403630.html
I/System.out: 收集到元素 0 flatMapLatest Hello First, 时间 233
I/System.out: 收集到元素 1 flatMapLatest Hello First, 时间 381
I/System.out: 收集到元素 2 flatMapLatest Hello First, 时间 547
I/System.out: 收集到元素 2 flatMapLatest Hello Second, 时间 1079
文章来源地址https://www.toymoban.com/news/detail-403630.html
到了这里,关于【Kotlin 协程】Flow 流展平 ( 连接模式 flatMapConcat | 合并模式 flatMapMerge | 最新展平模式 flatMapLatest )的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!