服务器端迅速与蒸气 - 更新为蒸气4!

建立Web应用程序和API的最佳书籍,由蒸气创造者写。
为蒸气4完全更新 - 包括新章节。
自由开始阅读 - 今天!

首页 安卓& Kotlin Tutorials

和roid的Kotlin Flow:入门

在本教程中,您将了解Kotlin Flow的基础知识,您将构建一个Android应用程序,该应用程序使用Flow获取天气预报数据。

4.9/5 23个评分

版本

  • Kotlin 1.3,Android 5.0,Android Studio 3.6

Kotlin Flow.是一种新的流处理API 码头,该公司在Kotlin语言背后。这是一个实施 反应流规格是一项倡议,其目标是为异步流处理提供标准。 Jetbrains在顶部构建了Kotlin流量 Kotlin Coroutines..

通过使用流量来处理值流,可以通过编写一小位代码来以复杂的多线程方式转换数据!

在本教程中,您将有机会玩几种不同的处理集合方法,并且您将通过构建一个简单的天气应用程序来探索Kotlin Flow的功能。在此过程中,您将了解:

  • 数据收集和流。
  • 同步和异步API调用。
  • 热和冷数据流。
  • 流量处理过程中的例外处理。
笔记:本教程假设您对Android开发具有牢固了解。如果你对这个话题完全新的话,请查看我们的 开始Android开发 首先使用kotlin系列。

您还必须有基本的理解 Kotlin Coroutines. 与本教程一起进行。您可以查看这些教程以熟悉Coroutines:

本教程中的示例应用程序使用MVVM架构模式。 MVVM代表模型视图 - ViewModel,表示通过无功数据更新用户界面的模式。如果您不熟悉此模式,请查看我们的 在Android上的MVVM 录像机熟悉它。

入门

在本教程的第一部分,您将了解Kotlin Flow概念,使用Kotlin Playground应用程序作为......嗯,游乐场!然后,您将使用您学到的内容开发更强大的Android应用程序,显示通过API调用检索的数据。

使用该教程下载这些教程的材料 下载材料 页面顶部或底部的按钮。现在,打开 游乐场 - 起动器 项目in. 安卓Studio。这只是一个空的项目,将作为操场。

返回多个值

你可能知道这一点 暂停功能 可以异步返回一个值。使用暂停函数时,您不必担心线程,Coroutines API对您有关!

流动但是,可以返回 异步值,随着时间的推移值。 异步操作 是您需要等待的操作,例如网络请求。你永远不知道这些操作可能需要多长时间!他们可以在几毫秒到几秒钟到几秒钟来获得回应。任何长时间运行的操作都应该是异步的,因为积极等待它们可以冻结您的程序。

您将看到使用暂停功能的返回值与流API非常不同,有一些例子!

列表

打开 main.kt. 文件,并添加以下功能:

suspend fun getValues(): List<Int> {
  delay(1000)
  return listOf(1, 2, 3)
}

This function computes values and adds those values into a 列表. delay() simulates a long-running operation, like you would have using a remote API.

现在添加函数来处理这些值:

fun processValues() {
  runBlocking {
    val values = getValues()
    for (value in values) {
      println(value)
    }
  }
}

Call processValues() from main():

fun main() {
  processValues()
}

构建并运行代码,使用主函数旁边的绿色“播放”图标。您将在延迟一秒后获得此输出:

1
2
3

When you call getValues(), it returns a 列表 with three values. You then use those values in processValues(). Within a for loop, you iterate over the 列表 和 print out the values.

函数的视觉表示如下:

列表图

这对于三个值很好。但如果它需要花费大量时间来计算这些值并不是。在这种情况下,你必须等待 全部 要计算的值。如果每个值需要一秒钟,那么您将等待数千个价值的小时!

它效率低下,因为它增加了数据处理的额外延迟。理想情况下,您希望尽快开始处理每个列表项。 序列 让你这样做。

顺序

序列与列表非常相似。但与列表不同,他们是 懒洋洋地评估. This means they produce values as you iterate over them, instead of producing them all at once. Refactor getValues() to return a 顺序:

suspend fun getValues(): Sequence<Int> = sequence {
  Thread.sleep(250)
  yield(1)
  Thread.sleep(250)
  yield(2)
  Thread.sleep(250)
  yield(3)
}

Thread.sleep(250) 计算每个值时模拟延迟。

构建并运行项目。您将获得与以前一样的输出相同,但这次您不必等待所有值。你一次生成并消费它们:

