首页 安卓& Kotlin Books 用kotlin的反应性编程

2
可观察到 由Alex Sullivan撰写& Scott Gardner

现在你们都与rxjava设置了,是时候跳入并开始构建一些可观察者了!

在本章中,您将继续使用创建和订阅观察的一些不同示例。现在的事情将是非常理论的,但休息放心,你在本章中拿出的技能将在你开始通过现实世界的项目开始工作时非常方便。

入门

您将使用正常的Intellij IDEA项目通过这些理论示例进行操作。您将继续转到Android Studio Projects,然后切换到现实世界的Android应用程序。

使用 文件▸打开 Intellij Idea中的命令打开Starter项目的根文件夹。接受发生的任何弹出窗口的默认值,然后将打开项目。你主要在工作 main.kt. 文件在 SRC / MAIN / KOTLIN 项目文件夹。 For now, there’s just an empty main() function. You’ll fill it out as you progress through the chapter.

在开始潜入一些rxjava代码之前,请看看 supportcode.kt. file. It contains the following helper function exampleOf(description: String, action: () -> Unit):

fun exampleOf(description: String, action: () -> Unit) {
  println("\n--- Example of: $description ---")
  action()
}

您将使用此功能封装不同的示例,因为您通过本章工作。您将看到如何不久使用此功能。

但是,在你过于深入的之前,现在可能是回答问题的好时机:什么 an observable?

观察到是Rx的核心。您将花一些时间讨论观察到的内容,如何创建它们以及如何使用它们。

什么是可观察的?

您将看到“可观察”,“可观察序列”和“流”在RX中可互换使用。而且,真的,他们都是一样的。在rxjava,一切都是序列......

......或者的东西 作品 with a sequence. And an Observable 是 just a sequence with special powers. One of them, in fact the most important one, is that it is 异步。观察到产生事件,图书馆是指的过程是指的 发出, 过了一段时间。事件可以包含值,例如自定义类型的数字或实例,或者可以识别用户手势(例如抽Taps)。

概念化这一致的最佳方法之一是使用大理石图,这些大理石图是在时间轴上绘制的值。

左右箭头表示时间,编号圆圈表示序列的元素。可观察到的将发出元素1,一段时间将通过,然后它会发出2和3.多少时间,你问?它可以在 任何 点在观察到的寿命中 - 这将使您带到可观察到的生命周期。

可观察的生命周期

在以前的大理石图中,可观察到的三个元素。当可观察到的散发元素时,它会在被称为a中 下一页 event.

这是另一个大理石图,这次包括垂直条,表示这种可观察到的道路末端。

这可观察到的是三个龙头事件,然后结束。这被称为a 完全的 事件,正如序列现在一样 终止。例如,也许水龙头在被驳回的视野中。重要的是可观察到的已终止,它不能再发动任何东西。这是正常的终端。

但是,有时事情会出错。

这个大理石图中发生了错误;它由红色x表示。可观察到的发射 错误 包含错误的事件。这与可观察到的通常与a终止时不满 完全的 事件。如果可观察到的散发 错误 事件,它也被终止,不再能发出其他任何东西。

这是一个快速回顾:

  • 可观察的散发 下一页 包含元素的事件。它可以继续这样做,直到它是:
  • ......发出A. 完全的 事件终止它。
  • ......发出An. 错误 事件终止它。
  • 一旦可观察到终止,它就无法再发出事件。

既然你了解可观察到的是什么以及它所做的,你会创建一些观察到,以便在行动中看到它们。

创建观察到

从当前文件切换回 main.kt. and add the code below to the main() function. You’ll also need to include the import io.reactivex.rxjava3.core.Observable:

exampleOf("只是") {  
  val observable: Observable<Int> = Observable.just(1)
}

In the code above, you used the 只是 static method to create an observable with 只是 一个项目: 整数 1.

在RX中,将可观察到的方法称为 运营商 — so you just utilized the 只是 operator.

只是 恰当命名,因为它所确实是创建一个包含的可观察序列 只是 the provided elements. 只是 can take more than one item as well — try updating the previous line to take in a few more items:

val observable = Observable.just(1,2,3)

这次,您没有明确指定类型。你 可能 think that because you gave it several integers, the type is Observable<List<Int>>. However, if you hover over the Observable.just(1,2,3) expression and click 查看▸表达式 you’ll see that the type is actually Observable<Int>.

只是 has ten overloaded methods that take a variable number of arguments, each of which are eventually emitted by the observable. If you want to create an observable of type Observable<List<Int>>, then you can pass a List<Int> into the 只是 operator. Replace the observable you previously defined with the following:

