Combine 맛보기 - Team-HGD/SniffMEET GitHub Wiki

Publisher

시간이 지남에 따라 일련의 값을 전달할 수 있는 유형 (프로토콜)

protocol Publisher<Output, Failure> {
    associatedtype Output
    associatedtype Failure : Error
    func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input
}
  • AsyncSequece와의 차이점

    Publisher → Subscriber가 element를 Publisher에 요청

    AsyncSequence → 얘가 publish한 요소를 반복

    Combine만 debounce, throttle, merge, combineLastest 와 같은 결합 연산을 제공함.

퍼블리셔를 어케 만들까요?

  1. Publisher 프로토콜 채택해서 직접 만들기
  2. 이미 제공된 Publisher 사용하기
    • 값이 요청될 때 값을 publish 하는 PassthroughSubject
    • 기본값을 업데이트할 때마다 publish 하는 CurrentValueSubject
    • @Published annotation을 추가할 수도 있다. 다만, 이친구는 willSet 시점에 objectWillChange를 호출한다. (주로 SwiftUI에 적합한 방식)

Operator

upstream

데이터 생성하는 부분

downstream

생성된 데이터를 받거나 처리하는 부분

Combine을 사용하는 이유 중 하나.

역시나 Publisher 구체타입 인스턴스.

extension을 통해 인스턴스를 반환하고 있는 형태

  • Publishers.Map
struct Map<Upstream, Output> where Upstream : Publisher

init(
    upstream: Upstream,
    transform: @escaping (Upstream.Output) -> Output
)

func map<T>(
	_ transform: @escaping (Self.Output) -> T
) -> Publishers.Map<Self, T>

// 이런 방식으로 구현되어 있음. 
extension Publisher {
	func map<T>(
	_ transform: @escaping (Self.Output) -> T
) -> Publishers.Map<Self, T> {
		Publishers.Map(upstream: self, transform)
	}
}

// 실제 사용 시 
publisher.map { 
	// ,,, 중략 
}

subject

@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public protocol Subject<Output, Failure> : AnyObject, Publisher {

    /// Sends a value to the subscriber.
    ///
    /// - Parameter value: The value to send.
    func send(_ value: Self.Output)

    /// Sends a completion signal to the subscriber.
    ///
    /// - Parameter completion: A `Completion` instance which indicates whether publishing has finished normally or failed with an error.
    func send(completion: Subscribers.Completion<Self.Failure>)

    /// Sends a subscription to the subscriber.
    ///
    /// This call provides the ``Subject`` an opportunity to establish demand for any new upstream subscriptions.
    ///
    /// - Parameter subscription: The subscription instance through which the subscriber can request elements.
    func send(subscription: any Subscription)
}

Subscriber

Publisher로부터 input을 받을 수 있는 프로토콜

protocol Subscriber<Input, Failure> : CustomCombineIdentifierConvertible {
    associatedtype Input
    associatedtype Failure : Error
    func receive(subscription: any Subscription)
    func receive(_ input: Self.Input) -> Subscribers.Demand
    func receive(completion: Subscribers.Completion<Self.Failure>)
}

Back Pressure

Publisher가 Subscriber에게 요소를 보내는 속도를 제어하는 방법, 구독자가 element를 수신할 준비가 되었다는 신호를 보내서 흐름을 제어.

Publisher는 구독자가 명시적으로 요청(on-demand)하는 경우에만 값을 내보냄. 따라서 Subscriber 코드는 연결된 Publisher로 부터 이벤트를 수신하는 속도를 제어할 수 있음.

Demand를 미세 조정해서 Back Pressure 흐름을 만들어 냄.

관련 Operator

  • buffer
  • debounce
  • throttle
  • collect

Subscribers.Demand (수신 준비 신호를 보내는 방법)

구독을 통해 구독자로부터 퍼블리셔에게 전송된 요청된 element의 개수

  • .max(Int)
  • .unlimited
  • .none
  1. Demand는 추가만 가능. 음수 불가.

