首页 iOS.& Swift Books 结合:Swift的异步编程

18
自定义发布商&处理背压 由佛罗特州撰写

在你的学习之旅中的这一点上,你可能会觉得框架中错过了很多运营商。如果您有经验框架,这可能尤其如此,这通常提供内置和第三方的丰富的运营商生态系统。联合允许您创建自己的公开。这一过程一开始就可以令人难以置信,但放心,它完全在你的范围内!本章将向您展示如何。

第二个相关主题,您将在本章中了解 背压管理。这需要一些解释:这是什么背压的东西?那是一些背痛引起的太多靠在椅子上,仔细审查了组合代码吗?您将了解后压是什么,以及如何创建处理它的发布者。

创建自己的出版商

实施自己的出版商的复杂性可能因“轻松”到“漂亮涉及”而异。对于您实现的每个操作员,您将达到最简单的实现形式,以满足您的目标。在本章中,您将看到三种不同的方法来制作自己的出版商:

  • Using a simple extension method in the Publisher namespace.
  • Implementing a type in the Publishers namespace with a Subscription that produces values.
  • Samese如上所述,但具有从上游发布者转换值的订阅。

笔记:在没有自定义订阅的情况下,在技术上可以创建自定义公开。如果您这一点,您将失去应对订户需求的能力,这使您的出版商在结合生态系统中非法。早期取消也可以成为或问题。这不是推荐的方法,本章将教您如何正确编写发布。

出版商作为推广方法

第一个任务是通过重新定位现有运算符来实现一个简单的开放操作员。这就像你可以得到它一样简单。