val observable = Observable.just(listOf(1))

Now, hover over the Observable.just(listOf(1)) expression and click 查看▸表达式 again. You’ll see that the type is now Observable<List<Int>>. That means that this new observable will emit one item — and that single item will be a list of Int values. It can be a little tough to wrap your mind around an observable that emits lists, but with time it will become second nature.

Another operator you can use to create observables is fromIterable. Add this code to the bottom of the main() function:

exampleOf("fromIterable") {
  val observable: Observable<Int> =
    Observable.fromIterable(listOf(1, 2, 3))
}

The fromIterable operator creates an observable of individual objects from a regular list of elements. That is, it takes all of the items in the provided list and emits those elements as if you had instead written Observable.just(1, 2, 3).

Hover over the Observable.fromIterable(listOf(1, 2, 3)) expression and click 查看▸表达式 again. You’ll see that the type of this observable is Observable<Int> rather than Observable<List<Int>>.

fromIterable 如果您有想要转换为可观察序列的对象列表,可以方便。

Intellij Idea Console可能在您运行此代码时看起来很屏蔽。那是因为除了示例标题之外你还没有打印任何东西。是时候改变了 订阅 to observables.

订阅观察到

As an Android developer, you may be familiar with LocalBroadcastManager; it broadcasts notifications to observers, which are different than RxJava Observables. Here’s an example of of a broadcast receiver that listens for a custom-event Intent:

LocalBroadcastManager.getInstance(this)
    .registerReceiver(object : BroadcastReceiver() {
  override fun onReceive(context: Context?, intent: Intent?) {
    println("We got an intent!")
  }
}, IntentFilter("custom-event"))

订阅rxjava可观察到的是相似的;你叫观察可观察 订阅 to it. So instead of registerReceiver(), you use subscribe(). Unlike LocalBroadcastManager, where developers typically use only the getInstance() singleton instance, each observable in Rx is different.

更多的 importantly, an observable won’t send events until it has a subscriber. Remember that an observable is really a sequence definition; subscribing to an observable is more like calling 下一页() on an Iterator in the Kotlin Standard Library:

val sequence = 0 until 3
val iterator = sequence.iterator()
while (iterator.hasNext()) {
  println(iterator.next())
}

/* Prints:
0
1
2
*/

订阅观察到 is more streamlined than this, though. You can also add handlers for each event type an observable can emit. Recall that an observable emits 下一页, 错误, and 完全的 events. A 下一页 event passes the emitted element to the handler, and an 错误 event contains a throwable instance.

要在操作中查看此功能,请将此新示例添加到Intellij项目(将代码插入某处 最后一个例子的闭合卷曲括号):

exampleOf("subscribe") {
  val observable = Observable.just(1, 2, 3)
}

This is similar to the previous example, except, this time, you’re simply using the 只是 operator. Now add this code at the bottom of this example’s lambda, to subscribe to the observable:

observable.subscribe { println(it) }

cmd点击 on the subscribe operator, and you’ll see that it takes a Consumer of type Int as a parameter. Consumer 是 a simple interface that has one method, accept(), which takes a value and returns nothing. You’ll also see that subscribe returns a Disposable. You’ll cover disposables shortly.

跑 your main() function. The result of this subscription is that each event emitted by the observable prints out:

--- Example of: subscribe ---
1
2
3

笔记:当您运行项目时,控制台应自动显示,但您可以通过单击手动显示它 tab in the bottom left of the IntelliJ IDEA window after you run the main() function. You can also select 查看▸工具窗口▸运行. This is where the println statements display their output.

You’ve seen how to create observables of one element and of many elements. But what about an observable of zero elements? The 空的 operator creates an empty observable sequence with zero elements; it will only emit a 完全的 event.

将此新示例添加到项目中:

exampleOf("空的") {
  val observable = Observable.empty<Unit>()
}

An observable must be defined as a specific type if it can’t be inferred. So, since 空的 has nothing from which to infer the type, the type must be defined explicitly. In this case, Unit 是 as good as anything else. Add this code to the example to subscribe to it, importing io.reactivex.rxjava3.kotlin.subscribeBy to resolve the compile errors:

observable.subscribeBy(
  // 1
  onNext = { println(it) },
  // 2
  onComplete = { println("Completed") }
)

You’re using a new subscribeBy method here instead of the subscribe method you used previously. subscribeBy 是 a handy extension method defined in the RxKotlin library, which we’ll touch on later in the book. Unlike the subscribe method you used previously, subscribeBy lets you explicitly state what event you want to handle — onNext, onComplete, or onError. If you were to only supply the onNext field of subscribeBy, you’d be recreating the subscribe functionality you used above.

