StreamProtocol
public protocol StreamProtocol : StreamConvertible where Self == Self.StreamType
A protocol that defines an abstraction for a series of asynchronously produced discrete values over time, such as byte buffers read from a socket or a file, incoming requests to a server, or sampled mouse input events.
Streams yield elements. Similar to Swift’s IteratorProtocol
, streams
signify completion by yielding nil
. Streams can be combined into larger
operations with combinators, such as map()
, flatMap()
, etc.
The type of element yielded by a stream is specified by its Output
type.
Streams that must communicate success or failure, must do so by encoding
that information in the output type, typically using Swift.Result
.
Futures comes with a number of convenience types and combinators for
working with Swift.Result
(see FuturesResult
module).
You typically create streams using the convenience methods on Stream
.
You can also create custom streams by adopting this protocol in your
types. Creating streams is always an asynchronous operation. It is
guaranteed that the producer of elements the stream wraps will only be
asked for elements after the stream is submitted to an executor and it
will always be on that executor’s context (see ExecutorProtocol
). In
other words, streams do nothing unless submitted to an executor.
For a stream to be submitted to an executor and yield elements, it must
be converted to a future that represents the stream’s completion. As a
convenience, this is done automatically if the stream output is Void
.
The semantics for cancellation, and memory and concurrency management
are the same as for futures (see FutureProtocol
).
-
Declaration
Swift
associatedtype Output
-
-
makeFuture()
Extension methodReturns a future that will complete with a 2-tuple containing the next element from this stream and the stream itself.
var output: Int? var s = Stream.sequence(0..<3) (output, s) = s.makeFuture().wait() assert(output == 0) (output, s) = s.makeFuture().wait() assert(output == 1) (output, s) = s.makeFuture().wait() assert(output == 2) (output, s) = s.makeFuture().wait() assert(output == nil)
Return Value
some FutureProtocol<Output == (Self.Output?, Self)>
-
makeStream()
Extension method.
var s = Stream.sequence(0..<3).makeStream() assert(s.next() == 0) assert(s.next() == 1) assert(s.next() == 2) assert(s.next() == nil)
Declaration
Swift
public func makeStream() -> Self
Return Value
some StreamProtocol<Output == Self.Output>
-
makeReference()
Extension method.
var s1 = Stream.sequence(0..<3).makeReference() var s2 = Stream.sequence(3..<6).makeReference() var s = Stream.join(s1, s2) assert(s.next()! == (0, 3)) assert(s1.next() == 1) assert(s2.next() == 4) assert(s.next()! == (2, 5)) assert(s.next() == nil)
Declaration
Swift
@inlinable public func makeReference() -> Stream._Private.Reference<Self>
Return Value
some StreamProtocol<Output == Self.Output>
-
next()
Extension methodSynchronously polls this stream on the current thread’s executor until it yields the next element or completes.
var s = Stream.sequence(0..<3) assert(s.next() == 0) assert(s.next() == 1) assert(s.next() == 2) assert(s.next() == nil)
Declaration
Swift
@inlinable public mutating func next() -> Output?
Return Value
Self.Output?
-
next(on:)
Extension methodSynchronously polls this stream using the provided blocking executor until it yields the next element or completes.
let executor = ThreadExecutor.current var s = Stream.sequence(0..<3) assert(s.next(on: executor) == 0) assert(s.next(on: executor) == 1) assert(s.next(on: executor) == 2) assert(s.next(on: executor) == nil)
Declaration
Swift
@inlinable public mutating func next<E>(on executor: E) -> Output? where E : BlockingExecutor
Return Value
Self.Output?
-
forward(to:close:)
Extension method.
let sink = Sink.collect(itemType: Int.self) let f = Stream.sequence(0..<3).forward(to: sink) f.ignoreOutput().wait() assert(sink.elements == [0, 1, 2])
Declaration
Swift
@inlinable public func forward<S>(to sink: S, close: Bool = true) -> Stream._Private.Forward<S, Self> where S : SinkConvertible, Self.Output == S.SinkType.Input
Return Value
some FutureProtocol<Output == Result<Void, S.Failure>>
-
abort(when:)
Extension methodDeclaration
Swift
@inlinable public func abort<U>(when f: U) -> Stream._Private.Abort<U, Self> where U : FutureConvertible, U.FutureType.Output == Void
Return Value
some StreamProtocol<Output == Self.Output>
-
abort(when:)
Extension methodDeclaration
Swift
@inlinable public func abort<U>(when f: @escaping () -> U) -> Stream._Private.Abort<U, Self>
Return Value
some StreamProtocol<Output == Self.Output>
-
poll(on:)
Extension methodEnsures this stream is polled on the given executor.
The returned stream retains the executor for its whole lifetime.
var s = Stream.sequence(0..<3) .poll(on: QueueExecutor.global) .assertNoError() assert(s.next() == 0) assert(s.next() == 1) assert(s.next() == 2) assert(s.next() == nil)
Declaration
Swift
@inlinable public func poll<E>(on executor: E) -> Stream._Private.PollOn<E, Self> where E : ExecutorProtocol
Return Value
some StreamProtocol<Output == Result<Self.Output, E.Failure>>
-
yield(maxElements:)
Extension methodPrecondition
maxElements > 0
Declaration
Swift
@inlinable public func yield(maxElements: Int) -> Stream._Private.Yield<Self>
Return Value
some StreamProtocol<Output == Self.Output>
-
eraseToAnyStream()
Extension method.
var s = Stream.sequence(0..<3).eraseToAnyStream() assert(s.next() == 0) assert(s.next() == 1) assert(s.next() == 2) assert(s.next() == nil)
Return Value
AnyStream<Self.Output>
-
multicast(replay:)
Extension methodMulticasts elements from this stream to multiple tasks, where each task sees every element, that run on the same executor.
Use this combinator when you want to use reference semantics, such as storing a stream instance in a property.
let iterations = 1_000 var counter0 = 0 var counter1 = 0 var counter2 = 0 let stream = Stream.sequence(0..<iterations).forEach { counter0 += $0 } let multicast = stream.multicast() let stream1 = multicast.makeStream().map { counter1 += $0 } let stream2 = multicast.makeStream().map { counter2 += $0 } ThreadExecutor.current.submit(stream1) ThreadExecutor.current.submit(stream2) ThreadExecutor.current.wait() let expected = (0..<iterations).reduce(into: 0, +=) assert(counter0 == expected) // 499_500 assert(counter1 == expected) assert(counter2 == expected)
Declaration
Return Value
some StreamProtocol<Output == Self.Output>
-
eraseToAnyMulticastStream(replay:)
Extension methodDeclaration
Swift
@inlinable public func eraseToAnyMulticastStream(replay: Stream.ReplayStrategy = .none) -> AnyMulticastStream<Output>
Return Value
AnyMulticastStream<Self.Output>
-
share(replay:)
Extension methodMulticasts elements from this stream to multiple tasks, where each task sees every element, that may run on any executor.
Use this combinator when you want to use reference semantics, such as storing a stream instance in a property.
let iterations = 1_000 var counter0 = 0 var counter1 = 0 var counter2 = 0 let stream = Stream.sequence(0..<iterations).forEach { counter0 += $0 } let shared = stream.share() let stream1 = shared.makeStream().map { counter1 += $0 } let stream2 = shared.makeStream().map { counter2 += $0 } let task1 = QueueExecutor(label: "queue 1").spawn(stream1) let task2 = QueueExecutor(label: "queue 2").spawn(stream2) ThreadExecutor.current.submit(task1) ThreadExecutor.current.submit(task2) ThreadExecutor.current.wait() let expected = (0..<iterations).reduce(into: 0, +=) assert(counter0 == expected) // 499_500 assert(counter1 == expected) assert(counter2 == expected)
Declaration
Return Value
some StreamProtocol<Output == Self.Output>
-
eraseToAnySharedStream(replay:)
Extension methodDeclaration
Swift
@inlinable public func eraseToAnySharedStream(replay: Stream.ReplayStrategy = .none) -> AnySharedStream<Output>
Return Value
AnySharedStream<Self.Output>
-
map(_:)
Extension methodTransforms all elements from this stream with a provided closure.
var s = Stream.sequence(0..<3).map { $0 + 1 } assert(s.next() == 1) assert(s.next() == 2) assert(s.next() == 3) assert(s.next() == nil)
Declaration
Return Value
some StreamProtocol<Output == T>
-
map(_:)
Extension method.
struct Data { let a: Int } let data: [Data] = [ .init(a: 0), .init(a: 3), .init(a: 6), ] var s = Stream.sequence(data).map(\.a) assert(s.next() == 0) assert(s.next() == 3) assert(s.next() == 6) assert(s.next() == nil)
Declaration
Return Value
some StreamProtocol<Output == T>
-
map(_:_:)
Extension method.
struct Data { let a, b: Int } let data: [Data] = [ .init(a: 0, b: 1), .init(a: 3, b: 4), .init(a: 6, b: 7), ] var s = Stream.sequence(data).map(\.a, \.b) assert(s.next() == (0, 1)) assert(s.next() == (3, 4)) assert(s.next() == (6, 7)) assert(s.next() == nil)
Declaration
Return Value
some StreamProtocol<Output == (T0, T1)>
-
map(_:_:_:)
Extension method.
struct Data { let a, b, c: Int } let data: [Data] = [ .init(a: 0, b: 1, c: 2), .init(a: 3, b: 4, c: 5), .init(a: 6, b: 7, c: 8), ] var s = Stream.sequence(data).map(\.a, \.b, \.c) assert(s.next() == (0, 1, 2)) assert(s.next() == (3, 4, 5)) assert(s.next() == (6, 7, 8)) assert(s.next() == nil)
Declaration
Return Value
some StreamProtocol<Output == (T0, T1, T2)>
-
flatMap(_:)
Extension methodTransforms each element from this stream into a new stream using a provided closure and merges the output from all returned streams into a single stream of output.
var s = Stream.sequence(0..<3).flatMap { Stream.sequence(0...($0 + 1)) } assert(s.next() == 0) assert(s.next() == 1) assert(s.next() == 0) assert(s.next() == 1) assert(s.next() == 2) assert(s.next() == 0) assert(s.next() == 1) assert(s.next() == 2) assert(s.next() == 3) assert(s.next() == nil)
Declaration
Return Value
some StreamProtocol<Output == U.StreamType.Output>
-
scan(_:_:)
Extension methodTransforms elements from this stream by providing the current element to a closure along with the last value returned by the closure.
var s = Stream.sequence(0..<3).scan(0) { $0 + $1 } assert(s.next() == 0) assert(s.next() == 1) assert(s.next() == 3) assert(s.next() == nil)
Declaration
Return Value
some StreamProtocol<Output == T>
-
replaceNil(with:)
Extension methodReplaces nil elements in the stream with the provided element.
var s = Stream.sequence([0, nil, 2]).replaceNil(with: 1) assert(s.next() == 0) assert(s.next() == 1) assert(s.next() == 2) assert(s.next() == nil)
Declaration
Return Value
some StreamProtocol<Output == T>
-
match(some:none:)
Extension method.
var s = Stream.sequence([0, nil, 2]).match( some: String.init, none: { "NaN" } ) assert(s.next() == "0") assert(s.next() == "NaN") assert(s.next() == "2") assert(s.next() == nil)
Declaration
Return Value
some StreamProtocol<Output == T>
-
match(left:right:)
Extension method.
func transform(_ value: Int) -> Either<Int, Float> { if value == 0 { return .left(value) } else { return .right(.init(value)) } } var s = Stream.sequence(0..<3).map(transform).match( left: String.init, right: String.init(describing:) ) assert(s.next() == "0") assert(s.next() == "1.0") assert(s.next() == "2.0") assert(s.next() == nil)
Declaration
Return Value
some StreamProtocol<Output == T>
-
filter(_:)
Extension methodYields all elements that match a provided closure.
var s = Stream.sequence(0..<3).filter { $0 > 0 } assert(s.next() == 1) assert(s.next() == 2) assert(s.next() == nil)
Declaration
Return Value
some StreamProtocol<Output == Self.Output>
-
compactMap(_:)
Extension methodCalls a closure with each element from this stream and yields any returned optional that has a value.
var s = Stream.sequence(0..<3).compactMap { $0 > 0 ? $0 * 2 : nil } assert(s.next() == 2) assert(s.next() == 4) assert(s.next() == nil)
Declaration
Return Value
some StreamProtocol<Output == T>
-
replaceEmpty(with:)
Extension methodReplaces an empty stream with the provided element.
If this stream completes without yielding any elements, the returned stream yields the provided element, then completes normally.
var s = Stream.empty().replaceEmpty(with: 42) assert(s.next() == 42) assert(s.next() == nil)
Declaration
Return Value
some StreamProtocol<Output == T>
-
removeDuplicates(by:)
Extension methodCalls a predicate closure with two consecutive elements from this stream and yields the current element if it passes the predicate.
let data = [ (1, "A"), (2, "B"), (3, "B"), (4, "C"), ] var s = Stream.sequence(data).removeDuplicates { $0.1 == $1.1 } assert(s.next()! == (1, "A")) assert(s.next()! == (2, "B")) assert(s.next()! == (4, "C")) assert(s.next() == nil)
Declaration
Return Value
some StreamProtocol<Output == Self.Output>
-
removeDuplicates()
Extension methodYields only elements that don’t match the previous element.
var s = Stream.sequence([1, 2, 2, 2, 3]).removeDuplicates() assert(s.next() == 1) assert(s.next() == 2) assert(s.next() == 3) assert(s.next() == nil)
Declaration
Swift
@inlinable public func removeDuplicates() -> Stream._Private.RemoveDuplicates<Self>
Return Value
some StreamProtocol<Output == Self.Output>
-
collect()
Extension methodReturns a future that collects all elements from this stream, and completes with a single array of the collection when it completes.
This combinator uses an unbounded amount of memory to store the yielded values.
let f = Stream.sequence(0..<3).collect() assert(f.wait() == [0, 1, 2])
Return Value
some FutureProtocol<Output == [Self.Output]>
-
replaceOutput(with:)
Extension methodReturns a future that ignores all elements from this stream, and completes with a given value when this stream completes.
let f = Stream.sequence(0..<3).replaceOutput(with: 42) assert(f.wait() == 42)
Declaration
Swift
@inlinable public func replaceOutput<T>(with output: T) -> Stream._Private.Reduce<T, Self>
Return Value
some FutureProtocol<Output == T>
-
ignoreOutput()
Extension methodReturns a future that ignores all elements from this stream, and completes when this stream completes.
let f = Stream.sequence(0..<3).ignoreOutput() assert(f.wait() == ())
Declaration
Swift
@inlinable public func ignoreOutput() -> Stream._Private.Reduce<Void, Self>
Return Value
some FutureProtocol<Output == Void>
-
count()
Extension methodReturns a future that completes with the number of elements from this stream.
let f = Stream.sequence(0..<3).count() assert(f.wait() == 3)
Declaration
Swift
@inlinable public func count() -> Stream._Private.Reduce<Int, Self>
Return Value
some FutureProtocol<Output == Int>
-
reduce(_:_:)
Extension methodReturns a future that applies a closure that accumulates each element of this stream and completes with the final result when this stream completes.
let f = Stream.sequence(0..<3).reduce(0) { $0 + $1 } assert(f.wait() == 3)
Declaration
Return Value
some FutureProtocol<Output == T>
-
reduce(into:_:)
Extension methodReturns a future that applies a closure that combines each element of this stream into a mutable state and completes with the final result when this stream completes.
let f = Stream.sequence(0..<3).reduce(into: []) { $0.append($1 + 1) } assert(f.wait() == [1, 2, 3])
Declaration
Return Value
some FutureProtocol<Output == T>
-
contains(_:)
Extension methodReturns a future that completes with a Boolean value upon receiving an element equal to the argument.
let f = Stream.sequence(0..<3).contains(2) assert(f.wait())
Declaration
Return Value
some FutureProtocol<Output == Bool>
-
contains(where:)
Extension methodReturns a future that completes with a Boolean value upon receiving an element that satisfies the predicate closure.
let f = Stream.sequence(0..<3).contains { $0 == 2 } assert(f.wait())
Declaration
Return Value
some FutureProtocol<Output == Bool>
-
allSatisfy(_:)
Extension methodReturns a future that completes with a Boolean value that indicates whether all received elements pass a given predicate.
let f = Stream.sequence(0..<3).allSatisfy { $0 < 3 } assert(f.wait())
Declaration
Return Value
some FutureProtocol<Output == Bool>
-
drop(untilOutputFrom:)
Extension methodIgnores elements from this stream until the given future completes.
var pollCount = 0 let f = AnyFuture<Void> { _ in if pollCount == 2 { return .ready(()) } pollCount += 1 return .pending } var s = Stream.sequence(0..<3).drop(untilOutputFrom: f) assert(s.next() == 2) assert(s.next() == nil)
Declaration
Swift
@inlinable public func drop<F>(untilOutputFrom future: F) -> Stream._Private.DropUntilOutput<F, Self> where F : FutureProtocol
Return Value
some StreamProtocol<Output == Self.Output>
-
drop(while:)
Extension methodOmits elements from this stream until a given closure returns
false
, before yielding all remaining elements.var s = Stream.sequence(0..<3).drop(while: { $0 < 2 }) assert(s.next() == 2) assert(s.next() == nil)
Declaration
Return Value
some StreamProtocol<Output == Self.Output>
-
dropFirst(_:)
Extension methodOmits the specified number of elements before yielding subsequent elements.
var s = Stream.sequence(0..<3).dropFirst(2) assert(s.next() == 2) assert(s.next() == nil)
Declaration
Swift
@inlinable public func dropFirst(_ count: Int = 1) -> Stream._Private.Drop<Self>
Return Value
some StreamProtocol<Output == Self.Output>
-
append(_:)
Extension methodAppends the specified elements to this stream’s output.
var s = Stream.sequence(0..<3).append(3, 4, 5) assert(s.next() == 0) assert(s.next() == 1) assert(s.next() == 2) assert(s.next() == 3) assert(s.next() == 4) assert(s.next() == 5) assert(s.next() == nil)
Declaration
Return Value
some StreamProtocol<Output == Self.Output>
-
append(_:)
Extension methodAppends a specified sequence to this stream’s output.
var s = Stream.sequence(0..<3).append(3..<6) assert(s.next() == 0) assert(s.next() == 1) assert(s.next() == 2) assert(s.next() == 3) assert(s.next() == 4) assert(s.next() == 5) assert(s.next() == nil)
Declaration
Return Value
some StreamProtocol<Output == Self.Output>
-
append(_:)
Extension methodAppends the elements from the given stream to this stream’s output.
let a = Stream.sequence(3..<6) var s = Stream.sequence(0..<3).append(a) assert(s.next() == 0) assert(s.next() == 1) assert(s.next() == 2) assert(s.next() == 3) assert(s.next() == 4) assert(s.next() == 5) assert(s.next() == nil)
Declaration
Return Value
some StreamProtocol<Output == Self.Output>
-
prepend(_:)
Extension methodPrepends the specified elements to this stream’s output.
var s = Stream.sequence(0..<3).prepend(3, 4, 5) assert(s.next() == 3) assert(s.next() == 4) assert(s.next() == 5) assert(s.next() == 0) assert(s.next() == 1) assert(s.next() == 2) assert(s.next() == nil)
Declaration
Return Value
some StreamProtocol<Output == Self.Output>
-
prepend(_:)
Extension methodPrepends a specified sequence to this stream’s output.
var s = Stream.sequence(0..<3).prepend(3..<6) assert(s.next() == 3) assert(s.next() == 4) assert(s.next() == 5) assert(s.next() == 0) assert(s.next() == 1) assert(s.next() == 2) assert(s.next() == nil)
Declaration
Return Value
some StreamProtocol<Output == Self.Output>
-
prepend(_:)
Extension methodPrepends the elements from the given stream to this stream’s output.
let a = Stream.sequence(3..<6) var s = Stream.sequence(0..<3).prepend(a) assert(s.next() == 3) assert(s.next() == 4) assert(s.next() == 5) assert(s.next() == 0) assert(s.next() == 1) assert(s.next() == 2) assert(s.next() == nil)
Declaration
Return Value
some StreamProtocol<Output == Self.Output>
-
prefix(untilOutputFrom:)
Extension methodYields elements from this stream until the given future completes.
var pollCount = 0 let f = AnyFuture<Void> { _ in if pollCount == 2 { return .ready(()) } pollCount += 1 return .pending } var s = Stream.sequence(0..<3).prefix(untilOutputFrom: f) assert(s.next() == 0) assert(s.next() == 1) assert(s.next() == nil)
Declaration
Swift
@inlinable public func prefix<F>(untilOutputFrom future: F) -> Stream._Private.PrefixUntilOutput<F, Self> where F : FutureProtocol
Return Value
some StreamProtocol<Output == Self.Output>
-
prefix(_:)
Extension methodYields elements from this stream up to the specified maximum count.
var s = Stream.sequence(0..<3).prefix(2) assert(s.next() == 0) assert(s.next() == 1) assert(s.next() == nil)
Declaration
Swift
@inlinable public func prefix(_ maxLength: Int) -> Stream._Private.Prefix<Self>
Return Value
some StreamProtocol<Output == Self.Output>
-
prefix(while:)
Extension methodYields elements from this stream while elements satisfy a predicate closure.
var s = Stream.sequence(0..<3).prefix(while: { $0 < 2 }) assert(s.next() == 0) assert(s.next() == 1) assert(s.next() == nil)
Declaration
Return Value
some StreamProtocol<Output == Self.Output>
-
buffer(_:)
Extension methodCollects up to the specified number of elements, and then yields a single array of the collection.
var s = Stream.sequence(0...4).buffer(2) assert(s.next() == [0, 1]) assert(s.next() == [2, 3]) assert(s.next() == [4]) assert(s.next() == nil)
Declaration
Swift
@inlinable public func buffer(_ count: Int) -> Stream._Private.Buffer<Self>
Return Value
some StreamProtocol<Output == [Self.Output]>
-
forEach(_:)
Extension methodApplies the given closure over each element from this stream.
var buffer = [Int]() var s = Stream.sequence(0..<3).forEach { buffer.append($0) } assert(s.next() == 0) assert(s.next() == 1) assert(s.next() == 2) assert(s.next() == nil) assert(buffer == [0, 1, 2])
Declaration
Return Value
some StreamProtocol<Output == Self.Output>
-
enumerate()
Extension methodYields pairs (n, x), where n represents a consecutive integer starting at zero and x represents an element from this stream.
var s = Stream.sequence(["A", "B", "C"]).enumerate() assert(s.next()! == (offset: 0, output: "A")) assert(s.next()! == (offset: 1, output: "B")) assert(s.next()! == (offset: 2, output: "C")) assert(s.next() == nil)
Declaration
Swift
@inlinable public func enumerate() -> Stream._Private.Enumerate<Self>
Return Value
some StreamProtocol<Output == (Int, Self.Output)>
-
first()
Extension methodReturns a future that completes with the first element from this stream.
let f = Stream.sequence(0..<3).first() assert(f.wait() == 0)
Declaration
Swift
@inlinable public func first() -> Stream._Private.First<Self>
Return Value
some FutureProtocol<Output == Self.Output?>
-
first(where:)
Extension methodReturns a future that completes with the first element from this stream to satisfy a predicate closure.
let f = Stream.sequence(0..<3).first(where: { $0 > 1 }) assert(f.wait() == 2)
Declaration
Return Value
some FutureProtocol<Output == Self.Output?>
-
last()
Extension methodReturns a future that completes with the last element from this stream.
let f = Stream.sequence(0..<3).last() assert(f.wait() == 2)
Declaration
Swift
@inlinable public func last() -> Stream._Private.Last<Self>
Return Value
some FutureProtocol<Output == Self.Output?>
-
last(where:)
Extension methodReturns a future that completes with the last element from this stream to satisfy a predicate closure.
let f = Stream.sequence(0..<3).last(where: { $0 < 2 }) assert(f.wait() == 1)
Declaration
Return Value
some FutureProtocol<Output == Self.Output?>
-
output(at:)
Extension methodYields a specific element from this stream, indicated by its index in the sequence of yielded elements.
var s = Stream.sequence(0..<3).output(at: 1) assert(s.next() == 1) assert(s.next() == nil)
Return Value
some StreamProtocol<Output == Self.Output>
-
output(in:)
Extension methodYields elements from this stream specified by their range in the sequence of yielded elements.
var s = Stream.sequence(0..<3).output(in: 1..<3) assert(s.next() == 1) assert(s.next() == 2) assert(s.next() == nil)
Declaration
Return Value
some StreamProtocol<Output == Self.Output>
-
latest()
Extension methodYields the latest available element by eagerly pulling elements out of this stream until it can’t yield any more elements.
var s = Stream.sequence(0..<3).latest() assert(s.next() == 2) assert(s.next() == nil)
Declaration
Swift
@inlinable public func latest() -> Stream._Private.Latest<Self>
Return Value
some StreamProtocol<Output == Self.Output>
-
merge(_:)
Extension methodCombines elements from this stream with those from another stream that yields elements of the same type, delivering an interleaved sequence of elements and completing when both streams complete.
let a = Stream.sequence(0..<3) let b = Stream.sequence(3..<6) var s = a.merge(b) assert(s.next() == 0) assert(s.next() == 3) assert(s.next() == 1) assert(s.next() == 4) assert(s.next() == 2) assert(s.next() == 5) assert(s.next() == nil)
Declaration
Return Value
some StreamProtocol<Output == Self.Output>
-
merge(_:_:)
Extension methodCombines elements from this stream with those from two other streams that yield elements of the same type, delivering an interleaved sequence of elements and completing when all streams complete.
let a = Stream.sequence(0..<3) let b = Stream.sequence(3..<6) let c = Stream.sequence(6..<9) var s = a.merge(b, c) assert(s.next() == 0) assert(s.next() == 3) assert(s.next() == 6) assert(s.next() == 1) assert(s.next() == 4) assert(s.next() == 7) assert(s.next() == 2) assert(s.next() == 5) assert(s.next() == 8) assert(s.next() == nil)
Declaration
Return Value
some StreamProtocol<Output == Self.Output>
-
merge(_:_:_:)
Extension methodCombines elements from this stream with those from three other streams that yield elements of the same type, delivering an interleaved sequence of elements and completing when all streams complete.
let a = Stream.sequence(0..<3) let b = Stream.sequence(3..<6) let c = Stream.sequence(6..<9) let d = Stream.sequence(9..<12) var s = a.merge(b, c, d) assert(s.next() == 0) assert(s.next() == 3) assert(s.next() == 6) assert(s.next() == 9) assert(s.next() == 1) assert(s.next() == 4) assert(s.next() == 7) assert(s.next() == 10) assert(s.next() == 2) assert(s.next() == 5) assert(s.next() == 8) assert(s.next() == 11) assert(s.next() == nil)
Declaration
Return Value
some StreamProtocol<Output == Self.Output>
-
zip(_:)
Extension methodCombines elements from this stream with those from another stream by waiting until both streams have yielded an element and then yielding the oldest unconsumed element from each stream together as a tuple, completing when either stream completes.
let a = Stream.sequence([1, 2]) let b = Stream.sequence(["A", "B", "C"]) var s = a.zip(b) assert(s.next() == (1, "A")) assert(s.next() == (2, "B")) assert(s.next() == nil)
Declaration
Swift
@inlinable public func zip<S>(_ other: S) -> Stream._Private.Zip<Self, S> where S : StreamProtocol
Return Value
some StreamProtocol<Output == (Self.Output, S.Output)>
-
zip(_:_:)
Extension methodCombines elements from this stream with those from two other streams by waiting until all three streams have yielded an element and then yielding the oldest unconsumed element from each stream together as a tuple, completing when any of the streams completes.
let a = Stream.sequence([1, 2]) let b = Stream.sequence(["A", "B", "C"]) let c = Stream.sequence(["X", "Y", "Z"]) var s = a.zip(b, c) assert(s.next() == (1, "A", "X")) assert(s.next() == (2, "B", "Y")) assert(s.next() == nil)
Declaration
Swift
@inlinable public func zip<A, B>(_ a: A, _ b: B) -> Stream._Private.Zip3<Self, A, B> where A : StreamProtocol, B : StreamProtocol
Return Value
some StreamProtocol<Output == (Self.Output, A.Output, B.Output)>
-
zip(_:_:_:)
Extension methodCombines elements from this stream with those from three other streams by waiting until all four streams have yielded an element and then yielding the oldest unconsumed element from each stream together as a tuple, completing when any of the streams completes.
let a = Stream.sequence([1, 2]) let b = Stream.sequence(["A", "B", "C"]) let c = Stream.sequence(["X", "Y", "Z"]) let d = Stream.sequence(3..<6) var s = a.zip(b, c, d) assert(s.next() == (1, "A", "X", 3)) assert(s.next() == (2, "B", "Y", 4)) assert(s.next() == nil)
Declaration
Swift
@inlinable public func zip<A, B, C>(_ a: A, _ b: B, _ c: C) -> Stream._Private.Zip4<Self, A, B, C> where A : StreamProtocol, B : StreamProtocol, C : StreamProtocol
Return Value
some StreamProtocol<Output == (Self.Output, A.Output, B.Output, C.Output)>
-
join(_:)
Extension methodCombines elements from this stream with those from another stream and delivers pairs of elements as tuples when either stream yields an element, completing when both of the streams complete.
This is the combinator typically called
combineLatest
in other frameworks.let a = Stream.sequence([1, 2]) let b = Stream.sequence(["A", "B", "C"]) var s = a.join(b) assert(s.next() == (1, "A")) assert(s.next() == (2, "B")) assert(s.next() == (2, "C")) assert(s.next() == nil)
Declaration
Swift
@inlinable public func join<S>(_ other: S) -> Stream._Private.Join<Self, S> where S : StreamProtocol
Return Value
some StreamProtocol<Output == (Self.Output, S.Output)>
-
join(_:_:)
Extension methodCombines elements from this stream with those from two other streams and delivers groups of elements as tuples when any stream yields an element, completing when all of the streams complete.
This is the combinator typically called
combineLatest
in other frameworks.let a = Stream.sequence([1, 2]) let b = Stream.sequence(["A", "B", "C"]) let c = Stream.sequence(["X", "Y", "Z"]) var s = a.join(b, c) assert(s.next() == (1, "A", "X")) assert(s.next() == (2, "B", "Y")) assert(s.next() == (2, "C", "Z")) assert(s.next() == nil)
Declaration
Swift
@inlinable public func join<A, B>(_ a: A, _ b: B) -> Stream._Private.Join3<Self, A, B> where A : StreamProtocol, B : StreamProtocol
Return Value
some StreamProtocol<Output == (Self.Output, A.Output, B.Output)>
-
join(_:_:_:)
Extension methodCombines elements from this stream with those from three other streams and delivers groups of elements as tuples when any stream yields an element, completing when all of the streams complete.
This is the combinator typically called
combineLatest
in other frameworks.let a = Stream.sequence([1, 2]) let b = Stream.sequence(["A", "B", "C"]) let c = Stream.sequence(["X", "Y", "Z"]) let d = Stream.sequence(3..<6) var s = a.join(b, c, d) assert(s.next() == (1, "A", "X", 3)) assert(s.next() == (2, "B", "Y", 4)) assert(s.next() == (2, "C", "Z", 5)) assert(s.next() == nil)
Declaration
Swift
@inlinable public func join<A, B, C>(_ a: A, _ b: B, _ c: C) -> Stream._Private.Join4<Self, A, B, C> where A : StreamProtocol, B : StreamProtocol, C : StreamProtocol
Return Value
some StreamProtocol<Output == (Self.Output, A.Output, B.Output, C.Output)>
-
switchToLatest()
Extension methodFlattens the stream of elements from multiple streams to appear as if they were coming from a single stream, by switching the inner stream as new ones are yielded by this stream and completing when this stream and the last inner one complete.
let a = Stream.sequence(0..<3).map { Stream.sequence($0..<($0 + 3)) } var s = a.switchToLatest() assert(s.next() == 2) assert(s.next() == 3) assert(s.next() == 4) assert(s.next() == nil)
Declaration
Swift
@inlinable public func switchToLatest() -> Stream._Private.SwitchToLatest<Self>
Return Value
some StreamProtocol<Output == Self.Output.StreamType.Output>
-
flatten()
Extension methodFlattens the stream of elements from multiple streams to appear as if they were coming from a single stream, by concatenating the inner streams as they are yielded by this stream and completes when this stream and the last inner one complete.
let a = Stream.sequence(0..<3).map { Stream.sequence($0..<($0 + 3)) } var s = a.flatten() assert(s.next() == 0) assert(s.next() == 1) assert(s.next() == 2) assert(s.next() == 1) assert(s.next() == 2) assert(s.next() == 3) assert(s.next() == 2) assert(s.next() == 3) assert(s.next() == 4) assert(s.next() == nil)
Declaration
Swift
@inlinable public func flatten() -> Stream._Private.Flatten<Self>
Return Value
some StreamProtocol<Output == Self.Output.StreamType.Output>
-
tryMap(_:)
Extension methodTransforms the output from this stream with a provided error-throwing closure.
enum UltimateQuestionError: Error { case wrongAnswer } var s = Stream.sequence(0..<3).tryMap { answer -> Int in if answer == 1 { throw UltimateQuestionError.wrongAnswer } return answer } assert(try! s.next()!.get() == 0) assert(try? s.next()!.get() == nil) assert(try! s.next()!.get() == 2) assert(s.next() == nil)
Declaration
Return Value
some StreamProtocol<Output == Result<T, Error>>
-
setFailureType(to:)
Extension methodConverts this stream to a failable one with the specified failure type.
You typically use this combinator to match the error types of two mismatched failable streams.
enum UltimateQuestionError: Error { case wrongAnswer } var s = Stream.sequence(0..<3).setFailureType(to: UltimateQuestionError.self) assert(try! s.next()!.get() == 0) assert(try! s.next()!.get() == 1) assert(try! s.next()!.get() == 2) assert(s.next() == nil)
Declaration
Return Value
some StreamProtocol<Output == Result<Self.Output, E>>
-
match(success:failure:)
Extension method.
enum UltimateQuestionError: Error { case wrongAnswer } func validateAnswer(_ answer: Int) throws -> Int { if answer == 1 { throw UltimateQuestionError.wrongAnswer } return answer } let a = Stream.sequence(0..<3).tryMap(validateAnswer) var s = a.match( success: String.init, failure: String.init(describing:) ) assert(s.next() == "0") assert(s.next() == "wrongAnswer") assert(s.next() == "2") assert(s.next() == nil)
Declaration
Return Value
some StreamProtocol<Output == T>
-
mapValue(_:)
Extension method.
enum UltimateQuestionError: Error { case wrongAnswer } func validateAnswer(_ answer: Int) throws -> Int { if answer == 1 { throw UltimateQuestionError.wrongAnswer } return answer } var s = Stream.sequence(0..<3).tryMap(validateAnswer).mapValue { $0 + 1 } assert(try! s.next()!.get() == 1) assert(try? s.next()!.get() == nil) assert(try! s.next()!.get() == 3) assert(s.next() == nil)
Declaration
Return Value
some StreamProtocol<Output == Result<T, Self.Output.Failure>>
-
mapError(_:)
Extension method.
enum UltimateQuestionError: Error { case wrongAnswer } func validateAnswer(_ answer: Int) throws -> Int { if answer == 1 { throw UltimateQuestionError.wrongAnswer } return answer } struct WrappedError: Error { let error: Error } var s = Stream.sequence(0..<3).tryMap(validateAnswer).mapError { WrappedError(error: $0) } assert(try! s.next()!.get() == 0) assert(try? s.next()!.get() == nil) assert(try! s.next()!.get() == 2) assert(s.next() == nil)
Declaration
Return Value
some StreamProtocol<Output == Result<Self.Output.Success, E>>
-
flattenResult()
Extension method.
enum UltimateQuestionError: Error { case wrongAnswer } func validateAnswer(_ answer: Int) throws -> Int { if answer == 1 { throw UltimateQuestionError.wrongAnswer } return answer } let a = Stream.sequence(0..<3) .tryMap(validateAnswer) .mapValue(Result<Int, Error>.success) var s = a.flattenResult() assert(try! s.next()!.get() == 0) assert(try? s.next()!.get() == nil) assert(try! s.next()!.get() == 2) assert(s.next() == nil)
Declaration
Swift
@inlinable public func flattenResult() -> Stream._Private.FlattenResult<Self>
Return Value
some StreamProtocol<Output == Result<Self.Output.Success.Success, Self.Output.Failure>>
-
setFailureType(to:)
Extension methodChanges the failure type declared by this stream.
You typically use this combinator to match the error types of two mismatched result streams.
enum UltimateQuestionError: Error { case wrongAnswer } let a = Stream.sequence(0..<3).map(Result<Int, Never>.success) var s = a.setFailureType(to: UltimateQuestionError.self) assert(try! s.next()!.get() == 0) assert(try! s.next()!.get() == 1) assert(try! s.next()!.get() == 2) assert(s.next() == nil)
Declaration
Return Value
some StreamProtocol<Output == Result<Self.Output.Success, E>>
-
assertNoError(_:file:line:)
Extension methodRaises a fatal error when this stream fails.
Declaration
Swift
@inlinable public func assertNoError(_ prefix: String = "", file: StaticString = #file, line: UInt = #line) -> Stream._Private.AssertNoError<Self>
Return Value
some StreamProtocol<Output == Self.Output.Success>
-
completeOnError()
Extension methodEnsures this stream completes on the first error that occurs.
Result streams in Futures do not terminate when an error occurs. This combinator allows you to selectively change that behavior on a per- stream basis so that it terminates when an error occurs.
enum UltimateQuestionError: Error { case wrongAnswer } func validateAnswer(_ answer: Int) throws -> Int { if answer == 1 { throw UltimateQuestionError.wrongAnswer } return answer } let a = Stream.sequence(0..<3).tryMap(validateAnswer) var s = a.completeOnError() assert(try! s.next()!.get() == 0) assert(try? s.next()!.get() == nil) assert(s.next() == nil)
Declaration
Swift
@inlinable public func completeOnError() -> Stream._Private.CompleteOnError<Self>
Return Value
some StreamProtocol<Output == Self.Output>
-
replaceError(with:)
Extension methodReplaces any errors in the stream with the provided element.
enum UltimateQuestionError: Error { case wrongAnswer } func validateAnswer(_ answer: Int) throws -> Int { if answer == 1 { throw UltimateQuestionError.wrongAnswer } return answer } let a = Stream.sequence(0..<3).tryMap(validateAnswer) var s = a.replaceError(with: 42) assert(s.next() == 0) assert(s.next() == 42) assert(s.next() == 2) assert(s.next() == nil)
Declaration
Return Value
some StreamProtocol<Output == Self.Output.Success>
-
catchError(_:)
Extension methodHandles errors from this stream by replacing them with the output from another future.
enum UltimateQuestionError: Error { case wrongAnswer } func validateAnswer(_ answer: Int) throws -> Int { if answer == 1 { throw UltimateQuestionError.wrongAnswer } return answer } var s = Stream.sequence(0..<3).tryMap(validateAnswer).catchError { _ in Stream.just(42) } assert(s.next() == 0) assert(s.next() == 42) assert(s.next() == 2) assert(s.next() == nil)
Declaration
Return Value
some StreamProtocol<Output == Self.Output.Success>
-
breakpoint(ready:pending:complete:)
Extension methodRaises a debugger signal when a provided closure needs to stop the process in the debugger.
When any of the provided closures returns
true
, this stream raises theSIGTRAP
signal to stop the process in the debugger.Declaration
Return Value
some StreamProtocol<Output == Self.Output>
-
handleEvents(ready:pending:complete:)
Extension methodPerforms the specified closures when poll events occur.
Declaration
Return Value
some StreamProtocol<Output == Self.Output>
-
print(_:to:)
Extension methodPrints log messages for all poll events.
Declaration
Swift
@inlinable public func print(_ prefix: String = "", to stream: TextOutputStream? = nil) -> Stream._Private.Print<Self>
Return Value
some StreamProtocol<Output == Self.Output>
-
breakpointOnError()
Extension methodRaises a debugger signal upon receiving a failure.
Declaration
Swift
@inlinable public func breakpointOnError() -> Stream._Private.Breakpoint<Self>
Return Value
some StreamProtocol<Output == Self.Output>