Engineering

Try Observable.zip run concurrency with RxSwift

Observable.zip()

Observable.zip of RxSwift helps grouping results. In this case, If we want to get results by run concurrency, which scheduler should we use?

I've tried following code.

Try Scheduler

Base Code

Define a function that creates Observable for blocking 1sec.

func task(_ s: Int) -> Observable<Void> {
  return Observable.create { o in
    print("begin", s)
    sleep(1)
    print("end", s)
    o.onNext(())
    o.onCompleted()
    return Disposables.create()
  }
}
let s: SchedulerType

Observable.zip([
  task(1).subscribeOn(s),
  task(2).subscribeOn(s),
  task(3).subscribeOn(s),
  task(4).subscribeOn(s),
  ])
  .subscribe()

CurrentThreadScheduler

let s = CurrentThreadScheduler.instance

Result

begin 1
end 1
begin 2
end 2
begin 3
end 3
begin 4
end 4

SerialDispatchQueueScheduler

let s = SerialDispatchQueueScheduler(qos: .default)

Result

begin 1
end 1
begin 2
end 2
begin 3
end 3
begin 4
end 4

ConcurrentDispatchQueueScheduler

let s = ConcurrentDispatchQueueScheduler(qos: .default)

Result

begin 3
begin 2
begin 4
begin 1
end 3
end 1
end 4
end 2

OperationQueueScheduler

let q = OperationQueue()
q.maxConcurrentOperationCount = 2
let s = OperationQueueScheduler(operationQueue: q)

Result

begin 1
begin 2
end 2
end 1
begin 3
begin 4
end 4
end 3