extension Publisher {
  // 1
  func unwrap<T>() -> Publishers.CompactMap<Self, T> where Output == Optional<T> {
    // 2
    compactMap { $0 }
  }
}
func unwrap<T>()
-> Publishers.CompactMap<Self, T>
where Output == Optional<T> {

测试您的自定义操作员

现在,你可以测试你的新运营商。添加扩展下面这段代码:

let values: [Int?] = [1, 2, nil, 3, nil, 4]

values.publisher
  .unwrap()
  .sink {
    print("Received value: \($0)")
  }
Received value: 1
Received value: 2
Received value: 3
Received value: 4

订阅机制

订阅是组合的无名英雄:虽然您在到处都是出版商,但它们主要是无生命的实体。订阅发布者时,它实例化了一个订阅,该订阅负责从订阅者接收并生成事件的需求(例如,值和完成)。

发布者发出价值

In Chapter 11, “Timers,” you learned about Timer.publish() but found that using Dispatch Queues for timers was somewhat uneasy. Why not develop your own timer based on Dispatch’s DispatchSourceTimer?

struct DispatchTimerConfiguration {
  // 1
  let queue: DispatchQueue?
  // 2
  let interval: DispatchTimeInterval
  // 3
  let leeway: DispatchTimeInterval
  // 4
  let times: Subscribers.Demand
}

添加DispatchTimer发布者

You can now start creating your DispatchTimer publisher. It’s going to be straightforward because all the work occurs inside the subscription!

extension Publishers {
  struct DispatchTimer: Publisher {
    // 5
    typealias Output = DispatchTime
    typealias Failure = Never
    
    // 6
    let configuration: DispatchTimerConfiguration
    
    init(configuration: DispatchTimerConfiguration) {
      self.configuration = configuration
    }
  }
}
// 7
func receive<S: Subscriber>(subscriber: S)
  where Failure == S.Failure,
        Output == S.Input {
  // 8
  let subscription = DispatchTimerSubscription(
    subscriber: subscriber,
    configuration: configuration
  )
  // 9
  subscriber.receive(subscription: subscription)
}

建立您的订阅

订阅的作用是:

private final class DispatchTimerSubscription
  <S: Subscriber>: Subscription where S.Input == DispatchTime {
}

向您的订阅添加所需的属性

现在,添加这些属性,订阅类定义:

// 10
let configuration: DispatchTimerConfiguration
// 11
var times: Subscribers.Demand
// 12
var requested: Subscribers.Demand = .none
// 13
var source: DispatchSourceTimer? = nil
// 14
var subscriber: S?

初始化和取消订阅

现在,将初始化程序添加到订阅定义:

init(subscriber: S,
     configuration: DispatchTimerConfiguration) {
  self.configuration = configuration
  self.subscriber = subscriber
  self.times = configuration.times
}
func cancel() {
  source = nil
  subscriber = nil
}

让您的订阅请求值

你还记得你在第2章中学到了什么,“出版商&订阅者?“一旦订户通过订阅发布者获得订阅,它必须 要求 values from the subscription. This is where all the magic happens. To implement it, add this method to the class, above the cancel method:

// 15
func request(_ demand: Subscribers.Demand) {
  // 16
  guard times > .none else {
    // 17
    subscriber?.receive(completion: .finished)
    return
  }
}
// 18
requested += demand

// 19
if source == nil, requested > .none {
  
}

配置您的计时器

Add this code to the body of the if conditional:

// 20
let source = DispatchSource.makeTimerSource(queue: configuration.queue)
// 21
source.schedule(deadline: .now() + configuration.interval,
                repeating: configuration.interval,
                leeway: configuration.leeway)
// 22
source.setEventHandler { [weak self] in
  // 23
  guard let self = self,
        self.requested > .none else { return }

  // 24
  self.requested -= .max(1)
  self.times -= .max(1)
  // 25
  _ = self.subscriber?.receive(.now())
  // 26
  if self.times == .none {
    self.subscriber?.receive(completion: .finished)
  }
}

激活您的计时器

现在您配置为配置南部计时器,存储对其的引用并通过Gettings激活它:此代码 setEventHandler:

self.source = source
source.activate()
extension Publishers {
  static func timer(queue: DispatchQueue? = nil,
                    interval: DispatchTimeInterval,
                    leeway: DispatchTimeInterval = .nanoseconds(0),
                    times: Subscribers.Demand = .unlimited)
                    -> Publishers.DispatchTimer {
    return Publishers.DispatchTimer(
      configuration: .init(queue: queue,
                           interval: interval,
                           leeway: leeway,
                           times: times)
                      )
  }
}

测试您的计时器

你现在准备测试你的新计时器!

// 27
var logger = TimeLogger(sinceOrigin: true)
// 28
let publisher = Publishers.timer(interval: .seconds(1),
                                 times: .max(6))
// 29
let subscription = publisher.sink { time in
  print("Timer emits: \(time)", to: &logger)
}
+1.02668s: Timer emits: DispatchTime(rawValue: 183177446790083)
+2.02508s: Timer emits: DispatchTime(rawValue: 183178445856469)
+3.02603s: Timer emits: DispatchTime(rawValue: 183179446800230)
+4.02509s: Timer emits: DispatchTime(rawValue: 183180445857620)
+5.02613s: Timer emits: DispatchTime(rawValue: 183181446885030)
+6.02617s: Timer emits: DispatchTime(rawValue: 183182446908654)
DispatchQueue.main.asyncAfter(deadline: .now() + 3.5) {
  subscription.cancel()
}

出版商转换价值

您在建立结合技能方面取得了认真的进展!您现在可以开发自己的运营商,甚至相当复杂。学习的下一件事是如何创建从上游发布者转换值的订阅。这是完全控制出版商订阅Duo的关键。

实施ShareReplyPlayer员

To implement shareReplay() you’ll need:

// 1
fileprivate final class ShareReplaySubscription<Output, Failure: Error>: Subscription {
  // 2
  let capacity: Int
  // 3
  var subscriber: AnySubscriber<Output,Failure>? = nil
  // 4
  var demand: Subscribers.Demand = .none
  // 5
  var buffer: [Output]
  // 6
  var completion: Subscribers.Completion<Failure>? = nil
}

初始化您的订阅

接下来,将初始化程序添加到订阅定义:

init<S>(subscriber: S,
        replay: [Output],
        capacity: Int,
        completion: Subscribers.Completion<Failure>?)
        where S: Subscriber,
              Failure == S.Failure,
              Output == S.Input {
  // 7
  self.subscriber = AnySubscriber(subscriber)
  // 8
  self.buffer = replay
  self.capacity = capacity
  self.completion = completion
}

发送完成事件和卓越的价值用户

您需要一种将完成事件中继到订户的方法。将以下内容添加到订阅类以满足该需求:

private func complete(with completion: Subscribers.Completion<Failure>) {
  // 9
  guard let subscriber = subscriber else { return }
  self.subscriber = nil
  // 10
  self.completion = nil
  self.buffer.removeAll()
  // 11
  subscriber.receive(completion: completion)
}
private func emitAsNeeded() {
  guard let subscriber = subscriber else { return }
  // 12
  while self.demand > .none && !buffer.isEmpty {
    // 13
    self.demand -= .max(1)
    // 14
    let nextDemand = subscriber.receive(buffer.removeFirst())
    // 15
    if nextDemand != .none {
      self.demand += nextDemand
    }
  }
  // 16
  if let completion = completion {
    complete(with: completion)
  }
}
func request(_ demand: Subscribers.Demand) {
  if demand != .none {
    self.demand += demand
  }
  emitAsNeeded()
}

取消订阅

取消订阅甚至更容易。添加此代码:

func cancel() {
  complete(with: .finished)
}
func receive(_ input: Output) {
  guard subscriber != nil else { return }
  // 17
  buffer.append(input)
  if buffer.count > capacity {
    // 18
    buffer.removeFirst()
  }
  // 19
  emitAsNeeded()
}

完成订阅

现在,将追随方法添加到接受收集并将是ceptlete:

func receive(completion: Subscribers.Completion<Failure>) {
  guard let subscriber = subscriber else { return }
  self.subscriber = nil
  self.buffer.removeAll()
  subscriber.receive(completion: completion)
}

编码您的出版商

出版商are usually value types implemented as struct in the Publishers namespace. Sometimes it makes sense to implement a publisher as a class like Publishers.Multicast, which multicast() returns. For this publisher, you need a class, though this is the exception to the rule – most often, you’ll use a struct.

extension Publishers {
  // 20
  final class ShareReplay<Upstream: Publisher>: Publisher {
    // 21
    typealias Output = Upstream.Output
    typealias Failure = Upstream.Failure
  }
}

添加发布者所需的属性

Now, add the properties that your publisher will need to operate to the definition of ShareReplay:

// 22
private let lock = NSRecursiveLock()
// 23
private let upstream: Upstream
// 24
private let capacity: Int
// 25
private var replay = [Output]()
// 26
private var subscriptions = [ShareReplaySubscription<Output, Failure>]()
// 27
private var completion: Subscribers.Completion<Failure>? = nil

将值初始化和中继到您的发布者

首先,添加必要的初始化程序:

init(upstream: Upstream, capacity: Int) {
  self.upstream = upstream
  self.capacity = capacity
}
private func relay(_ value: Output) {
  // 28
  lock.lock()
  defer { lock.unlock() }

  // 29
  guard completion == nil else { return }

  // 30
  replay.append(value)
  if replay.count > capacity {
    replay.removeFirst()
  }
  // 31
  subscriptions.forEach {
    _ = $0.receive(value)
  }
}

让你的出版商知道什么时候完成

其次,添加此方法来处理完成事件:

private func complete(_ completion: Subscribers.Completion<Failure>) {
  lock.lock()
  defer { lock.unlock() }
  // 32
  self.completion = completion
  // 33
  subscriptions.forEach {
    _ = $0.receive(completion: completion)
  }
}
func receive<S: Subscriber>(subscriber: S)
  where Failure == S.Failure,
        Output == S.Input {
  lock.lock()
  defer { lock.unlock() }
}

创建订阅

接下来,将此代码添加到方法中向订阅并将其交给订阅:

// 34
let subscription = ShareReplaySubscription(
  subscriber: subscriber,
  replay: replay,
  capacity: capacity,
  completion: completion)

// 35
subscriptions.append(subscription)
// 36
subscriber.receive(subscription: subscription)

订阅发布者并处理其输入

您是Arder NOV准备订阅发布发布发布发布。你只需要做一次:当你收到时。

// 37
guard subscriptions.count == 1 else { return }

let sink = AnySubscriber(
  // 38
  receiveSubscription: { subscription in
    subscription.request(.unlimited)
  },
  // 39
  receiveValue: { [weak self] (value: Output) -> Subscribers.Demand in
    self?.relay(value)
    return .none
  },
  // 40
  receiveCompletion: { [weak self] in
    self?.complete($0)
  }
)
upstream.subscribe(sink)

添加便利算子

您的出版商是彗星!当然,你会更加削减:便利运营商来帮助ChainNap New Publisher。

extension Publisher {
  func shareReplay(capacity: Int = .max)
    -> Publishers.ShareReplay<Self> {
    return Publishers.ShareReplay(upstream: self,
                                  capacity: capacity)
  }
}

测试您的订阅

此代码添加到您的操场上测试新运营商的结尾:

// 41
var logger = TimeLogger(sinceOrigin: true)
// 42
let subject = PassthroughSubject<Int,Never>()
// 43
let publisher = subject.shareReplay(capacity: 2)
// 44
subject.send(0)
let subscription1 = publisher.sink(
  receiveCompletion: {
    print("subscription2 completed: \($0)", to: &logger)
  },
  receiveValue: {
    print("subscription2 received \($0)", to: &logger)
  }
)

subject.send(1)
subject.send(2)
subject.send(3)
let subscription2 = publisher.sink(
  receiveCompletion: {
    print("subscription2 completed: \($0)", to: &logger)
  },
  receiveValue: {
    print("subscription2 received \($0)", to: &logger)
  }
)

subject.send(4)
subject.send(5)
subject.send(completion: .finished)
var subscription3: Cancellable? = nil

DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
  print("Subscribing to shareReplay after upstream completed")
  subscription3 = publisher.sink(
    receiveCompletion: {
      print("subscription3 completed: \($0)", to: &logger)
    },
    receiveValue: {
      print("subscription3 received \($0)", to: &logger)
    }
  )
}
+0.02967s: subscription1 received 1
+0.03092s: subscription1 received 2
+0.03189s: subscription1 received 3
+0.03309s: subscription2 received 2
+0.03317s: subscription2 received 3
+0.03371s: subscription1 received 4
+0.03401s: subscription2 received 4
+0.03515s: subscription1 received 5
+0.03548s: subscription2 received 5
+0.03716s: subscription1 completed: finished
+0.03746s: subscription2 completed: finished
Subscribing to shareReplay after upstream completed
+1.12007s: subscription3 received 4
+1.12015s: subscription3 received 5
+1.12057s: subscription3 completed: finished

