TimeBased Operator 의 종류
replay
- 구독자가 과거의 요소들을 자신이 구독하기 전에 나왔던 이벤트들을 버퍼의 갯수만큼 최신 순서대로 받게 함.
- replay관련 연산자를 사용할 땐 꼭 connect()로 연결해주어야 함.
- replay(bufferSize)에 1을 작성했다면 구독 전 가장 최신으로 배출된 item 1개를 받음.
let hi = PublishSubject<String>()
let parrot = hi.replay(1)
parrot.connect()
hi.onNext("hi")
hi.onNext("hello")
parrot
.subscribe(onNext: {
print($0)
})
.disposed(by: disposeBag)
hi.onNext("good")
// hello
// good
buffer
- 이벤트를 번들로 한번에 묶어서 묶음(Array)으로 방출
- timeSpan은 항목을 수집하는 시간, count는 최대 몇개까지의 요소를 담을지, scheduler는 해당 연사자가 실행될 쓰레드를 결정
var disposeBag = DisposeBag()
let publishSubject = PublishSubject<String>()
var count = 0
let timer = DispatchSource.makeTimerSource()
timer.schedule(deadline: .now() + 2, repeating: .seconds(1))
timer.setEventHandler {
count += 1
publishSubject.onNext("\(count)")
}
timer.resume()
publishSubject
.buffer(
timeSpan: .seconds(2),
count: 2,
scheduler: MainScheduler.instance
)
.subscribe(onNext: {
print($0)
})
.disposed(by: disposeBag)
// 타이밍에 따라 바뀔 가능성 있음.
// ["1"] // 2초 내에 1밖에 받지 못했기 때문에 1만 출력
// ["2","3"]
// ["4","5"]
// ...
window
- Buffer와 달리 묶음(Array)이 아닌 Observable 하나씩 방출해줌
- buffer와 인자는 같지만 반환값이 다름.
- buffer는 Observable<[Element]> 를 방출하지만, window는 Observable<Observable<Element>> 를 방출함.
var disposeBag = DisposeBag()
let publishSubject = PublishSubject<String>()
let windowCount = 0
let windowTimerSource = DispatchSource.makeTimerSource()
windowTimerSource.schedule(deadline: now() + 2, repeating: .seconds(1))
windowTimerSource.setEventHandler {
windowCount += 1
publishSubject.onNext("\(windowCount)")
}
windowTimerSource.resume()
publishSubject
.window(
timeSpan: RxTimerInterval.seconds(3), // 1개의 window를 만들어낼 시간
count: 3, // 1개의 window에 들어갈 최대 Observable 개수
schedule: MainScheduler.instance
)
.flatMap { windowObservable -> Observable<(index: Int, element: String)> in
retrun windowObservable.enumerated()
}
.subscribe(onNext: {
print("\($0.index)번째 Observable의 요소 \($0.element)")
})
.disposed(by: disposeBag)
// 0 번째 observable의 요소 1
// 1 번째 observable의 요소 2
// 2 번째 는 시간초과로 없이 진행
// 0 번째 observable의 요소 3
// 1 번째 observable의 요소 4
// 2 번째 observable의 요소 5
// 0 번째 observable의 요소 6
// 1 번째 observable의 요소 7
// 2 번째 observable의 요소 8
// 0 번째 observable의 요소 9
// 1 번째 observable의 요소 10
// 2 번째 observable의 요소 11
.
delaySubscription
- 구독을 지연하는 연산자
var disposeBag = DisposeBag()
let delaySource = PublishSubject<String>()
var delayCount = 0
let delayTImeSource = DispatchSource.makeTimerSource()
delayTimeSource.schedule(deadline: .now() + 2, repeating: .seconds(1))
delayTimeSource.setEventHandler {
delayCount += 1
delayCount.onNext("\(delayCount)")
}
delayTimeSource.resume()
delaySource
.delaySubscription(.second(2), scheduler: MainScheduler.instance)
.subscribe(onNext: {
print($0)
})
.disposed(by: disposeBag)
// 2초뒤 구독 시작
delay
- 시퀀스를 지연하는 연산자
var disposeBag = DisposeBag()
let delaySubject = PublishSubject<Int>()
var delayCount = 0
let delayTimerSource = DispatchSourec.makeTimerSource()
delayTimerSource.schedule(deadline: .now(), repeating: .seconds(1))
delayTimerSource.setEventHandler {
delayCount += 1
delaysubject.onNext(delayCount)
}
delayTimerSource.resume()
delaySubject
.delay(.seconds(3), scheduler: MainScheduler.instance)
.subscribe(onNext: {
print($0)
})
.disposed(by: disposeBag)
// 3초 후 방출 시작
interval
- 지정한 시간에 따라 이벤트를 방출 시켜주는 연산자
Observable<Int>
.interval(.second(3), schedule: MainSchedule.instance)
.subscribe(onNext: {
print($0)
}
.dispose(by: disposeBag)
// 0
// 3초 후
// 1
// 3초 후
// 2
// 3초 후
// 3
// ...
timer
- dueTime을 통해 구독을 시작하기까지의 딜레이값, period는 이벤트가 방출되는 간격임.
- 주어진 기간만큼 지연한 이후에 item을 특정 기간별로 방출하는 observable을 생성함.
- dueTime : 제일 첫번째 값을 언제 생성할 것인지
- period : 그 다음 값을 생성하는 시간 간격
Observable<Int>
.timer(
.seconds(5),
period: .seconds(2),
scheduler: MainScheduler.instance
)
.subscribe(onNext: {
print($0)
})
.disposed(by: disposeBag)
// 0 // 구독 시작 후 5초 뒤에 출력
// 1 // 0이 출력되고 2초 뒤에 출력
// 2 // 1이 출력되고 2초 뒤에 출력
// ....
timeout
- dueTime 시간내에 어떠한 이벤트도 방출하지 않았을때, 에러를 방출함
let disposeBag = DisposeBag()
let subject = PublishSubject<Int>()
subject.timeout(.seconds(3), other: Observable.just(100), scheduler: MainScheduler.instance)
.subscribe{ print($0) }
.disposed(by: disposeBag)
Observable<Int>.timer(.seconds(2), period: .seconds(2), scheduler: MainScheduler.instance)
.take(5)
.subscribe { subject.onNext($0) }
.disposed(by: disposeBag)
// 0
// 1
// 2
// 3
// 4
// 100 // take 로 여기서 멈추다가 3초 후 반응이 없어 100 방출
// completed
'rxSwift' 카테고리의 다른 글
[rxSwift] Combining Operator 의 종류 (0) | 2022.10.27 |
---|---|
[rxSwift] Transforming Operator 의 종류 (0) | 2022.10.23 |
[rxSwift] Filtering Operator 의 종류 (0) | 2022.10.22 |
[rxSwift] Subject 를 wrapping한 Relay 사용법 (0) | 2022.10.21 |
[rxSwift] Subject 에 대한 이해와 종류 (0) | 2022.10.21 |