依次以每个编号的评论为:

  1. 明确处理 下一页 事件通过打印携带的值,就像之前一样。
  2. A 完全的 event doesn’t carry any value, so just print “Completed” instead.

跑 this new example. In the console, you’ll see that 空的 only emits the 完全的d event which makes the code print “Completed”:

--- Example of: empty ---
Completed

但是使用了什么 空的 observable? Well, they’re handy when you want to return an observable that immediately terminates or intentionally has zero values. As opposed to the 空的 operator, the 绝不 operator creates an observable that doesn’t emit anything and 绝不 终止。它可用于表示无限持续时间。将此示例添加到项目:

exampleOf("绝不") {
  val observable = Observable.never<Any>()

  observable.subscribeBy(
      onNext = { println(it) },
      onComplete = { println("Completed") }
  )
}

除示例头外,没有打印任何内容。甚至没有“完成”。你怎么知道这甚至是工作吗?坚持到那种好奇的精神直到 挑战 本章的一部分。

到目前为止,您已经使用了显式变量的可观察结果,但也可以从一系列值生成可观察到的。

将此示例添加到项目:

exampleOf("range") {
  // 1
  val observable: Observable<Int> = Observable.range(1, 10)

  observable.subscribe {
    // 2
    val n = it.toDouble()
    val fibonacci = ((Math.pow(1.61803, n) -
            Math.pow(0.61803, n)) /2.23606).roundToInt()
    println(fibonacci)
  }
}

按部分取缔部分:

  1. Create an observable using the range operator, which takes a start integer value and a count of sequential integers to generate.
  2. 计算和打印 NTH. 每个发出元素的斐波纳契号。

笔记: 这 斐波纳契序列 通过在序列中添加前两个数字中的每个数字来生成,从0和1:0,1,1,2,3,5,8,...

There’s actually a better place than in the subscribe method, to put code that transforms the emitted element. You’ll learn about that in Chapter 7, “转变运营商.”

Except for the 绝不() example, up to this point, you’ve been working with observables that automatically emit a 完全的d event and naturally terminate. This permitted you to focus on the mechanics of creating and subscribing to observables, but that swept an important aspect of subscribing to observables under the rug.

在继续之前,是时候做一些家务并处理这个方面了。

处理和终止

Remember that an observable doesn’t do anything until it receives a subscription. It’s the subscription that triggers an observable to begin emitting events, up until it emits an 错误 or 完全的d event and is terminated. You can manually cause an observable to terminate by canceling a subscription to it.

将此新示例添加到项目中:

exampleOf("赔货") {
  // 1
  val mostPopular: Observable<String> =
          Observable.just("A", "B", "C")
  // 2
  val subscription = mostPopular.subscribe {
    // 3
    println(it)
  }
}

简直:

  1. 创建一个字符串的可观察。
  2. Subscribe to the observable, this time saving the returned Disposable as a local constant called subscription.
  3. Print each emitted event in the handler.

To explicitly cancel a subscription, call 赔货() on it. After you cancel the subscription, or 赔货 其中,目前示例中的可观察将停止发出事件。

将此代码添加到示例的底部:

subscription.dispose()

Managing each subscription individually would be tedious, so RxJava includes a CompositeDisposable type. A CompositeDisposable holds disposables — typically added using the add() method — and will call 赔货() on all of them when you call 赔货() on the CompositeDisposable itself. Add this new example to the project. You’ll need to import io.reactivex.rxjava3.disposables.CompositeDisposable:

exampleOf("CompositeDisposable") {
  // 1
  val subscriptions = CompositeDisposable()
  // 2
  val disposable = Observable.just("A", "B", "C")
      .subscribe {
        // 3
        println(it)
      }
  // 4
  subscriptions.add(disposable)
  // 5
  subscriptions.dispose()
}

以下是这种一次性代码如何工作:

  1. Create a CompositeDisposable.
  2. 创造一个可观察和一次性的。
  3. 订阅可观察并打印出发出的物品。
  4. Add the Disposable return value from subscribe to the subscriptions CompositeDisposable.
  5. 处理一次性物品。

This is the pattern you’ll use most frequently: creating and subscribing to an observable and immediately adding the subscription to a CompositeDisposable.

Why bother with disposables at all? If you forget to call 赔货() on a Disposable when you’re done with the subscription, or in some other way cause the observable to terminate at some point, you will 大概 leak memory.

If you forget to utilize the Disposable returned by calling subscribe on an Observable, 安卓Studio 将很清楚,在Android项目中不对劲!