验证您的订阅

Fantastic! This works exactly as you wanted. Or does it? How can you verify that the publisher is being subscribed to only once? By using the print(_:) operator, of course! You can try it by inserting it before shareReplay.

let publisher = subject.shareReplay(capacity: 2)
let publisher = subject
  .print("shareReplay")
  .shareReplay(capacity: 2)
shareReplay: receive subscription: (PassthroughSubject)
shareReplay: request unlimited
shareReplay: receive value: (1)
+0.03004s: subscription1 received 1
shareReplay: receive value: (2)
+0.03146s: subscription1 received 2
shareReplay: receive value: (3)
+0.03239s: subscription1 received 3
+0.03364s: subscription2 received 2
+0.03374s: subscription2 received 3
shareReplay: receive value: (4)
+0.03439s: subscription1 received 4
+0.03471s: subscription2 received 4
shareReplay: receive value: (5)
+0.03577s: subscription1 received 5
+0.03609s: subscription2 received 5
shareReplay: receive finished
+0.03759s: subscription1 received completion: finished
+0.03788s: subscription2 received completion: finished
Subscribing to shareReplay after upstream completed
+1.11936s: subscription3 received 4
+1.11945s: subscription3 received 5
+1.11985s: subscription3 received completion: finished

处理背压

在流体动力学中,背压是 阻力或力相对的流体通过管道的期望流。在合并,这是抵抗反对从出版商未来价值的所需的流量。但是,这是什么阻力?通常情况下,它的时候用户需要处理出版商发出的价值。一些例子是:

使用暂停的水槽来处理背压

要开始,请切换到 暂停链接 操场的页面。

protocol Pausable {
  var paused: Bool { get }
  func resume()
}
// 1
final class PausableSubscriber<Input, Failure: Error>:
  Subscriber, Pausable, Cancellable {
  // 2
  let combineIdentifier = CombineIdentifier()
}
// 3
let receiveValue: (Input) -> Bool
// 4
let receiveCompletion: (Subscribers.Completion<Failure>) -> Void

// 5
private var subscription: Subscription? = nil
// 6
var paused = false
// 7
init(receiveValue: @escaping (Input) -> Bool,
     receiveCompletion: @escaping (Subscribers.Completion<Failure>) -> Void) {
  self.receiveValue = receiveValue
  self.receiveCompletion = receiveCompletion
}

// 8
func cancel() {
  subscription?.cancel()
  subscription = nil
}
func receive(subscription: Subscription) {
  // 9
  self.subscription = subscription
  // 10
  subscription.request(.max(1))
}

func receive(_ input: Input) -> Subscribers.Demand {
  // 11
  paused = receiveValue(input) == false
  // 12
  return paused ? .none : .max(1)
}

func receive(completion: Subscribers.Completion<Failure>) {
  // 13
  receiveCompletion(completion)
  subscription = nil
}
func resume() {
  guard paused else { return }

  paused = false
  // 14
  subscription?.request(.max(1))
}
extension Publisher {
  // 15
  func pausableSink(
    receiveCompletion: @escaping ((Subscribers.Completion<Failure>) -> Void),
    receiveValue: @escaping ((Output) -> Bool))
    -> Pausable & Cancellable {
    // 16
    let pausable = PausableSubscriber(
      receiveValue: receiveValue,
      receiveCompletion: receiveCompletion)
    self.subscribe(pausable)
    // 17
    return pausable
  }
}

