Publisher mit Timer - wurzelsand/swift-memos GitHub Wiki

Publisher mit Timer

Aufgabe 1: delay

Gegeben ist ein Publisher, der alle 2 Sekunden die aktuelle Zeit publiziert:

Timer.publish(every: 2, on: .main, in: .common)

Ein PassthroughSubject soll sich bei diesem Publisher so einschreiben, dass alle zwei Sekunden von 1 weiter aufwärts gezählt wird. Ein zweiter Publisher soll das gleiche tun, aber um eine halbe Sekunde verzögert.

Ausführung

import Combine
import Foundation

var subscriptions = Set<AnyCancellable>()

let publisher = PassthroughSubject<Int, Never>()
let delayedPublisher = publisher.delay(for: .seconds(0.5), scheduler: DispatchQueue.main)

publisher
    .sink { print("value:", $0) }
    .store(in: &subscriptions)

delayedPublisher
    .sink { print("delayed:", $0) }
    .store(in: &subscriptions)

Timer.publish(every: 2, on: .main, in: .common)
    .autoconnect()
    .scan(0) { count, date in
        return count + 1
    }
    .subscribe(publisher)
    .store(in: &subscriptions)

print("Run")
RunLoop.main.run(until: Date(timeIntervalSinceNow: 6))

Ausgabe:

Run
value: 1
delayed: 1
value: 2
delayed: 2
value: 3
Program ended with exit code: 0

Beschreibung: value: 1 erscheint zwei Sekunden nach Run. Eine halbe Sekunde später erscheint delayed: 1 usw. Direkt nach 6 Sekunden wird RunLoop.main verlassen und noch gerade value: 3 ausgegeben.

Aufgabe 2: collect

  • Nimm die Ausführung aus Aufgabe 1.

Erzeuge aus publisher einen weiteren Publisher, der dessen Publizierungen 4 Sekunden lang sammelt und dann als Array ausgibt.

Ausführung

let collectedPublisher = publisher.collect(.byTime(DispatchQueue.main, .seconds(4)))

collectedPublisher
    .sink { print("collected:", $0) }
    .store(in: &subscriptions)

Ausgabe:

collected: [1, 2]
collected: [3, 4]
...

Anmerkungen

  • Ich kann das Sammeln der Werte zusätzlich zur Zeit auf eine maximale Anzahl der Werte beschränken:

    let collectedPublisher = publisher.collect(.byTimeOrCount(DispatchQueue.main, .seconds(4), 3))

Aufgabe 3: debounce

Ich möchte die Tastatureingabe eines Benutzers simulieren, bei der die eingegebenen Buchstaben immer so lange gesammelt werden, bis die Eingabe für eine Sekunde pausiert. Erst dann werden die gesammelten Buchstaben ausgegeben. Als Model verwende ich ein Array aus Tuples:

let letters: [(TimeInterval, String)] =
    [(0, "H"), (0.2, "e"), (0.2, "l"), (1.5, "l"), (0.2, "o")]

Simuliert werden soll hier: Der Benutzer gibt sofort den Buchstaben "H" ein (0 Sekunden), 0,2 Sekunden später folgt der Buchstabe "e" und wieder 0,2 Sekunden später der Buchstabe "l". Dann pausiert der Benutzer 1,5 Sekunden lang bevor er mit der Eingabe fortfährt.

Dazu erweitere ich die Klasse Subject:

extension Subject where Output == String {
    func feed(with data: [(TimeInterval, String)], overtime: TimeInterval = 2) {
        guard let entry = data.first else {
            DispatchQueue.global().asyncAfter(deadline: .now() + overtime) { // #1
                self.send(completion: .finished)
            }
            return
        }
        let remaining = Array(data.dropFirst())
        DispatchQueue.global().asyncAfter(deadline: .now() + entry.0) {
            self.send(entry.1)
            self.feed(with: remaining, overtime: overtime)
        }
    }
}

Erstelle nun zwei Subscriptions: Eine, die die gesammelten Buchstaben sofort nach der simulierten Eingabe ausgibt und eine andere, die immer erst nach einer einsekündigen Pause die Eingabe ausgibt.

Ausführung

import Foundation
import Combine

var subscriptions = Set<AnyCancellable>()

let letters: [(TimeInterval, String)] =
    [(0, "H"), (0.2, "e"), (0.2, "l"), (1.5, "l"), (0.2, "o")]

extension Subject where Output == String {
    func feed(with data: [(TimeInterval, String)], overtime: TimeInterval = 2) {
        guard let entry = data.first else {
            DispatchQueue.global().asyncAfter(deadline: .now() + overtime) { // #1
                self.send(completion: .finished)
            }
            return
        }
        let remaining = Array(data.dropFirst())
        DispatchQueue.global().asyncAfter(deadline: .now() + entry.0) {
            self.send(entry.1)
            self.feed(with: remaining, overtime: overtime)
        }
    }
}

