StreamProtocol

public protocol StreamProtocol : StreamConvertible where Self == Self.StreamType

A protocol that defines an abstraction for a series of asynchronously produced discrete values over time, such as byte buffers read from a socket or a file, incoming requests to a server, or sampled mouse input events.

Streams yield elements. Similar to Swift’s IteratorProtocol, streams signify completion by yielding nil. Streams can be combined into larger operations with combinators, such as map(), flatMap(), etc.

The type of element yielded by a stream is specified by its Output type. Streams that must communicate success or failure, must do so by encoding that information in the output type, typically using Swift.Result. Futures comes with a number of convenience types and combinators for working with Swift.Result (see FuturesResult module).

You typically create streams using the convenience methods on Stream. You can also create custom streams by adopting this protocol in your types. Creating streams is always an asynchronous operation. It is guaranteed that the producer of elements the stream wraps will only be asked for elements after the stream is submitted to an executor and it will always be on that executor’s context (see ExecutorProtocol). In other words, streams do nothing unless submitted to an executor.

For a stream to be submitted to an executor and yield elements, it must be converted to a future that represents the stream’s completion. As a convenience, this is done automatically if the stream output is Void.

The semantics for cancellation, and memory and concurrency management are the same as for futures (see FutureProtocol).

  • makeFuture() Extension method

    Returns a future that will complete with a 2-tuple containing the next element from this stream and the stream itself.

    var output: Int?
    var s = Stream.sequence(0..<3)
    
    (output, s) = s.makeFuture().wait()
    assert(output == 0)
    
    (output, s) = s.makeFuture().wait()
    assert(output == 1)
    
    (output, s) = s.makeFuture().wait()
    assert(output == 2)
    
    (output, s) = s.makeFuture().wait()
    assert(output == nil)
    

    Declaration

    Swift

    @inlinable
    public func makeFuture() -> Stream._Private.Future<Self>

    Return Value

    some FutureProtocol<Output == (Self.Output?, Self)>

  • makeStream() Extension method

    .

    var s = Stream.sequence(0..<3).makeStream()
    assert(s.next() == 0)
    assert(s.next() == 1)
    assert(s.next() == 2)
    assert(s.next() == nil)
    

    Declaration

    Swift

    public func makeStream() -> Self

    Return Value

    some StreamProtocol<Output == Self.Output>

  • makeReference() Extension method

    .

    var s1 = Stream.sequence(0..<3).makeReference()
    var s2 = Stream.sequence(3..<6).makeReference()
    var s = Stream.join(s1, s2)
    assert(s.next()! == (0, 3))
    assert(s1.next() == 1)
    assert(s2.next() == 4)
    assert(s.next()! == (2, 5))
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func makeReference() -> Stream._Private.Reference<Self>

    Return Value

    some StreamProtocol<Output == Self.Output>

  • next() Extension method

    Synchronously polls this stream on the current thread’s executor until it yields the next element or completes.

    var s = Stream.sequence(0..<3)
    assert(s.next() == 0)
    assert(s.next() == 1)
    assert(s.next() == 2)
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public mutating func next() -> Output?

    Return Value

    Self.Output?

  • next(on:) Extension method

    Synchronously polls this stream using the provided blocking executor until it yields the next element or completes.

    let executor = ThreadExecutor.current
    var s = Stream.sequence(0..<3)
    assert(s.next(on: executor) == 0)
    assert(s.next(on: executor) == 1)
    assert(s.next(on: executor) == 2)
    assert(s.next(on: executor) == nil)
    

    Declaration

    Swift

    @inlinable
    public mutating func next<E>(on executor: E) -> Output? where E : BlockingExecutor

    Return Value

    Self.Output?

  • forward(to:close:) Extension method

    .

    let sink = Sink.collect(itemType: Int.self)
    let f = Stream.sequence(0..<3).forward(to: sink)
    f.ignoreOutput().wait()
    assert(sink.elements == [0, 1, 2])
    

    Declaration

    Swift

    @inlinable
    public func forward<S>(to sink: S, close: Bool = true) -> Stream._Private.Forward<S, Self> where S : SinkConvertible, Self.Output == S.SinkType.Input

    Return Value

    some FutureProtocol<Output == Result<Void, S.Failure>>

  • abort(when:) Extension method

    Declaration

    Swift

    @inlinable
    public func abort<U>(when f: U) -> Stream._Private.Abort<U, Self> where U : FutureConvertible, U.FutureType.Output == Void

    Return Value

    some StreamProtocol<Output == Self.Output>

  • abort(when:) Extension method

    Declaration

    Swift

    @inlinable
    public func abort<U>(when f: @escaping () -> U) -> Stream._Private.Abort<U, Self>

    Return Value

    some StreamProtocol<Output == Self.Output>

  • poll(on:) Extension method

    Ensures this stream is polled on the given executor.

    The returned stream retains the executor for its whole lifetime.

    var s = Stream.sequence(0..<3)
        .poll(on: QueueExecutor.global)
        .assertNoError()
    assert(s.next() == 0)
    assert(s.next() == 1)
    assert(s.next() == 2)
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func poll<E>(on executor: E) -> Stream._Private.PollOn<E, Self> where E : ExecutorProtocol

    Return Value

    some StreamProtocol<Output == Result<Self.Output, E.Failure>>

  • yield(maxElements:) Extension method

    Precondition

    maxElements > 0

    Declaration

    Swift

    @inlinable
    public func yield(maxElements: Int) -> Stream._Private.Yield<Self>

    Return Value

    some StreamProtocol<Output == Self.Output>

  • eraseToAnyStream() Extension method

    .

    var s = Stream.sequence(0..<3).eraseToAnyStream()
    assert(s.next() == 0)
    assert(s.next() == 1)
    assert(s.next() == 2)
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func eraseToAnyStream() -> AnyStream<Output>

    Return Value

    AnyStream<Self.Output>

  • multicast(replay:) Extension method

    Multicasts elements from this stream to multiple tasks, where each task sees every element, that run on the same executor.

    Use this combinator when you want to use reference semantics, such as storing a stream instance in a property.

    let iterations = 1_000
    var counter0 = 0
    var counter1 = 0
    var counter2 = 0
    
    let stream = Stream.sequence(0..<iterations).forEach {
        counter0 += $0
    }
    
    let multicast = stream.multicast()
    let stream1 = multicast.makeStream().map { counter1 += $0 }
    let stream2 = multicast.makeStream().map { counter2 += $0 }
    
    ThreadExecutor.current.submit(stream1)
    ThreadExecutor.current.submit(stream2)
    ThreadExecutor.current.wait()
    
    let expected = (0..<iterations).reduce(into: 0, +=)
    assert(counter0 == expected) // 499_500
    assert(counter1 == expected)
    assert(counter2 == expected)
    

    Declaration

    Swift

    @inlinable
    public func multicast(replay: Stream.ReplayStrategy = .none) -> Stream._Private.Multicast<Self>

    Return Value

    some StreamProtocol<Output == Self.Output>

  • Declaration

    Swift

    @inlinable
    public func eraseToAnyMulticastStream(replay: Stream.ReplayStrategy = .none) -> AnyMulticastStream<Output>

    Return Value

    AnyMulticastStream<Self.Output>

  • share(replay:) Extension method

    Multicasts elements from this stream to multiple tasks, where each task sees every element, that may run on any executor.

    Use this combinator when you want to use reference semantics, such as storing a stream instance in a property.

    let iterations = 1_000
    var counter0 = 0
    var counter1 = 0
    var counter2 = 0
    
    let stream = Stream.sequence(0..<iterations).forEach {
        counter0 += $0
    }
    
    let shared = stream.share()
    let stream1 = shared.makeStream().map { counter1 += $0 }
    let stream2 = shared.makeStream().map { counter2 += $0 }
    
    let task1 = QueueExecutor(label: "queue 1").spawn(stream1)
    let task2 = QueueExecutor(label: "queue 2").spawn(stream2)
    
    ThreadExecutor.current.submit(task1)
    ThreadExecutor.current.submit(task2)
    ThreadExecutor.current.wait()
    
    let expected = (0..<iterations).reduce(into: 0, +=)
    assert(counter0 == expected) // 499_500
    assert(counter1 == expected)
    assert(counter2 == expected)
    

    Declaration

    Swift

    @inlinable
    public func share(replay: Stream.ReplayStrategy = .none) -> Stream._Private.Share<Self>

    Return Value

    some StreamProtocol<Output == Self.Output>

  • Declaration

    Swift

    @inlinable
    public func eraseToAnySharedStream(replay: Stream.ReplayStrategy = .none) -> AnySharedStream<Output>

    Return Value

    AnySharedStream<Self.Output>

  • map(_:) Extension method

    Transforms all elements from this stream with a provided closure.

    var s = Stream.sequence(0..<3).map {
        $0 + 1
    }
    assert(s.next() == 1)
    assert(s.next() == 2)
    assert(s.next() == 3)
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func map<T>(_ transform: @escaping (Output) -> T) -> Stream._Private.Map<T, Self>

    Return Value

    some StreamProtocol<Output == T>

  • map(_:) Extension method

    .

    struct Data {
        let a: Int
    }
    let data: [Data] = [
        .init(a: 0),
        .init(a: 3),
        .init(a: 6),
    ]
    var s = Stream.sequence(data).map(\.a)
    assert(s.next() == 0)
    assert(s.next() == 3)
    assert(s.next() == 6)
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func map<T>(_ keyPath: KeyPath<Output, T>) -> Stream._Private.MapKeyPath<T, Self>

    Return Value

    some StreamProtocol<Output == T>

  • map(_:_:) Extension method

    .

    struct Data {
        let a, b: Int
    }
    let data: [Data] = [
        .init(a: 0, b: 1),
        .init(a: 3, b: 4),
        .init(a: 6, b: 7),
    ]
    var s = Stream.sequence(data).map(\.a, \.b)
    assert(s.next() == (0, 1))
    assert(s.next() == (3, 4))
    assert(s.next() == (6, 7))
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func map<T0, T1>(_ keyPath0: KeyPath<Output, T0>, _ keyPath1: KeyPath<Output, T1>) -> Stream._Private.MapKeyPath2<T0, T1, Self>

    Return Value

    some StreamProtocol<Output == (T0, T1)>

  • map(_:_:_:) Extension method

    .

    struct Data {
        let a, b, c: Int
    }
    let data: [Data] = [
        .init(a: 0, b: 1, c: 2),
        .init(a: 3, b: 4, c: 5),
        .init(a: 6, b: 7, c: 8),
    ]
    var s = Stream.sequence(data).map(\.a, \.b, \.c)
    assert(s.next() == (0, 1, 2))
    assert(s.next() == (3, 4, 5))
    assert(s.next() == (6, 7, 8))
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func map<T0, T1, T2>(_ keyPath0: KeyPath<Output, T0>, _ keyPath1: KeyPath<Output, T1>, _ keyPath2: KeyPath<Output, T2>) -> Stream._Private.MapKeyPath3<T0, T1, T2, Self>

    Return Value

    some StreamProtocol<Output == (T0, T1, T2)>

  • flatMap(_:) Extension method

    Transforms each element from this stream into a new stream using a provided closure and merges the output from all returned streams into a single stream of output.

    var s = Stream.sequence(0..<3).flatMap {
        Stream.sequence(0...($0 + 1))
    }
    assert(s.next() == 0)
    assert(s.next() == 1)
    assert(s.next() == 0)
    assert(s.next() == 1)
    assert(s.next() == 2)
    assert(s.next() == 0)
    assert(s.next() == 1)
    assert(s.next() == 2)
    assert(s.next() == 3)
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func flatMap<U>(_ transform: @escaping (Output) -> U) -> Stream._Private.FlatMap<U, Self>

    Return Value

    some StreamProtocol<Output == U.StreamType.Output>

  • scan(_:_:) Extension method

    Transforms elements from this stream by providing the current element to a closure along with the last value returned by the closure.

    var s = Stream.sequence(0..<3).scan(0) {
        $0 + $1
    }
    assert(s.next() == 0)
    assert(s.next() == 1)
    assert(s.next() == 3)
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func scan<T>(_ initialResult: T, _ nextPartialResult: @escaping (T, Output) -> T) -> Stream._Private.Scan<T, Self>

    Return Value

    some StreamProtocol<Output == T>

  • replaceNil(with:) Extension method

    Replaces nil elements in the stream with the provided element.

    var s = Stream.sequence([0, nil, 2]).replaceNil(with: 1)
    assert(s.next() == 0)
    assert(s.next() == 1)
    assert(s.next() == 2)
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func replaceNil<T>(with output: T) -> Stream._Private.Map<T, Self> where Self.Output == T?

    Return Value

    some StreamProtocol<Output == T>

  • match(some:none:) Extension method

    .

    var s = Stream.sequence([0, nil, 2]).match(
        some: String.init,
        none: { "NaN" }
    )
    assert(s.next() == "0")
    assert(s.next() == "NaN")
    assert(s.next() == "2")
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func match<T>(
        some: @escaping (Output.WrappedType) -> T,
        none: @escaping () -> T
    ) -> Stream._Private.MatchOptional<T, Self>

    Return Value

    some StreamProtocol<Output == T>

  • match(left:right:) Extension method

    .

    func transform(_ value: Int) -> Either<Int, Float> {
        if value == 0 {
            return .left(value)
        } else {
            return .right(.init(value))
        }
    }
    var s = Stream.sequence(0..<3).map(transform).match(
        left: String.init,
        right: String.init(describing:)
    )
    assert(s.next() == "0")
    assert(s.next() == "1.0")
    assert(s.next() == "2.0")
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func match<T>(
        left: @escaping (Output.Left) -> T,
        right: @escaping (Output.Right) -> T
    ) -> Stream._Private.MatchEither<T, Self>

    Return Value

    some StreamProtocol<Output == T>

  • filter(_:) Extension method

    Yields all elements that match a provided closure.

    var s = Stream.sequence(0..<3).filter {
        $0 > 0
    }
    assert(s.next() == 1)
    assert(s.next() == 2)
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func filter(_ isIncluded: @escaping (Output) -> Bool) -> Stream._Private.Filter<Self>

    Return Value

    some StreamProtocol<Output == Self.Output>

  • compactMap(_:) Extension method

    Calls a closure with each element from this stream and yields any returned optional that has a value.

    var s = Stream.sequence(0..<3).compactMap {
        $0 > 0 ? $0 * 2 : nil
    }
    assert(s.next() == 2)
    assert(s.next() == 4)
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func compactMap<T>(_ transform: @escaping (Output) -> T?) -> Stream._Private.CompactMap<T, Self>

    Return Value

    some StreamProtocol<Output == T>

  • replaceEmpty(with:) Extension method

    Replaces an empty stream with the provided element.

    If this stream completes without yielding any elements, the returned stream yields the provided element, then completes normally.

    var s = Stream.empty().replaceEmpty(with: 42)
    assert(s.next() == 42)
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func replaceEmpty(with output: Output) -> Stream._Private.ReplaceEmpty<Self>

    Return Value

    some StreamProtocol<Output == T>

  • removeDuplicates(by:) Extension method

    Calls a predicate closure with two consecutive elements from this stream and yields the current element if it passes the predicate.

    let data = [
        (1, "A"),
        (2, "B"),
        (3, "B"),
        (4, "C"),
    ]
    var s = Stream.sequence(data).removeDuplicates {
        $0.1 == $1.1
    }
    assert(s.next()! == (1, "A"))
    assert(s.next()! == (2, "B"))
    assert(s.next()! == (4, "C"))
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func removeDuplicates(by predicate: @escaping (Output, Output) -> Bool) -> Stream._Private.RemoveDuplicates<Self>

    Return Value

    some StreamProtocol<Output == Self.Output>

  • removeDuplicates() Extension method

    Yields only elements that don’t match the previous element.

    var s = Stream.sequence([1, 2, 2, 2, 3]).removeDuplicates()
    assert(s.next() == 1)
    assert(s.next() == 2)
    assert(s.next() == 3)
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func removeDuplicates() -> Stream._Private.RemoveDuplicates<Self>

    Return Value

    some StreamProtocol<Output == Self.Output>

  • collect() Extension method

    Returns a future that collects all elements from this stream, and completes with a single array of the collection when it completes.

    This combinator uses an unbounded amount of memory to store the yielded values.

    let f = Stream.sequence(0..<3).collect()
    assert(f.wait() == [0, 1, 2])
    

    Declaration

    Swift

    @inlinable
    public func collect() -> Stream._Private.ReduceInto<[Output], Self>

    Return Value

    some FutureProtocol<Output == [Self.Output]>

  • replaceOutput(with:) Extension method

    Returns a future that ignores all elements from this stream, and completes with a given value when this stream completes.

    let f = Stream.sequence(0..<3).replaceOutput(with: 42)
    assert(f.wait() == 42)
    

    Declaration

    Swift

    @inlinable
    public func replaceOutput<T>(with output: T) -> Stream._Private.Reduce<T, Self>

    Return Value

    some FutureProtocol<Output == T>

  • ignoreOutput() Extension method

    Returns a future that ignores all elements from this stream, and completes when this stream completes.

    let f = Stream.sequence(0..<3).ignoreOutput()
    assert(f.wait() == ())
    

    Declaration

    Swift

    @inlinable
    public func ignoreOutput() -> Stream._Private.Reduce<Void, Self>

    Return Value

    some FutureProtocol<Output == Void>

  • count() Extension method

    Returns a future that completes with the number of elements from this stream.

    let f = Stream.sequence(0..<3).count()
    assert(f.wait() == 3)
    

    Declaration

    Swift

    @inlinable
    public func count() -> Stream._Private.Reduce<Int, Self>

    Return Value

    some FutureProtocol<Output == Int>

  • reduce(_:_:) Extension method

    Returns a future that applies a closure that accumulates each element of this stream and completes with the final result when this stream completes.

    let f = Stream.sequence(0..<3).reduce(0) {
        $0 + $1
    }
    assert(f.wait() == 3)
    

    Declaration

    Swift

    @inlinable
    public func reduce<T>(_ initialResult: T, _ nextPartialResult: @escaping (T, Output) -> T) -> Stream._Private.Reduce<T, Self>

    Return Value

    some FutureProtocol<Output == T>

  • reduce(into:_:) Extension method

    Returns a future that applies a closure that combines each element of this stream into a mutable state and completes with the final result when this stream completes.

    let f = Stream.sequence(0..<3).reduce(into: []) {
        $0.append($1 + 1)
    }
    assert(f.wait() == [1, 2, 3])
    

    Declaration

    Swift

    @inlinable
    public func reduce<T>(into state: T, _ reducer: @escaping (inout T, Output) -> Void) -> Stream._Private.ReduceInto<T, Self>

    Return Value

    some FutureProtocol<Output == T>

  • contains(_:) Extension method

    Returns a future that completes with a Boolean value upon receiving an element equal to the argument.

    let f = Stream.sequence(0..<3).contains(2)
    assert(f.wait())
    

    Declaration

    Swift

    @inlinable
    public func contains(_ output: Output) -> Stream._Private.Contains<Self>

    Return Value

    some FutureProtocol<Output == Bool>

  • contains(where:) Extension method

    Returns a future that completes with a Boolean value upon receiving an element that satisfies the predicate closure.

    let f = Stream.sequence(0..<3).contains {
        $0 == 2
    }
    assert(f.wait())
    

    Declaration

    Swift

    @inlinable
    public func contains(where predicate: @escaping (Output) -> Bool) -> Stream._Private.ContainsWhere<Self>

    Return Value

    some FutureProtocol<Output == Bool>

  • allSatisfy(_:) Extension method

    Returns a future that completes with a Boolean value that indicates whether all received elements pass a given predicate.

    let f = Stream.sequence(0..<3).allSatisfy {
        $0 < 3
    }
    assert(f.wait())
    

    Declaration

    Swift

    @inlinable
    public func allSatisfy(_ predicate: @escaping (Output) -> Bool) -> Stream._Private.AllSatisfy<Self>

    Return Value

    some FutureProtocol<Output == Bool>

  • drop(untilOutputFrom:) Extension method

    Ignores elements from this stream until the given future completes.

    var pollCount = 0
    let f = AnyFuture<Void> { _ in
        if pollCount == 2 {
            return .ready(())
        }
        pollCount += 1
        return .pending
    }
    var s = Stream.sequence(0..<3).drop(untilOutputFrom: f)
    assert(s.next() == 2)
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func drop<F>(untilOutputFrom future: F) -> Stream._Private.DropUntilOutput<F, Self> where F : FutureProtocol

    Return Value

    some StreamProtocol<Output == Self.Output>

  • drop(while:) Extension method

    Omits elements from this stream until a given closure returns false, before yielding all remaining elements.

    var s = Stream.sequence(0..<3).drop(while: { $0 < 2 })
    assert(s.next() == 2)
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func drop(while predicate: @escaping (Output) -> Bool) -> Stream._Private.DropWhile<Self>

    Return Value

    some StreamProtocol<Output == Self.Output>

  • dropFirst(_:) Extension method

    Omits the specified number of elements before yielding subsequent elements.

    var s = Stream.sequence(0..<3).dropFirst(2)
    assert(s.next() == 2)
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func dropFirst(_ count: Int = 1) -> Stream._Private.Drop<Self>

    Return Value

    some StreamProtocol<Output == Self.Output>

  • append(_:) Extension method

    Appends the specified elements to this stream’s output.

    var s = Stream.sequence(0..<3).append(3, 4, 5)
    assert(s.next() == 0)
    assert(s.next() == 1)
    assert(s.next() == 2)
    assert(s.next() == 3)
    assert(s.next() == 4)
    assert(s.next() == 5)
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func append(_ elements: Output...) -> Stream._Private.Concatenate<Self, Stream._Private.Sequence<[Output]>>

    Return Value

    some StreamProtocol<Output == Self.Output>

  • append(_:) Extension method

    Appends a specified sequence to this stream’s output.

    var s = Stream.sequence(0..<3).append(3..<6)
    assert(s.next() == 0)
    assert(s.next() == 1)
    assert(s.next() == 2)
    assert(s.next() == 3)
    assert(s.next() == 4)
    assert(s.next() == 5)
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func append<C>(_ elements: C) -> Stream._Private.Concatenate<Self, Stream._Private.Sequence<C>> where C : Sequence, Self.Output == C.Element

    Return Value

    some StreamProtocol<Output == Self.Output>

  • append(_:) Extension method

    Appends the elements from the given stream to this stream’s output.

    let a = Stream.sequence(3..<6)
    var s = Stream.sequence(0..<3).append(a)
    assert(s.next() == 0)
    assert(s.next() == 1)
    assert(s.next() == 2)
    assert(s.next() == 3)
    assert(s.next() == 4)
    assert(s.next() == 5)
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func append<S>(_ stream: S) -> Stream._Private.Concatenate<Self, S> where S : StreamProtocol, Self.Output == S.Output

    Return Value

    some StreamProtocol<Output == Self.Output>

  • prepend(_:) Extension method

    Prepends the specified elements to this stream’s output.

    var s = Stream.sequence(0..<3).prepend(3, 4, 5)
    assert(s.next() == 3)
    assert(s.next() == 4)
    assert(s.next() == 5)
    assert(s.next() == 0)
    assert(s.next() == 1)
    assert(s.next() == 2)
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func prepend(_ elements: Output...) -> Stream._Private.Concatenate<Stream._Private.Sequence<[Output]>, Self>

    Return Value

    some StreamProtocol<Output == Self.Output>

  • prepend(_:) Extension method

    Prepends a specified sequence to this stream’s output.

    var s = Stream.sequence(0..<3).prepend(3..<6)
    assert(s.next() == 3)
    assert(s.next() == 4)
    assert(s.next() == 5)
    assert(s.next() == 0)
    assert(s.next() == 1)
    assert(s.next() == 2)
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func prepend<S>(_ elements: S) -> Stream._Private.Concatenate<Stream._Private.Sequence<S>, Self> where S : Sequence, Self.Output == S.Element

    Return Value

    some StreamProtocol<Output == Self.Output>

  • prepend(_:) Extension method

    Prepends the elements from the given stream to this stream’s output.

    let a = Stream.sequence(3..<6)
    var s = Stream.sequence(0..<3).prepend(a)
    assert(s.next() == 3)
    assert(s.next() == 4)
    assert(s.next() == 5)
    assert(s.next() == 0)
    assert(s.next() == 1)
    assert(s.next() == 2)
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func prepend<S>(_ stream: S) -> Stream._Private.Concatenate<S, Self> where S : StreamProtocol, Self.Output == S.Output

    Return Value

    some StreamProtocol<Output == Self.Output>

  • prefix(untilOutputFrom:) Extension method

    Yields elements from this stream until the given future completes.

    var pollCount = 0
    let f = AnyFuture<Void> { _ in
        if pollCount == 2 {
            return .ready(())
        }
        pollCount += 1
        return .pending
    }
    var s = Stream.sequence(0..<3).prefix(untilOutputFrom: f)
    assert(s.next() == 0)
    assert(s.next() == 1)
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func prefix<F>(untilOutputFrom future: F) -> Stream._Private.PrefixUntilOutput<F, Self> where F : FutureProtocol

    Return Value

    some StreamProtocol<Output == Self.Output>

  • prefix(_:) Extension method

    Yields elements from this stream up to the specified maximum count.

    var s = Stream.sequence(0..<3).prefix(2)
    assert(s.next() == 0)
    assert(s.next() == 1)
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func prefix(_ maxLength: Int) -> Stream._Private.Prefix<Self>

    Return Value

    some StreamProtocol<Output == Self.Output>

  • prefix(while:) Extension method

    Yields elements from this stream while elements satisfy a predicate closure.

    var s = Stream.sequence(0..<3).prefix(while: { $0 < 2 })
    assert(s.next() == 0)
    assert(s.next() == 1)
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func prefix(while predicate: @escaping (Output) -> Bool) -> Stream._Private.PrefixWhile<Self>

    Return Value

    some StreamProtocol<Output == Self.Output>

  • buffer(_:) Extension method

    Collects up to the specified number of elements, and then yields a single array of the collection.

    var s = Stream.sequence(0...4).buffer(2)
    assert(s.next() == [0, 1])
    assert(s.next() == [2, 3])
    assert(s.next() == [4])
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func buffer(_ count: Int) -> Stream._Private.Buffer<Self>

    Return Value

    some StreamProtocol<Output == [Self.Output]>

  • forEach(_:) Extension method

    Applies the given closure over each element from this stream.

    var buffer = [Int]()
    var s = Stream.sequence(0..<3).forEach {
        buffer.append($0)
    }
    assert(s.next() == 0)
    assert(s.next() == 1)
    assert(s.next() == 2)
    assert(s.next() == nil)
    assert(buffer == [0, 1, 2])
    

    Declaration

    Swift

    @inlinable
    public func forEach(_ body: @escaping (Output) -> Void) -> Stream._Private.ForEach<Self>

    Return Value

    some StreamProtocol<Output == Self.Output>

  • enumerate() Extension method

    Yields pairs (n, x), where n represents a consecutive integer starting at zero and x represents an element from this stream.

    var s = Stream.sequence(["A", "B", "C"]).enumerate()
    assert(s.next()! == (offset: 0, output: "A"))
    assert(s.next()! == (offset: 1, output: "B"))
    assert(s.next()! == (offset: 2, output: "C"))
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func enumerate() -> Stream._Private.Enumerate<Self>

    Return Value

    some StreamProtocol<Output == (Int, Self.Output)>

  • first() Extension method

    Returns a future that completes with the first element from this stream.

    let f = Stream.sequence(0..<3).first()
    assert(f.wait() == 0)
    

    Declaration

    Swift

    @inlinable
    public func first() -> Stream._Private.First<Self>

    Return Value

    some FutureProtocol<Output == Self.Output?>

  • first(where:) Extension method

    Returns a future that completes with the first element from this stream to satisfy a predicate closure.

    let f = Stream.sequence(0..<3).first(where: { $0 > 1 })
    assert(f.wait() == 2)
    

    Declaration

    Swift

    @inlinable
    public func first(where predicate: @escaping (Output) -> Bool) -> Stream._Private.FirstWhere<Self>

    Return Value

    some FutureProtocol<Output == Self.Output?>

  • last() Extension method

    Returns a future that completes with the last element from this stream.

    let f = Stream.sequence(0..<3).last()
    assert(f.wait() == 2)
    

    Declaration

    Swift

    @inlinable
    public func last() -> Stream._Private.Last<Self>

    Return Value

    some FutureProtocol<Output == Self.Output?>

  • last(where:) Extension method

    Returns a future that completes with the last element from this stream to satisfy a predicate closure.

    let f = Stream.sequence(0..<3).last(where: { $0 < 2 })
    assert(f.wait() == 1)
    

    Declaration

    Swift

    @inlinable
    public func last(where predicate: @escaping (Output) -> Bool) -> Stream._Private.LastWhere<Self>

    Return Value

    some FutureProtocol<Output == Self.Output?>

  • output(at:) Extension method

    Yields a specific element from this stream, indicated by its index in the sequence of yielded elements.

    var s = Stream.sequence(0..<3).output(at: 1)
    assert(s.next() == 1)
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func output(at index: Int) -> Stream._Private.Output<Self>

    Return Value

    some StreamProtocol<Output == Self.Output>

  • output(in:) Extension method

    Yields elements from this stream specified by their range in the sequence of yielded elements.

    var s = Stream.sequence(0..<3).output(in: 1..<3)
    assert(s.next() == 1)
    assert(s.next() == 2)
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func output<R>(in range: R) -> Stream._Private.Output<Self> where R : RangeExpression, R.Bound == Int

    Return Value

    some StreamProtocol<Output == Self.Output>

  • latest() Extension method

    Yields the latest available element by eagerly pulling elements out of this stream until it can’t yield any more elements.

    var s = Stream.sequence(0..<3).latest()
    assert(s.next() == 2)
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func latest() -> Stream._Private.Latest<Self>

    Return Value

    some StreamProtocol<Output == Self.Output>

  • merge(_:) Extension method

    Combines elements from this stream with those from another stream that yields elements of the same type, delivering an interleaved sequence of elements and completing when both streams complete.

    let a = Stream.sequence(0..<3)
    let b = Stream.sequence(3..<6)
    var s = a.merge(b)
    assert(s.next() == 0)
    assert(s.next() == 3)
    assert(s.next() == 1)
    assert(s.next() == 4)
    assert(s.next() == 2)
    assert(s.next() == 5)
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func merge<S>(_ other: S) -> Stream._Private.Merge<Self, S> where S : StreamProtocol, Self.Output == S.Output

    Return Value

    some StreamProtocol<Output == Self.Output>

  • merge(_:_:) Extension method

    Combines elements from this stream with those from two other streams that yield elements of the same type, delivering an interleaved sequence of elements and completing when all streams complete.

    let a = Stream.sequence(0..<3)
    let b = Stream.sequence(3..<6)
    let c = Stream.sequence(6..<9)
    var s = a.merge(b, c)
    assert(s.next() == 0)
    assert(s.next() == 3)
    assert(s.next() == 6)
    assert(s.next() == 1)
    assert(s.next() == 4)
    assert(s.next() == 7)
    assert(s.next() == 2)
    assert(s.next() == 5)
    assert(s.next() == 8)
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func merge<A, B>(_ a: A, _ b: B) -> Stream._Private.Merge3<Self, A, B> where A : StreamProtocol, B : StreamProtocol, Self.Output == A.Output, A.Output == B.Output

    Return Value

    some StreamProtocol<Output == Self.Output>

  • merge(_:_:_:) Extension method

    Combines elements from this stream with those from three other streams that yield elements of the same type, delivering an interleaved sequence of elements and completing when all streams complete.

    let a = Stream.sequence(0..<3)
    let b = Stream.sequence(3..<6)
    let c = Stream.sequence(6..<9)
    let d = Stream.sequence(9..<12)
    var s = a.merge(b, c, d)
    assert(s.next() == 0)
    assert(s.next() == 3)
    assert(s.next() == 6)
    assert(s.next() == 9)
    assert(s.next() == 1)
    assert(s.next() == 4)
    assert(s.next() == 7)
    assert(s.next() == 10)
    assert(s.next() == 2)
    assert(s.next() == 5)
    assert(s.next() == 8)
    assert(s.next() == 11)
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func merge<A, B, C>(_ a: A, _ b: B, _ c: C) -> Stream._Private.Merge4<Self, A, B, C> where A : StreamProtocol, B : StreamProtocol, C : StreamProtocol, Self.Output == A.Output, A.Output == B.Output, B.Output == C.Output

    Return Value

    some StreamProtocol<Output == Self.Output>

  • zip(_:) Extension method

    Combines elements from this stream with those from another stream by waiting until both streams have yielded an element and then yielding the oldest unconsumed element from each stream together as a tuple, completing when either stream completes.

    let a = Stream.sequence([1, 2])
    let b = Stream.sequence(["A", "B", "C"])
    var s = a.zip(b)
    assert(s.next() == (1, "A"))
    assert(s.next() == (2, "B"))
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func zip<S>(_ other: S) -> Stream._Private.Zip<Self, S> where S : StreamProtocol

    Return Value

    some StreamProtocol<Output == (Self.Output, S.Output)>

  • zip(_:_:) Extension method

    Combines elements from this stream with those from two other streams by waiting until all three streams have yielded an element and then yielding the oldest unconsumed element from each stream together as a tuple, completing when any of the streams completes.

    let a = Stream.sequence([1, 2])
    let b = Stream.sequence(["A", "B", "C"])
    let c = Stream.sequence(["X", "Y", "Z"])
    var s = a.zip(b, c)
    assert(s.next() == (1, "A", "X"))
    assert(s.next() == (2, "B", "Y"))
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func zip<A, B>(_ a: A, _ b: B) -> Stream._Private.Zip3<Self, A, B> where A : StreamProtocol, B : StreamProtocol

    Return Value

    some StreamProtocol<Output == (Self.Output, A.Output, B.Output)>

  • zip(_:_:_:) Extension method

    Combines elements from this stream with those from three other streams by waiting until all four streams have yielded an element and then yielding the oldest unconsumed element from each stream together as a tuple, completing when any of the streams completes.

    let a = Stream.sequence([1, 2])
    let b = Stream.sequence(["A", "B", "C"])
    let c = Stream.sequence(["X", "Y", "Z"])
    let d = Stream.sequence(3..<6)
    var s = a.zip(b, c, d)
    assert(s.next() == (1, "A", "X", 3))
    assert(s.next() == (2, "B", "Y", 4))
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func zip<A, B, C>(_ a: A, _ b: B, _ c: C) -> Stream._Private.Zip4<Self, A, B, C> where A : StreamProtocol, B : StreamProtocol, C : StreamProtocol

    Return Value

    some StreamProtocol<Output == (Self.Output, A.Output, B.Output, C.Output)>

  • join(_:) Extension method

    Combines elements from this stream with those from another stream and delivers pairs of elements as tuples when either stream yields an element, completing when both of the streams complete.

    This is the combinator typically called combineLatest in other frameworks.

    let a = Stream.sequence([1, 2])
    let b = Stream.sequence(["A", "B", "C"])
    var s = a.join(b)
    assert(s.next() == (1, "A"))
    assert(s.next() == (2, "B"))
    assert(s.next() == (2, "C"))
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func join<S>(_ other: S) -> Stream._Private.Join<Self, S> where S : StreamProtocol

    Return Value

    some StreamProtocol<Output == (Self.Output, S.Output)>

  • join(_:_:) Extension method

    Combines elements from this stream with those from two other streams and delivers groups of elements as tuples when any stream yields an element, completing when all of the streams complete.

    This is the combinator typically called combineLatest in other frameworks.

    let a = Stream.sequence([1, 2])
    let b = Stream.sequence(["A", "B", "C"])
    let c = Stream.sequence(["X", "Y", "Z"])
    var s = a.join(b, c)
    assert(s.next() == (1, "A", "X"))
    assert(s.next() == (2, "B", "Y"))
    assert(s.next() == (2, "C", "Z"))
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func join<A, B>(_ a: A, _ b: B) -> Stream._Private.Join3<Self, A, B> where A : StreamProtocol, B : StreamProtocol

    Return Value

    some StreamProtocol<Output == (Self.Output, A.Output, B.Output)>

  • join(_:_:_:) Extension method

    Combines elements from this stream with those from three other streams and delivers groups of elements as tuples when any stream yields an element, completing when all of the streams complete.

    This is the combinator typically called combineLatest in other frameworks.

    let a = Stream.sequence([1, 2])
    let b = Stream.sequence(["A", "B", "C"])
    let c = Stream.sequence(["X", "Y", "Z"])
    let d = Stream.sequence(3..<6)
    var s = a.join(b, c, d)
    assert(s.next() == (1, "A", "X", 3))
    assert(s.next() == (2, "B", "Y", 4))
    assert(s.next() == (2, "C", "Z", 5))
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func join<A, B, C>(_ a: A, _ b: B, _ c: C) -> Stream._Private.Join4<Self, A, B, C> where A : StreamProtocol, B : StreamProtocol, C : StreamProtocol

    Return Value

    some StreamProtocol<Output == (Self.Output, A.Output, B.Output, C.Output)>

  • switchToLatest() Extension method

    Flattens the stream of elements from multiple streams to appear as if they were coming from a single stream, by switching the inner stream as new ones are yielded by this stream and completing when this stream and the last inner one complete.

    let a = Stream.sequence(0..<3).map {
        Stream.sequence($0..<($0 + 3))
    }
    var s = a.switchToLatest()
    assert(s.next() == 2)
    assert(s.next() == 3)
    assert(s.next() == 4)
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func switchToLatest() -> Stream._Private.SwitchToLatest<Self>

    Return Value

    some StreamProtocol<Output == Self.Output.StreamType.Output>

  • flatten() Extension method

    Flattens the stream of elements from multiple streams to appear as if they were coming from a single stream, by concatenating the inner streams as they are yielded by this stream and completes when this stream and the last inner one complete.

    let a = Stream.sequence(0..<3).map {
        Stream.sequence($0..<($0 + 3))
    }
    var s = a.flatten()
    assert(s.next() == 0)
    assert(s.next() == 1)
    assert(s.next() == 2)
    assert(s.next() == 1)
    assert(s.next() == 2)
    assert(s.next() == 3)
    assert(s.next() == 2)
    assert(s.next() == 3)
    assert(s.next() == 4)
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func flatten() -> Stream._Private.Flatten<Self>

    Return Value

    some StreamProtocol<Output == Self.Output.StreamType.Output>

  • tryMap(_:) Extension method

    Transforms the output from this stream with a provided error-throwing closure.

    enum UltimateQuestionError: Error {
        case wrongAnswer
    }
    
    var s = Stream.sequence(0..<3).tryMap { answer -> Int in
        if answer == 1 {
            throw UltimateQuestionError.wrongAnswer
        }
        return answer
    }
    
    assert(try! s.next()!.get() == 0)
    assert(try? s.next()!.get() == nil)
    assert(try! s.next()!.get() == 2)
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func tryMap<T>(_ catching: @escaping (Output) throws -> T) -> Stream._Private.TryMap<T, Self>

    Return Value

    some StreamProtocol<Output == Result<T, Error>>

  • setFailureType(to:) Extension method

    Converts this stream to a failable one with the specified failure type.

    You typically use this combinator to match the error types of two mismatched failable streams.

    enum UltimateQuestionError: Error {
        case wrongAnswer
    }
    
    var s = Stream.sequence(0..<3).setFailureType(to: UltimateQuestionError.self)
    
    assert(try! s.next()!.get() == 0)
    assert(try! s.next()!.get() == 1)
    assert(try! s.next()!.get() == 2)
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func setFailureType<E>(to _: E.Type) -> Stream._Private.SetFailureType<Output, E, Self> where E : Error

    Return Value

    some StreamProtocol<Output == Result<Self.Output, E>>

  • match(success:failure:) Extension method

    .

    enum UltimateQuestionError: Error {
        case wrongAnswer
    }
    
    func validateAnswer(_ answer: Int) throws -> Int {
        if answer == 1 {
            throw UltimateQuestionError.wrongAnswer
        }
        return answer
    }
    
    let a = Stream.sequence(0..<3).tryMap(validateAnswer)
    var s = a.match(
        success: String.init,
        failure: String.init(describing:)
    )
    
    assert(s.next() == "0")
    assert(s.next() == "wrongAnswer")
    assert(s.next() == "2")
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func match<T>(
        success: @escaping (Output.Success) -> T,
        failure: @escaping (Output.Failure) -> T
    ) -> Stream._Private.MatchResult<T, Self>

    Return Value

    some StreamProtocol<Output == T>

  • mapValue(_:) Extension method

    .

    enum UltimateQuestionError: Error {
        case wrongAnswer
    }
    
    func validateAnswer(_ answer: Int) throws -> Int {
        if answer == 1 {
            throw UltimateQuestionError.wrongAnswer
        }
        return answer
    }
    
    var s = Stream.sequence(0..<3).tryMap(validateAnswer).mapValue {
        $0 + 1
    }
    
    assert(try! s.next()!.get() == 1)
    assert(try? s.next()!.get() == nil)
    assert(try! s.next()!.get() == 3)
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func mapValue<T>(_ transform: @escaping (Output.Success) -> T) -> Stream._Private.MapResult<T, Output.Failure, Self>

    Return Value

    some StreamProtocol<Output == Result<T, Self.Output.Failure>>

  • mapError(_:) Extension method

    .

    enum UltimateQuestionError: Error {
        case wrongAnswer
    }
    
    func validateAnswer(_ answer: Int) throws -> Int {
        if answer == 1 {
            throw UltimateQuestionError.wrongAnswer
        }
        return answer
    }
    
    struct WrappedError: Error {
        let error: Error
    }
    
    var s = Stream.sequence(0..<3).tryMap(validateAnswer).mapError {
        WrappedError(error: $0)
    }
    
    assert(try! s.next()!.get() == 0)
    assert(try? s.next()!.get() == nil)
    assert(try! s.next()!.get() == 2)
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func mapError<E>(_ transform: @escaping (Output.Failure) -> E) -> Stream._Private.MapResult<Output.Success, E, Self>

    Return Value

    some StreamProtocol<Output == Result<Self.Output.Success, E>>

  • flattenResult() Extension method

    .

    enum UltimateQuestionError: Error {
        case wrongAnswer
    }
    
    func validateAnswer(_ answer: Int) throws -> Int {
        if answer == 1 {
            throw UltimateQuestionError.wrongAnswer
        }
        return answer
    }
    
    let a = Stream.sequence(0..<3)
        .tryMap(validateAnswer)
        .mapValue(Result<Int, Error>.success)
    
    var s = a.flattenResult()
    
    assert(try! s.next()!.get() == 0)
    assert(try? s.next()!.get() == nil)
    assert(try! s.next()!.get() == 2)
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func flattenResult() -> Stream._Private.FlattenResult<Self>

    Return Value

    some StreamProtocol<Output == Result<Self.Output.Success.Success, Self.Output.Failure>>

  • setFailureType(to:) Extension method

    Changes the failure type declared by this stream.

    You typically use this combinator to match the error types of two mismatched result streams.

    enum UltimateQuestionError: Error {
        case wrongAnswer
    }
    
    let a = Stream.sequence(0..<3).map(Result<Int, Never>.success)
    var s = a.setFailureType(to: UltimateQuestionError.self)
    
    assert(try! s.next()!.get() == 0)
    assert(try! s.next()!.get() == 1)
    assert(try! s.next()!.get() == 2)
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func setFailureType<E>(to _: E.Type) -> Stream._Private.SetFailureType<Output.Success, E, Self> where E : Error

    Return Value

    some StreamProtocol<Output == Result<Self.Output.Success, E>>

  • assertNoError(_:file:line:) Extension method

    Raises a fatal error when this stream fails.

    Declaration

    Swift

    @inlinable
    public func assertNoError(_ prefix: String = "", file: StaticString = #file, line: UInt = #line) -> Stream._Private.AssertNoError<Self>

    Return Value

    some StreamProtocol<Output == Self.Output.Success>

  • completeOnError() Extension method

    Ensures this stream completes on the first error that occurs.

    Result streams in Futures do not terminate when an error occurs. This combinator allows you to selectively change that behavior on a per- stream basis so that it terminates when an error occurs.

    enum UltimateQuestionError: Error {
        case wrongAnswer
    }
    
    func validateAnswer(_ answer: Int) throws -> Int {
        if answer == 1 {
            throw UltimateQuestionError.wrongAnswer
        }
        return answer
    }
    
    let a = Stream.sequence(0..<3).tryMap(validateAnswer)
    var s = a.completeOnError()
    
    assert(try! s.next()!.get() == 0)
    assert(try? s.next()!.get() == nil)
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func completeOnError() -> Stream._Private.CompleteOnError<Self>

    Return Value

    some StreamProtocol<Output == Self.Output>

  • replaceError(with:) Extension method

    Replaces any errors in the stream with the provided element.

    enum UltimateQuestionError: Error {
        case wrongAnswer
    }
    
    func validateAnswer(_ answer: Int) throws -> Int {
        if answer == 1 {
            throw UltimateQuestionError.wrongAnswer
        }
        return answer
    }
    
    let a = Stream.sequence(0..<3).tryMap(validateAnswer)
    var s = a.replaceError(with: 42)
    
    assert(s.next() == 0)
    assert(s.next() == 42)
    assert(s.next() == 2)
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func replaceError(with output: Output.Success) -> Stream._Private.ReplaceError<Self>

    Return Value

    some StreamProtocol<Output == Self.Output.Success>

  • catchError(_:) Extension method

    Handles errors from this stream by replacing them with the output from another future.

    enum UltimateQuestionError: Error {
        case wrongAnswer
    }
    
    func validateAnswer(_ answer: Int) throws -> Int {
        if answer == 1 {
            throw UltimateQuestionError.wrongAnswer
        }
        return answer
    }
    
    var s = Stream.sequence(0..<3).tryMap(validateAnswer).catchError {
        _ in Stream.just(42)
    }
    
    assert(s.next() == 0)
    assert(s.next() == 42)
    assert(s.next() == 2)
    assert(s.next() == nil)
    

    Declaration

    Swift

    @inlinable
    public func catchError<U>(_ errorHandler: @escaping (Output.Failure) -> U) -> Stream._Private.CatchError<U, Self>

    Return Value

    some StreamProtocol<Output == Self.Output.Success>

  • Raises a debugger signal when a provided closure needs to stop the process in the debugger.

    When any of the provided closures returns true, this stream raises the SIGTRAP signal to stop the process in the debugger.

    Declaration

    Swift

    @inlinable
    public func breakpoint(
        ready: @escaping (Output) -> Bool = { _ in false },
        pending: @escaping () -> Bool = { false },
        complete: @escaping () -> Bool = { false }
    ) -> Stream._Private.Breakpoint<Self>

    Return Value

    some StreamProtocol<Output == Self.Output>

  • Performs the specified closures when poll events occur.

    Declaration

    Swift

    @inlinable
    public func handleEvents(
        ready: @escaping (Output) -> Void = { _ in },
        pending: @escaping () -> Void = {},
        complete: @escaping () -> Void = {}
    ) -> Stream._Private.HandleEvents<Self>

    Return Value

    some StreamProtocol<Output == Self.Output>

  • print(_:to:) Extension method

    Prints log messages for all poll events.

    Declaration

    Swift

    @inlinable
    public func print(_ prefix: String = "", to stream: TextOutputStream? = nil) -> Stream._Private.Print<Self>

    Return Value

    some StreamProtocol<Output == Self.Output>

  • breakpointOnError() Extension method

    Raises a debugger signal upon receiving a failure.

    Declaration

    Swift

    @inlinable
    public func breakpointOnError() -> Stream._Private.Breakpoint<Self>

    Return Value

    some StreamProtocol<Output == Self.Output>