Hello,
im currently rewriting my entire network stuff to swift concurrency. I have a Swift Package which contains the NWConnection with my custom framing protocol. So Network framework does not support itself concurrency so I build an api around that. To receive messages I used an AsyncThrowingStream and it works like that:
let connection = MyNetworkFramework(host: "example.org")
Task {
await connection.start()
for try await result in connection.receive() {
// do something with result
}
}
that's pretty neat and I like it a lot but now things got tricky. in my application I have up to 10 different tcp streams I open up to handle connection stuff. so with my api change every tcp connection runs in it's own task like above and I have no idea how to handle the possible errors from the .receive()
func inside the tasks.
First my idea was to use a ThrowingTaskGroup
for that and I think that will work but biggest problem is that I initially start with let's say 4 tcp connections and I need the ability to add additional ones later if I need them. so it seems not possible to add a Task
afterwards to the ThrowingTaskGroup
.
So what's a good way to handle a case like that?
i have an actor which handles everything in it's isolated context and basically I just need let the start func throw if any of the Tasks throw I open up. Here is a basic sample of how it's structured.
Thanks Vinz
internal actor MultiConnector {
internal var count: Int { connections.count }
private var connections: [ConnectionsModel] = []
private let host: String
private let port: UInt16
private let parameters: NWParameters
internal init(host: String, port: UInt16, parameters: NWParameters) {
self.host = host
self.port = port
self.parameters = parameters
}
internal func start(count: Int) async throws -> Void {
guard connections.isEmpty else { return }
guard count > .zero else { return }
try await sockets(from: count)
}
internal func cancel() -> Void {
guard !connections.isEmpty else { return }
for connection in connections { connection.connection.cancel() }
connections.removeAll()
}
internal func sockets(from count: Int) async throws -> Void {
while connections.count < count { try await connect() }
}
}
// MARK: - Private API -
private extension MultiConnector {
private func connect() async throws -> Void {
let uuid = UUID(), connection = MyNetworkFramework(host: host, port: port, parameters: parameters)
connections.append(.init(id: uuid, connection: connection))
let task = Task { [weak self] in guard let self else { return }; try await stream(connection: connection, id: uuid) }
try await connection.start(); await connection.send(message: "Sample Message")
// try await task.value <-- this does not work because stream runs infinite until i cancel it (that's expected and intended but it need to handle if the stream throws an error)
}
private func stream(connection: MyNetworkFramework, id: UUID) async throws -> Void {
for try await result in connection.receive() {
if case .message(_) = result { await connection.send(message: "Sample Message") }
// ... more to handle
}
}
}