처음 구독자가 2개의 요소를 요청하고 다음에 3개를 요청하면 충족되지 않은 demand는 총 5개가 됨. Publisher가 1개를 보내면 충족되지 않은 demand는 4개로 줄어듦.

  1. Subscriber가 Subscription에게 demand를 request(_:) 할 수 있음.
  • 예시
// Publisher: Uses a timer to emit the date once per second.
let timerPub = Timer.publish(every: 1, on: .main, in: .default)
    .autoconnect()

// Subscriber: Waits 5 seconds after subscription, then requests a
// maximum of 3 values.
class MySubscriber: Subscriber {
    typealias Input = Date
    typealias Failure = Never
    var subscription: Subscription?
    
    func receive(subscription: Subscription) {
        print("published                             received")
        self.subscription = subscription
        DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
            subscription.request(.max(3))
        }
    }
    
    func receive(_ input: Date) -> Subscribers.Demand {
        print("\(input)             \(Date())")
        return Subscribers.Demand.none
    }
    
    func receive(completion: Subscribers.Completion<Never>) {
        print ("--done--")
    }
}

// Subscribe to timerPub.
let mySub = MySubscriber()
print ("Subscribing at \(Date())")
timerPub.subscribe(mySub)

Subscribing at 2019-12-09 18:57:06 +0000
published                             received
2019-12-09 18:57:11 +0000             2019-12-09 18:57:11 +0000
2019-12-09 18:57:12 +0000             2019-12-09 18:57:12 +0000
2019-12-09 18:57:13 +0000             2019-12-09 18:57:13 +0000

Operator

역시나 Subscriber 구체 타입 인스턴스

  • Subscribers.Sink
final public class Sink<Input, Failure> : Subscriber,
 Cancellable,
 CustomStringConvertible,
 CustomReflectable,
 CustomPlaygroundDisplayConvertible where Failure : Error
  • 사용하기 편하게 아래 같은 방식으로 Sugar API를 제공해준다.
extension Publisher {
	func sink() -> AnyCancellable {
		let subscriber = Subscribers.Sink()
		subscirbe(subscriber)
		return AnyCancellable(subscriber)
		}
}

Subscription

Publisher와 Subscriber의 연결을 나타내는 프로토콜

Publisher에 특정 Subscriber가 연결된 시점에 따라 정의되는 ID를 가지기 때문에, 클래스 제약이 있음.

public protocol Subscription : Cancellable, CustomCombineIdentifierConvertible {
  func request(_ demand: Subscribers.Demand)
}
  • 구독 취소가 가능한 객체
    • 구독은 한번만 취소할 수 있음.
    • 구독 취소 → thread safe해야 함. (class라서)
    • 구독 취소시 구독자를 연결해 할당된 리소스가 모두 해제

개념적으로는 Publisher가 completion을 던진다고 히는데,

구현 동작에서는 Subscription이 실제로 subscrirber에게 completion을 전달하게 됨. receive(completion:)

subscription - subscriber는 강한 참조 관계로 만들어야 함. (중간에 subscriber가 사라질 수 있음)

처리 로직은 subscription에서 구현하게 됨.

Flow

image.png

정리

Publisher가 하는 일

  1. subscription 생성
  2. subscriber에게 subscription 전달
  3. input / completion 방출

Subscriber가 하는 일

  1. subscription에 demand를 요청

Subscription이 하는 일

  1. 남은 demand에 따라 subscriber에게 element 전달
  2. subscriber에게 completion 전달

Ref.

[Processing Published Elements with Subscribers | Apple Developer Documentation](https://developer.apple.com/documentation/combine/processing-published-elements-with-subscribers)

https://excalidraw.com/#room=0421cc1ac005c1805439,78-hdeL0Xa2DpNGugnWV_g

⚠️ **GitHub.com Fallback** ⚠️