想象一下,泄漏了一个巨大的观点层次结构,因为你忘了取消订阅很长的行驶,你甚至不需要了!

创建运营商

In the previous examples, you’ve created observables with specific 下一页 event elements. Another way to specify all events that an observable will emit to subscribers is by using the create operator.

将此新示例添加到项目中:

exampleOf("create") {

  val disposables = CompositeDisposable()

  Observable.create<String> { emitter ->

  }
}

The create operator takes a single parameter named source. Its job is to provide the implementation of calling subscribe on the observable. In other words, it defines all the events that will be emitted to subscribers. 命令单击 on create to see it’s definition:

The source parameter is an ObservableOnSubscribe<T>. ObservableOnSubscribe 是 a SAM (Single Abstract Method) interface that exposes one method — subscribe. That subscribe method takes in an Emitter<T>, which has a few methods that you’ll use to build up the actual Observable. Specifically, it has onNext, onComplete, and onError methods that you can invoke.

Change the implementation of create to the following:

Observable.create<String> { emitter ->
    // 1
    emitter.onNext("1")

    // 2
    emitter.onComplete()

    // 3
    emitter.onNext("?")
}

这是游戏的游戏:

  1. Emit the string 1 via the onNext method.
  2. Emit a 完全的d event.
  3. Emit another string ? via the onNext method again.

Do you think the second onNext element (?) could ever be emitted to subscribers? Why or why not?

To see if you guessed correctly, subscribe to the observable by adding the following code on the next line after the create implementation:

.subscribeBy(
    onNext = { println(it) },
    onComplete = { println("Completed") },
    onError = { println(it) }
)

You’ve subscribed to the observable, now run the code. The result is that the first 下一页 event element and “Completed” print out. The second 下一页 event doesn’t print because the observable emitted a 完全的d event and terminated before it.

 --- Example of: create ---
1
Completed

Add the following line of code between the emitter.onNext and emitter.onComplete calls:

 emitter.onError(RuntimeException("Error"))

在进行这些更改后运行代码。可观察者发出错误,然后终止。

--- Example of: create ---
1
Error

What would happen if you emitted neither a 完全的d nor an 错误 event? Comment out the onComplete and onError lines of code to find out. Here’s the complete implementation:

exampleOf("create") {
  Observable.create<String> { emitter ->
    // 1
    emitter.onNext("1")

//    emitter.onError(RuntimeException("Error"))
    // 2
//    emitter.onComplete()

    // 3
    emitter.onNext("?")
  }.subscribeBy(
      onNext = { println(it) },
      onComplete = { println("Completed") },
      onError = { println("Error") }
  )
}

跑 those changes. Congratulations, you’ve just leaked memory! :] The observable will never finish, and since you never disposed of the Disposable returned by Observable.create the sequence will never be canceled.

 --- Example of: create ---
1
?

Feel free to uncomment the line adding the 完全的 event or dispose of the returned Disposable if you can’t stand leaving the code in a leaky state.

创造可观察的工厂

不是创建等待用户等待的可观察者,可以创建可观察到的工厂,该工厂为每个订户提供了新的可观察到的。

将此新示例添加到项目中:

exampleOf("defer") {

  val disposables = CompositeDisposable()
  // 1
  var flip = false
  // 2
  val factory: Observable<Int> = Observable.defer {
    // 3
    flip = !flip
    // 4
    if (flip) {
      Observable.just(1, 2, 3)
    } else {
      Observable.just(4, 5, 6)
    }
  }
}

这是解释:

  1. Create a Boolean flag to flip which observable to return.
  2. Create an observable of Int factory using the defer operator.
  3. Invert flip, which will be used each time factory 是 subscribed to.
  4. Return different observables based on whether fliptrue or false.

Externally, an observable factory is indistinguishable from a regular observable. Add this code to the bottom of the example to subscribe to factory four times:

for (i in 0..3) {
  disposables.add(
      factory.subscribe {
        println(it)
      }
  )
}

disposables.dispose()

跑 this code. Each time you subscribe to factory, you get the opposite observable. You get 123, then 456, and the pattern repeats each time a new subscription is created:

 --- Example of: defer ---
1
2
3
4
5
6
1
2
3
4
5
6

使用其他可观察类型

In addition to the normal Observable type, there are a few other types of observables with a narrower set of behaviors than regular observables. Their use is optional; you can use a regular observable anywhere you might use one of these specialized observables. Their purpose is to provide a way to more clearly convey your intent to readers of your code or consumers of your API. The context implied by using them can help make your code more intuitive.

