Loading source
Pulling the file list, source metadata, and syntax-aware rendering for this listing.
Source from bundle
Telegram MTProto MCP server with userbot watcher, chat/DM parser and context builders
Files
Skill
Size
Entrypoint
Format
Open file
Syntax-highlighted preview of this file as included in the skill package.
watcher.py
1"""2watcher.py — полный хендлер сообщений для Март 7.34- Хендлит ВСЕ входящие и исходящие сообщения (лички + группы до 1000 чел)5- Каналы и чаты >1000 участников игнорирует6- Записывает контекст в context/messages.jsonl7- Триггер "клав" работает ТОЛЬКО от создателя (OWNER_ID)8- OWNER_ID задаётся через переменную окружения TELEGRAM_OWNER_ID9"""1011import asyncio12import os13import subprocess14import re15import json16import logging17from datetime import datetime, timezone18from pathlib import Path19from dotenv import load_dotenv20from telethon import TelegramClient, events21from telethon.sessions import StringSession22from telethon.tl.types import (23User, Chat, Channel,24PeerUser, PeerChat, PeerChannel25)2627load_dotenv()2829API_ID = int(os.getenv("TELEGRAM_API_ID"))30API_HASH = os.getenv("TELEGRAM_API_HASH")31SESSION_NAME = os.getenv("TELEGRAM_SESSION_NAME", "my_session")32SESSION_STRING = os.getenv("TELEGRAM_SESSION_STRING")3334# ═══ СОЗДАТЕЛЬ — единственный кто может управлять ═══35OWNER_ID = int(os.getenv("TELEGRAM_OWNER_ID", "0")) # set in .env3637TRIGGER = "клав"38MAX_CHAT_MEMBERS = 1000 # не хендлим чаты больше этого числа3940# Семафор — только один запрос к openclaw одновременно41_openclaw_lock = asyncio.Semaphore(1)4243# Кеш размеров чатов {chat_id: count}44_chat_size_cache: dict = {}4546# Хранилище контекста47CONTEXT_DIR = Path(__file__).parent / "context"48CONTEXT_DIR.mkdir(exist_ok=True)49MESSAGES_FILE = CONTEXT_DIR / "messages.jsonl"5051logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")52log = logging.getLogger("watcher")5354if SESSION_STRING:55client = TelegramClient(StringSession(SESSION_STRING), API_ID, API_HASH)56else:57client = TelegramClient(SESSION_NAME, API_ID, API_HASH)585960# ═══ Утилиты ═══6162def get_chat_type(entity) -> str:63if isinstance(entity, User):64return "private"65if isinstance(entity, Chat):66return "group"67if isinstance(entity, Channel):68if getattr(entity, "megagroup", False):69return "group"70return "channel"71return "unknown"727374async def get_members_count(entity) -> int:75"""Возвращает количество участников чата или 0 для личек."""76try:77if isinstance(entity, User):78return 2 # личка — всегда пропускаем79full = await client.get_entity(entity)80return getattr(full, "participants_count", 0) or 081except Exception:82return 0838485def save_message(record: dict):86"""Сохраняет запись в JSONL файл."""87with open(MESSAGES_FILE, "a", encoding="utf-8") as f:88f.write(json.dumps(record, ensure_ascii=False) + "\n")899091async def build_record(event, direction: str) -> dict | None:92"""Строит запись для сохранения."""93try:94msg = event.message95chat = await event.get_chat()96chat_type = get_chat_type(chat)9798# Пропускаем каналы99if chat_type == "channel":100return None101102# Проверяем размер чата (с кешем)103if chat_type == "group":104cid = getattr(chat, "id", None)105if cid not in _chat_size_cache:106count = getattr(chat, "participants_count", None)107if not count:108try:109from telethon.tl.functions.channels import GetFullChannelRequest110from telethon.tl.functions.messages import GetFullChatRequest111if isinstance(chat, Channel):112full = await client(GetFullChannelRequest(chat))113count = full.full_chat.participants_count114elif isinstance(chat, Chat):115full = await client(GetFullChatRequest(chat.id))116count = getattr(full.full_chat, "participants_count", 0) or 0117except Exception as e:118log.warning(f"Не удалось получить размер чата {cid}: {e}")119count = 9999 # если ошибка — считаем большим, не пишем120_chat_size_cache[cid] = count or 0121if _chat_size_cache.get(cid, 0) > MAX_CHAT_MEMBERS:122return None123124# Данные отправителя125sender_id = None126sender_name = ""127sender_username = ""128try:129sender = await msg.get_sender()130# Для исходящих sender может быть None — берём кешированные данные131if sender is None and msg.out:132sender = _me133if sender:134sender_id = sender.id135sender_name = (getattr(sender, "first_name", "") or "") + \136(" " + getattr(sender, "last_name", "") if getattr(sender, "last_name", "") else "")137sender_name = sender_name.strip()138# username может быть строкой или None139uname = getattr(sender, "username", None)140usernames = getattr(sender, "usernames", None) # Telegram Premium multiple usernames141if uname:142sender_username = uname143elif usernames:144sender_username = ",".join(u.username for u in usernames if u.username)145else:146sender_username = ""147except Exception:148pass149150# Данные чата151chat_id = msg.chat_id or msg.peer_id152chat_title = getattr(chat, "title", None) or \153getattr(chat, "first_name", None) or str(chat_id)154155# Реплай156reply_to_msg_id = None157reply_to_text = ""158reply_to_user = ""159if msg.reply_to and getattr(msg.reply_to, "reply_to_msg_id", None):160reply_to_msg_id = msg.reply_to.reply_to_msg_id161try:162replied = await msg.get_reply_message()163if replied:164reply_to_text = (replied.message or "")[:300]165rs = await replied.get_sender()166if rs:167reply_to_user = (getattr(rs, "first_name", "") or "") + \168(" " + getattr(rs, "last_name", "") if getattr(rs, "last_name", "") else "")169reply_to_user = reply_to_user.strip()170except Exception:171pass172173record = {174"ts": datetime.now(timezone.utc).isoformat(),175"direction": direction, # "in" | "out"176"chat_type": chat_type, # "private" | "group"177"chat_id": chat_id,178"chat_title": chat_title,179"msg_id": msg.id,180"sender_id": sender_id,181"sender_name": sender_name,182"sender_username": sender_username,183"text": msg.message or "",184"has_media": msg.media is not None,185"reply_to_msg_id": reply_to_msg_id,186"reply_to_text": reply_to_text,187"reply_to_user": reply_to_user,188}189return record190except Exception as e:191log.error(f"build_record error: {e}")192return None193194195LIVE_MESSAGES_LIMIT = 30 # последних сообщений из messages.jsonl196197def load_live_messages(chat_id: int, limit: int = LIVE_MESSAGES_LIMIT) -> str:198"""Читает последние N сообщений этого чата прямо из messages.jsonl (live-данные)."""199if not MESSAGES_FILE.exists():200return ""201msgs = []202with open(MESSAGES_FILE, encoding="utf-8") as f:203for line in f:204try:205m = json.loads(line)206if m.get("chat_id") == chat_id and m.get("text","").strip():207msgs.append(m)208except Exception:209pass210# берём последние N211recent = msgs[-limit:]212if not recent:213return ""214lines = []215for m in recent:216direction = "→" if m.get("sender_id") == OWNER_ID else "←"217sender = m.get("sender", "?")218text = m.get("text", "").replace("\n", " ")[:200]219date = m.get("date", "")[:16]220lines.append(f"[{date}] {direction} {sender}: {text}")221return "## Последние сообщения (live)\n" + "\n".join(lines)222223224def load_chat_context(chat_id: int) -> str:225"""Загружает саммари чата, последние live-сообщения и профили участников."""226parts = []227chat_file = CONTEXT_DIR / "chats" / f"{chat_id}.md"228if chat_file.exists():229parts.append(f"[Контекст чата]\n{chat_file.read_text(encoding='utf-8')[:1500]}")230231# Live-сообщения из messages.jsonl (реальное время)232live = load_live_messages(chat_id)233if live:234parts.append(live)235236# Профили людей из этого чата237people_dir = CONTEXT_DIR / "people"238if people_dir.exists():239seen_uids = set()240if MESSAGES_FILE.exists():241with open(MESSAGES_FILE, encoding="utf-8") as f:242for line in f:243try:244m = json.loads(line)245if m.get("chat_id") == chat_id and m.get("sender_id") and m["sender_id"] != OWNER_ID:246seen_uids.add(m["sender_id"])247except Exception:248pass249for uid in seen_uids:250pf = people_dir / f"{uid}.md"251if pf.exists():252parts.append(f"[Участник]\n{pf.read_text(encoding='utf-8')[:500]}")253254return "\n\n".join(parts)255256257# Кеш session_id живой main-сессии258_main_session_id: str | None = None259260def _get_main_session_id() -> str | None:261"""Читает session_id живой main-сессии из sessions.json (без subprocess)."""262global _main_session_id263if _main_session_id:264return _main_session_id265try:266import json as _j267path = os.path.expanduser("~/.openclaw/agents/main/sessions/sessions.json")268if os.path.exists(path):269data = _j.loads(open(path).read())270# Структура: {"agent:main:main": {"sessionId": "...", ...}, ...}271for key, val in data.items():272if key == "agent:main:main" and isinstance(val, dict):273sid = val.get("sessionId", "")274if sid:275_main_session_id = sid276log.info(f"[watcher] Найден session-id: {sid[:8]}...")277return sid278except Exception as e:279log.debug(f"[watcher] _get_main_session_id error: {e}")280return None281282283def ask_openclaw(query: str, chat_id: int) -> str:284"""Отправляет запрос в OpenClaw — переиспользует живую main-сессию (быстро)."""285chat_context = load_chat_context(chat_id)286ctx_prefix = f"\n[Контекст из базы данных]\n{chat_context}\n\n" if chat_context else ""287context = f"{ctx_prefix}[Запрос из Telegram чата {chat_id}, отвечай кратко как в чате, без лишних слов]: {query}"288289# Пробуем переиспользовать живую сессию (без нового процесса)290session_id = _get_main_session_id()291if session_id:292cmd = ["openclaw", "agent", "--session-id", session_id, "--message", context, "--json"]293log.info(f"[watcher] Используем session-id={session_id}")294else:295# Фолбэк: создаём новый процесс (медленнее)296cmd = ["openclaw", "agent", "--agent", "main", "--message", context, "--json"]297log.info("[watcher] session-id не найден, фолбэк на --agent main")298299result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)300if result.returncode != 0:301# Если session-id устарел — сбросить кеш и повторить302global _main_session_id303_main_session_id = None304return f"Ошибка: {result.stderr.strip()}"305306import json as _json307try:308data = _json.loads(result.stdout)309if "result" in data and "payloads" in data["result"]:310return data["result"]["payloads"][0]["text"]311elif "payloads" in data:312return data["payloads"][0]["text"]313elif "text" in data:314return data["text"]315else:316def find_text(obj):317if isinstance(obj, dict):318if "text" in obj and isinstance(obj["text"], str) and len(obj["text"]) > 0:319return obj["text"]320for v in obj.values():321r = find_text(v)322if r:323return r324elif isinstance(obj, list):325for item in obj:326r = find_text(item)327if r:328return r329return None330return find_text(data) or result.stdout.strip()331except Exception:332return result.stdout.strip()333334335# ═══ Хендлеры ═══336337@client.on(events.NewMessage(outgoing=True))338async def handle_outgoing(event):339"""Хендлит исходящие сообщения."""340text = event.raw_text or ""341342# Сохраняем в контекст343record = await build_record(event, "out")344if record:345save_message(record)346347# Триггер "клав" — только от создателя348if not re.match(rf"^{TRIGGER}\b", text, re.IGNORECASE):349return350351# Получаем chat_id352chat_id = event.chat_id353354# Извлекаем запрос355query = re.sub(rf"^{TRIGGER}[,\s]*", "", text, flags=re.IGNORECASE).strip()356if not query:357return358359log.info(f"[trigger] чат {chat_id}: {query}")360361# Добавляем контекст реплая362reply_context = ""363if event.reply_to and getattr(event.reply_to, "reply_to_msg_id", None):364try:365replied_msg = await event.get_reply_message()366if replied_msg:367sender_id = replied_msg.sender_id368sender_name = ""369try:370sender = await replied_msg.get_sender()371if sender:372sender_name = (getattr(sender, "first_name", "") or "").strip()373except Exception:374sender_name = str(sender_id)375msg_text = replied_msg.message or "[медиа]"376reply_context = f"\n[Реплай на сообщение от {sender_name} (user_id: {sender_id}, msg_id: {replied_msg.id}): \"{msg_text[:300]}\"]"377except Exception:378pass379380full_query = query + reply_context381382# Отправляем ⏳ как реплай383thinking_msg = await event.reply("⏳")384385# Запрос к OpenClaw — сериализуем через семафор386async with _openclaw_lock:387loop = asyncio.get_event_loop()388response = await loop.run_in_executor(None, ask_openclaw, full_query, chat_id)389390# Отправляем ответ391if response:392MAX_LEN = 4096393if len(response) <= MAX_LEN:394await thinking_msg.edit(response)395else:396await thinking_msg.delete()397for i in range(0, len(response), MAX_LEN):398await client.send_message(chat_id, response[i:i + MAX_LEN])399else:400await thinking_msg.delete()401402403@client.on(events.NewMessage(incoming=True))404async def handle_incoming(event):405"""Хендлит входящие сообщения — только записывает контекст."""406record = await build_record(event, "in")407if record:408save_message(record)409410411# Кеш данных о себе — загружается один раз при старте412_me = None413414async def refresh_me():415"""Обновляет данные о себе через API. Вызывать по запросу."""416global _me417_me = await client.get_me()418uname = getattr(_me, "username", None)419usernames = getattr(_me, "usernames", None)420if usernames:421uname = ",".join(u.username for u in usernames if u.username)422log.info(f"[me] Обновлено: {_me.first_name} id={_me.id} username={uname}")423return _me424425async def main():426log.info(f"[watcher] Запуск... триггер: '{TRIGGER}', только для owner: {OWNER_ID}")427log.info(f"[watcher] Контекст сохраняется в: {MESSAGES_FILE}")428await client.start()429me = await client.get_me()430await refresh_me() # загружаем данные о себе один раз при старте431log.info(f"[watcher] Авторизован как {_me.first_name} (id: {_me.id}). Слушаю все сообщения...")432await client.run_until_disconnected()433434435if __name__ == "__main__":436asyncio.run(main())437