let subject = PassthroughSubject<String, Never>()
let scan = subject
    .scan("", +)
    .share() // #2

scan
    .sink {
        print("immediately:", $0)
    }
    .store(in: &subscriptions)

scan
    .debounce(for: .seconds(1), scheduler: DispatchQueue.main)
    .sink(receiveCompletion: { print("completed:", $0) },
          receiveValue: { print("debounced:", $0) })
    .store(in: &subscriptions)

subject.feed(with: letters)

RunLoop.main.run(until: Date(timeIntervalSinceNow: 5))

Ausgabe:

immediately: H
immediately: He
immediately: Hel
debounced: Hel
immediately: Hell
immediately: Hello
debounced: Hello
completed: finished
Program ended with exit code: 0

Anmerkungen:

  1. overtime: .finished darf nicht sofort nach dem letzten Element gesendet werden, damit .debounce noch die Möglichkeit hat, die Pause abzuwarten.

  2. share ist hier wahrscheinlich nicht notwendig. Der share-Operator sorgt nur dafür, dass der Publisher eine einzelne Subscription mehreren Subscribern per Referenz vergeben kann. Ein Beispiel, das den Unterschied zeigt zwischen einem Publisher mit und einem ohne share-Operator:

    import Foundation
    import Combine
    
    var subscriptions = Set<AnyCancellable>()
    
    var count = 0
    
    let publisher = Timer.publish(every: 1, on: .main, in: .common)
        .autoconnect()
        .map { _ -> Int in
            count += 1
            return count
        }
        //.share()
    
    publisher
        .sink { print("#1:", $0) }
        .store(in: &subscriptions)
    
    publisher
        .sink { print("#2:", $0) }
        .store(in: &subscriptions)
    
    RunLoop.main.run(until: Date(timeIntervalSinceNow: 3))

    Ausgabe ohne share:

    #2: 1
    #1: 2
    #2: 3
    #1: 4
    #2: 5
    #1: 6
    

    Ausgabe mit share:

    #2: 1
    #1: 1
    #2: 2
    #1: 2
    #2: 3
    #1: 3
    
  • Der Begriff Entprellen (debounce) kommt aus dem Bereich der Elektrotechnik. Es steht für das Verfahren, den Effekt des Prellens elektromechanischer Schalter zu verhindern. Beim Prellen kann ein elastisches Zurückprallen der Federung einer Taste einen Störeffekt verursachen.

Aufgabe 4: throttle

Ich nehme wieder ein Subject als übergeordneten Publisher und einen zweiten Publisher, der diesmal die Sendungen des ersten Publishers drosseln soll. Dieser zweite Publisher soll zum einen nur dann publizieren, wenn sein übergeordneter Publisher einen neuen Wert heraus bringt. Zum anderen soll maximal einmal pro Sekunde ein neuer Wert publiziert werden. Zwischen den Publikationen soll also mindestens 1 Sekunde liegen. Ein Sonderfall ist der erste Wert: Dieser soll sofort publiziert werden, wenn der übergeordnete Publisher ihn sendet.

Ich nehme ein Model wie in Aufgabe 3:

let letters: [(TimeInterval, String)] =
[(0.5, "#1"), (2.0, "#2"), (0.1, "#3"), (0.1, "#4"), (0.5, "#5")]

0.5 Sekunden nach Programmstart soll der übergeordnete Publisher "#1" heraus geben. Mein Throttle-Publisher soll, da es sein erster Wert ist, diesen sobald wie möglich publizieren. Danach verstreicht eine Sekunde, ohne dass es einen neuen Wert gibt. Erst nach einer weiteren Sekunde wird "#2" heraus gebracht. Sofort soll dieser Wert vom Throttle-Publisher publiziert werden, da ja bereits die Minimaldifferenz von einer Sekunde überschritten ist. Nach jeweils einer zehntel Sekunde werden die Werte "#3" und "#4" heraus gebracht. Diese sollen aber nicht sofort publiziert werden, da noch keine ganze Sekunde vergangen ist, seitdem der letzte Wert publiziert wurde. Auch nach einer weiteren halben Sekunde wird "#5" nicht sofort publiziert. Erst nach 3 weiteren zehntel Sekunden ist die Minimaldifferenz von einer Sekunde ganz vergangen. Welcher Wert soll nun publiziert werden? Ich entscheide mich dafür, dass immer der zuletzt herausgebrachte Wert publiziert werden soll. Das wäre also die "#5"

Ausführung

import Foundation
import Combine