There are three special types of observables in RxJava: Single, Maybe and Completable. Without knowing anything more about them yet, can you guess how each one is specialized?

  • Singles will emit either a success(value) or 错误 事件。 success(value) 是 actually a combination of the 下一页 and 完全的d events. This is useful for one-time processes that will either succeed and yield a value or fail, such as downloading data or loading it from disk.

  • A Completable will only emit a 完全的d or 错误 事件。 It doesn’t emit any value. You could use a Completable when you only care that an operation completed successfully or failed, such as a file write.

  • And Maybe 是 a mash-up of a Single and Completable. It can either emit a success(value), 完全的d, or 错误. If you need to implement an operation that could either succeed or fail, and optionally return a value on success, then Maybe 是 your ticket.

You’ll have an opportunity to work more with these special observable types in Chapter 4, “Observables and实践中的主题,” and beyond. For now, you’ll run through a basic example of using a Single to load some text from a text file named 版权所有.txt.,因为谁偶尔爱一些法律呢?此文件位于 SRC. 项目文件夹。

Add this example to main(), importing io.reactivex.rxjava3.core.Single when you do:

exampleOf("Single") {
  // 1
  val subscriptions = CompositeDisposable()
  // 2
  fun loadText(filename: String): Single<String> {
    // 3
    return Single.create [email protected]{ emitter ->

    }
  }
}

这是您在本准则中的所作所为:

  1. 创建稍后使用的复合一次性。
  2. Implement a function to load text from a file on disk that returns a Single.
  3. Create and return a Single.

Add this code inside the create lambda to complete the implementation:

// 1
val file = File(filename)
// 2
if (!file.exists()) {
  emitter.onError(FileNotFoundException("Can’t find $filename"))
  [email protected]
}
// 3
val contents = file.readText(Charsets.UTF_8)
// 4
emitter.onSuccess(contents)

从一开始:

  1. Create a new File from the filename.
  2. If the file doesn’t exist, emit a FileNotFoundException via the onError method and return from the create method.
  3. 从文件中获取数据。
  4. 发出文件的内容。

现在,您可以将此功能置于工作。将此代码添加到示例:

// 1
val observer = loadText("版权所有.txt.")
    // 2
    .subscribeBy(
        // 3
        onSuccess = { println(it) },
        onError = { println("Error, $it") }
    )

subscriptions.add(observer)

在这里,你:

  1. Call loadText(), passing the root name of the text file.
  2. Subscribe to the Single it returns.
  3. Pass onSuccess and onError lambdas to the subscribeBy method, either printing the contents of the file or printing the error.

运行该示例,您应该从打印到控制台的文件中查看文本,与项目顶部的版权评论相同:

 --- Example of: Single ---
Copyright (c) 2014-2020 Razeware LLC
...

尝试将文件名更改为其他内容,您应该获取未找到的文件,而不是打印的异常。

挑战

练习 永恒的。通过在本书中完成挑战,您将练习您在每章中学到的内容,并在与可观察品一起使用的内容更多的知识。为每项挑战提供了初学项目以及成品版本。享受!

挑战:执行副作用

In the 绝不 operator example earlier, nothing printed out. That was before you were adding your subscriptions to composite disposables, but if you 将其添加到一个中,您可以使用方便运算符在一次性处理时打印消息。

Operators that begin with doOn, such as the doOnDispose operator, allows you to insert 副作用; that is, you add handlers that take some action but that won’t affect the observable. For doOnDispose, that is whenever the disposable is disposed of.

There’s a few other handy doOn methods that you can use. There’s a doOnNext method, a doOnComplete method, a doOnError method and a doOnSubscribe method that you can also use to perform some side effect at the right moment.

To complete this challenge, insert the doOnSubscribe operator in the 绝不 example. Feel free to include any of the other handlers if you’d like; they work just like doOnSubscribe’s handler does.

虽然您在它,但创建一个复合一次性并将订阅添加到它。

别忘了你总是可以偷看完成的挑战项目“灵感”。

关键点

  • 一切都是A. 顺序 in RxJava, and the primary sequence type is Observable.
  • 观察到开始时开始发出 订阅 to.
  • 你必须 赔货 of subscriptions when done with them, and you’ll often use a CompositeDisposable to do so.
  • Single, Completable and Maybe are specialized observable types that are handy in certain situations.

有一个技术问题?想报告一个错误吗? 您可以向官方书籍论坛中的书籍作者提出问题和报告错误 这里.

有反馈分享在线阅读体验吗? 如果您有关于UI,UX,突出显示或我们在线阅读器的其他功能的反馈,您可以将其发送到设计团队,其中表格如下所示:

© 2021 Razeware LLC