Kotlin Flow 冷流

这篇具有很好参考价值的文章主要介绍了Kotlin Flow 冷流。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

协程:Flow

1、Flow是什么?

  1. 处理异步事件流
  2. 可取消:通过取消协程取消Flow
  3. 组合操作符:复杂逻辑处理
  4. 缓冲和背压:发送和接收时用不同速度处理,实现流量控制、避免数据丢失

2、传统事件处理方案:同步、sequence、异步delay

    // 1、同步:
    fun getList() = listOf(100, 200, 300, 400, 500, 600)
    val job = GlobalScope.launch {
        getList().forEach{println(it)}
    }
    job.join()

    // 2、异步: 在SequenceScope中,禁止自己调用挂起,除了库内部的函数yield可以挂起
    fun getSequcence() = sequence{
        for(item in 0..1000){
            // 不允许使用 delay
            yield(item) // 可以做到协程间切换
        }
    }
    val job2 = GlobalScope.launch {
        getSequcence().forEach {
            println(it)
        }
    }
    job2.join()

    // 3、异步: 挂起,但没有协作
    suspend fun getSuspendSequcence(): List<Int> {
        delay(1000)
        return listOf(100, 200, 300, 400, 500, 600)
    }
    val job3 = GlobalScope.launch {
        getSuspendSequcence().forEach {
            println(it)
        }
    }
    job3.join()

flow

1、flow的作用

  1. RxJava和Flow完全一样
  2. 替代LiveData


    // 1、类似Observable
    suspend fun getFlow() = flow{
        for(item in 1..8){
            // 发射
            emit(item)
        }
    }
    val job = GlobalScope.launch {
        // 2、类似RxJava消费,subscribe === Observer消费
        getFlow().collect{
            println(it)
        }
    }
    job.join()

2、getFlow()可以不用suspend修饰,更自由

    fun getFlow() = flow{
        for(item in 1..8){
            // 发射
            emit(item)
        }
    }
替换LiveData

1、Flow可以完全替换LiveData ===> LiveData

// Step 1 : 网络请求
fun fetchData() = flow{
    for(item in 1..100){
        emit("get Json String = $item")
    }
}
// Step 2 : ViewModel抛弃LiveData使用flow
class MyViewModel: ViewModel(){
    val dataFlow: Flow<String> = fetchData()
}

// Step 3 : 订阅,并且collect返回数据
class MyFragment: Fragment(){

    val viewModel: MyViewModel by viewModels()

    override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
        super.onViewCreated(view, savedInstanceState)

        lifecycleScope.launch {
            viewModel.dataFlow.collect{
                // 在Lifecycle的CoroutineScope中,订阅冷流
                println(it)
            }
        }
    }
}

flowOn

1、Kotlin的flowOn替代了subscribeOn, 对上游进行了切换 ====> RxJava

  1. 不再需要observeOn,在需要的线程collect即可
fun main() = runBlocking<Unit> { // 顶级协程
    launch(Dispatchers.Default) {
        NetworkRequest.uploadRequestAction()
            .flowOn(Dispatchers.IO) // flow运行在IO线程池,默认是main。替换了subscribeOn
//            .observeOn(Dispatchers.Default) // 不再需要observeOn,在需要的线程collect即可
            .collect {
                println("$it%") // 显示下载进度
            }
    }
    // flowOn是给上游还是下游切换?
    // 都是给上游
}


object NetworkRequest {
    fun uploadRequestAction() = flow {
        println("uploadRequestAction thread:${Thread.currentThread().name}")
        for (item in 1..100) {
            delay(100)
            emit(item) // 反馈文件上传进度
        }
    }
}

冷流

1、flow和RxJava都是冷流

发射源简化

1、简化发射源 ===> 高阶函数

  1. 一切对象、函数都可以toFlow转换为flow
// 1、无参非挂起函数 toFlow
private fun<T> (()->T).toFlow() = flow{
    emit(invoke())
}
    // 使用:
    val r: () -> String = ::getFlowValue
    r.toFlow().collect { println(it) }

// 2、String toFlow
//private fun String.toFlow() = flow{
//    emit(this@toFlow) // this@toFlow有markdown错误
//}
    // 使用:
    "String".toFlow().collect { println(it) }

