协程:Flow
1、Flow是什么?
- 处理异步事件流
- 可取消:通过取消协程取消Flow
- 组合操作符:复杂逻辑处理
- 缓冲和背压:发送和接收时用不同速度处理,实现流量控制、避免数据丢失
2、传统事件处理方案:同步、sequence、异步delay
// 1、同步:
fun getList() = listOf(100, 200, 300, 400, 500, 600)
val job = GlobalScope.launch {
getList().forEach{println(it)}
}
job.join()
// 2、异步: 在SequenceScope中,禁止自己调用挂起,除了库内部的函数yield可以挂起
fun getSequcence() = sequence{
for(item in 0..1000){
// 不允许使用 delay
yield(item) // 可以做到协程间切换
}
}
val job2 = GlobalScope.launch {
getSequcence().forEach {
println(it)
}
}
job2.join()
// 3、异步: 挂起,但没有协作
suspend fun getSuspendSequcence(): List<Int> {
delay(1000)
return listOf(100, 200, 300, 400, 500, 600)
}
val job3 = GlobalScope.launch {
getSuspendSequcence().forEach {
println(it)
}
}
job3.join()
flow
1、flow的作用
- RxJava和Flow完全一样
- 替代LiveData
// 1、类似Observable
suspend fun getFlow() = flow{
for(item in 1..8){
// 发射
emit(item)
}
}
val job = GlobalScope.launch {
// 2、类似RxJava消费,subscribe === Observer消费
getFlow().collect{
println(it)
}
}
job.join()
2、getFlow()可以不用suspend修饰,更自由
fun getFlow() = flow{
for(item in 1..8){
// 发射
emit(item)
}
}
替换LiveData
1、Flow可以完全替换LiveData ===> LiveData
// Step 1 : 网络请求
fun fetchData() = flow{
for(item in 1..100){
emit("get Json String = $item")
}
}
// Step 2 : ViewModel抛弃LiveData使用flow
class MyViewModel: ViewModel(){
val dataFlow: Flow<String> = fetchData()
}
// Step 3 : 订阅,并且collect返回数据
class MyFragment: Fragment(){
val viewModel: MyViewModel by viewModels()
override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
super.onViewCreated(view, savedInstanceState)
lifecycleScope.launch {
viewModel.dataFlow.collect{
// 在Lifecycle的CoroutineScope中,订阅冷流
println(it)
}
}
}
}
flowOn
1、Kotlin的flowOn替代了subscribeOn, 对上游进行了切换 ====> RxJava
- 不再需要observeOn,在需要的线程collect即可
fun main() = runBlocking<Unit> { // 顶级协程
launch(Dispatchers.Default) {
NetworkRequest.uploadRequestAction()
.flowOn(Dispatchers.IO) // flow运行在IO线程池,默认是main。替换了subscribeOn
// .observeOn(Dispatchers.Default) // 不再需要observeOn,在需要的线程collect即可
.collect {
println("$it%") // 显示下载进度
}
}
// flowOn是给上游还是下游切换?
// 都是给上游
}
object NetworkRequest {
fun uploadRequestAction() = flow {
println("uploadRequestAction thread:${Thread.currentThread().name}")
for (item in 1..100) {
delay(100)
emit(item) // 反馈文件上传进度
}
}
}
冷流
1、flow和RxJava都是冷流
发射源简化
1、简化发射源 ===> 高阶函数
- 一切对象、函数都可以
toFlow
转换为flow
// 1、无参非挂起函数 toFlow
private fun<T> (()->T).toFlow() = flow{
emit(invoke())
}
// 使用:
val r: () -> String = ::getFlowValue
r.toFlow().collect { println(it) }
// 2、String toFlow
//private fun String.toFlow() = flow{
// emit(this@toFlow) // this@toFlow有markdown错误
//}
// 使用:
"String".toFlow().collect { println(it) }
// 3、无参挂起函数
private fun <OUTPUT> (suspend () -> OUTPUT).toFlow() = flow {
emit(invoke())
}
// 使用:
::getFlowValueSuspend.toFlow().collect { println(it) }
// 4、所有集合toFlow
private fun <E> Iterable<E>.toFlow() = flow {
this@toFlow.forEach { emit(it) }
}
// 使用:
listOf(1, 2, 3, 4, 5, 6).toFlow().collect { println(it) }
setOf(100, 200, 300, 400, 500, 600).toFlow().collect { println(it) }
// 5、sequence的toFlow
private fun <T> Sequence<T>.toFlow() = flow {
this@toFlow.forEach { emit(it) }
}
// 使用
sequence {
yield("Derry1")
yield("Derry2")
yield("Derry3")
}.toFlow().collect { println(it) }
// 6、Array系列处理
//private fun <T> Array<T>.toFlow() = flow {
// // this@toFlow.forEach { emit(it) }
// repeat(this@toFlow.size) {
// emit(this@toFlow[it])
// }
//}
//
//private fun IntArray.toFlow() = flow {
// for (i in this@toFlow) {
// emit(i)
// }
//}
//
//private fun LongArray.toFlow() = flow {
// for (i in this@toFlow) {
// emit(i)
// }
//}
// 7、Range
// 注意第4步,就已经覆盖Range的情况
private fun IntRange.toFlow() = flow {
this@toFlow.forEach { emit(it) }
}
private fun LongRange.toFlow() = flow {
this@toFlow.forEach { emit(it) }
}
vararg和flowOf
1、可变参数实现单个数据或者多个数据都可以转为flow
private fun <T> flows(vararg value: T) = flow{
value.forEach {
emit(it)
}
}
使用
flows("Hello").collect{ println(it) }
flows(1,2,3,4,5).collect{ println(it) }
2、使用官方的flowOf
flowOf("Hello").collect{ println(it) }
flowOf(1,2,3,4,5).collect{ println(it) }
withContext
1、协程中上游不可以使用withContext,只能使用flowOn
- 上下文保存机制
- 使用withContext会报错
launchIn
1、launchIn的作用
- 发射区域flowOn
- 收集区域launchIn:选择下游协程,需要用onEach打印数据
// 发射源区域
fun getFlowValue() =
listOf(100, 200, 300, 400, 500, 600)
.asFlow()
.onEach { delay(2000) }
.flowOn(Dispatchers.Default)
// 收集消费区域
val job = getFlowValue()
.onEach { println("thread:${Thread.currentThread().name} $it") }
.launchIn(CoroutineScope(Dispatchers.IO + CoroutineName("自定义协程"))) // 打开水龙头
job.join() // 需要等待执行完成,不然外面main执行结束了。
输出结果
thread:DefaultDispatcher-worker-3 @自定义协程#2 100
thread:DefaultDispatcher-worker-1 @自定义协程#2 200
thread:DefaultDispatcher-worker-1 @自定义协程#2 300
thread:DefaultDispatcher-worker-1 @自定义协程#2 400
thread:DefaultDispatcher-worker-1 @自定义协程#2 500
thread:DefaultDispatcher-worker-1 @自定义协程#2 600
cancellable
1、协程取消,会导致Flow管道流也会取消。每次都delay 1000,可以正确检测异常
fun getFlow() = flow {
(1..10).forEach { emit(it) }
}.onEach { delay(1000) }
getFlow().collect {
println(it)
if (it == 5) cancel()
}
输出结果
1
2
3
4
5
Exception in thread "main" kotlinx.coroutines.JobCancellationException
检测
2、cancellable:取消不及时,速度太快了,增加监测机制
(1..10).asFlow().collect {
println(it)
if (it == 5) cancel()
}// 会输出1~10,才抛出异常
(1..10).asFlow().cancellable().collect {
println(it)
if (it == 5) cancel()
}// 可以正确捕获到
背压
buffer
1、背压是什么?
- 数据产生速度 >>> 数据消费速度,消耗过多时间
- 可能OOM
// 数据过多,会导致消费不来
fun getFlow() = flow {
(1..10).forEach {
delay(500L)
emit(it) // 一秒钟发射一个 一秒钟发射一个 ....
println("生成了:$it thread:${Thread.currentThread().name}")
}
}
// 消费慢
val t = measureTimeMillis {
getFlow().collect {
delay(1000L)
println("消费了:$it thread:${Thread.currentThread().name}")
}
}
println("上游 下游 共 消耗:$t 时间")
// 共消耗15495ms
// 都在一个线程处理,按顺序,放一个取一个
2、buffer:设立缓冲区,减少背压的数量【解决办法一】
fun getFlow() = flow {
(1..10).forEach {
delay(500L)
emit(it) // 一秒钟发射一个 一秒钟发射一个 ....
println("生成了:$it thread:${Thread.currentThread().name}")
}
}
.buffer(100) // 设置缓冲区,减少 背压
// 共消耗11272ms
3、flowOn(Dispatchers.IO):另一个线程处理【解决办法二】
- 可以和buffer一起使用
fun getFlow() = flow {
(1..10).forEach {
delay(500L)
emit(it) // 一秒钟发射一个 一秒钟发射一个 ....
println("生成了:$it thread:${Thread.currentThread().name}")
}
}
.buffer(100) // 设置缓冲区,减少 背压
.flowOn(Dispatchers.IO)
// 共消耗11001ms
conflate
1、conflate作用:只消费当前认为最新的值,会丢失部分信息
val t = measureTimeMillis {
getFlow().conflate().collect{
delay(1000L)
println("消费了:$it thread:${Thread.currentThread().name}")
}
}
println("上游 下游 共 消耗:$t 时间")
// 共消耗7303ms
collectLatest
1、collectLatest:只收集最新值,速度大幅度提升
val t = measureTimeMillis {
getFlow().collectLatest {
delay(1000L)
println("消费了:$it thread:${Thread.currentThread().name}")
}
}
println("上游 下游 共 消耗:$t 时间")
// 共消耗6869ms
transform
1、transform将上游数据转换后交给下游 ====> LiveData
listOf(100, 200, 300, 400, 500, 600)
.asFlow()
.transform {
this.emit("你好啊数字$it")
}.collect { println(it) }
take
1、take限制发送的长度,只要前面几个
listOf(100, 200, 300, 400, 500, 600)
.asFlow()
.take(4)
.collect { println(it) }
2、自定义take
- 对Flow扩展,调用collect收集结果
- 用结果构造出flow
fun <INPUT> Flow<INPUT>.myTake(number:Int):Flow<INPUT>{
require(number > 0){"Request element count 0 show be positive"}
return flow {
var i = 0
collect{// collect收集的n个数据,构造了flow{}
if(i++ < number){
return@collect emit(it)
}
}
}
}
reduce
末端操作符:适合累加
- reduce参数p1 = 上一次运算返回的最后一行
- 下面代码实现:1+2+3+4+…+100 = 5050
val r = (1..100)
.asFlow()
.reduce { p1, p2 ->
val result = p1 + p2
result
}
println(r)
fliter:过滤
(100..200).toFlow().filter { it % 50 == 0 }.map { "map result:$it" }.collect{ println(it) }
zip
1、zip合并Flow
fun getNames() = listOf("杜子腾", "史珍香", "刘奋").asFlow().onEach { delay(1000) }
fun getAges() = arrayOf(30, 40, 50).asFlow().onEach { delay(2000) }
// 合并 组合 操作符 zip
getNames().zip(getAges()) { p1, p2 ->
"name:$p1, age:$p2"
}.collect {
println(it)
}
输出:
name:杜子腾, age:30
name:史珍香, age:40
name:刘奋, age:50
2、zip合并的两个Flow数据长度不一样会怎么办?
fun getNames() = listOf("杜子腾", "史珍香", "刘奋").asFlow().onEach { delay(1000) }
fun getAges() = arrayOf(30, 40, 50, 60, 70).asFlow().onEach { delay(2000) }
zip之后输出结果:会抛弃不匹配的信息60、70
name:杜子腾, age:30
name:史珍香, age:40
name:刘奋, age:50
map
转换
flatmap
1、flatMapxxx作用是展平
- 不展平相当于 Flow嵌套,如:
Flow<Flow<String>>
- 需要两次收集:
collect { it.collect { a -> println(a)} }
手动展平
// 不展平相当于 Flow嵌套,如:Flow<Flow<String>>
// 这里发送两次事件,属于Flow
fun runWork(inputValue:Int) = flow {
emit("$inputValue 号员工开始工作了")
delay(1000L)
emit("$inputValue 号员工结束工作了")
}
(1..6).asFlow()
.onEach { delay(1000L)}
.map { runWork(it) } // Flow<Flow<String>> // Flow嵌套
.collect { it.collect { a -> println(a)} }
// 展平 操作符 flatMap
getNumbers()
.onEach { delay(1000L)}
// .flatMap { } // 已经废弃
.flatMapConcat { runWork(it) }
// .flatMapMerge { runWork(it) }
// .flatMapLatest { runWork(it) }
.collect { println(it) }
2、flatMapConcat:拼接,常用
3、flatMapMerge
4、flatMapLatest
merge
1、flow合并,执行,并且获得结果
- 数据请求函数
data class Home(val info1: String, val info2: String)
data class HomeRequestResponseResultData(val code: Int, val msg: String, val home: Home)
// 请求本地加载首页数据
fun CoroutineScope.getHomeLocalData(userName: String) = async (Dispatchers.IO) {
delay(3000)
Home("数据1...", "数据1...")
}
// 请求网络服务器加载首页数据
fun CoroutineScope.getHomeRemoteData(userName: String) = async (Dispatchers.IO) {
delay(6000)
Home("数据3...", "数据4...")
}
- map + merge,合并Flow,collect触发冷流
// 流程
// 1.把多个函数 拿过来
// 2.组装成协程
// 3.包装成FLow
// 4.Flow合并 得到 结果
coroutineScope {
val r = listOf(::getHomeLocalData, ::getHomeRemoteData) // 1.把多个函数 拿过来
.map {
it("Derry用户") //it.call("Derry用户") 需要引入Kotlin反射 2.组装成协程,调用
}.map {
flow { emit(it.await()) }// 3.包装成FLow
}
val r2 = r.merge() // 4.Flow合并 得到 结果
r2.collect { println(it) }
}
异常
catch
捕获上游的异常
- 用声明式
flow {
listOf(100).forEach { value ->
emit(value)
throw KotlinNullPointerException("上游抛出了异常")
}
}
.catch {
println("e:$it")
emit(200)
}
.onEach { delay(1000L) }
.collect { println(it) }
- Flow是流式的,catch不能捕获下游的异常
onCompletion
1、Flow正常结束,声明式
getNumbers().onCompletion { println("协程Flow结束了") }.collect{println(it)}
2、onCompletion来捕获异常结束:上游和下游都可以文章来源:https://www.toymoban.com/news/detail-657265.html
// 上游
getNumbers2().onCompletion {
if (it != null) { // 非正常结束 是异常结束
println("上游 发生了异常 $it")
}
}
.catch { println("被catch到了 上游 发生了异常 $it") } // .catch是能 捕获到 上游 抛出的异常, 异常的传递过程
.collect { println(it) }
3、异常总结文章来源地址https://www.toymoban.com/news/detail-657265.html
- 上游的异常抛出,可以使用 声明式
- 下游的异常抛出,可以使用 命令式
- onCompletion(声明式) 上游 与 下游 的异常信息,都能够知道 能够得到
- onCompletion(声明式) 正常的结束 还是 异常的结束,都能知道
- finally 能够知道正常的结束(命令式)
到了这里,关于Kotlin Flow 冷流的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!