楼主: 孙林珂
41 0

[程序分享] stateflow源代码详解 [推广有奖]

  • 0关注
  • 0粉丝

等待验证会员

学前班

80%

还不是VIP/贵宾

-

威望
0
论坛币
0 个
通用积分
0
学术水平
0 点
热心指数
0 点
信用等级
0 点
经验
30 点
帖子
2
精华
0
在线时间
0 小时
注册时间
2018-11-30
最后登录
2018-11-30

楼主
孙林珂 发表于 2025-11-21 18:31:18 |AI写论文

+2 论坛币
k人 参与回答

经管之家送您一份

应届毕业生专属福利!

求职就业群
赵安豆老师微信:zhaoandou666

经管之家联合CDA

送您一个全额奖学金名额~ !

感谢您参与论坛问题回答

经管之家送您两个论坛币!

+2 论坛币

我们通常会这样定义一个 StateFlow:

val weatherState: StateFlow<Result<WeatherData>> = _weatherState

接下来,我们可以追踪其源码实现:

public interface StateFlow<out T> : SharedFlow<T> {
    /**
     * 当前该状态流的值
     */
    public val value: T
}

可以看到,这只是一个接口定义。通过右键查看其实现类,可以找到具体实现:

private class StateFlowImpl<T>(
    initialState: Any // T | NULL
) : AbstractSharedFlow<StateFlowSlot>(), MutableStateFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
    private val _state = atomic(initialState)
kotlinx.atomicfu

这种写法在 Kotlin 中非常典型,它利用了 KotlinX Atomic(来自以下库)或类似的原子操作工具库来创建一个线程安全的可变状态变量。

kotlinx.coroutines

其中涉及的核心组件可能包括:

MutableStateFlow

结合原子性编程思想,这种方式能有效保障多线程环境下的数据一致性与安全性。虽然具体底层机制较为复杂,但我们可以理解为:它是为确保并发场景下状态更新的原子性和可见性而设计的。

atomicfu

在实际开发中,我们通常通过如下方式收集流的数据变化:

viewModel.weatherState.collect { result ->
    when (result) {
        is Result.Loading -> {
            // 显示加载动画
            // progressBar.visibility = View.VISIBLE
        }
        is Result.Success<WeatherData> -> {
            // 隐藏加载动画,展示获取到的数据
            // progressBar.visibility = View.GONE
            val weatherData = result.data
            // updateUi(weatherData)
        }
        is Result.Failure -> {
            // 统一处理错误情况
            // Toast.makeText(this@MainActivity, result.getErrorMsgOrNull(), Toast.LENGTH_SHORT).show()
            result.handleFailure(this@MainActivity)
        }
    }
}

那么,collect 方法内部究竟做了什么?我们来看其核心逻辑:

override suspend fun collect(collector: FlowCollector<T>): Nothing {
    val slot = allocateSlot()
    try {
        if (collector is SubscribedFlowCollector) collector.onSubscription()
        val collectorJob = currentCoroutineContext()[Job]
        var oldState: Any? = null // 上一次发射的值 T!! | NULL(null 表示尚未发射过)
        
        // 循环结构设计为无需等待即可立即发送当前值
        while (true) {
            // 协程在此处可能被延迟调度,
            // 因此使用最新状态以尽可能合并过时的值
            val newState = _state.value
            
            // 始终检查是否已被取消
            collectorJob?.ensureActive()
            
            // 使用相等性判断来进行值的合并(conflation)
            if (oldState == null || oldState != newState) {
                collector.emit(NULL.unbox(newState))
                oldState = newState
            }
        }

该循环持续监听状态的变化,并通过比较新旧值决定是否触发发射,避免重复通知,从而提升性能和响应效率。

该函数接收一个函数式接口作为参数,用于执行我们预先定义的表达式。由于带有 suspend 修饰符,说明此函数只能在协程环境中调用。

在 while 循环内部,会通过 emit 触发我们设定的表达式逻辑。emit 本身是一个挂起函数,同时循环体中也会检查数据是否真正发生了变更,以确保状态更新的有效性。

// 注意:如果 awaitPending 被取消,则会退出当前循环,并调用 freeSlot if (!slot.takePending()) { // 首先尝试快速路径,避免挂起     slot.awaitPending() // 只有在需要获取新值时才进入挂起状态 } } } finally {     freeSlot(slot) }

kotlinx.atomicfu

最后一部分的作用是启动一个挂起函数,使当前的 while 循环暂时挂起。一旦有新的数据到达,系统将通知该挂起函数完成任务,从而恢复循环并继续读取最新数据。

接下来看构造方法:

public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ?: NULL)

该构造函数要求传入的参数不可为空,因此必须提供一个初始值,以保证 StateFlow 的正确初始化。

二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

关键词:State flow ATE LOW TEF

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加好友,备注cda
拉您进交流群
GMT+8, 2025-12-30 10:14