Loading source
Pulling the file list, source metadata, and syntax-aware rendering for this listing.
Source from repo
Diagnose and fix Swift Concurrency issues: async/await, actor isolation, Sendable, and Swift 6 migration.
Files
Skill
Size
Entrypoint
Format
Open file
Syntax-highlighted preview of this file as included in the skill package.
references/async-sequences.md
1# Async Sequences and Streams23Use this when:45- You need to iterate over values that arrive over time.6- You are bridging callback-based or delegate-based APIs to async/await.7- You need to choose between `AsyncSequence`, `AsyncStream`, or a regular async method.89Skip this file if:1011- You need time-based operators like debounce, throttle, or merge. Use `async-algorithms.md`.12- You are choosing between `Task`, `async let`, or task groups. Use `tasks.md`.1314Jump to:1516- AsyncSequence Protocol17- AsyncStream / AsyncThrowingStream18- Bridging Callbacks and Delegates19- Stream Lifecycle and Cleanup20- Buffer Policies21- Standard Library Integration22- Limitations23- When to Use AsyncAlgorithms2425## AsyncSequence2627Protocol for asynchronous iteration over values that become available over time.2829### Basic usage3031```swift32for await value in someAsyncSequence {33print(value)34}35```3637**Key difference from Sequence**: Values may not all be available immediately.3839### Custom implementation4041```swift42struct Counter: AsyncSequence, AsyncIteratorProtocol {43typealias Element = Int4445let limit: Int46var current = 14748mutating func next() async -> Int? {49guard !Task.isCancelled else { return nil }50guard current <= limit else { return nil }5152let result = current53current += 154return result55}5657func makeAsyncIterator() -> Counter {58self59}60}6162// Usage63for await count in Counter(limit: 5) {64print(count) // 1, 2, 3, 4, 565}66```6768### Standard operators6970Same functional operators as regular sequences:7172```swift73// Filter74for await even in Counter(limit: 5).filter({ $0 % 2 == 0 }) {75print(even) // 2, 476}7778// Map79let mapped = Counter(limit: 5).map { $0 % 2 == 0 ? "Even" : "Odd" }80for await label in mapped {81print(label)82}8384// Contains (awaits until found or sequence ends)85let contains = await Counter(limit: 5).contains(3) // true86```8788### Termination8990Return `nil` from `next()` to end iteration:9192```swift93mutating func next() async -> Int? {94guard !Task.isCancelled else {95return nil // Stop on cancellation96}9798guard current <= limit else {99return nil // Stop at limit100}101102return current103}104```105106> **Course Deep Dive**: This topic is covered in detail in [Lesson 6.1: Working with asynchronous sequences](https://www.swiftconcurrencycourse.com?utm_source=github&utm_medium=agent-skill&utm_campaign=lesson-reference)107108## AsyncStream109110Convenient way to create async sequences without implementing protocols.111112### Basic creation113114```swift115let stream = AsyncStream<Int> { continuation in116for i in 1...5 {117continuation.yield(i)118}119continuation.finish()120}121122for await value in stream {123print(value)124}125```126127### AsyncThrowingStream128129For streams that can fail:130131```swift132let throwingStream = AsyncThrowingStream<Int, Error> { continuation in133continuation.yield(1)134continuation.yield(2)135continuation.finish(throwing: SomeError())136}137138do {139for try await value in throwingStream {140print(value)141}142} catch {143print("Error: \(error)")144}145```146147> **Course Deep Dive**: This topic is covered in detail in [Lesson 6.2: Using AsyncStream and AsyncThrowingStream in your code](https://www.swiftconcurrencycourse.com?utm_source=github&utm_medium=agent-skill&utm_campaign=lesson-reference)148149## Bridging Closures to Streams150151### Progress + completion handlers152153```swift154// Old closure-based API155struct FileDownloader {156enum Status {157case downloading(Float)158case finished(Data)159}160161func download(162_ url: URL,163progressHandler: @escaping (Float) -> Void,164completion: @escaping (Result<Data, Error>) -> Void165) throws {166// Implementation167}168}169170// Modern stream-based API171extension FileDownloader {172func download(_ url: URL) -> AsyncThrowingStream<Status, Error> {173AsyncThrowingStream { continuation in174do {175try self.download(url, progressHandler: { progress in176continuation.yield(.downloading(progress))177}, completion: { result in178switch result {179case .success(let data):180continuation.yield(.finished(data))181continuation.finish()182case .failure(let error):183continuation.finish(throwing: error)184}185})186} catch {187continuation.finish(throwing: error)188}189}190}191}192193// Usage194for try await status in downloader.download(url) {195switch status {196case .downloading(let progress):197print("Progress: \(progress)")198case .finished(let data):199print("Done: \(data.count) bytes")200}201}202```203204### Simplified with Result205206```swift207AsyncThrowingStream { continuation in208try self.download(url, progressHandler: { progress in209continuation.yield(.downloading(progress))210}, completion: { result in211continuation.yield(with: result.map { .finished($0) })212continuation.finish()213})214}215```216217## Bridging Delegates218219### Location updates example220221```swift222final class LocationMonitor: NSObject {223private var continuation: AsyncThrowingStream<CLLocation, Error>.Continuation?224let stream: AsyncThrowingStream<CLLocation, Error>225226override init() {227var capturedContinuation: AsyncThrowingStream<CLLocation, Error>.Continuation?228stream = AsyncThrowingStream { continuation in229capturedContinuation = continuation230}231super.init()232self.continuation = capturedContinuation233234locationManager.delegate = self235locationManager.startUpdatingLocation()236}237}238239extension LocationMonitor: CLLocationManagerDelegate {240func locationManager(_ manager: CLLocationManager, didUpdateLocations locations: [CLLocation]) {241for location in locations {242continuation?.yield(location)243}244}245246func locationManager(_ manager: CLLocationManager, didFailWithError error: Error) {247continuation?.finish(throwing: error)248}249}250251// Usage252let monitor = LocationMonitor()253for try await location in monitor.stream {254print("Location: \(location.coordinate)")255}256```257258## Stream Lifecycle259260### Termination callback261262```swift263AsyncThrowingStream<Int, Error> { continuation in264continuation.onTermination = { @Sendable reason in265print("Terminated: \(reason)")266// Cleanup: remove observers, cancel work, etc.267}268269continuation.yield(1)270continuation.finish()271}272```273274**Termination reasons**:275- `.finished` - Normal completion276- `.finished(Error?)` - Completed with error (throwing stream)277- `.cancelled` - Task canceled278279### Cancellation280281Streams cancel when:282- Enclosing task cancels283- Stream goes out of scope284285```swift286let task = Task {287for try await status in download(url) {288print(status)289}290}291292task.cancel() // Triggers onTermination with .cancelled293```294295**No explicit cancel method** - rely on task cancellation.296297## Buffer Policies298299Control what happens to values when no one is awaiting:300301### .unbounded (default)302303Buffers all values until consumed:304305```swift306let stream = AsyncStream<Int> { continuation in307(0...5).forEach { continuation.yield($0) }308continuation.finish()309}310311try await Task.sleep(for: .seconds(1))312313for await value in stream {314print(value) // Prints all: 0, 1, 2, 3, 4, 5315}316```317318### .bufferingNewest(n)319320Keeps only the newest N values:321322```swift323let stream = AsyncStream(bufferingPolicy: .bufferingNewest(1)) { continuation in324(0...5).forEach { continuation.yield($0) }325continuation.finish()326}327328try await Task.sleep(for: .seconds(1))329330for await value in stream {331print(value) // Prints only: 5332}333```334335### .bufferingOldest(n)336337Keeps only the oldest N values:338339```swift340let stream = AsyncStream(bufferingPolicy: .bufferingOldest(1)) { continuation in341(0...5).forEach { continuation.yield($0) }342continuation.finish()343}344345try await Task.sleep(for: .seconds(1))346347for await value in stream {348print(value) // Prints only: 0349}350```351352### .bufferingNewest(0)353354Only receives values emitted after iteration starts:355356```swift357let stream = AsyncStream(bufferingPolicy: .bufferingNewest(0)) { continuation in358continuation.yield(1) // Discarded359360Task {361try await Task.sleep(for: .seconds(2))362continuation.yield(2) // Received363continuation.finish()364}365}366367try await Task.sleep(for: .seconds(1))368369for await value in stream {370print(value) // Prints only: 2371}372```373374**Use case**: Location updates, file system changes - only care about latest.375376## Repeated Async Calls377378Use `init(unfolding:onCancel:)` for polling:379380```swift381struct PingService {382func startPinging() -> AsyncStream<Bool> {383AsyncStream {384try? await Task.sleep(for: .seconds(5))385return await ping()386} onCancel: {387print("Pinging cancelled")388}389}390391func ping() async -> Bool {392// Network request393return true394}395}396397// Usage398for await result in pingService.startPinging() {399print("Ping: \(result)")400}401```402403## Standard Library Integration404405### NotificationCenter406407```swift408let stream = NotificationCenter.default.notifications(409named: .NSSystemTimeZoneDidChange410)411412for await notification in stream {413print("Time zone changed")414}415```416417### Combine publishers418419```swift420let numbers = [1, 2, 3, 4, 5]421let filtered = numbers.publisher.filter { $0 % 2 == 0 }422423for await number in filtered.values {424print(number) // 2, 4425}426```427428### Task groups429430```swift431await withTaskGroup(of: Image.self) { group in432for url in urls {433group.addTask { await download(url) }434}435436for await image in group {437display(image)438}439}440```441442## Limitations443444### Single consumer only445446Unlike Combine, streams support one consumer at a time:447448```swift449let stream = AsyncStream { continuation in450(0...5).forEach { continuation.yield($0) }451continuation.finish()452}453454Task {455for await value in stream {456print("Consumer 1: \(value)")457}458}459460Task {461for await value in stream {462print("Consumer 2: \(value)")463}464}465466// Unpredictable output - values split between consumers467// Consumer 1: 0468// Consumer 2: 1469// Consumer 1: 2470// Consumer 2: 3471```472473**Solution**: Create separate streams or use third-party libraries (AsyncExtensions).474475### No values after termination476477Once finished, stream won't emit new values:478479```swift480let stream = AsyncStream<Int> { continuation in481continuation.finish() // Terminate immediately482continuation.yield(1) // Never received483}484485for await value in stream {486print(value) // Loop exits immediately487}488```489490## Decision Guide491492### Use AsyncSequence when:493494- Implementing standard library-style protocols495- Need fine-grained control over iteration496- Building reusable sequence types497- Working with existing sequence protocols498499**Reality**: Rarely needed in application code.500501### Use AsyncStream when:502503- Bridging delegates to async/await504- Converting closure-based APIs505- Emitting events manually506- Polling or repeated async operations507- Most common use case508509---510511## When to Use AsyncAlgorithms vs Standard Library512513### Use AsyncAlgorithms when:514515- **Time-based operations** need debounce/throttle/timer516- **Combining multiple async sequences** (merge, combineLatest, zip)517- **Multi-consumer scenarios** require backpressure (AsyncChannel)518- **Complex operator chains** that Combine would handle naturally519- **Need specific operators** not in standard library520521### Use Standard Library when:522523- **Bridging callback APIs** → AsyncStream524- **Simple iteration** → for await in sequence525- **Single-value operations** → async/await526- **Basic transformations** → map/filter/contains527528### Quick Decision Table529530| Need | Solution |531|------|----------|532| Debounce search input | ✅ AsyncAlgorithms.debounce() |533| Throttle button clicks | ✅ AsyncAlgorithms.throttle() |534| Merge independent streams | ✅ AsyncAlgorithms.merge() |535| Combine dependent values | ✅ AsyncAlgorithms.combineLatest() or async let |536| Pair values from two sources | ✅ AsyncAlgorithms.zip() |537| Bridge callback API | AsyncStream |538| Multi-consumer with backpressure | ✅ AsyncChannel |539| Periodic timer | ✅ AsyncTimerSequence |540| Simple async iteration | for await in... |541542> **See**: [async-algorithms.md](async-algorithms.md) for detailed usage examples with real-world patterns.543544### Use regular async methods when:545546- Single value returned547- No progress updates needed548- Simple request/response pattern549550```swift551// Use this552func fetchData() async throws -> Data553554// Not this555func fetchData() -> AsyncThrowingStream<Data, Error>556557> **Course Deep Dive**: This topic is covered in detail in [Lesson 6.3: Deciding between AsyncSequence, AsyncStream, or regular asynchronous methods](https://www.swiftconcurrencycourse.com?utm_source=github&utm_medium=agent-skill&utm_campaign=lesson-reference)558```559560## Common Patterns561562### Progress reporting563564```swift565func download(_ url: URL) -> AsyncThrowingStream<DownloadEvent, Error> {566AsyncThrowingStream { continuation in567Task {568do {569var progress: Double = 0570while progress < 1.0 {571progress += 0.1572continuation.yield(.progress(progress))573try await Task.sleep(for: .milliseconds(100))574}575576let data = try await URLSession.shared.data(from: url).0577continuation.yield(.completed(data))578continuation.finish()579} catch {580continuation.finish(throwing: error)581}582}583}584}585```586587### Monitoring file system588589```swift590func watchDirectory(_ path: String) -> AsyncStream<FileEvent> {591AsyncStream(bufferingPolicy: .bufferingNewest(1)) { continuation in592let source = DispatchSource.makeFileSystemObjectSource(593fileDescriptor: fd,594eventMask: .write,595queue: .main596)597598source.setEventHandler {599continuation.yield(.fileChanged(path))600}601602continuation.onTermination = { _ in603source.cancel()604}605606source.resume()607}608}609```610611### Timer/polling612613```swift614func timer(interval: Duration) -> AsyncStream<Date> {615AsyncStream { continuation in616Task {617while !Task.isCancelled {618continuation.yield(Date())619try? await Task.sleep(for: interval)620}621continuation.finish()622}623}624}625626// Usage627for await date in timer(interval: .seconds(1)) {628print("Tick: \(date)")629}630```631632## Best Practices6336341. **Always call finish()** - Streams stay alive until terminated6352. **Use buffer policies wisely** - Match your use case (latest value vs all values)6363. **Handle cancellation** - Set `onTermination` for cleanup6374. **Single consumer** - Don't share streams across multiple consumers6385. **Prefer streams over closures** - More composable and cancellable6396. **Check Task.isCancelled** - Respect cancellation in custom sequences6407. **Use throwing variant** - When operations can fail6418. **Consider regular async** - If only returning single value642643## Debugging644645### Add termination logging646647```swift648continuation.onTermination = { reason in649print("Stream ended: \(reason)")650}651```652653### Validate finish() calls654655```swift656// ❌ Forgot to finish657AsyncStream { continuation in658continuation.yield(1)659// Stream never ends!660}661662// ✅ Always finish663AsyncStream { continuation in664continuation.yield(1)665continuation.finish()666}667```668669### Check for dropped values670671```swift672let stream = AsyncStream(bufferingPolicy: .bufferingNewest(1)) { continuation in673for i in 1...100 {674continuation.yield(i)675print("Yielded: \(i)")676}677continuation.finish()678}679680// If consumer is slow, many values dropped681for await value in stream {682print("Received: \(value)")683try? await Task.sleep(for: .seconds(1))684}685```686687## Common Mistakes Agents Make688689```swift690// ❌ Values after finish() are silently dropped691continuation.finish()692continuation.yield(1) // Never received693694// ❌ Stream never terminates (forgot finish)695AsyncStream { continuation in696continuation.yield(1)697// Missing: continuation.finish()698}699700// ❌ Wrapping a single-value API in a stream — use a regular async function instead701func fetchUser() -> AsyncStream<User> { ... } // Overkill for one result702```703704- **Sharing a single `AsyncStream` between multiple consumers**: Values split unpredictably. There is no built-in broadcast; use `AsyncChannel` for point-to-point multi-consumer patterns.705- **Forgetting `onTermination`** when bridging delegate or observer APIs, causing resources to leak.706707## Further Learning708709For real-world migration examples, performance patterns, and advanced stream techniques, see [Swift Concurrency Course](https://www.swiftconcurrencycourse.com).710711