Loading source
Pulling the file list, source metadata, and syntax-aware rendering for this listing.
Source from repo
Diagnose and fix Swift Concurrency issues: async/await, actor isolation, Sendable, and Swift 6 migration.
Files
Skill
Size
Entrypoint
Format
Open file
Syntax-highlighted preview of this file as included in the skill package.
references/async-algorithms.md
1# AsyncAlgorithms Package23Use this when:45- You need time-based operators (debounce, throttle, timers).6- You need to combine multiple async sequences (merge, combineLatest, zip).7- You are migrating from Combine or RxSwift operators to Swift Concurrency equivalents.89Skip this file if:1011- You need basic `AsyncStream` bridging for callbacks or delegates. Use `async-sequences.md`.12- You are choosing between `Task`, `async let`, or task groups. Use `tasks.md`.1314Jump to:1516- Quick Start17- Time-Based Operators18- Combining Operators19- Multi-Consumer Scenarios20- Combine Migration Guide21- Best Practices2223---2425## Quick Start2627Top 5 most common operators:2829```swift30import AsyncAlgorithms3132// 1. Debounce rapid inputs33for await query in searchQueryStream.debounce(for: .milliseconds(500)) {34await performSearch(query)35}3637// 2. Throttle repeated actions38for await _ in buttonClicks.throttle(for: .seconds(1)) {39await performAction()40}4142// 3. Merge multiple independent streams43for await message in chat1Messages.merge(chat2Messages) {44display(message)45}4647// 4. Combine dependent values48for await (username, email) in usernameStream.combineLatest(emailStream) {49validateForm(username: username, email: email)50}5152// 5. Zip paired operations53for await (image, metadata) in imageStream.zip(metadataStream) {54await cache(image: image, metadata: metadata)55}56```5758> **See**: [AsyncAlgorithms on GitHub](https://github.com/apple/swift-async-algorithms)5960---6162## Overview & Installation6364### What is AsyncAlgorithms?6566Extends Swift's AsyncSequence with time-based operators, stream combination tools, and multi-consumer primitives.6768**Use for**:69- Time-based operations: debounce, throttle, timers70- Combining streams: merge, combineLatest, zip, chain71- Multi-consumer scenarios: AsyncChannel for backpressure72- Specific operators: removeDuplicates, chunks, adjacentPairs, compacted7374**Use standard library for**:75- Bridging callbacks: AsyncStream76- Simple iteration: for await in sequence77- Single-value operations: async/await7879### Installation8081```swift82dependencies: [83.package(url: "https://github.com/apple/swift-async-algorithms", from: "1.0.0")84]8586targets: [87.target(88name: "MyTarget",89dependencies: [90.product(name: "AsyncAlgorithms", package: "swift-async-algorithms")91]92)93]94```9596Import:9798```swift99import AsyncAlgorithms100```101102---103104## Time-Based Operators105106### debounce(for:tolerance:clock:)107108Wait for inactivity before emitting. Use for rapid inputs like search fields.109110#### Example: ArticleSearcher111112```swift113import AsyncAlgorithms114115@Observable116final class ArticleSearcher {117@MainActor private(set) var results: [Article] = []118private var searchQueryContinuation: AsyncStream<String>.Continuation?119120private lazy var searchQueryStream: AsyncStream<String> = {121AsyncStream { continuation in122searchQueryContinuation = continuation123}124}()125126func search(_ query: String) {127searchQueryContinuation?.yield(query)128}129130func startDebouncedSearch() {131Task { @MainActor in132for await query in searchQueryStream.debounce(for: .milliseconds(500)) {133self.results = []134self.results = await APIClient.searchArticles(query)135}136}137}138}139```140141**Benefits**: Automatic cancellation, backpressure, cleaner than manual Task.sleep.142143#### ❌ Anti-Pattern144145```swift146// Bad: Every keystroke spawns new task147func search(_ query: String) {148Task {149try? await Task.sleep(for: .milliseconds(500))150await performSearch(query)151}152}153```154155**Problem**: Multiple tasks execute simultaneously, causing out-of-order results.156157**Solution**: Use `debounce()` for automatic backpressure.158159---160161### throttle(for:clock:reducing:)162163Emit at most one value per interval. Use for repeated actions like button taps.164165#### Example: Like Button166167```swift168import AsyncAlgorithms169170struct LikeButton: View {171@State private var tapStream = AsyncStream<Void> { continuation in172// Continuation stored externally173}174@State private var isLiked = false175176var body: some View {177Button(action: {178tapStream.continuation?.yield()179}) {180Image(systemName: isLiked ? "heart.fill" : "heart")181}182.task {183await handleThrottledTaps()184}185}186187private func handleThrottledTaps() async {188for await _ in tapStream.throttle(for: .seconds(1)) {189await toggleLike()190}191}192193private func toggleLike() async {194isLiked.toggle()195await APIClient.updateLikeStatus(isLiked: isLiked)196}197}198```199200#### Understanding reducing Parameter201202```swift203// .latest (default): Keep most recent value204for await value in events.throttle(for: .seconds(1)) {205process(value)206}207208// .oldest: Keep first value209for await value in events.throttle(for: .seconds(1), reducing: .oldest) {210process(value)211}212213// Custom: Sum all values214for await value in events.throttle(for: .seconds(1)) { $0 + $1 } {215process(value)216}217```218219---220221### AsyncTimerSequence222223Emit values at regular intervals. Use for periodic refresh or countdown timers.224225#### Example: Feed Refresh226227```swift228import AsyncAlgorithms229230@MainActor @Observable231final class FeedViewModel {232private(set) var articles: [Article] = []233private var refreshTask: Task<Void, Never>?234235func startAutoRefresh() {236refreshTask = Task {237for await _ in AsyncTimerSequence(interval: .seconds(30)) {238await refreshFeed()239}240}241}242243private func refreshFeed() async {244articles = await APIClient.fetchLatestArticles()245}246}247```248249#### ❌ Anti-Pattern250251```swift252// Bad: Manual timer implementation253func startTimer() {254Task {255while !Task.isCancelled {256performAction()257try? await Task.sleep(for: .seconds(1))258}259}260}261```262263**Solution**: Use `AsyncTimerSequence`.264265---266267## Combining Operators268269### merge(_:...)270271Combine sequences into one, emitting as they arrive. **Stable operator ✅**272273Use for independent data sources that don't depend on each other.274275#### Example: Multi-Room Chat276277```swift278import AsyncAlgorithms279280actor ChatManager {281private var messageContinuations: [String: AsyncStream<ChatMessage>.Continuation] = [:]282283func getMessagesStream(roomID: String) -> AsyncStream<ChatMessage> {284AsyncStream { continuation in285messageContinuations[roomID] = continuation286}287}288289func receiveMessage(_ message: ChatMessage) {290messageContinuations[message.roomID]?.yield(message)291}292293func startMonitoring(rooms: [String]) -> AsyncStream<ChatMessage> {294let streams = rooms.map { getMessagesStream(roomID: $0) }295return streams.merge()296}297}298299// Usage300let manager = ChatManager()301let mergedMessages = await manager.startMonitoring(rooms: ["general", "random"])302303for await message in mergedMessages {304print("[\(message.roomID)] \(message.text)")305}306```307308**Behavior**: Values emit as they arrive from any source. Order interleaved by timing. Cancellation propagates to all sources.309310---311312### combineLatest(_:...)313314Combine sequences, emitting tuple when any source emits. Always uses latest values. **Stable operator ✅**315316Use for dependent values that need synchronization.317318#### Example: Form Validation319320```swift321import AsyncAlgorithms322323struct SignupForm: View {324@State private var usernameStream = AsyncStream<String> { /* ... */ }325@State private var emailStream = AsyncStream<String> { /* ... */ }326@State private var passwordStream = AsyncStream<String> { /* ... */ }327@State private var formState = FormState.incomplete328329var body: some View {330Form {331TextField("Username", text: $username)332TextField("Email", text: $email)333SecureField("Password", text: $password)334}335.task {336await validateForm()337}338}339340private func validateForm() async {341for await (username, email, password) in342usernameStream.combineLatest(emailStream, passwordStream)343{344formState = await validate(345username: username,346email: email,347password: password348)349}350}351}352```353354#### ❌ Anti-Pattern355356```swift357// Bad: Manual value combining358actor FormValidator {359private var currentUsername: String = ""360private var currentEmail: String = ""361362func updateUsername(_ username: String) {363currentUsername = username364checkForm()365}366}367```368369**Solution**: Use `combineLatest()`.370371---372373### zip(_:...)374375Combine sequences by pairing elements in order. **Stable operator ✅**376377#### Example: Image + Metadata378379```swift380import AsyncAlgorithms381382struct ImageLoader {383func loadImagesWithMetadata(urls: [URL]) async throws -> [LoadedImage] {384let imageStream = AsyncThrowingStream<UIImage, Error> { continuation in385Task {386for url in urls {387let image = try await downloadImage(from: url)388continuation.yield(image)389}390continuation.finish()391}392}393394let metadataStream = AsyncThrowingStream<ImageMetadata, Error> { continuation in395Task {396for url in urls {397let metadata = try await fetchMetadata(for: url)398continuation.yield(metadata)399}400continuation.finish()401}402}403404var results: [LoadedImage] = []405for try await (image, metadata) in imageStream.zip(metadataStream) {406results.append(LoadedImage(image: image, metadata: metadata))407}408return results409}410}411```412413**Behavior**: Emits tuple when all sequences emit. Maintains order. Finishes when shortest sequence finishes.414415---416417### chain(_:...)418419Concatenate sequences sequentially. **Stable operator ✅**420421#### Example: Paginated Loading422423```swift424import AsyncAlgorithms425426struct ArticlePaginator {427func loadAllArticles() -> AsyncStream<[Article]> {428AsyncStream { continuation in429Task {430var page = 1431var hasMore = true432while hasMore {433let articles = try await fetchPage(page: page)434continuation.yield(articles)435hasMore = articles.count == 20436page += 1437}438continuation.finish()439}440}441}442}443444// Usage: Chain cache + network445for await articles in loadFromCacheStream().chain(loadFromNetworkStream()) {446display(articles)447}448```449450**Behavior**: Emits all values from first sequence before starting second.451452---453454## Utility Operators455456### removeDuplicates()457458Remove adjacent duplicates. **Stable operator ✅**459460```swift461import AsyncAlgorithms462463actor ChatHistory {464private var messageStream = AsyncStream<ChatMessage> { /* ... */ }465466func getUniqueMessages() -> AsyncStream<ChatMessage> {467messageStream.removeDuplicates()468}469}470```471472---473474### chunks() and chunked()475476Collect values into batches. **Stable operator ✅**477478```swift479import AsyncAlgorithms480481struct BatchProcessor {482func processLargeDataset(dataStream: AsyncStream<DataItem>) async {483for await batch in dataStream.chunks(count: 100) {484await processBatch(batch)485}486}487488func chunkedByTime(dataStream: AsyncStream<DataItem>) async {489for await batch in dataStream.chunked(by: .seconds(5)) {490await processBatch(batch)491}492}493}494```495496---497498### compacted() and adjacentPairs()499500```swift501import AsyncAlgorithms502503// Remove nil values504for await value in optionalValuesStream.compacted() {505process(value)506}507508// Pair adjacent elements509for await (previous, current) in valuesStream.adjacentPairs() {510let difference = current - previous511}512```513514---515516## Multi-Consumer Scenarios517518### AsyncChannel519520AsyncSequence with backpressure. **Stable operator ✅**521522Use for producer-consumer patterns with flow control.523524#### Example: Message Queue525526```swift527import AsyncAlgorithms528529actor MessageQueue {530private let channel = AsyncChannel<Message>()531532func getMessages() -> AsyncStream<Message> {533channel534}535536func enqueue(_ message: Message) async {537await channel.send(message)538}539540func startProcessing() {541Task {542for await message in channel {543await process(message)544}545}546}547}548549// Multiple producers550let queue = MessageQueue()551Task { await queue.enqueue(Message(type: .userAction, content: "tap")) }552Task { await queue.enqueue(Message(type: .network, content: "data")) }553queue.startProcessing()554```555556#### ❌ Anti-Pattern557558```swift559// Bad: Values split unpredictably560let stream = AsyncStream<Int> { continuation in561for i in 1...10 {562continuation.yield(i)563}564continuation.finish()565}566567Task { for await value in stream { print("Consumer 1: \(value)") } }568Task { for await value in stream { print("Consumer 2: \(value)") } }569```570571**Problem**: Each value goes to only one consumer.572573**Solution**: Use `AsyncChannel` for multi-consumer scenarios.574575---576577### AsyncThrowingChannel578579Like AsyncChannel but can emit errors. **Stable operator ✅**580581#### Example: WebSocket582583```swift584import AsyncAlgorithms585586actor WebSocketConnection {587private let channel = AsyncThrowingChannel<WebSocketMessage, Error>()588589func getMessages() -> AsyncThrowingStream<WebSocketMessage, Error> {590channel591}592593func receiveMessage(_ message: WebSocketMessage) async {594await channel.send(message)595}596597func reportError(_ error: Error) async {598await channel.finish(throwing: error)599}600}601602// Usage603do {604for await message in connection.getMessages() {605handle(message)606}607} catch {608print("WebSocket error: \(error)")609}610```611612---613614## Combine Migration Guide615616### Operator Mapping Table617618| Combine | AsyncAlgorithms | Status | Alternative |619|---------|-----------------|---------|-------------|620| `.debounce()` | `debounce()` | ✅ Stable | - |621| `.throttle()` | `throttle()` | ✅ Stable | - |622| `.merge()` | `merge()` | ✅ Stable | - |623| `.combineLatest()` | `combineLatest()` | ✅ Stable | - |624| `.zip()` | `zip()` | ✅ Stable | - |625| `.concat()` | `chain()` | ✅ Stable | - |626| `.removeDuplicates()` | `removeDuplicates()` | ✅ Stable | - |627| `.timer()` | `AsyncTimerSequence` | ✅ Stable | - |628| `.share()` | - | - | `AsyncChannel` |629| `.flatMap()` | - | - | `TaskGroup` |630| `.receive(on:)` | - | - | `Task` / `@MainActor` |631| `.eraseToAnyPublisher()` | - | - | `any AsyncSequence` |632633---634635### Migration Examples636637#### Example 1: ArticleSearcher638639**Before: Combine**640641```swift642import Combine643644final class ArticleSearcher: ObservableObject {645@Published private(set) var results: [Article] = []646@Published var searchQuery = ""647648init() {649$searchQuery650.debounce(for: .milliseconds(500), scheduler: DispatchQueue.main)651.removeDuplicates()652.flatMap { query in653APIClient.searchArticles(query)654.catch { _ in Just([]) }655}656.receive(on: DispatchQueue.main)657.assign(to: &$results)658}659}660```661662**After: AsyncAlgorithms**663664```swift665import AsyncAlgorithms666667@Observable668final class ArticleSearcher {669@MainActor private(set) var results: [Article] = []670private var searchQueryContinuation: AsyncStream<String>.Continuation?671672private lazy var searchQueryStream: AsyncStream<String> = {673AsyncStream { continuation in674searchQueryContinuation = continuation675}676}()677678func search(_ query: String) {679searchQueryContinuation?.yield(query)680}681682func startDebouncedSearch() {683Task { @MainActor in684for await query in searchQueryStream685.debounce(for: .milliseconds(500))686.removeDuplicates()687{688do {689self.results = try await APIClient.searchArticles(query)690} catch {691self.results = []692}693}694}695}696}697```698699**Benefits**: Simpler error handling, no cancellables, automatic cancellation.700701---702703#### Example 2: Multi-Source Loading704705**Before: Combine Merge**706707```swift708import Combine709710final class ArticleLoader: ObservableObject {711@Published private(set) var items: [Item] = []712713func loadAllSources() {714let source1 = APIClient.fetchItems(from: .source1)715let source2 = APIClient.fetchItems(from: .source2)716717Publishers.Merge(source1, source2)718.scan([]) { accumulated, new in719accumulated + new720}721.receive(on: DispatchQueue.main)722.assign(to: &$items)723}724}725```726727**After: TaskGroup**728729```swift730import AsyncAlgorithms731732@Observable733final class ArticleLoader {734@MainActor private(set) var items: [Item] = []735736func loadAllSourcesParallel() async {737await withTaskGroup(of: [Item].self) { group in738group.addTask {739await APIClient.fetchItems(from: .source1)740}741group.addTask {742await APIClient.fetchItems(from: .source2)743}744745for await newItems in group {746items.append(contentsOf: newItems)747}748}749}750}751```752753**Key difference**: For parallel execution, use `TaskGroup` instead of `flatMap`.754755---756757#### Example 3: Form Validation758759**Before: Combine**760761```swift762import Combine763764final class FormValidator: ObservableObject {765@Published var username = ""766@Published var email = ""767768@Published private(set) var formState: FormState = .incomplete769770init() {771Publishers.CombineLatest2($username, $email)772.map { username, email in773validate(username: username, email: email)774}775.assign(to: &$formState)776}777}778```779780**After: AsyncAlgorithms or async let**781782```swift783import AsyncAlgorithms784785@Observable786final class FormValidator {787var username = ""788var email = ""789790@MainActor private(set) var formState: FormState = .incomplete791792// Option 1: combineLatest for stream-based validation793func startStreamValidation() {794Task { @MainActor in795for await (username, email) in796usernameStream.combineLatest(emailStream)797{798self.formState = validate(799username: username,800email: email801)802}803}804}805806// Option 2: async let for simple validation807func validateForm() async {808let (username, email) = await (username, email)809formState = validate(810username: username,811email: email812)813}814}815```816817**Choose**:818- `combineLatest()`: Continuous validation as fields change819- `async let`: One-time validation when all values available820821---822823## Common Mistakes Agents Make824825- **Manual debounce with `Task.sleep`**: This creates multiple concurrent tasks and risks out-of-order results. Use the stream-based `debounce(for:)` operator from AsyncAlgorithms instead.826- **Sharing `AsyncStream` across multiple consumers**: Values split unpredictably between consumers. Use `AsyncChannel` for multi-consumer scenarios with backpressure. Note: `AsyncChannel` is point-to-point, not broadcast like Combine's `.share()`.827- **Looking for a `.flatMap` equivalent**: Use `TaskGroup` for fan-out; the semantics differ from Combine/Rx `flatMap`.828- **Looking for `.receive(on:)` equivalent**: Use `@MainActor` or `Task` context for isolation instead.829830## Best Practices8318321. **Use time-based operators** for rapid inputs: debounce() for search, throttle() for buttons8332. **Combine streams** with merge/combineLatest instead of manual state management8343. **Use AsyncChannel** for multi-consumer scenarios with backpressure8354. **Ensure Sendable conformance** when using operators across isolation boundaries8365. **Leverage cancellation** - Task cancellation propagates through all operators8376. **Choose right tool**: AsyncAlgorithms for complex streams, AsyncStream for bridging callbacks8387. **Avoid manual sleep loops** - use AsyncTimerSequence instead839840---841842## Further Learning843844- [AsyncAlgorithms Documentation](https://github.com/apple/swift-async-algorithms)845- [Combine Migration Guide](migration.md)846- [Async Sequences](async-sequences.md)847- [Tasks](tasks.md) - Task groups and structured concurrency848