diff --git a/README.md b/README.md index efca7da..3eaf46a 100644 --- a/README.md +++ b/README.md @@ -60,6 +60,7 @@ This MCP server exposes a huge suite of Telegram tools. **Every major Telegram/T ### Messaging - **get_messages(chat_id, page, page_size)**: Paginated messages - **list_messages(chat_id, limit, search_query, from_date, to_date)**: Filtered messages +- **list_topics(chat_id, limit, offset_topic, search_query)**: List forum topics in supergroups - **send_message(chat_id, message)**: Send a message - **reply_to_message(chat_id, message_id, text)**: Reply to a message - **edit_message(chat_id, message_id, new_text)**: Edit your message @@ -72,6 +73,7 @@ This MCP server exposes a huge suite of Telegram tools. **Every major Telegram/T - **get_history(chat_id, limit)**: Full chat history - **get_pinned_messages(chat_id)**: List pinned messages - **get_last_interaction(contact_id)**: Most recent message with a contact +- **create_poll(chat_id, question, options, multiple_choice, quiz_mode, public_votes, close_date)**: Create a poll ### Contact Management - **list_contacts()**: List all contacts @@ -151,18 +153,16 @@ git clone https://github.com/chigwell/telegram-mcp.git cd telegram-mcp ``` -### 2. Create a Virtual Environment +### 2. Install Dependencies with uv ```bash -python3 -m venv .venv -source .venv/bin/activate # On Windows: .venv\Scripts\activate -pip install -r requirements.txt +uv sync ``` ### 3. Generate a Session String ```bash -python3 session_string_generator.py +uv run session_string_generator.py ``` Follow the prompts to authenticate and update your `.env` file. diff --git a/main.py b/main.py index cd7a094..f63dd68 100644 --- a/main.py +++ b/main.py @@ -7,6 +7,7 @@ import sqlite3 import logging import mimetypes from datetime import datetime, timedelta +from enum import Enum from typing import List, Dict, Optional, Union, Any # Third-party libraries @@ -104,48 +105,56 @@ except Exception as log_error: logger.error(f"Failed to set up log file handler: {log_error}") # Error code prefix mapping for better error tracing -ERROR_PREFIXES = { - "chat": "CHAT", - "msg": "MSG", - "contact": "CONTACT", - "group": "GROUP", - "media": "MEDIA", - "profile": "PROFILE", - "auth": "AUTH", - "admin": "ADMIN", -} + + +class ErrorCategory(str, Enum): + CHAT = "CHAT" + MSG = "MSG" + CONTACT = "CONTACT" + GROUP = "GROUP" + MEDIA = "MEDIA" + PROFILE = "PROFILE" + AUTH = "AUTH" + ADMIN = "ADMIN" def log_and_format_error( - function_name: str, error: Exception, prefix: str = None, user_message: str = None, **kwargs + function_name: str, + error: Exception, + prefix: Optional[Union[ErrorCategory, str]] = None, + user_message: str = None, + **kwargs, ) -> str: """ - Centralized error handling function that logs the error and returns a formatted user-friendly message. + Centralized error handling function. + + Logs an error and returns a formatted, user-friendly message. Args: - function_name: Name of the function where error occurred - error: The exception that was raised - prefix: Error code prefix (e.g., "CHAT", "MSG") - if None, will be derived from function_name + function_name: Name of the function where the error occurred. + error: The exception that was raised. + prefix: Error code prefix (e.g., ErrorCategory.CHAT, "VALIDATION-001"). + If None, it will be derived from the function_name. user_message: A custom user-facing message to return. If None, a generic one is created. - **kwargs: Additional context parameters to include in log + **kwargs: Additional context parameters to include in the log. Returns: - A user-friendly error message with error code + A user-friendly error message with an error code. """ # Generate a consistent error code - if prefix == "VALIDATION-001": + if isinstance(prefix, str) and prefix == "VALIDATION-001": + # Special case for validation errors error_code = prefix - elif prefix is None: - # Try to derive prefix from function name - for key, value in ERROR_PREFIXES.items(): - if key in function_name.lower(): - prefix = value - break - if prefix is None: - prefix = "GEN" # Generic prefix if none matches - error_code = f"{prefix}-ERR-{abs(hash(function_name)) % 1000:03d}" else: - error_code = f"{prefix}-ERR-{abs(hash(function_name)) % 1000:03d}" + if prefix is None: + # Try to derive prefix from function name + for category in ErrorCategory: + if category.name.lower() in function_name.lower(): + prefix = category + break + + prefix_str = prefix.value if isinstance(prefix, ErrorCategory) else (prefix or "GEN") + error_code = f"{prefix_str}-ERR-{abs(hash(function_name)) % 1000:03d}" # Format the additional context parameters context = ", ".join(f"{k}={v}" for k, v in kwargs.items()) @@ -503,20 +512,48 @@ async def list_messages( # Prepare filter parameters params = {} if search_query: + # IMPORTANT: Do not combine offset_date with search. + # Use server-side search alone, then enforce date bounds client-side. params["search"] = search_query - - messages = await client.get_messages(entity, limit=limit, **params) - - # Apply date filters (Telethon doesn't support date filtering in get_messages directly) - if from_date_obj or to_date_obj: - filtered_messages = [] - for msg in messages: - if from_date_obj and msg.date < from_date_obj: - continue + messages = [] + async for msg in client.iter_messages(entity, **params): # newest -> oldest if to_date_obj and msg.date > to_date_obj: continue - filtered_messages.append(msg) - messages = filtered_messages + if from_date_obj and msg.date < from_date_obj: + break + messages.append(msg) + if len(messages) >= limit: + break + + else: + # Use server-side iteration when only date bounds are present + # (no search) to avoid over-fetching. + if from_date_obj or to_date_obj: + messages = [] + if from_date_obj: + # Walk forward from start date (oldest -> newest) + async for msg in client.iter_messages( + entity, offset_date=from_date_obj, reverse=True + ): + if to_date_obj and msg.date > to_date_obj: + break + if msg.date < from_date_obj: + continue + messages.append(msg) + if len(messages) >= limit: + break + else: + # Only upper bound: walk backward from end bound + async for msg in client.iter_messages( + # offset_date is exclusive; +1µs makes to_date inclusive + entity, + offset_date=to_date_obj + timedelta(microseconds=1), + ): + messages.append(msg) + if len(messages) >= limit: + break + else: + messages = await client.get_messages(entity, limit=limit, **params) if not messages: return "No messages found matching the criteria." @@ -538,6 +575,93 @@ async def list_messages( return log_and_format_error("list_messages", e, chat_id=chat_id) +@mcp.tool() +async def list_topics( + chat_id: int, + limit: int = 200, + offset_topic: int = 0, + search_query: str = None, +) -> str: + """ + Retrieve forum topics from a supergroup with the forum feature enabled. + + Note for LLM: You can send a message to a selected topic via reply_to_message tool + by using Topic ID as the message_id parameter. + + Args: + chat_id: The ID of the forum-enabled chat (supergroup). + limit: Maximum number of topics to retrieve. + offset_topic: Topic ID offset for pagination. + search_query: Optional query to filter topics by title. + """ + try: + entity = await client.get_entity(chat_id) + + if not isinstance(entity, Channel) or not getattr(entity, "megagroup", False): + return "The specified chat is not a supergroup." + + if not getattr(entity, "forum", False): + return "The specified supergroup does not have forum topics enabled." + + result = await client( + functions.channels.GetForumTopicsRequest( + channel=entity, + offset_date=0, + offset_id=0, + offset_topic=offset_topic, + limit=limit, + q=search_query or None, + ) + ) + + topics = getattr(result, "topics", None) or [] + if not topics: + return "No topics found for this chat." + + messages_map = {} + if getattr(result, "messages", None): + messages_map = {message.id: message for message in result.messages} + + lines = [] + for topic in topics: + line_parts = [f"Topic ID: {topic.id}"] + + title = getattr(topic, "title", None) or "(no title)" + line_parts.append(f"Title: {title}") + + total_messages = getattr(topic, "total_messages", None) + if total_messages is not None: + line_parts.append(f"Messages: {total_messages}") + + unread_count = getattr(topic, "unread_count", None) + if unread_count: + line_parts.append(f"Unread: {unread_count}") + + if getattr(topic, "closed", False): + line_parts.append("Closed: Yes") + + if getattr(topic, "hidden", False): + line_parts.append("Hidden: Yes") + + top_message_id = getattr(topic, "top_message", None) + top_message = messages_map.get(top_message_id) + if top_message and getattr(top_message, "date", None): + line_parts.append(f"Last Activity: {top_message.date.isoformat()}") + + lines.append(" | ".join(line_parts)) + + return "\n".join(lines) + except Exception as e: + return log_and_format_error( + "list_topics", + e, + chat_id=chat_id, + limit=limit, + offset_topic=offset_topic, + search_query=search_query, + ) + + @mcp.tool() async def list_chats(chat_type: str = None, limit: int = 20) -> str: """ @@ -2653,6 +2777,79 @@ async def get_pinned_messages(chat_id: Union[int, str]) -> str: return log_and_format_error("get_pinned_messages", e, chat_id=chat_id) +@mcp.tool() +async def create_poll( + chat_id: int, + question: str, + options: list, + multiple_choice: bool = False, + quiz_mode: bool = False, + public_votes: bool = True, + close_date: str = None, +) -> str: + """ + Create a poll in a chat using Telegram's native poll feature. + + Args: + chat_id: The ID of the chat to send the poll to + question: The poll question + options: List of answer options (2-10 options) + multiple_choice: Whether users can select multiple answers + quiz_mode: Whether this is a quiz (has correct answer) + public_votes: Whether votes are public + close_date: Optional close date in ISO format (YYYY-MM-DD HH:MM:SS) + """ + try: + entity = await client.get_entity(chat_id) + + # Validate options + if len(options) < 2: + return "Error: Poll must have at least 2 options." + if len(options) > 10: + return "Error: Poll can have at most 10 options." + + # Parse close date if provided + close_date_obj = None + if close_date: + try: + close_date_obj = datetime.fromisoformat(close_date.replace("Z", "+00:00")) + except ValueError: + return f"Invalid close_date format. Use YYYY-MM-DD HH:MM:SS format." + + # Create the poll using InputMediaPoll with SendMediaRequest + from telethon.tl.types import InputMediaPoll, Poll, PollAnswer, TextWithEntities + import random + + poll = Poll( + id=random.randint(0, 2**63 - 1), + question=TextWithEntities(text=question, entities=[]), + answers=[ + PollAnswer(text=TextWithEntities(text=option, entities=[]), option=bytes([i])) + for i, option in enumerate(options) + ], + multiple_choice=multiple_choice, + quiz=quiz_mode, + public_voters=public_votes, + close_date=close_date_obj, + ) + + result = await client( + functions.messages.SendMediaRequest( + peer=entity, + media=InputMediaPoll(poll=poll), + message="", + random_id=random.randint(0, 2**63 - 1), + ) + ) + + return f"Poll created successfully in chat {chat_id}." + except Exception as e: + logger.exception(f"create_poll failed (chat_id={chat_id}, question='{question}')") + return log_and_format_error( + "create_poll", e, chat_id=chat_id, question=question, options=options + ) + + if __name__ == "__main__": nest_asyncio.apply()