Handle multiple AsyncThrowingStreams in structured concurrency?

Hello,

im currently rewriting my entire network stuff to swift concurrency. I have a Swift Package which contains the NWConnection with my custom framing protocol. So Network framework does not support itself concurrency so I build an api around that. To receive messages I used an AsyncThrowingStream and it works like that:

let connection = MyNetworkFramework(host: "example.org")
Task {
  await connection.start()
  for try await result in connection.receive() {
    // do something with result
  }
}

that's pretty neat and I like it a lot but now things got tricky. in my application I have up to 10 different tcp streams I open up to handle connection stuff. so with my api change every tcp connection runs in it's own task like above and I have no idea how to handle the possible errors from the .receive() func inside the tasks.

First my idea was to use a ThrowingTaskGroup for that and I think that will work but biggest problem is that I initially start with let's say 4 tcp connections and I need the ability to add additional ones later if I need them. so it seems not possible to add a Task afterwards to the ThrowingTaskGroup.

So what's a good way to handle a case like that?

i have an actor which handles everything in it's isolated context and basically I just need let the start func throw if any of the Tasks throw I open up. Here is a basic sample of how it's structured.

Thanks Vinz

internal actor MultiConnector {
    internal var count: Int { connections.count }
    private var connections: [ConnectionsModel] = []
    private let host: String
    private let port: UInt16
    private let parameters: NWParameters
    
    internal init(host: String, port: UInt16, parameters: NWParameters) {
        self.host = host
        self.port = port
        self.parameters = parameters
    }
    
    internal func start(count: Int) async throws -> Void {
        guard connections.isEmpty else { return }
        guard count > .zero else { return }
        try await sockets(from: count)
    }
    
    internal func cancel() -> Void {
        guard !connections.isEmpty else { return }
        for connection in connections { connection.connection.cancel() }
        connections.removeAll()
    }
    
    internal func sockets(from count: Int) async throws -> Void {
        while connections.count < count { try await connect() }
    }
}

// MARK: - Private API -

private extension MultiConnector {
    private func connect() async throws -> Void {
        let uuid = UUID(), connection = MyNetworkFramework(host: host, port: port, parameters: parameters)
        connections.append(.init(id: uuid, connection: connection))
        let task = Task { [weak self] in guard let self else { return }; try await stream(connection: connection, id: uuid) }
        try await connection.start(); await connection.send(message: "Sample Message")
        
        // try await task.value <-- this does not work because stream runs infinite until i cancel it (that's expected and intended but it need to handle if the stream throws an error)
    }
    
    private func stream(connection: MyNetworkFramework, id: UUID) async throws -> Void {
        for try await result in connection.receive() {
            if case .message(_) = result { await connection.send(message: "Sample Message") }
            // ... more to handle
        }
    }
}

Accepted Reply

The answer here is that… well… it depends.

I see two general strategies for this:

  • Merging the events from multiple sequences, using Merge.

  • Using a task group.

In my mind the merge approach makes sense when you have multiple sources of events all related to the core operation. For example, with NWConnection you might have received-message events, state update events, path update events, and so on.

OTOH, I think a task group makes sense when the event sources are largely independent. For example, SwiftNIO uses this as the core of its network server architecture, where the task group spawns a task for each incoming connection.

You wrote:

so it seems not possible to add a Task afterwards

I’m not sure what gives you that impression. This is a pretty standard use of a task group.

There is one word of warning here: Don’t use a standard task group for a long-running process. Rather, use a discarding task group. See SE-0381 DiscardingTaskGroups for the backstory there.

Share and Enjoy

Quinn “The Eskimo!” @ Developer Technical Support @ Apple
let myEmail = "eskimo" + "1" + "@" + "apple.com"

Replies

The answer here is that… well… it depends.

I see two general strategies for this:

  • Merging the events from multiple sequences, using Merge.

  • Using a task group.

In my mind the merge approach makes sense when you have multiple sources of events all related to the core operation. For example, with NWConnection you might have received-message events, state update events, path update events, and so on.

OTOH, I think a task group makes sense when the event sources are largely independent. For example, SwiftNIO uses this as the core of its network server architecture, where the task group spawns a task for each incoming connection.

You wrote:

so it seems not possible to add a Task afterwards

I’m not sure what gives you that impression. This is a pretty standard use of a task group.

There is one word of warning here: Don’t use a standard task group for a long-running process. Rather, use a discarding task group. See SE-0381 DiscardingTaskGroups for the backstory there.

Share and Enjoy

Quinn “The Eskimo!” @ Developer Technical Support @ Apple
let myEmail = "eskimo" + "1" + "@" + "apple.com"

Hi Quinn,

Thanks a lot for your response. I initially thought it wouldn't be possible to add a Task afterwards because the entire workflow differs a lot in concurrency in comparison to the "old" way I implemented things. But I decided to currently stay away from concurrency for my custom network stuff. I exposed the high level API of my framework to concurrency and that works pretty well in the app and makes the ViewModel much more readable. But internally I'm not really happy with the flow control and it's much harder to build a good solution when I have to deal with a lot of AsyncSequences. I'm sure the Network.framework Engineers will build something fantastic and most of time im very impatient but I think it's worth the wait until Network.framework supports concurrency out of the box. Then I think it's much easier for me to build an appropriate solution for my framework.

But all in all. Thank's for your work, you always helped me a lot in the last years 😊

Thank's for your work

You’re welcome!

I'm sure the Network.framework Engineers will build something fantastic

I’m also looking forward to it.

In the meantime, one alternative is to look at SwiftNIO. This is available today and you can follow its development in public. And SwiftNIO Transport Services lets you use Network framework under the covers.

Share and Enjoy

Quinn “The Eskimo!” @ Developer Technical Support @ Apple
let myEmail = "eskimo" + "1" + "@" + "apple.com"