// 3、无参挂起函数
private fun <OUTPUT> (suspend () -> OUTPUT).toFlow() = flow {
    emit(invoke())
}
    // 使用:
    ::getFlowValueSuspend.toFlow().collect { println(it) }

// 4、所有集合toFlow
private fun <E> Iterable<E>.toFlow() = flow {
    this@toFlow.forEach { emit(it) }
}
    // 使用:
    listOf(1, 2, 3, 4, 5, 6).toFlow().collect { println(it) }
    setOf(100, 200, 300, 400, 500, 600).toFlow().collect { println(it) }

// 5、sequence的toFlow
private fun <T> Sequence<T>.toFlow() = flow {
    this@toFlow.forEach { emit(it) }
}
    // 使用
    sequence {
        yield("Derry1")
        yield("Derry2")
        yield("Derry3")
    }.toFlow().collect { println(it) }

// 6、Array系列处理
//private fun <T> Array<T>.toFlow() = flow {
//    // this@toFlow.forEach { emit(it) }
//    repeat(this@toFlow.size) {
//        emit(this@toFlow[it])
//    }
//}
//
//private fun IntArray.toFlow() = flow {
//    for (i in this@toFlow) {
//        emit(i)
//    }
//}
//
//private fun LongArray.toFlow() = flow {
//    for (i in this@toFlow) {
//        emit(i)
//    }
//}

// 7、Range
// 注意第4步,就已经覆盖Range的情况
private fun IntRange.toFlow() = flow {
    this@toFlow.forEach { emit(it) }
}

private fun LongRange.toFlow() = flow {
    this@toFlow.forEach { emit(it) }
}
vararg和flowOf

1、可变参数实现单个数据或者多个数据都可以转为flow

private fun <T> flows(vararg value: T) = flow{
    value.forEach {
        emit(it)
    }
}

使用

    flows("Hello").collect{ println(it) }
    flows(1,2,3,4,5).collect{ println(it) }

2、使用官方的flowOf

    flowOf("Hello").collect{ println(it) }
    flowOf(1,2,3,4,5).collect{ println(it) }

withContext

1、协程中上游不可以使用withContext,只能使用flowOn

  1. 上下文保存机制
  2. 使用withContext会报错

launchIn

1、launchIn的作用

  1. 发射区域flowOn
  2. 收集区域launchIn:选择下游协程,需要用onEach打印数据
// 发射源区域
fun getFlowValue() =
    listOf(100, 200, 300, 400, 500, 600)
        .asFlow()
        .onEach { delay(2000) }
        .flowOn(Dispatchers.Default)
// 收集消费区域
    val job = getFlowValue()
        .onEach { println("thread:${Thread.currentThread().name}   $it")  }
        .launchIn(CoroutineScope(Dispatchers.IO + CoroutineName("自定义协程"))) // 打开水龙头
    job.join() // 需要等待执行完成,不然外面main执行结束了。

输出结果

thread:DefaultDispatcher-worker-3 @自定义协程#2   100
thread:DefaultDispatcher-worker-1 @自定义协程#2   200
thread:DefaultDispatcher-worker-1 @自定义协程#2   300
thread:DefaultDispatcher-worker-1 @自定义协程#2   400
thread:DefaultDispatcher-worker-1 @自定义协程#2   500
thread:DefaultDispatcher-worker-1 @自定义协程#2   600

cancellable

1、协程取消,会导致Flow管道流也会取消。每次都delay 1000,可以正确检测异常

fun getFlow() = flow {
    (1..10).forEach { emit(it) }
}.onEach { delay(1000) }
    getFlow().collect {
        println(it)
        if (it == 5) cancel()
    }

输出结果

1
2
3
4
5
Exception in thread "main" kotlinx.coroutines.JobCancellationException
检测

2、cancellable:取消不及时,速度太快了,增加监测机制

    (1..10).asFlow().collect {
        println(it)
        if (it == 5) cancel()
    }// 会输出1~10,才抛出异常

    (1..10).asFlow().cancellable().collect {
        println(it)
        if (it == 5) cancel()
    }// 可以正确捕获到

背压

buffer

1、背压是什么?

  1. 数据产生速度 >>> 数据消费速度,消耗过多时间
  2. 可能OOM
