Loading source
Pulling the file list, source metadata, and syntax-aware rendering for this listing.
Source from repo
Implement Rust async code with Tokio, handle concurrency primitives, and avoid common async pitfalls.
Files
Skill
Size
Entrypoint
Format
Open file
Syntax-highlighted preview of this file as included in the skill package.
references/details.md
1# rust-async-patterns — detailed patterns and worked examples23## Patterns45### Pattern 1: Concurrent Task Execution67```rust8use tokio::task::JoinSet;9use anyhow::Result;1011// Spawn multiple concurrent tasks12async fn fetch_all_concurrent(urls: Vec<String>) -> Result<Vec<String>> {13let mut set = JoinSet::new();1415for url in urls {16set.spawn(async move {17fetch_data(&url).await18});19}2021let mut results = Vec::new();22while let Some(res) = set.join_next().await {23match res {24Ok(Ok(data)) => results.push(data),25Ok(Err(e)) => tracing::error!("Task failed: {}", e),26Err(e) => tracing::error!("Join error: {}", e),27}28}2930Ok(results)31}3233// With concurrency limit34use futures::stream::{self, StreamExt};3536async fn fetch_with_limit(urls: Vec<String>, limit: usize) -> Vec<Result<String>> {37stream::iter(urls)38.map(|url| async move { fetch_data(&url).await })39.buffer_unordered(limit) // Max concurrent tasks40.collect()41.await42}4344// Select first to complete45use tokio::select;4647async fn race_requests(url1: &str, url2: &str) -> Result<String> {48select! {49result = fetch_data(url1) => result,50result = fetch_data(url2) => result,51}52}53```5455### Pattern 2: Channels for Communication5657```rust58use tokio::sync::{mpsc, broadcast, oneshot, watch};5960// Multi-producer, single-consumer61async fn mpsc_example() {62let (tx, mut rx) = mpsc::channel::<String>(100);6364// Spawn producer65let tx2 = tx.clone();66tokio::spawn(async move {67tx2.send("Hello".to_string()).await.unwrap();68});6970// Consume71while let Some(msg) = rx.recv().await {72println!("Got: {}", msg);73}74}7576// Broadcast: multi-producer, multi-consumer77async fn broadcast_example() {78let (tx, _) = broadcast::channel::<String>(100);7980let mut rx1 = tx.subscribe();81let mut rx2 = tx.subscribe();8283tx.send("Event".to_string()).unwrap();8485// Both receivers get the message86let _ = rx1.recv().await;87let _ = rx2.recv().await;88}8990// Oneshot: single value, single use91async fn oneshot_example() -> String {92let (tx, rx) = oneshot::channel::<String>();9394tokio::spawn(async move {95tx.send("Result".to_string()).unwrap();96});9798rx.await.unwrap()99}100101// Watch: single producer, multi-consumer, latest value102async fn watch_example() {103let (tx, mut rx) = watch::channel("initial".to_string());104105tokio::spawn(async move {106loop {107// Wait for changes108rx.changed().await.unwrap();109println!("New value: {}", *rx.borrow());110}111});112113tx.send("updated".to_string()).unwrap();114}115```116117### Pattern 3: Async Error Handling118119```rust120use anyhow::{Context, Result, bail};121use thiserror::Error;122123#[derive(Error, Debug)]124pub enum ServiceError {125#[error("Network error: {0}")]126Network(#[from] reqwest::Error),127128#[error("Database error: {0}")]129Database(#[from] sqlx::Error),130131#[error("Not found: {0}")]132NotFound(String),133134#[error("Timeout after {0:?}")]135Timeout(std::time::Duration),136}137138// Using anyhow for application errors139async fn process_request(id: &str) -> Result<Response> {140let data = fetch_data(id)141.await142.context("Failed to fetch data")?;143144let parsed = parse_response(&data)145.context("Failed to parse response")?;146147Ok(parsed)148}149150// Using custom errors for library code151async fn get_user(id: &str) -> Result<User, ServiceError> {152let result = db.query(id).await?;153154match result {155Some(user) => Ok(user),156None => Err(ServiceError::NotFound(id.to_string())),157}158}159160// Timeout wrapper161use tokio::time::timeout;162163async fn with_timeout<T, F>(duration: Duration, future: F) -> Result<T, ServiceError>164where165F: std::future::Future<Output = Result<T, ServiceError>>,166{167timeout(duration, future)168.await169.map_err(|_| ServiceError::Timeout(duration))?170}171```172173### Pattern 4: Graceful Shutdown174175```rust176use tokio::signal;177use tokio::sync::broadcast;178use tokio_util::sync::CancellationToken;179180async fn run_server() -> Result<()> {181// Method 1: CancellationToken182let token = CancellationToken::new();183let token_clone = token.clone();184185// Spawn task that respects cancellation186tokio::spawn(async move {187loop {188tokio::select! {189_ = token_clone.cancelled() => {190tracing::info!("Task shutting down");191break;192}193_ = do_work() => {}194}195}196});197198// Wait for shutdown signal199signal::ctrl_c().await?;200tracing::info!("Shutdown signal received");201202// Cancel all tasks203token.cancel();204205// Give tasks time to cleanup206tokio::time::sleep(Duration::from_secs(5)).await;207208Ok(())209}210211// Method 2: Broadcast channel for shutdown212async fn run_with_broadcast() -> Result<()> {213let (shutdown_tx, _) = broadcast::channel::<()>(1);214215let mut rx = shutdown_tx.subscribe();216tokio::spawn(async move {217tokio::select! {218_ = rx.recv() => {219tracing::info!("Received shutdown");220}221_ = async { loop { do_work().await } } => {}222}223});224225signal::ctrl_c().await?;226let _ = shutdown_tx.send(());227228Ok(())229}230```231232### Pattern 5: Async Traits233234```rust235use async_trait::async_trait;236237#[async_trait]238pub trait Repository {239async fn get(&self, id: &str) -> Result<Entity>;240async fn save(&self, entity: &Entity) -> Result<()>;241async fn delete(&self, id: &str) -> Result<()>;242}243244pub struct PostgresRepository {245pool: sqlx::PgPool,246}247248#[async_trait]249impl Repository for PostgresRepository {250async fn get(&self, id: &str) -> Result<Entity> {251sqlx::query_as!(Entity, "SELECT * FROM entities WHERE id = $1", id)252.fetch_one(&self.pool)253.await254.map_err(Into::into)255}256257async fn save(&self, entity: &Entity) -> Result<()> {258sqlx::query!(259"INSERT INTO entities (id, data) VALUES ($1, $2)260ON CONFLICT (id) DO UPDATE SET data = $2",261entity.id,262entity.data263)264.execute(&self.pool)265.await?;266Ok(())267}268269async fn delete(&self, id: &str) -> Result<()> {270sqlx::query!("DELETE FROM entities WHERE id = $1", id)271.execute(&self.pool)272.await?;273Ok(())274}275}276277// Trait object usage278async fn process(repo: &dyn Repository, id: &str) -> Result<()> {279let entity = repo.get(id).await?;280// Process...281repo.save(&entity).await282}283```284285### Pattern 6: Streams and Async Iteration286287```rust288use futures::stream::{self, Stream, StreamExt};289use async_stream::stream;290291// Create stream from async iterator292fn numbers_stream() -> impl Stream<Item = i32> {293stream! {294for i in 0..10 {295tokio::time::sleep(Duration::from_millis(100)).await;296yield i;297}298}299}300301// Process stream302async fn process_stream() {303let stream = numbers_stream();304305// Map and filter306let processed: Vec<_> = stream307.filter(|n| futures::future::ready(*n % 2 == 0))308.map(|n| n * 2)309.collect()310.await;311312println!("{:?}", processed);313}314315// Chunked processing316async fn process_in_chunks() {317let stream = numbers_stream();318319let mut chunks = stream.chunks(3);320321while let Some(chunk) = chunks.next().await {322println!("Processing chunk: {:?}", chunk);323}324}325326// Merge multiple streams327async fn merge_streams() {328let stream1 = numbers_stream();329let stream2 = numbers_stream();330331let merged = stream::select(stream1, stream2);332333merged334.for_each(|n| async move {335println!("Got: {}", n);336})337.await;338}339```340341### Pattern 7: Resource Management342343```rust344use std::sync::Arc;345use tokio::sync::{Mutex, RwLock, Semaphore};346347// Shared state with RwLock (prefer for read-heavy)348struct Cache {349data: RwLock<HashMap<String, String>>,350}351352impl Cache {353async fn get(&self, key: &str) -> Option<String> {354self.data.read().await.get(key).cloned()355}356357async fn set(&self, key: String, value: String) {358self.data.write().await.insert(key, value);359}360}361362// Connection pool with semaphore363struct Pool {364semaphore: Semaphore,365connections: Mutex<Vec<Connection>>,366}367368impl Pool {369fn new(size: usize) -> Self {370Self {371semaphore: Semaphore::new(size),372connections: Mutex::new((0..size).map(|_| Connection::new()).collect()),373}374}375376async fn acquire(&self) -> PooledConnection<'_> {377let permit = self.semaphore.acquire().await.unwrap();378let conn = self.connections.lock().await.pop().unwrap();379PooledConnection { pool: self, conn: Some(conn), _permit: permit }380}381}382383struct PooledConnection<'a> {384pool: &'a Pool,385conn: Option<Connection>,386_permit: tokio::sync::SemaphorePermit<'a>,387}388389impl Drop for PooledConnection<'_> {390fn drop(&mut self) {391if let Some(conn) = self.conn.take() {392let pool = self.pool;393tokio::spawn(async move {394pool.connections.lock().await.push(conn);395});396}397}398}399```400401## Debugging Tips402403```rust404// Enable tokio-console for runtime debugging405// Cargo.toml: tokio = { features = ["tracing"] }406// Run: RUSTFLAGS="--cfg tokio_unstable" cargo run407// Then: tokio-console408409// Instrument async functions410use tracing::instrument;411412#[instrument(skip(pool))]413async fn fetch_user(pool: &PgPool, id: &str) -> Result<User> {414tracing::debug!("Fetching user");415// ...416}417418// Track task spawning419let span = tracing::info_span!("worker", id = %worker_id);420tokio::spawn(async move {421// Enters span when polled422}.instrument(span));423```424