Loading source
Pulling the file list, source metadata, and syntax-aware rendering for this listing.
Source from repo
Fetch daily multi-ticker stock data with RSI, Bollinger Bands, Stochastic, and BUY/HOLD/SELL signals.
Files
Skill
Size
Entrypoint
Format
Open file
Syntax-highlighted preview of this file as included in the skill package.
scripts/portfolio.py
1#!/usr/bin/env python32# /// script3# requires-python = ">=3.10"4# dependencies = ["yfinance>=0.2.40"]5# ///6"""7Portfolio management for stock-analysis skill.89Usage:10uv run portfolio.py create "Portfolio Name"11uv run portfolio.py list12uv run portfolio.py show [--portfolio NAME]13uv run portfolio.py delete "Portfolio Name"14uv run portfolio.py rename "Old Name" "New Name"1516uv run portfolio.py add TICKER --quantity 100 --cost 150.00 [--portfolio NAME]17uv run portfolio.py update TICKER --quantity 150 [--portfolio NAME]18uv run portfolio.py remove TICKER [--portfolio NAME]19"""2021import argparse22import json23import os24import sys25from dataclasses import dataclass, asdict26from datetime import datetime27from pathlib import Path28from typing import Literal2930import yfinance as yf313233# Top 20 supported cryptocurrencies34SUPPORTED_CRYPTOS = {35"BTC-USD", "ETH-USD", "BNB-USD", "SOL-USD", "XRP-USD",36"ADA-USD", "DOGE-USD", "AVAX-USD", "DOT-USD", "MATIC-USD",37"LINK-USD", "ATOM-USD", "UNI-USD", "LTC-USD", "BCH-USD",38"XLM-USD", "ALGO-USD", "VET-USD", "FIL-USD", "NEAR-USD",39}404142def get_storage_path() -> Path:43"""Get the portfolio storage path."""44# Use ~/.clawdbot/skills/stock-analysis/portfolios.json45state_dir = os.environ.get("CLAWDBOT_STATE_DIR", os.path.expanduser("~/.clawdbot"))46portfolio_dir = Path(state_dir) / "skills" / "stock-analysis"47portfolio_dir.mkdir(parents=True, exist_ok=True)48return portfolio_dir / "portfolios.json"495051def detect_asset_type(ticker: str) -> Literal["stock", "crypto"]:52"""Detect asset type from ticker format."""53ticker_upper = ticker.upper()54if ticker_upper.endswith("-USD"):55base = ticker_upper[:-4]56if base.isalpha() and f"{base}-USD" in SUPPORTED_CRYPTOS:57return "crypto"58# Allow any *-USD ticker as crypto (flexible)59if base.isalpha():60return "crypto"61return "stock"626364@dataclass65class Asset:66ticker: str67type: Literal["stock", "crypto"]68quantity: float69cost_basis: float70added_at: str717273@dataclass74class Portfolio:75name: str76created_at: str77updated_at: str78assets: list[Asset]798081class PortfolioStore:82"""Manages portfolio storage with atomic writes."""8384def __init__(self, path: Path | None = None):85self.path = path or get_storage_path()86self._data: dict | None = None8788def _load(self) -> dict:89"""Load portfolios from disk."""90if self._data is not None:91return self._data9293if not self.path.exists():94self._data = {"version": 1, "portfolios": {}}95return self._data9697try:98with open(self.path, "r", encoding="utf-8") as f:99self._data = json.load(f)100return self._data101except (json.JSONDecodeError, IOError):102self._data = {"version": 1, "portfolios": {}}103return self._data104105def _save(self) -> None:106"""Save portfolios to disk with atomic write."""107if self._data is None:108return109110# Ensure directory exists111self.path.parent.mkdir(parents=True, exist_ok=True)112113# Atomic write: write to temp file, then rename114tmp_path = self.path.with_suffix(".tmp")115try:116with open(tmp_path, "w", encoding="utf-8") as f:117json.dump(self._data, f, indent=2)118tmp_path.replace(self.path)119except Exception:120if tmp_path.exists():121tmp_path.unlink()122raise123124def _get_portfolio_key(self, name: str) -> str:125"""Convert portfolio name to storage key."""126return name.lower().replace(" ", "-")127128def list_portfolios(self) -> list[str]:129"""List all portfolio names."""130data = self._load()131return [p["name"] for p in data["portfolios"].values()]132133def get_portfolio(self, name: str) -> Portfolio | None:134"""Get a portfolio by name."""135data = self._load()136key = self._get_portfolio_key(name)137138if key not in data["portfolios"]:139# Try case-insensitive match140for k, v in data["portfolios"].items():141if v["name"].lower() == name.lower():142key = k143break144else:145return None146147p = data["portfolios"][key]148assets = [149Asset(150ticker=a["ticker"],151type=a["type"],152quantity=a["quantity"],153cost_basis=a["cost_basis"],154added_at=a["added_at"],155)156for a in p.get("assets", [])157]158return Portfolio(159name=p["name"],160created_at=p["created_at"],161updated_at=p["updated_at"],162assets=assets,163)164165def create_portfolio(self, name: str) -> Portfolio:166"""Create a new portfolio."""167data = self._load()168key = self._get_portfolio_key(name)169170if key in data["portfolios"]:171raise ValueError(f"Portfolio '{name}' already exists")172173now = datetime.now().isoformat()174portfolio = {175"name": name,176"created_at": now,177"updated_at": now,178"assets": [],179}180data["portfolios"][key] = portfolio181self._save()182183return Portfolio(name=name, created_at=now, updated_at=now, assets=[])184185def delete_portfolio(self, name: str) -> bool:186"""Delete a portfolio."""187data = self._load()188key = self._get_portfolio_key(name)189190# Try case-insensitive match191if key not in data["portfolios"]:192for k, v in data["portfolios"].items():193if v["name"].lower() == name.lower():194key = k195break196else:197return False198199del data["portfolios"][key]200self._save()201return True202203def rename_portfolio(self, old_name: str, new_name: str) -> bool:204"""Rename a portfolio."""205data = self._load()206old_key = self._get_portfolio_key(old_name)207new_key = self._get_portfolio_key(new_name)208209# Find old portfolio210if old_key not in data["portfolios"]:211for k, v in data["portfolios"].items():212if v["name"].lower() == old_name.lower():213old_key = k214break215else:216return False217218if new_key in data["portfolios"] and new_key != old_key:219raise ValueError(f"Portfolio '{new_name}' already exists")220221portfolio = data["portfolios"].pop(old_key)222portfolio["name"] = new_name223portfolio["updated_at"] = datetime.now().isoformat()224data["portfolios"][new_key] = portfolio225self._save()226return True227228def add_asset(229self,230portfolio_name: str,231ticker: str,232quantity: float,233cost_basis: float,234) -> Asset:235"""Add an asset to a portfolio."""236data = self._load()237key = self._get_portfolio_key(portfolio_name)238239# Find portfolio240if key not in data["portfolios"]:241for k, v in data["portfolios"].items():242if v["name"].lower() == portfolio_name.lower():243key = k244break245else:246raise ValueError(f"Portfolio '{portfolio_name}' not found")247248portfolio = data["portfolios"][key]249ticker = ticker.upper()250251# Check if asset already exists252for asset in portfolio["assets"]:253if asset["ticker"] == ticker:254raise ValueError(f"Asset '{ticker}' already in portfolio. Use 'update' to modify.")255256# Validate ticker257asset_type = detect_asset_type(ticker)258try:259stock = yf.Ticker(ticker)260info = stock.info261if "regularMarketPrice" not in info:262raise ValueError(f"Invalid ticker: {ticker}")263except Exception as e:264raise ValueError(f"Could not validate ticker '{ticker}': {e}")265266now = datetime.now().isoformat()267asset = {268"ticker": ticker,269"type": asset_type,270"quantity": quantity,271"cost_basis": cost_basis,272"added_at": now,273}274portfolio["assets"].append(asset)275portfolio["updated_at"] = now276self._save()277278return Asset(**asset)279280def update_asset(281self,282portfolio_name: str,283ticker: str,284quantity: float | None = None,285cost_basis: float | None = None,286) -> Asset | None:287"""Update an asset in a portfolio."""288data = self._load()289key = self._get_portfolio_key(portfolio_name)290291# Find portfolio292if key not in data["portfolios"]:293for k, v in data["portfolios"].items():294if v["name"].lower() == portfolio_name.lower():295key = k296break297else:298return None299300portfolio = data["portfolios"][key]301ticker = ticker.upper()302303for asset in portfolio["assets"]:304if asset["ticker"] == ticker:305if quantity is not None:306asset["quantity"] = quantity307if cost_basis is not None:308asset["cost_basis"] = cost_basis309portfolio["updated_at"] = datetime.now().isoformat()310self._save()311return Asset(**asset)312313return None314315def remove_asset(self, portfolio_name: str, ticker: str) -> bool:316"""Remove an asset from a portfolio."""317data = self._load()318key = self._get_portfolio_key(portfolio_name)319320# Find portfolio321if key not in data["portfolios"]:322for k, v in data["portfolios"].items():323if v["name"].lower() == portfolio_name.lower():324key = k325break326else:327return False328329portfolio = data["portfolios"][key]330ticker = ticker.upper()331332original_len = len(portfolio["assets"])333portfolio["assets"] = [a for a in portfolio["assets"] if a["ticker"] != ticker]334335if len(portfolio["assets"]) < original_len:336portfolio["updated_at"] = datetime.now().isoformat()337self._save()338return True339340return False341342def get_default_portfolio_name(self) -> str | None:343"""Get the default (first) portfolio name, or None if empty."""344portfolios = self.list_portfolios()345return portfolios[0] if portfolios else None346347348def format_currency(value: float) -> str:349"""Format a value as currency."""350if abs(value) >= 1_000_000:351return f"${value/1_000_000:.2f}M"352elif abs(value) >= 1_000:353return f"${value/1_000:.2f}K"354else:355return f"${value:.2f}"356357358def show_portfolio(portfolio: Portfolio, verbose: bool = False) -> None:359"""Display portfolio details with current prices."""360print(f"\n{'='*60}")361print(f"PORTFOLIO: {portfolio.name}")362print(f"Created: {portfolio.created_at[:10]} | Updated: {portfolio.updated_at[:10]}")363print(f"{'='*60}\n")364365if not portfolio.assets:366print(" No assets in portfolio. Use 'add' to add assets.\n")367return368369total_cost = 0.0370total_value = 0.0371372print(f"{'Ticker':<12} {'Type':<8} {'Qty':>10} {'Cost':>12} {'Current':>12} {'Value':>14} {'P&L':>12}")373print("-" * 82)374375for asset in portfolio.assets:376try:377stock = yf.Ticker(asset.ticker)378current_price = stock.info.get("regularMarketPrice", 0) or 0379except Exception:380current_price = 0381382cost_total = asset.quantity * asset.cost_basis383current_value = asset.quantity * current_price384pnl = current_value - cost_total385pnl_pct = (pnl / cost_total * 100) if cost_total > 0 else 0386387total_cost += cost_total388total_value += current_value389390pnl_str = f"{'+' if pnl >= 0 else ''}{format_currency(pnl)} ({pnl_pct:+.1f}%)"391392print(f"{asset.ticker:<12} {asset.type:<8} {asset.quantity:>10.4f} "393f"{format_currency(asset.cost_basis):>12} {format_currency(current_price):>12} "394f"{format_currency(current_value):>14} {pnl_str:>12}")395396print("-" * 82)397total_pnl = total_value - total_cost398total_pnl_pct = (total_pnl / total_cost * 100) if total_cost > 0 else 0399print(f"{'TOTAL':<12} {'':<8} {'':<10} {format_currency(total_cost):>12} {'':<12} "400f"{format_currency(total_value):>14} {'+' if total_pnl >= 0 else ''}{format_currency(total_pnl)} ({total_pnl_pct:+.1f}%)")401print()402403404def main():405parser = argparse.ArgumentParser(description="Portfolio management for stock-analysis")406subparsers = parser.add_subparsers(dest="command", help="Commands")407408# create409create_parser = subparsers.add_parser("create", help="Create a new portfolio")410create_parser.add_argument("name", help="Portfolio name")411412# list413subparsers.add_parser("list", help="List all portfolios")414415# show416show_parser = subparsers.add_parser("show", help="Show portfolio details")417show_parser.add_argument("--portfolio", "-p", help="Portfolio name (default: first portfolio)")418419# delete420delete_parser = subparsers.add_parser("delete", help="Delete a portfolio")421delete_parser.add_argument("name", help="Portfolio name")422423# rename424rename_parser = subparsers.add_parser("rename", help="Rename a portfolio")425rename_parser.add_argument("old_name", help="Current portfolio name")426rename_parser.add_argument("new_name", help="New portfolio name")427428# add429add_parser = subparsers.add_parser("add", help="Add an asset to portfolio")430add_parser.add_argument("ticker", help="Stock/crypto ticker (e.g., AAPL, BTC-USD)")431add_parser.add_argument("--quantity", "-q", type=float, required=True, help="Quantity")432add_parser.add_argument("--cost", "-c", type=float, required=True, help="Cost basis per unit")433add_parser.add_argument("--portfolio", "-p", help="Portfolio name (default: first portfolio)")434435# update436update_parser = subparsers.add_parser("update", help="Update an asset in portfolio")437update_parser.add_argument("ticker", help="Stock/crypto ticker")438update_parser.add_argument("--quantity", "-q", type=float, help="New quantity")439update_parser.add_argument("--cost", "-c", type=float, help="New cost basis per unit")440update_parser.add_argument("--portfolio", "-p", help="Portfolio name (default: first portfolio)")441442# remove443remove_parser = subparsers.add_parser("remove", help="Remove an asset from portfolio")444remove_parser.add_argument("ticker", help="Stock/crypto ticker")445remove_parser.add_argument("--portfolio", "-p", help="Portfolio name (default: first portfolio)")446447args = parser.parse_args()448449if not args.command:450parser.print_help()451sys.exit(1)452453store = PortfolioStore()454455try:456if args.command == "create":457portfolio = store.create_portfolio(args.name)458print(f"Created portfolio: {portfolio.name}")459460elif args.command == "list":461portfolios = store.list_portfolios()462if not portfolios:463print("No portfolios found. Use 'create' to create one.")464else:465print("\nPortfolios:")466for name in portfolios:467p = store.get_portfolio(name)468asset_count = len(p.assets) if p else 0469print(f" - {name} ({asset_count} assets)")470print()471472elif args.command == "show":473portfolio_name = args.portfolio or store.get_default_portfolio_name()474if not portfolio_name:475print("No portfolios found. Use 'create' to create one.")476sys.exit(1)477478portfolio = store.get_portfolio(portfolio_name)479if not portfolio:480print(f"Portfolio '{portfolio_name}' not found.")481sys.exit(1)482483show_portfolio(portfolio)484485elif args.command == "delete":486if store.delete_portfolio(args.name):487print(f"Deleted portfolio: {args.name}")488else:489print(f"Portfolio '{args.name}' not found.")490sys.exit(1)491492elif args.command == "rename":493if store.rename_portfolio(args.old_name, args.new_name):494print(f"Renamed portfolio: {args.old_name} -> {args.new_name}")495else:496print(f"Portfolio '{args.old_name}' not found.")497sys.exit(1)498499elif args.command == "add":500portfolio_name = args.portfolio or store.get_default_portfolio_name()501if not portfolio_name:502print("No portfolios found. Use 'create' to create one first.")503sys.exit(1)504505asset = store.add_asset(portfolio_name, args.ticker, args.quantity, args.cost)506print(f"Added {asset.ticker} ({asset.type}) to {portfolio_name}: "507f"{asset.quantity} units @ {format_currency(asset.cost_basis)}")508509elif args.command == "update":510portfolio_name = args.portfolio or store.get_default_portfolio_name()511if not portfolio_name:512print("No portfolios found.")513sys.exit(1)514515if args.quantity is None and args.cost is None:516print("Must specify --quantity and/or --cost to update.")517sys.exit(1)518519asset = store.update_asset(portfolio_name, args.ticker, args.quantity, args.cost)520if asset:521print(f"Updated {asset.ticker} in {portfolio_name}: "522f"{asset.quantity} units @ {format_currency(asset.cost_basis)}")523else:524print(f"Asset '{args.ticker}' not found in portfolio '{portfolio_name}'.")525sys.exit(1)526527elif args.command == "remove":528portfolio_name = args.portfolio or store.get_default_portfolio_name()529if not portfolio_name:530print("No portfolios found.")531sys.exit(1)532533if store.remove_asset(portfolio_name, args.ticker):534print(f"Removed {args.ticker.upper()} from {portfolio_name}")535else:536print(f"Asset '{args.ticker}' not found in portfolio '{portfolio_name}'.")537sys.exit(1)538539except ValueError as e:540print(f"Error: {e}")541sys.exit(1)542except Exception as e:543print(f"Unexpected error: {e}")544sys.exit(1)545546547if __name__ == "__main__":548main()549