// 数据过多,会导致消费不来
fun getFlow() = flow {
    (1..10).forEach {
        delay(500L)
        emit(it) // 一秒钟发射一个 一秒钟发射一个 ....
        println("生成了:$it thread:${Thread.currentThread().name}")
    }
}

// 消费慢
    val t = measureTimeMillis {
        getFlow().collect {
            delay(1000L)
            println("消费了:$it thread:${Thread.currentThread().name}")
        }
    }
    println("上游 下游 共 消耗:$t 时间")


// 共消耗15495ms
// 都在一个线程处理,按顺序,放一个取一个

2、buffer:设立缓冲区,减少背压的数量【解决办法一】

fun getFlow() = flow {
    (1..10).forEach {
        delay(500L)
        emit(it) // 一秒钟发射一个 一秒钟发射一个 ....
        println("生成了:$it thread:${Thread.currentThread().name}")
    }
}
    .buffer(100) // 设置缓冲区,减少 背压

// 共消耗11272ms

3、flowOn(Dispatchers.IO):另一个线程处理【解决办法二】

  1. 可以和buffer一起使用
fun getFlow() = flow {
    (1..10).forEach {
        delay(500L)
        emit(it) // 一秒钟发射一个 一秒钟发射一个 ....
        println("生成了:$it thread:${Thread.currentThread().name}")
    }
}
    .buffer(100) // 设置缓冲区,减少 背压
    .flowOn(Dispatchers.IO)
// 共消耗11001ms
conflate

1、conflate作用:只消费当前认为最新的值,会丢失部分信息

    val t = measureTimeMillis {
        getFlow().conflate().collect{
            delay(1000L)
            println("消费了:$it thread:${Thread.currentThread().name}")
        }
    }
    println("上游 下游 共 消耗:$t 时间")
// 共消耗7303ms
collectLatest

1、collectLatest:只收集最新值,速度大幅度提升

    val t = measureTimeMillis {
        getFlow().collectLatest {
            delay(1000L)
            println("消费了:$it thread:${Thread.currentThread().name}")
        }
    }
    println("上游 下游 共 消耗:$t 时间")
// 共消耗6869ms
transform

1、transform将上游数据转换后交给下游 ====> LiveData

    listOf(100, 200, 300, 400, 500, 600)
        .asFlow()
        .transform {
            this.emit("你好啊数字$it")
        }.collect { println(it) }
take

1、take限制发送的长度,只要前面几个

    listOf(100, 200, 300, 400, 500, 600)
        .asFlow()
        .take(4)
        .collect { println(it) }

2、自定义take

  1. 对Flow扩展,调用collect收集结果
  2. 用结果构造出flow
fun <INPUT> Flow<INPUT>.myTake(number:Int):Flow<INPUT>{
    require(number > 0){"Request element count 0 show be positive"}
    return flow {
        var i = 0
        collect{// collect收集的n个数据,构造了flow{}
            if(i++ < number){
                return@collect emit(it)
            }
        }
    }
}

reduce

末端操作符:适合累加

  1. reduce参数p1 = 上一次运算返回的最后一行
  2. 下面代码实现:1+2+3+4+…+100 = 5050
    val r = (1..100)
        .asFlow()
        .reduce { p1, p2 ->
            val result = p1 + p2
            result
        }
    println(r)

fliter:过滤

    (100..200).toFlow().filter { it % 50 == 0 }.map { "map result:$it" }.collect{ println(it) }

zip

1、zip合并Flow

fun getNames() = listOf("杜子腾", "史珍香", "刘奋").asFlow().onEach { delay(1000) }
fun getAges() = arrayOf(30, 40, 50).asFlow().onEach { delay(2000) }

    // 合并 组合 操作符 zip
    getNames().zip(getAges()) { p1, p2 ->
        "name:$p1, age:$p2"
    }.collect {
        println(it)
    }

输出:
name:杜子腾, age:30
name:史珍香, age:40
name:刘奋, age:50 

2、zip合并的两个Flow数据长度不一样会怎么办?

fun getNames() = listOf("杜子腾", "史珍香", "刘奋").asFlow().onEach { delay(1000) }
fun getAges() = arrayOf(30, 40, 50, 60, 70).asFlow().onEach { delay(2000) }

