Loading source
Pulling the file list, source metadata, and syntax-aware rendering for this listing.
Source from bundle
Telegram MTProto MCP server with userbot watcher, chat/DM parser and context builders
Files
Skill
Size
Entrypoint
Format
Open file
Syntax-highlighted preview of this file as included in the skill package.
main.py
1import argparse2import os3import sys4import json5import time6import asyncio7import sqlite38import logging9import mimetypes10from datetime import datetime, timedelta11from enum import Enum12from typing import List, Dict, Optional, Union, Any13from pathlib import Path14from urllib.parse import unquote, urlparse1516# Third-party libraries17import nest_asyncio18from dotenv import load_dotenv19from mcp.server.fastmcp import FastMCP, Context20from mcp.types import ToolAnnotations21from mcp.shared.exceptions import McpError22from pythonjsonlogger import jsonlogger23from telethon import TelegramClient, functions, types, utils24from telethon.sessions import StringSession25from telethon.tl.types import (26User,27Chat,28Channel,29ChatAdminRights,30ChatBannedRights,31ChannelParticipantsKicked,32ChannelParticipantsAdmins,33InputChatPhoto,34InputChatUploadedPhoto,35InputChatPhotoEmpty,36InputPeerUser,37InputPeerChat,38InputPeerChannel,39DialogFilter,40DialogFilterChatlist,41DialogFilterDefault,42TextWithEntities,43)44import re45from functools import wraps46import telethon.errors.rpcerrorlist474849class ValidationError(Exception):50"""Custom exception for validation errors."""5152pass535455def json_serializer(obj):56"""Helper function to convert non-serializable objects for JSON serialization."""57if isinstance(obj, datetime):58return obj.isoformat()59if isinstance(obj, bytes):60return obj.decode("utf-8", errors="replace")61# Add other non-serializable types as needed62raise TypeError(f"Object of type {type(obj)} is not JSON serializable")636465def get_entity_type(entity: Any) -> str:66"""Return a normalized, human-readable chat/entity type."""67if isinstance(entity, User):68return "User"69if isinstance(entity, Chat):70return "Group (Basic)"71if isinstance(entity, Channel):72if getattr(entity, "megagroup", False):73return "Supergroup"74return "Channel" if getattr(entity, "broadcast", False) else "Group"75return type(entity).__name__767778def get_entity_filter_type(entity: Any) -> Optional[str]:79"""Return list_chats-compatible filter type: user/group/channel."""80entity_type = get_entity_type(entity)81if entity_type == "User":82return "user"83if entity_type in ("Group (Basic)", "Group", "Supergroup"):84return "group"85if entity_type == "Channel":86return "channel"87return None888990load_dotenv()9192TELEGRAM_API_ID = int(os.getenv("TELEGRAM_API_ID"))93TELEGRAM_API_HASH = os.getenv("TELEGRAM_API_HASH")94TELEGRAM_SESSION_NAME = os.getenv("TELEGRAM_SESSION_NAME")9596# Check if a string session exists in environment, otherwise use file-based session97SESSION_STRING = os.getenv("TELEGRAM_SESSION_STRING")9899mcp = FastMCP("telegram")100101if SESSION_STRING:102# Use the string session if available103client = TelegramClient(StringSession(SESSION_STRING), TELEGRAM_API_ID, TELEGRAM_API_HASH)104else:105# Use file-based session106client = TelegramClient(TELEGRAM_SESSION_NAME, TELEGRAM_API_ID, TELEGRAM_API_HASH)107108# Setup robust logging with both file and console output109logger = logging.getLogger("telegram_mcp")110logger.setLevel(logging.ERROR) # Set to ERROR for production, INFO for debugging111112# Create console handler113console_handler = logging.StreamHandler()114console_handler.setLevel(logging.ERROR) # Set to ERROR for production, INFO for debugging115116# Create file handler with absolute path117script_dir = os.path.dirname(os.path.abspath(__file__))118log_file_path = os.path.join(script_dir, "mcp_errors.log")119120try:121file_handler = logging.FileHandler(log_file_path, mode="a") # Append mode122file_handler.setLevel(logging.ERROR)123124# Create formatters125# Console formatter remains in the old format126console_formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(name)s - %(message)s")127console_handler.setFormatter(console_formatter)128129# File formatter is now JSON130json_formatter = jsonlogger.JsonFormatter(131"%(asctime)s %(name)s %(levelname)s %(message)s",132datefmt="%Y-%m-%dT%H:%M:%S%z",133)134file_handler.setFormatter(json_formatter)135136# Add handlers to logger137logger.addHandler(console_handler)138logger.addHandler(file_handler)139logger.info(f"Logging initialized to {log_file_path}")140except Exception as log_error:141print(f"WARNING: Error setting up log file: {log_error}", file=sys.stderr)142# Fallback to console-only logging143logger.addHandler(console_handler)144logger.error(f"Failed to set up log file handler: {log_error}")145146147# File-path tool security configuration148SERVER_ALLOWED_ROOTS: list[Path] = []149DEFAULT_DOWNLOAD_SUBDIR = "downloads"150DISALLOWED_PATH_PATTERNS = ("*", "?", "[", "]", "{", "}", "~", "\x00")151EXTENSION_ALLOWLISTS: dict[str, set[str]] = {152"send_voice": {".ogg", ".opus"},153"send_sticker": {".webp"},154"set_profile_photo": {".jpg", ".jpeg", ".png", ".webp"},155"edit_chat_photo": {".jpg", ".jpeg", ".png", ".webp"},156}157MAX_FILE_BYTES: dict[str, int] = {158"send_file": 200 * 1024 * 1024, # 200 MB159"upload_file": 200 * 1024 * 1024,160"send_voice": 100 * 1024 * 1024,161"send_sticker": 10 * 1024 * 1024,162"set_profile_photo": 50 * 1024 * 1024,163"edit_chat_photo": 50 * 1024 * 1024,164}165ROOTS_UNSUPPORTED_ERROR_CODES = {-32601}166ROOTS_STATUS_READY = "ready"167ROOTS_STATUS_NOT_CONFIGURED = "not_configured"168ROOTS_STATUS_UNSUPPORTED_FALLBACK = "unsupported_fallback"169ROOTS_STATUS_CLIENT_DENY_ALL = "client_deny_all"170ROOTS_STATUS_ERROR = "error"171172173# Error code prefix mapping for better error tracing174class ErrorCategory(str, Enum):175CHAT = "CHAT"176MSG = "MSG"177CONTACT = "CONTACT"178GROUP = "GROUP"179MEDIA = "MEDIA"180PROFILE = "PROFILE"181AUTH = "AUTH"182ADMIN = "ADMIN"183FOLDER = "FOLDER"184185186def log_and_format_error(187function_name: str,188error: Exception,189prefix: Optional[Union[ErrorCategory, str]] = None,190user_message: str = None,191**kwargs,192) -> str:193"""194Centralized error handling function.195196Logs an error and returns a formatted, user-friendly message.197198Args:199function_name: Name of the function where the error occurred.200error: The exception that was raised.201prefix: Error code prefix (e.g., ErrorCategory.CHAT, "VALIDATION-001").202If None, it will be derived from the function_name.203user_message: A custom user-facing message to return. If None, a generic one is created.204**kwargs: Additional context parameters to include in the log.205206Returns:207A user-friendly error message with an error code.208"""209# Generate a consistent error code210if isinstance(prefix, str) and prefix == "VALIDATION-001":211# Special case for validation errors212error_code = prefix213else:214if prefix is None:215# Try to derive prefix from function name216for category in ErrorCategory:217if category.name.lower() in function_name.lower():218prefix = category219break220221prefix_str = prefix.value if isinstance(prefix, ErrorCategory) else (prefix or "GEN")222error_code = f"{prefix_str}-ERR-{abs(hash(function_name)) % 1000:03d}"223224# Format the additional context parameters225context = ", ".join(f"{k}={v}" for k, v in kwargs.items())226227# Log the full technical error228logger.error(f"Error in {function_name} ({context}) - Code: {error_code}", exc_info=True)229230# Return a user-friendly message231if user_message:232return user_message233234return f"An error occurred (code: {error_code}). Check mcp_errors.log for details."235236237def validate_id(*param_names_to_validate):238"""239Decorator to validate chat_id and user_id parameters, including lists of IDs.240It checks for valid integer ranges, string representations of integers,241and username formats.242"""243244def decorator(func):245@wraps(func)246async def wrapper(*args, **kwargs):247for param_name in param_names_to_validate:248if param_name not in kwargs or kwargs[param_name] is None:249continue250251param_value = kwargs[param_name]252253def validate_single_id(value, p_name):254# Handle integer IDs255if isinstance(value, int):256if not (-(2**63) <= value <= 2**63 - 1):257return (258None,259f"Invalid {p_name}: {value}. ID is out of the valid integer range.",260)261return value, None262263# Handle string IDs264if isinstance(value, str):265try:266int_value = int(value)267if not (-(2**63) <= int_value <= 2**63 - 1):268return (269None,270f"Invalid {p_name}: {value}. ID is out of the valid integer range.",271)272return int_value, None273except ValueError:274if re.match(r"^@?[a-zA-Z0-9_]{5,}$", value):275return value, None276else:277return (278None,279f"Invalid {p_name}: '{value}'. Must be a valid integer ID, or a username string.",280)281282# Handle other invalid types283return (284None,285f"Invalid {p_name}: {value}. Type must be an integer or a string.",286)287288if isinstance(param_value, list):289validated_list = []290for item in param_value:291validated_item, error_msg = validate_single_id(item, param_name)292if error_msg:293return log_and_format_error(294func.__name__,295ValidationError(error_msg),296prefix="VALIDATION-001",297user_message=error_msg,298**{param_name: param_value},299)300validated_list.append(validated_item)301kwargs[param_name] = validated_list302else:303validated_value, error_msg = validate_single_id(param_value, param_name)304if error_msg:305return log_and_format_error(306func.__name__,307ValidationError(error_msg),308prefix="VALIDATION-001",309user_message=error_msg,310**{param_name: param_value},311)312kwargs[param_name] = validated_value313314return await func(*args, **kwargs)315316return wrapper317318return decorator319320321def format_entity(entity) -> Dict[str, Any]:322"""Helper function to format entity information consistently."""323result = {"id": entity.id}324325if hasattr(entity, "title"):326result["name"] = entity.title327result["type"] = "group" if isinstance(entity, Chat) else "channel"328elif hasattr(entity, "first_name"):329name_parts = []330if entity.first_name:331name_parts.append(entity.first_name)332if hasattr(entity, "last_name") and entity.last_name:333name_parts.append(entity.last_name)334result["name"] = " ".join(name_parts)335result["type"] = "user"336if hasattr(entity, "username") and entity.username:337result["username"] = entity.username338if hasattr(entity, "phone") and entity.phone:339result["phone"] = entity.phone340341return result342343344async def resolve_entity(identifier: Union[int, str]) -> Any:345"""Resolve entity with automatic cache warming on miss.346347StringSession has no persistent entity cache. If get_entity() fails348because the cache is cold (ValueError on PeerUser lookup for group IDs),349warm the cache via get_dialogs() and retry.350"""351try:352return await client.get_entity(identifier)353except ValueError:354await client.get_dialogs()355return await client.get_entity(identifier)356357358async def resolve_input_entity(identifier: Union[int, str]) -> Any:359"""Like resolve_entity() but returns an InputPeer."""360try:361return await client.get_input_entity(identifier)362except ValueError:363await client.get_dialogs()364return await client.get_input_entity(identifier)365366367def format_message(message) -> Dict[str, Any]:368"""Helper function to format message information consistently."""369result = {370"id": message.id,371"date": message.date.isoformat(),372"text": message.message or "",373}374375if message.from_id:376result["from_id"] = utils.get_peer_id(message.from_id)377378if message.media:379result["has_media"] = True380result["media_type"] = type(message.media).__name__381382return result383384385def get_sender_name(message) -> str:386"""Helper function to get sender name from a message."""387if not message.sender:388return "Unknown"389390# Check for group/channel title first391if hasattr(message.sender, "title") and message.sender.title:392return message.sender.title393elif hasattr(message.sender, "first_name"):394# User sender395first_name = getattr(message.sender, "first_name", "") or ""396last_name = getattr(message.sender, "last_name", "") or ""397full_name = f"{first_name} {last_name}".strip()398return full_name if full_name else "Unknown"399else:400return "Unknown"401402403def get_engagement_info(message) -> str:404"""Helper function to get engagement metrics (views, forwards, reactions) from a message."""405engagement_parts = []406views = getattr(message, "views", None)407if views is not None:408engagement_parts.append(f"views:{views}")409forwards = getattr(message, "forwards", None)410if forwards is not None:411engagement_parts.append(f"forwards:{forwards}")412reactions = getattr(message, "reactions", None)413if reactions is not None:414results = getattr(reactions, "results", None)415total_reactions = sum(getattr(r, "count", 0) or 0 for r in results) if results else 0416engagement_parts.append(f"reactions:{total_reactions}")417return f" | {', '.join(engagement_parts)}" if engagement_parts else ""418419420def _dedupe_paths(paths: List[Path]) -> List[Path]:421seen: set[str] = set()422result: List[Path] = []423for path in paths:424key = str(path)425if key in seen:426continue427seen.add(key)428result.append(path)429return result430431432def _contains_forbidden_path_patterns(raw_path: str) -> Optional[str]:433value = raw_path.strip()434if not value:435return "Path must not be empty."436if any(token in value for token in DISALLOWED_PATH_PATTERNS):437return "Path contains disallowed wildcard/shell patterns."438if ".." in Path(value).parts:439return "Path traversal is not allowed."440return None441442443def _coerce_root_uri_to_path(uri: str) -> Path:444parsed = urlparse(uri)445if parsed.scheme != "file":446raise ValueError(f"Unsupported root URI scheme: {parsed.scheme}")447448decoded_path = unquote(parsed.path or "")449if parsed.netloc and parsed.netloc not in ("", "localhost"):450decoded_path = f"//{parsed.netloc}{decoded_path}"451if os.name == "nt" and decoded_path.startswith("/") and len(decoded_path) > 2:452# file:///C:/tmp -> C:/tmp on Windows453if decoded_path[2] == ":":454decoded_path = decoded_path