1
2
3

Now, instead of waiting for all of the items, processValues() consumes each item as getValues() produces it.

序列 use Iterators under the hood and block while waiting for the next item. This works when returning a simple list, but what if your application needs to communicate with a 流媒体API.?

渠道

Streaming API几乎是与REST API的完全相反。与REST API通信时,您会提出请求,API发送响应。流媒体API有不同的工作方式。它连接到客户端并随着时间的推移,不断侦听新信息。例如,Twitter提供了一个流式API,您可以用来实时流传输推文。

You can use 顺序s for synchronous streams. But you need a different solution for asynchronous streams.

For asynchronous streams, you could use 渠道s from Kotlin Coroutines. Conceptually, you can think of channels as pipes. You send items through one pipe and receive a response through the other. However, a channel represents a 热的 价值流。再一次,热流立即开始产生值。

这引入了另一系列挑战。

热的与冷溪流

即使没有在另一边收听它们,也会产生值的频道。如果您没有收听流,则会失败。

在 the following diagram, getValues() emits the items via a channel. processValues() receives 1,2,3,然后它停止倾听物品。频道仍在产生这些项目,即使没有人在倾听:

序列图

在实践中,您可以使用频道具有打开的网络连接。但这可能导致内存泄漏。或者您可以忘记订阅频道,并“丢失”值。

即使没有人消耗它们,热门流量也会推动值。然而, 冷溪,只有在开始收集时才开始推动值!

Kotlin Flow. 是冷流的实施,由Kotlin Coroutines提供动力!

Kotlin Flow.基础知识

流量是异步产生值的流。此外,流量在内部使用科素。因此,它享有所有津贴 结构化并发.

With 结构化并发, coroutines live for a limited amount of time. This time is connected to the CoroutineScope you start your coroutines in.

当您取消范围时,您还会释放任何运行的Coroutines。同样的规则也适用于Kotlin流。当您取消范围时,还会处理流程。您不必手动释放内存! :]

Kotlin Flow.,Liveata和Rxjava之间存在一些相似之处。所有这些都提供了一种实现方法 观察者模式 在你的代码中。

  • LiveData. 是一个简单的可观察数据持有者。它最适合存储UI状态,例如项目列表。学习和合作很容易。但它不超过那样。
  • rxjava. 是一个非常强大的反应溪流工具。它具有许多特征和多种转型运营商。但它有一个陡峭的学习曲线!
  • 流动 落在Liveata和Rxjava之间的某个地方。这是非常强大的,但也很容易使用!流动API甚至看起来很像RXJava!

Kotlin Flow.和Rxjava都是实现的 反应流规格.

但是,Flow在内部使用Coroutines,并且没有RXJava的一些功能。部分是因为它不需要一些功能,部分是因为仍在开发了一些功能!

笔记: In Kotlin 1.3.0 release, core Flow APIs and basic operators are stable. APIs that are not stable have annotations @ExperimentalCoroutinesApi 或者 @FlowPreview.

现在你已经有足够的理论,是时候创造你的第一个流程了!

流动建设者

导航 main.kt. 在初学项目中。

您将首先创建一个简单的流程。要创建流程,您需要使用一个 流量建设者. You’ll start by using the most basic builder – flow { ... }. Add the following code above main():

val namesFlow = flow {
  val names = listOf("Jody", "Steve", "Lance", "Joe")
  for (name in names) {
    delay(100)
    emit(name)
  }
}

Make sure to add the imports from the kotlinx.coroutines package.

Here, you’re using flow() to create a Flow from a suspendable lambda block. Inside the block, you declare 姓名s 和 assigning it to a list of names.

Next, you used a for loop to go through the list of names and emit each name after a small delay. The Flow uses emit() send values to consumers.

There are other Flow builders that you can use for an easy Flow declaration. For example, you can use flowOf() to create a Flow from a fixed set of values:

val namesFlow = flowOf("Jody", "Steve", "Lance", "Joe")

或者您可以将各种集合和序列转换为流程:

val namesFlow = listOf("Jody", "Steve", "Lance", "Joe").asFlow()

流量运营商

此外,您可以使用运算符来转换流量,如您所能使用集合或序列。流程中有两种类型的操作员 - 中间的终端.

中间运营商

回到 main.kt. 和 add the following code to main():

fun main() = runBlocking {
  namesFlow
      .map { name -> name.length }
      .filter { length -> length < 5 }
    
  println()
}