zip之后输出结果:会抛弃不匹配的信息6070
name:杜子腾, age:30
name:史珍香, age:40
name:刘奋, age:50 

map

转换

flatmap

1、flatMapxxx作用是展平

  1. 不展平相当于 Flow嵌套,如:Flow<Flow<String>>
  2. 需要两次收集:collect { it.collect { a -> println(a)} }手动展平
// 不展平相当于 Flow嵌套,如:Flow<Flow<String>> 
// 这里发送两次事件,属于Flow
fun runWork(inputValue:Int) = flow {
    emit("$inputValue 号员工开始工作了")
    delay(1000L)
    emit("$inputValue 号员工结束工作了")
}
    (1..6).asFlow()
        .onEach { delay(1000L)}
        .map { runWork(it) } // Flow<Flow<String>> // Flow嵌套
        .collect { it.collect { a -> println(a)} }
    // 展平 操作符 flatMap
    getNumbers()
        .onEach { delay(1000L)}
//        .flatMap {  } // 已经废弃
        .flatMapConcat { runWork(it) }
        // .flatMapMerge { runWork(it) }
        // .flatMapLatest { runWork(it) }
        .collect { println(it) }

2、flatMapConcat:拼接,常用
3、flatMapMerge
4、flatMapLatest

merge

1、flow合并,执行,并且获得结果

  1. 数据请求函数
data class Home(val info1: String, val info2: String)

data class HomeRequestResponseResultData(val code: Int, val msg: String, val home: Home)

// 请求本地加载首页数据
fun CoroutineScope.getHomeLocalData(userName: String) = async (Dispatchers.IO) {
    delay(3000)
    Home("数据1...", "数据1...")
}

// 请求网络服务器加载首页数据
fun CoroutineScope.getHomeRemoteData(userName: String) = async (Dispatchers.IO) {
    delay(6000)
    Home("数据3...", "数据4...")
}
  1. map + merge,合并Flow,collect触发冷流
// 流程
    // 1.把多个函数 拿过来
    // 2.组装成协程
    // 3.包装成FLow
    // 4.Flow合并 得到 结果
    coroutineScope {
        val r = listOf(::getHomeLocalData, ::getHomeRemoteData) // 1.把多个函数 拿过来
            .map {
                it("Derry用户") //it.call("Derry用户") 需要引入Kotlin反射 2.组装成协程,调用
            }.map {
                flow { emit(it.await()) }// 3.包装成FLow
            }
        val r2 = r.merge() // 4.Flow合并 得到 结果
        r2.collect { println(it) }
    }

异常

catch

捕获上游的异常

  1. 用声明式
    flow {
        listOf(100).forEach { value ->
            emit(value)
            throw KotlinNullPointerException("上游抛出了异常")
        }
    }
        .catch {
             println("e:$it")
             emit(200)
        }
        .onEach { delay(1000L) }
        .collect { println(it) }
  1. Flow是流式的,catch不能捕获下游的异常
onCompletion

1、Flow正常结束,声明式

getNumbers().onCompletion { println("协程Flow结束了") }.collect{println(it)}

2、onCompletion来捕获异常结束:上游和下游都可以

// 上游
    getNumbers2().onCompletion {
            if (it != null) { // 非正常结束  是异常结束
                println("上游 发生了异常 $it")
            }
        }
        .catch { println("被catch到了 上游 发生了异常 $it") }  // .catch是能 捕获到 上游 抛出的异常, 异常的传递过程
        .collect { println(it) }

3、异常总结文章来源地址https://www.toymoban.com/news/detail-657265.html

  1. 上游的异常抛出,可以使用 声明式
  2. 下游的异常抛出,可以使用 命令式
  3. onCompletion(声明式) 上游 与 下游 的异常信息,都能够知道 能够得到
  4. onCompletion(声明式) 正常的结束 还是 异常的结束,都能知道
  5. finally 能够知道正常的结束(命令式)

