Loading source
Pulling the file list, source metadata, and syntax-aware rendering for this listing.
Source from repo
Implement Rust async code with Tokio, handle concurrency primitives, and avoid common async pitfalls.
Files
Skill
Size
Entrypoint
Format
Open file
Syntax-highlighted preview of this file as included in the skill package.
SKILL.md
1---2name: rust-async-patterns3description: Master Rust async programming with Tokio, async traits, error handling, and concurrent patterns. Use when building async Rust applications, implementing concurrent systems, or debugging async code.4---56# Rust Async Patterns78Production patterns for async Rust programming with Tokio runtime, including tasks, channels, streams, and error handling.910## When to Use This Skill1112- Building async Rust applications13- Implementing concurrent network services14- Using Tokio for async I/O15- Handling async errors properly16- Debugging async code issues17- Optimizing async performance1819## Core Concepts2021### 1. Async Execution Model2223```24Future (lazy) → poll() → Ready(value) | Pending25↑ ↓26Waker ← Runtime schedules27```2829### 2. Key Abstractions3031| Concept | Purpose |32| ---------- | ---------------------------------------- |33| `Future` | Lazy computation that may complete later |34| `async fn` | Function returning impl Future |35| `await` | Suspend until future completes |36| `Task` | Spawned future running concurrently |37| `Runtime` | Executor that polls futures |3839## Quick Start4041```toml42# Cargo.toml43[dependencies]44tokio = { version = "1", features = ["full"] }45futures = "0.3"46async-trait = "0.1"47anyhow = "1.0"48tracing = "0.1"49tracing-subscriber = "0.3"50```5152```rust53use tokio::time::{sleep, Duration};54use anyhow::Result;5556#[tokio::main]57async fn main() -> Result<()> {58// Initialize tracing59tracing_subscriber::fmt::init();6061// Async operations62let result = fetch_data("https://api.example.com").await?;63println!("Got: {}", result);6465Ok(())66}6768async fn fetch_data(url: &str) -> Result<String> {69// Simulated async operation70sleep(Duration::from_millis(100)).await;71Ok(format!("Data from {}", url))72}73```7475## Patterns7677### Pattern 1: Concurrent Task Execution7879```rust80use tokio::task::JoinSet;81use anyhow::Result;8283// Spawn multiple concurrent tasks84async fn fetch_all_concurrent(urls: Vec<String>) -> Result<Vec<String>> {85let mut set = JoinSet::new();8687for url in urls {88set.spawn(async move {89fetch_data(&url).await90});91}9293let mut results = Vec::new();94while let Some(res) = set.join_next().await {95match res {96Ok(Ok(data)) => results.push(data),97Ok(Err(e)) => tracing::error!("Task failed: {}", e),98Err(e) => tracing::error!("Join error: {}", e),99}100}101102Ok(results)103}104105// With concurrency limit106use futures::stream::{self, StreamExt};107108async fn fetch_with_limit(urls: Vec<String>, limit: usize) -> Vec<Result<String>> {109stream::iter(urls)110.map(|url| async move { fetch_data(&url).await })111.buffer_unordered(limit) // Max concurrent tasks112.collect()113.await114}115116// Select first to complete117use tokio::select;118119async fn race_requests(url1: &str, url2: &str) -> Result<String> {120select! {121result = fetch_data(url1) => result,122result = fetch_data(url2) => result,123}124}125```126127### Pattern 2: Channels for Communication128129```rust130use tokio::sync::{mpsc, broadcast, oneshot, watch};131132// Multi-producer, single-consumer133async fn mpsc_example() {134let (tx, mut rx) = mpsc::channel::<String>(100);135136// Spawn producer137let tx2 = tx.clone();138tokio::spawn(async move {139tx2.send("Hello".to_string()).await.unwrap();140});141142// Consume143while let Some(msg) = rx.recv().await {144println!("Got: {}", msg);145}146}147148// Broadcast: multi-producer, multi-consumer149async fn broadcast_example() {150let (tx, _) = broadcast::channel::<String>(100);151152let mut rx1 = tx.subscribe();153let mut rx2 = tx.subscribe();154155tx.send("Event".to_string()).unwrap();156157// Both receivers get the message158let _ = rx1.recv().await;159let _ = rx2.recv().await;160}161162// Oneshot: single value, single use163async fn oneshot_example() -> String {164let (tx, rx) = oneshot::channel::<String>();165166tokio::spawn(async move {167tx.send("Result".to_string()).unwrap();168});169170rx.await.unwrap()171}172173// Watch: single producer, multi-consumer, latest value174async fn watch_example() {175let (tx, mut rx) = watch::channel("initial".to_string());176177tokio::spawn(async move {178loop {179// Wait for changes180rx.changed().await.unwrap();181println!("New value: {}", *rx.borrow());182}183});184185tx.send("updated".to_string()).unwrap();186}187```188189### Pattern 3: Async Error Handling190191```rust192use anyhow::{Context, Result, bail};193use thiserror::Error;194195#[derive(Error, Debug)]196pub enum ServiceError {197#[error("Network error: {0}")]198Network(#[from] reqwest::Error),199200#[error("Database error: {0}")]201Database(#[from] sqlx::Error),202203#[error("Not found: {0}")]204NotFound(String),205206#[error("Timeout after {0:?}")]207Timeout(std::time::Duration),208}209210// Using anyhow for application errors211async fn process_request(id: &str) -> Result<Response> {212let data = fetch_data(id)213.await214.context("Failed to fetch data")?;215216let parsed = parse_response(&data)217.context("Failed to parse response")?;218219Ok(parsed)220}221222// Using custom errors for library code223async fn get_user(id: &str) -> Result<User, ServiceError> {224let result = db.query(id).await?;225226match result {227Some(user) => Ok(user),228None => Err(ServiceError::NotFound(id.to_string())),229}230}231232// Timeout wrapper233use tokio::time::timeout;234235async fn with_timeout<T, F>(duration: Duration, future: F) -> Result<T, ServiceError>236where237F: std::future::Future<Output = Result<T, ServiceError>>,238{239timeout(duration, future)240.await241.map_err(|_| ServiceError::Timeout(duration))?242}243```244245### Pattern 4: Graceful Shutdown246247```rust248use tokio::signal;249use tokio::sync::broadcast;250use tokio_util::sync::CancellationToken;251252async fn run_server() -> Result<()> {253// Method 1: CancellationToken254let token = CancellationToken::new();255let token_clone = token.clone();256257// Spawn task that respects cancellation258tokio::spawn(async move {259loop {260tokio::select! {261_ = token_clone.cancelled() => {262tracing::info!("Task shutting down");263break;264}265_ = do_work() => {}266}267}268});269270// Wait for shutdown signal271signal::ctrl_c().await?;272tracing::info!("Shutdown signal received");273274// Cancel all tasks275token.cancel();276277// Give tasks time to cleanup278tokio::time::sleep(Duration::from_secs(5)).await;279280Ok(())281}282283// Method 2: Broadcast channel for shutdown284async fn run_with_broadcast() -> Result<()> {285let (shutdown_tx, _) = broadcast::channel::<()>(1);286287let mut rx = shutdown_tx.subscribe();288tokio::spawn(async move {289tokio::select! {290_ = rx.recv() => {291tracing::info!("Received shutdown");292}293_ = async { loop { do_work().await } } => {}294}295});296297signal::ctrl_c().await?;298let _ = shutdown_tx.send(());299300Ok(())301}302```303304### Pattern 5: Async Traits305306```rust307use async_trait::async_trait;308309#[async_trait]310pub trait Repository {311async fn get(&self, id: &str) -> Result<Entity>;312async fn save(&self, entity: &Entity) -> Result<()>;313async fn delete(&self, id: &str) -> Result<()>;314}315316pub struct PostgresRepository {317pool: sqlx::PgPool,318}319320#[async_trait]321impl Repository for PostgresRepository {322async fn get(&self, id: &str) -> Result<Entity> {323sqlx::query_as!(Entity, "SELECT * FROM entities WHERE id = $1", id)324.fetch_one(&self.pool)325.await326.map_err(Into::into)327}328329async fn save(&self, entity: &Entity) -> Result<()> {330sqlx::query!(331"INSERT INTO entities (id, data) VALUES ($1, $2)332ON CONFLICT (id) DO UPDATE SET data = $2",333entity.id,334entity.data335)336.execute(&self.pool)337.await?;338Ok(())339}340341async fn delete(&self, id: &str) -> Result<()> {342sqlx::query!("DELETE FROM entities WHERE id = $1", id)343.execute(&self.pool)344.await?;345Ok(())346}347}348349// Trait object usage350async fn process(repo: &dyn Repository, id: &str) -> Result<()> {351let entity = repo.get(id).await?;352// Process...353repo.save(&entity).await354}355```356357### Pattern 6: Streams and Async Iteration358359```rust360use futures::stream::{self, Stream, StreamExt};361use async_stream::stream;362363// Create stream from async iterator364fn numbers_stream() -> impl Stream<Item = i32> {365stream! {366for i in 0..10 {367tokio::time::sleep(Duration::from_millis(100)).await;368yield i;369}370}371}372373// Process stream374async fn process_stream() {375let stream = numbers_stream();376377// Map and filter378let processed: Vec<_> = stream379.filter(|n| futures::future::ready(*n % 2 == 0))380.map(|n| n * 2)381.collect()382.await;383384println!("{:?}", processed);385}386387// Chunked processing388async fn process_in_chunks() {389let stream = numbers_stream();390391let mut chunks = stream.chunks(3);392393while let Some(chunk) = chunks.next().await {394println!("Processing chunk: {:?}", chunk);395}396}397398// Merge multiple streams399async fn merge_streams() {400let stream1 = numbers_stream();401let stream2 = numbers_stream();402403let merged = stream::select(stream1, stream2);404405merged406.for_each(|n| async move {407println!("Got: {}", n);408})409.await;410}411```412413### Pattern 7: Resource Management414415```rust416use std::sync::Arc;417use tokio::sync::{Mutex, RwLock, Semaphore};418419// Shared state with RwLock (prefer for read-heavy)420struct Cache {421data: RwLock<HashMap<String, String>>,422}423424impl Cache {425async fn get(&self, key: &str) -> Option<String> {426self.data.read().await.get(key).cloned()427}428429async fn set(&self, key: String, value: String) {430self.data.write().await.insert(key, value);431}432}433434// Connection pool with semaphore435struct Pool {436semaphore: Semaphore,437connections: Mutex<Vec<Connection>>,438}439440impl Pool {441fn new(size: usize) -> Self {442Self {443semaphore: Semaphore::new(size),444connections: Mutex::new((0..size).map(|_| Connection::new()).collect()),445}446}447448async fn acquire(&self) -> PooledConnection<'_> {449let permit = self.semaphore.acquire().await.unwrap();450let conn = self.connections.lock().await.pop().unwrap();451PooledConnection { pool: self, conn: Some(conn), _permit: permit }452}453}454455struct PooledConnection<'a> {456pool: &'a Pool,457conn: Option<Connection>,458_permit: tokio::sync::SemaphorePermit<'a>,459}460461impl Drop for PooledConnection<'_> {462fn drop(&mut self) {463if let Some(conn) = self.conn.take() {464let pool = self.pool;465tokio::spawn(async move {466pool.connections.lock().await.push(conn);467});468}469}470}471```472473## Debugging Tips474475```rust476// Enable tokio-console for runtime debugging477// Cargo.toml: tokio = { features = ["tracing"] }478// Run: RUSTFLAGS="--cfg tokio_unstable" cargo run479// Then: tokio-console480481// Instrument async functions482use tracing::instrument;483484#[instrument(skip(pool))]485async fn fetch_user(pool: &PgPool, id: &str) -> Result<User> {486tracing::debug!("Fetching user");487// ...488}489490// Track task spawning491let span = tracing::info_span!("worker", id = %worker_id);492tokio::spawn(async move {493// Enters span when polled494}.instrument(span));495```496497## Best Practices498499### Do's500501- **Use `tokio::select!`** - For racing futures502- **Prefer channels** - Over shared state when possible503- **Use `JoinSet`** - For managing multiple tasks504- **Instrument with tracing** - For debugging async code505- **Handle cancellation** - Check `CancellationToken`506507### Don'ts508509- **Don't block** - Never use `std::thread::sleep` in async510- **Don't hold locks across awaits** - Causes deadlocks511- **Don't spawn unboundedly** - Use semaphores for limits512- **Don't ignore errors** - Propagate with `?` or log513- **Don't forget Send bounds** - For spawned futures514