在这里,您将使用早期的名称流,并将两个中间运算符应用于它:

  • map 将每个值转换为另一个值。在这里,您将名称值转换为它们的长度。
  • filter 选择符合条件的值。在这里,您选择了不到五个的值。

这里要注意的重要事项是每个操作员内部的代码块。这些代码块可以调用暂停功能!所以你也可以延迟这些块。或者您可以调用其他暂停功能!

在下面的图像上可以看到流程发生的情况:

流滤波图

流动将一次发出值。然后,您将每个运算符应用于每个值,一次一次,一次再次。最后,当您开始消耗值时,您将以相同的顺序收到它们。

通过单击主函数旁边的播放按钮构建和运行。

你会注意到没有发生任何事情!这是因为中间运营商很冷。当您对流的中间操作调用时,操作不会立即执行。相反,您返回转换的流量,仍然很冷。仅当您在最终流上调用终端运算符时才执行操作。

终端运营商

由于流量很冷,因此在调用终端运算符之前,它们不会产生值。终端运营商正在暂停启动的函数 收藏 流动。调用终端运算符时,您可以调用所有中间运算符及其:

如果您要收听流程,原始值会发生什么示例:

终端操作图

当您开始收集值时,您会一次获得一个,并且在等待新值时不会阻止!

现在回去了 main.kt. file and add the collect() 终端 operator:

fun main() = runBlocking {
  namesFlow
      .map { name -> name.length }
      .filter { length -> length < 5 }
      .collect { println(it) }

  println()
}

Since collect() is a suspending function, it can only be called from a coroutine or another suspending function. This is why you wrap the code with runBlocking().

通过单击播放按钮构建并运行代码。您将获得以下输出:

4
3

collect() 是最基本的终端运营商。它收集来自流的值并使用每个项目执行操作。在这种情况下,您正在将项目打印到控制台。还有其他终端运营商可用;在本教程后面,您将了解它们。

您可以查看最终代码 游乐场决赛 project.

既然你知道流程的基础,让我们搬到我们的天气应用程序,在那里你会看到Kotlin Flow做真正的工作! :]

在Android上流动

现在,您将在Android应用程序中应用您到目前为止学到的一切!这 Sunzoid. 应用程序是一个简单的天气应用程序,为特定城市显示预测。它从网络获取了天气数据,将其存储到数据库中以支持离线模式。

打开 Sunzoid.启动器 项目在Android Studio。构建并运行应用程序,您将看到一个空屏幕:

Sunzoid.初始屏幕

左上角有一个搜索图标。您可以点击它来输入特定位置。如果你现在这样做,什么都不会发生。但是挂断 - 你接下来要实现这个功能!

Starter项目中有一个公平的代码:

Sunzoid.项目结构

您将专注于应用程序中的Kotlin Flow。但如果你想要的话,你可以探索代码,并熟悉这个应用程序!

Starter项目遵循Google的应用程序架构指南。您可以在Android开发人员网站上找到指南 文件:

谷歌推荐的架构

版权所有2020 Google LLC

在该方案的顶部,有一个UI层与ViewModel体系结构组件通话。 ViewModel与数据存储库进行通信。存储库使用网络获取数据 改造。它将数据存储在本地 房间 数据库。最后,它将数据库数据暴露给ViewModel。

房间和改装,在最新版本中,支持Kotlin Coroutines。 STARTER项目设置为使用Coroutines使用它们。

您将使用Kotlin Flow将数据从数据库传递给ViewModel。然后,ViewModel将收集数据。您还将使用Coroutines和Flow来实现搜索功能。

获取数据

您将首先实现逻辑来获取预测数据。打开 hyeager.kt.. In onCreate(), add a call to fetchLocationDetails(), right below initUi():

homeViewModel.fetchLocationDetails(851128)

fetchLocationDetails() accepts a cityId as an argument. For now, you'll pass the hardcoded ID. You'll add a search feature later that will allow you to search for a specific location.

构建并运行项目。您仍然不会在屏幕上看到任何内容:

Sunzoid.空屏幕

但这时间该应用程序已经获取了预测数据并将其保存到房间数据库! :]

房间和流量

在2.1室,图书馆增加了Coroutine支持,以便进行一次性操作。房间2.2增加了可观察查询的流量支持。这使您可以随时添加或删除数据库中的条目的任何时间通知。

在当前实现中,只有用户可以触发数据获取。但是,您可以轻松实现调度和更新每三个小时的数据库的逻辑,例如。通过这样做,您确保您的UI及时了解最新数据。您将使用Kotlin Flow来收到表中的每一个更改。

