我们通常会这样定义一个 StateFlow:
val weatherState: StateFlow<Result<WeatherData>> = _weatherState
接下来,我们可以追踪其源码实现:
public interface StateFlow<out T> : SharedFlow<T> {
/**
* 当前该状态流的值
*/
public val value: T
}
可以看到,这只是一个接口定义。通过右键查看其实现类,可以找到具体实现:
private class StateFlowImpl<T>(
initialState: Any // T | NULL
) : AbstractSharedFlow<StateFlowSlot>(), MutableStateFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
private val _state = atomic(initialState)
kotlinx.atomicfu
这种写法在 Kotlin 中非常典型,它利用了 KotlinX Atomic(来自以下库)或类似的原子操作工具库来创建一个线程安全的可变状态变量。
kotlinx.coroutines
其中涉及的核心组件可能包括:
MutableStateFlow
结合原子性编程思想,这种方式能有效保障多线程环境下的数据一致性与安全性。虽然具体底层机制较为复杂,但我们可以理解为:它是为确保并发场景下状态更新的原子性和可见性而设计的。
atomicfu
在实际开发中,我们通常通过如下方式收集流的数据变化:
viewModel.weatherState.collect { result ->
when (result) {
is Result.Loading -> {
// 显示加载动画
// progressBar.visibility = View.VISIBLE
}
is Result.Success<WeatherData> -> {
// 隐藏加载动画,展示获取到的数据
// progressBar.visibility = View.GONE
val weatherData = result.data
// updateUi(weatherData)
}
is Result.Failure -> {
// 统一处理错误情况
// Toast.makeText(this@MainActivity, result.getErrorMsgOrNull(), Toast.LENGTH_SHORT).show()
result.handleFailure(this@MainActivity)
}
}
}
那么,collect 方法内部究竟做了什么?我们来看其核心逻辑:
override suspend fun collect(collector: FlowCollector<T>): Nothing {
val slot = allocateSlot()
try {
if (collector is SubscribedFlowCollector) collector.onSubscription()
val collectorJob = currentCoroutineContext()[Job]
var oldState: Any? = null // 上一次发射的值 T!! | NULL(null 表示尚未发射过)
// 循环结构设计为无需等待即可立即发送当前值
while (true) {
// 协程在此处可能被延迟调度,
// 因此使用最新状态以尽可能合并过时的值
val newState = _state.value
// 始终检查是否已被取消
collectorJob?.ensureActive()
// 使用相等性判断来进行值的合并(conflation)
if (oldState == null || oldState != newState) {
collector.emit(NULL.unbox(newState))
oldState = newState
}
}
该循环持续监听状态的变化,并通过比较新旧值决定是否触发发射,避免重复通知,从而提升性能和响应效率。
该函数接收一个函数式接口作为参数,用于执行我们预先定义的表达式。由于带有 suspend 修饰符,说明此函数只能在协程环境中调用。
在 while 循环内部,会通过 emit 触发我们设定的表达式逻辑。emit 本身是一个挂起函数,同时循环体中也会检查数据是否真正发生了变更,以确保状态更新的有效性。
// 注意:如果 awaitPending 被取消,则会退出当前循环,并调用 freeSlot if (!slot.takePending()) { // 首先尝试快速路径,避免挂起 slot.awaitPending() // 只有在需要获取新值时才进入挂起状态 } } } finally { freeSlot(slot) }
kotlinx.atomicfu
最后一部分的作用是启动一个挂起函数,使当前的 while 循环暂时挂起。一旦有新的数据到达,系统将通知该挂起函数完成任务,从而恢复循环并继续读取最新数据。
接下来看构造方法:
public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ?: NULL)
该构造函数要求传入的参数不可为空,因此必须提供一个初始值,以保证 StateFlow 的正确初始化。


雷达卡


京公网安备 11010802022788号