到了这里,关于Kotlin Flow 冷流的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • kotlin flow 定时任务

    要在 Kotlin 的 Flow 中定时广播数据,可以使用 Kotlin 的协程库中的 delay 函数和 while 循环结合使用。以下是一个简单的示例代码,每秒钟向 Flow 发送一个数字: kotlin Copy code import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flow funtimerFlow(): FlowInt = flow

    2024年02月15日
    浏览(83)
  • Kotlin Flow 转换以及上下游处理

    本片文章主要介绍Flow上下游处理,上游一个Flow使用map,上游两个Flow使用zip,上游三个Flow及以上使用combine  1、下面代码展示了upStreamFlow作为上游,downStreamFlow作为下游,通过对upStreamFlow使用map操作符函数将upStreamFlow转换为新的Flow对象,每个元素都通过lambda表达式进行处理,

    2024年02月11日
    浏览(37)
  • Kotlin协程flow发送时间间隔debounce

    debounce的作用是让连续发射的数据之间间隔起来。典型的应用场景是搜索引擎里面的输入,当用户输入字符时候,有时候,并不希望用户每输入任何一个单字就触发一次后台真正的查询,而是希望用户在输入一定时间间隔后(猜测此时用户已完成完整的输入),

    2024年02月11日
    浏览(32)
  • Kotlin:使用flow实现倒计时功能

    一、效果图 二、ExtendContext.kt 文件代码 注意:创建ExtendContext.kt选择file 使用kotlin扩展方法的特性创建countDown扩展方法,避免多个地方使用倒计时重复创建countDown方法 三、MainActivity.kt代码 四、build.gradle.kts代码

    2024年02月19日
    浏览(40)
  • Kotlin 协程四 —— Flow 和 Channel 的应用

    1.1.1 ChannelFlow 前面提到 ChannelFlow 是热流。只要上游产生数据,就会立即发射给下游收集者。 ChannelFlow 是一个抽象类,并且被标记为内部 Api,不应该在外部代码直接使用。 注意到它内部有一个方法  produceImpl  返回的是一个 ReceiveChannel,它的实现是收集上游发射的数据,然后

    2024年02月20日
    浏览(34)
  • kotlin基础--快速上手kotlin语言开发

    1.1 变量 var表示可变变量,val表示不可变变量,注意并不是常量。变量名写在前面,类型写在后面,编译器如果能推断出你的类型,那么类型是不用声明的 。 编译器自动推断类型。 空安全类型编译器报错 如果还是想给赋初始化值的话 注意:String和String?是两个完全不同的类

    2024年02月15日
    浏览(54)
  • Kotlin管道Channel在receiveAsFlow时debounce与flow差异

      - -- 0 channel 休眠 10 0 flow 休眠 10 -- - 1 flow 休眠 10 1 channel 休眠 10 - -- 2 channel 休眠 10 2 flow 休眠 10 - -- 3 channel 休眠 10 3 flow 休眠 10 - -- 4 channel 休眠 10 4 flow 休眠 10 flow 4-1693561918986   程序运行后,flow很快就收到了最后一条数据4-xxx...,而Channel在receiveAsFlow接收数据debounce时候,将

    2024年02月09日
    浏览(42)
  • Kotlin 轻量级Android开发

    Kotlin 是一门运行在 JVM 之上的语言。 它由 Jetbrains 创建,而 Jetbrains 则是诸多强大的工具(如知名的 Java IDE IntelliJ IDEA )背后的公司。 Kotlin 是一门非常简单的语言,其主要目标之一就是提供强大语言的同时又保持简单且精简的语法。 其主要特性如下所示: 轻量级:这一点对

    2024年02月07日
    浏览(161)
  • Kotlin开发Android之基础问题记录

    1、Kotlin中如何直接通过组件id来操作组件? 解决方案:在build.gradle中添加对相应插件的使用即可。 2、Kotlin中Button设置背景颜色没有效果。 解决方案:在res-values-themes.xml文件中修改如下代码: 3、Kotlin中如何使用静态类或者静态方法? 解决方案: 4、Kotlin中EditText的赋值问题

    2024年02月09日
    浏览(47)
  • Android开发知识学习——Kotlin进阶

    申明前缀有construct修饰 如果有一个主构造函数,每个次构造函数需要委托给主构造函数,可以直接委托或者通过别的构造函数 主构造函数:是类头的一部分,跟在类名后面(可带参数),没有任何注解和可见性修饰符。如: 主构造函数中没有任何代码,初始化代码放在关键

    2024年02月06日
    浏览(64)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包