插入数据库
打开 Forecastdao.kt. 和 add a call to getForecasts(). This method returns 流动<List<DbForecast>>:

@Query("SELECT * FROM forecasts_table")
fun getForecasts(): Flow<List<DbForecast>>

getForecasts() returns forecast data for a specific city from forecasts_table. Whenever data in this table changes, the query executes again and 流动 emits fresh data.

接下来,开放 weatherrepository.kt. 和 add a function called getForecasts:

fun getForecasts(): Flow<List<Forecast>>

接下来,添加实现 weatherrepositoryimpl.kt.:

override fun getForecasts() =
    forecastDao
      .getForecasts()
      .map { dbMapper.mapDbForecastsToDomain(it) }

This method uses the forecastDao to get data from the database. The database returns the database model. It's a good practice for every layer in the app to work with its own model. Using map(), you convert the database model to the Forecast domain model.

打开 homeviewmodel.kt. 和 add forecasts, like so:

//1
val forecasts: LiveData<List<ForecastViewState>> = weatherRepository
    //2
    .getForecasts()
    //3
    .map {
      homeViewStateMapper.mapForecastsToViewState(it)
    }
    //4
    .asLiveData()

这里有几件事:

  1. First, you declare forecasts of the LiveData.<List<ForecastViewState>> type. The Activity will observe changes in forecasts. forecasts could have been of the 流动<List<ForecastViewState>> type, but LiveData. is preferred when implementing communication between View and ViewModel. This is because LiveData. has internal lifecycle handling!
  2. Next, reference weatherRepository to get the Flow of forecast data.
  3. Then call map(). map() converts the domain models to the ForecastViewState model, which is ready for rendering.
  4. Finally, convert a 流动 to LiveData., using asLiveData(). This function is from the AndroidX KTX library for LifecycleLiveData..

背景保存和背压

流程的集合始终发生在父科素的上下文中。这种流量的属性被称为 背景保存. But you can still change the context when emitting items. To change the context of emissions you can use flowOn().

您可以有一个场景,其中流量会比收集器产生更快的事件可以消耗它们。在反应溪流中,这被称为 背压。由于它是基于科素的基础,Kotlin Flow支持盒子中的背压。当消费者处于暂停状态或忙于做一些工作时,生产者将认识到这一点。在此期间不会产生任何物品。

观察值

最后,开放 hyeager.kt. 和 observe forecasts from initObservers():

homeViewModel.forecasts.observe(this, Observer {
  forecastAdapter.setData(it)
})

每当预测数据库中的更改时,您将在观察者中收到新数据,并在UI上显示它。

构建并运行应用程序。现在主屏幕显示预测数据! :]

阳光与预测数据

Congratulations! You've implemented communication between multiple layers of your app using 流动LiveData.!

消除

homeviewmodel.kt., you're observing the forecasts. You've noticed that you never stop observing. How long is this observed, then?

在 this case, the Flow collection starts when LiveData. becomes active. Then, if LiveData. becomes inactive before the Flow completes, the flow collection is canceled.

The cancellation occurs after a timed delay unless LiveData. becomes active again before that timeout. The default delay triggering cancellation is 5000 milliseconds. You can customize the timeout value if necessary. The timeout exists to handle cases like Android configuration changes.

If LiveData. becomes active again after cancellation, the Flow collection restarts.

例外

流动 streams can complete with an exception if an emitter or code inside the operators throws an exception. catch() blocks handle exceptions within Flows. You can do this 或者 说法. A try-catch block on the collector's side is an example of an 至关重要的 方法。

它是必要的,因为这些捕获了发射器或任何运营商中发生的任何异常。

You can use catch() to handle errors 说法 反而。宣言意味着你 宣布 the function to handle errors. And you declare it within the Flow itself, and not a try-catch block.

打开 homeviewmodel.kt. 和 navigate to forecasts. Add catch() right before map(). To simulate errors in the stream, throw an exception from map():

val forecasts: LiveData<List<ForecastViewState>> = weatherRepository
    .getForecasts()
    .catch {
      // Log Error
    }
    .map {
      homeViewStateMapper.mapForecastsToViewState(it)
      throw Exception()
    }
    .asLiveData()

Build and run the app. You'll notice that the app crashes! catch() catches only upstream exceptions. That is, it catches exceptions from all the operators above the catch. catch() doesn't catch any exception that occurs after the operator.

Now move catch() below map():