测试你的新水槽

你现在可以尝试新的水槽!要使事物简单,模拟发布者应停止发送值的情况。添加此代码:

let subscription = [1, 2, 3, 4, 5, 6]
  .publisher
  .pausableSink(receiveCompletion: { completion in
    print("Pausable subscription completed: \(completion)")
  }) { value -> Bool in
    print("Receive value: \(value)")
    if value % 2 == 1 {
      print("Pausing")
      return false
    }
    return true
}
Receive value: 1
Pausing
let timer = Timer.publish(every: 1, on: .main, in: .common)
  .autoconnect()
  .sink { _ in
    guard subscription.paused else { return }
    print("Subscription is paused, resuming")
    subscription.resume()
  }
Receive value: 1
Pausing
Subscription is paused, resuming
Receive value: 2
Receive value: 3
Pausing
Subscription is paused, resuming
Receive value: 4
Receive value: 5
Pausing
Subscription is paused, resuming
Receive value: 6
Pausable subscription completed: finished

关键点

哇,这是一个漫长而复杂的篇章!你学到了很多出版商:

然后去哪儿?

您了解了出版商的内部工作,以及如何设置机器来写自己。当然,你写的任何代码 - 特别是出版商! - 应该彻底测试。继续前一章了解所有关于测试的组合代码!

有一个技术问题?想报告一个错误吗? 你可以问的问题和bug报告本书的作者在我们的官书论坛 这里.

有反馈分享在线阅读体验吗? 如果您有关于UI,UX,突出显示或我们在线阅读器的其他功能的反馈,您可以将其发送到设计团队,其中表格如下所示:

© 2021 Razeware LLC

您可以免费读取,本章的部分显示为 混淆了 文本。解锁这本书,以及我们整个书籍和视频目录,带有Raywenderlich.com的专业订阅。

现在解锁

要突出或记笔记,您需要在这里拥有订阅或本身订阅的书。