var subscriptions = Set<AnyCancellable>()

let letters: [(TimeInterval, String)] =
[(0.5, "#1"), (2.0, "#2"), (0.1, "#3"), (0.1, "#4"), (0.5, "#5")]

extension Subject where Output == String {
    func feed(with data: [(TimeInterval, String)], overtime: TimeInterval = 2) {
        guard let entry = data.first else {
            DispatchQueue.global().asyncAfter(deadline: .now() + overtime) {
                self.send(completion: .finished)
            }
            return
        }
        let remaining = Array(data.dropFirst())
        DispatchQueue.global().asyncAfter(deadline: .now() + entry.0) {
            self.send(entry.1)
            self.feed(with: remaining, overtime: overtime)
        }
    }
}

let subject = PassthroughSubject<String, Never>()

let throttled = subject
    .throttle(for: .seconds(1), scheduler: DispatchQueue.main, latest: true) // #1

throttled
    .sink {
        print($0)
    }
    .store(in: &subscriptions)

subject.feed(with: letters)

RunLoop.main.run(until: Date(timeIntervalSinceNow: 4))

Ausgabe:

#1
#2
#5
Program ended with exit code: 0

Anmerkungen

  1. Wenn ich immer den ersten Wert nach der letzten Publikation senden möchte, wähle ich latest: false.

Aufgabe 5: timeout

Wieder habe ich einen übergeordneten Publisher und einen zweiten Publisher. Der zweite Publisher soll mit einem Fehler beendet werden, wenn er länger als eine Sekunde auf einen Wert des übergeordneten Publishers warten musste.

Ausführung

  • extension Subject where Output == String mit func feed(with data: [(TimeInterval, String)], overtime: TimeInterval = 2) wie oben.
import Foundation
import Combine

var subscriptions = Set<AnyCancellable>()

let letters: [(TimeInterval, String)] =
[(0.5, "#1"), (0.5, "#2"), (0.5, "#3"), (2.0, "#4"), (0.5, "#5")]

enum MyError: Error {
    case timeout
}

let subject = PassthroughSubject<String, MyError>()

let timedOut = subject.timeout(.seconds(1), scheduler: DispatchQueue.main, customError: { .timeout }) // #1

timedOut
    .sink(receiveCompletion: { print("completed:", $0) },
          receiveValue: { print($0) })
    .store(in: &subscriptions)

subject.feed(with: letters)

RunLoop.main.run(until: Date(timeIntervalSinceNow: 5))

Ausgabe:

#1
#2
#3
completed: failure(.MyError.timeout)
Program ended with exit code: 0

Anmerkungen:

  1. Auf customError kann ich auch verzichten, wenn der Publisher ohne einen Fehler beendet werden soll.

Aufgabe 6: measureInterval

Schreibe einen Timer der jede halbe Sekunde das aktuelle Datum publiziert. Verbinde 3 Subscriber:

  • Der erste soll die Daten des Timers zählen und jede halbe Sekunde den nächsten Zähler ausgeben (1, 2, 3, ...).

  • Der zweite misst das Zeitintervall zur vorherigen Publikation im DispatchQueue.

  • Der dritte misst das Zeitintervall zur vorherigen Publikation im RunLoop.

Nach 3 Sekunden wird das Programm abgebrochen.

Ausführung

import Foundation
import Combine

var subscriptions = Set<AnyCancellable>()

let publisher = Timer.publish(every: 0.5, on: .main, in: .common)
    .autoconnect()

publisher
    .scan(0) { count, date in count + 1 }
    .sink { print($0, "received") }
    .store(in: &subscriptions)
    
publisher
    .measureInterval(using: DispatchQueue.main)
    .sink { print("DispQue", Double($0.magnitude) * 1e-9) } // #1
    .store(in: &subscriptions)

publisher
    .measureInterval(using: RunLoop.main)
    .sink { print("RunLoop", $0.magnitude) }
    .store(in: &subscriptions)

RunLoop.main.run(until: Date(timeIntervalSinceNow: 3))

Ausgabe:

1 received
DispQue 0.5026756600000001
RunLoop 0.5026719570159912
2 received
DispQue 0.500365972
RunLoop 0.5003399848937988
3 received
DispQue 0.49698649800000005
RunLoop 0.49706006050109863
4 received
DispQue 0.500792282
RunLoop 0.5007069110870361
5 received
DispQue 0.501335313
RunLoop 0.501331090927124
6 received
DispQue 0.502850804
RunLoop 0.5028469562530518
Program ended with exit code: 0

Anmerkungen

  1. Im DispatchQueue wird das Zeitintervall nicht in Sekunden sondern in Nanosekunden gemessen.
⚠️ **GitHub.com Fallback** ⚠️