val forecasts: LiveData<List<ForecastViewState>> = weatherRepository
    .getForecasts()
    .map {
      homeViewStateMapper.mapForecastsToViewState(it)
      throw Exception()
    }
    .catch {
      // Log Error
    }
    .asLiveData()

再次构建并运行应用程序。现在你会看到一个空屏幕:

Sunzoid.抛出异常

这是一个例子 例外透明度,您可以在其中分开从值集合中流动的例外处理。您也对异常透明,因为您没有隐藏任何错误,您将在操作员中显式处理它们!

Before proceeding, remove the line that throws an exception from map().

搜索位置

到目前为止,您的应用程序显示了硬编码位置的预测。现在,您将实现搜索功能!这将允许用户使用Coroutines和Flow搜索特定位置。作为搜索框中的用户类型,应用程序将对每个类型的字母进行搜索,并将更新搜索结果。

hyeager.kt., you already have a listener attached to the search view. When the user changes query text, the app sends the new value to queryChannel in homeviewmodel.kt.. homeviewmodel.kt. uses a BroadcastChannel as a bridge to pass the text from the view to the ViewModel. offer() passes the text and synchronously adds the specified element to the channel.

查询位置

Now add the logic for consuming the events from the channel as a 流动:

private val _locations = queryChannel
    //1
    .asFlow()
    //2
    .debounce(SEARCH_DELAY_MILLIS)
    //3
    .mapLatest {
      if (it.length >= MIN_QUERY_LENGTH) {
        getLocations(it)
      } else {
        emptyList()
      }
    }
    //4
    .catch {
      // Log Error
    }

以下是此代码块中发生的事情:

  1. First, the call to asFlow converts the 渠道 into a 流动.
  2. Next, debounce() waits for values to stop arriving for a given time period. This is used to avoid processing every single letter typed by users. Users usually type several letters in a row. You don't need to make a network request until the user stops typing. This ensures that you're performing the API call only after 500 milliseconds have passed with no typing!
  3. Then, mapLatest() performs the API call and returns location results. If the original flow emits a new value while the previous API call is still in progress, mapLatest() ensures that computation of the previous block is canceled. mapLatest() performs the API call only if the search query contains at least two characters.
  4. Finally, catch() handles errors.

Add locations to homeviewmodel.kt.。这允许您观察活动:

val locations = _locations.asLiveData()

Here you're using asLiveData() to collect values from the origin flow and add transform them to a LiveData. instance.

打开 hyeager.kt. 和 delete the call from onCreate() to homeViewModel.fetchLocationDetails(). Instead observe locations from initObservers():

private fun initObservers() {
  homeViewModel.locations.observe(this, Observer {
    locationAdapter.setData(it)
  })

  ...
}

再一次,构建和运行应用程序。现在输入搜索查询。您将看到查询生成的选项:

Sunzoid.与位置查询

继续使用任何选项。主屏幕将为新选定的位置显示预测数据。

为什么Kotlin Flow.

已经存在反应流规范的其他实施方式,例如Rxjava。那么为什么要使用Kotlin Flow?一个原因是您不能在Kotlin多平台项目中使用像rxjava这样的特定于jvm的库。但流程是Kotlin语言的一部分,因此在Kotlin多平台项目中使用它是理想的。

此外,Kotlin Flow的运算符较少,但它们更简单。单个操作员可以处理同步和异步逻辑,因为运营商接受可以暂停的代码块!

和Kotlin Flow与其他反应流和金属卷曲互操作。由于它构建在科图表之上,它提供了结构化并发性和取消的所有特权。结合暂停函数,它产生简单,可读,可理解的API。它还支持开箱即用的背压。

然后去哪儿?

您可以使用使用的最终项目 下载材料 本教程顶部或底部的按钮。

您已创建一个应用程序,该应用程序使用Kotlin Flow进行应用程序图层之间的通信。在此过程中,您了解到Kotlin Flow的内容以及与现有解决方案的不同之明。您还使用了流制造商来创建流程。您熟悉基本操作员来转换数据流。最后,您应用了终端运营商来消耗价值!

为了获得更多关于Kotlin Flow的深入知识,请查看我们的 Kotlin Flow.:入门 视频课程。您将了解有关流建设者,运营商和取消的更多信息。您还将了解如何使用流程,更改其上下文,添加缓冲区,组合多个流程以及处理异常!

我们希望您享受本教程!如果您有任何疑问或意见,请加入下面的论坛讨论。 :]

平均评级

4.9/5

为此内容添加评级

23 ratings

更像这样的

贡献者

注释