Merge branch 'main' of https://github.com/chigwell/telegram-mcp into feature/id-validation-decorator

Resolved conflict in log_and_format_error function by:
- Combining ErrorCategory enum (from main) with user_message parameter (from feature branch)
- Supporting both ErrorCategory and string prefixes (for validation errors)
- Keeping validation decorator functionality intact

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Korzhavin Ivan 2025-10-19 15:23:15 +02:00
commit cbe429b474
2 changed files with 241 additions and 44 deletions

View file

@ -60,6 +60,7 @@ This MCP server exposes a huge suite of Telegram tools. **Every major Telegram/T
### Messaging
- **get_messages(chat_id, page, page_size)**: Paginated messages
- **list_messages(chat_id, limit, search_query, from_date, to_date)**: Filtered messages
- **list_topics(chat_id, limit, offset_topic, search_query)**: List forum topics in supergroups
- **send_message(chat_id, message)**: Send a message
- **reply_to_message(chat_id, message_id, text)**: Reply to a message
- **edit_message(chat_id, message_id, new_text)**: Edit your message
@ -72,6 +73,7 @@ This MCP server exposes a huge suite of Telegram tools. **Every major Telegram/T
- **get_history(chat_id, limit)**: Full chat history
- **get_pinned_messages(chat_id)**: List pinned messages
- **get_last_interaction(contact_id)**: Most recent message with a contact
- **create_poll(chat_id, question, options, multiple_choice, quiz_mode, public_votes, close_date)**: Create a poll
### Contact Management
- **list_contacts()**: List all contacts
@ -151,18 +153,16 @@ git clone https://github.com/chigwell/telegram-mcp.git
cd telegram-mcp
```
### 2. Create a Virtual Environment
### 2. Install Dependencies with uv
```bash
python3 -m venv .venv
source .venv/bin/activate # On Windows: .venv\Scripts\activate
pip install -r requirements.txt
uv sync
```
### 3. Generate a Session String
```bash
python3 session_string_generator.py
uv run session_string_generator.py
```
Follow the prompts to authenticate and update your `.env` file.

275
main.py
View file

@ -7,6 +7,7 @@ import sqlite3
import logging
import mimetypes
from datetime import datetime, timedelta
from enum import Enum
from typing import List, Dict, Optional, Union, Any
# Third-party libraries
@ -104,48 +105,56 @@ except Exception as log_error:
logger.error(f"Failed to set up log file handler: {log_error}")
# Error code prefix mapping for better error tracing
ERROR_PREFIXES = {
"chat": "CHAT",
"msg": "MSG",
"contact": "CONTACT",
"group": "GROUP",
"media": "MEDIA",
"profile": "PROFILE",
"auth": "AUTH",
"admin": "ADMIN",
}
class ErrorCategory(str, Enum):
CHAT = "CHAT"
MSG = "MSG"
CONTACT = "CONTACT"
GROUP = "GROUP"
MEDIA = "MEDIA"
PROFILE = "PROFILE"
AUTH = "AUTH"
ADMIN = "ADMIN"
def log_and_format_error(
function_name: str, error: Exception, prefix: str = None, user_message: str = None, **kwargs
function_name: str,
error: Exception,
prefix: Optional[Union[ErrorCategory, str]] = None,
user_message: str = None,
**kwargs,
) -> str:
"""
Centralized error handling function that logs the error and returns a formatted user-friendly message.
Centralized error handling function.
Logs an error and returns a formatted, user-friendly message.
Args:
function_name: Name of the function where error occurred
error: The exception that was raised
prefix: Error code prefix (e.g., "CHAT", "MSG") - if None, will be derived from function_name
function_name: Name of the function where the error occurred.
error: The exception that was raised.
prefix: Error code prefix (e.g., ErrorCategory.CHAT, "VALIDATION-001").
If None, it will be derived from the function_name.
user_message: A custom user-facing message to return. If None, a generic one is created.
**kwargs: Additional context parameters to include in log
**kwargs: Additional context parameters to include in the log.
Returns:
A user-friendly error message with error code
A user-friendly error message with an error code.
"""
# Generate a consistent error code
if prefix == "VALIDATION-001":
if isinstance(prefix, str) and prefix == "VALIDATION-001":
# Special case for validation errors
error_code = prefix
elif prefix is None:
# Try to derive prefix from function name
for key, value in ERROR_PREFIXES.items():
if key in function_name.lower():
prefix = value
break
if prefix is None:
prefix = "GEN" # Generic prefix if none matches
error_code = f"{prefix}-ERR-{abs(hash(function_name)) % 1000:03d}"
else:
error_code = f"{prefix}-ERR-{abs(hash(function_name)) % 1000:03d}"
if prefix is None:
# Try to derive prefix from function name
for category in ErrorCategory:
if category.name.lower() in function_name.lower():
prefix = category
break
prefix_str = prefix.value if isinstance(prefix, ErrorCategory) else (prefix or "GEN")
error_code = f"{prefix_str}-ERR-{abs(hash(function_name)) % 1000:03d}"
# Format the additional context parameters
context = ", ".join(f"{k}={v}" for k, v in kwargs.items())
@ -503,20 +512,48 @@ async def list_messages(
# Prepare filter parameters
params = {}
if search_query:
# IMPORTANT: Do not combine offset_date with search.
# Use server-side search alone, then enforce date bounds client-side.
params["search"] = search_query
messages = await client.get_messages(entity, limit=limit, **params)
# Apply date filters (Telethon doesn't support date filtering in get_messages directly)
if from_date_obj or to_date_obj:
filtered_messages = []
for msg in messages:
if from_date_obj and msg.date < from_date_obj:
continue
messages = []
async for msg in client.iter_messages(entity, **params): # newest -> oldest
if to_date_obj and msg.date > to_date_obj:
continue
filtered_messages.append(msg)
messages = filtered_messages
if from_date_obj and msg.date < from_date_obj:
break
messages.append(msg)
if len(messages) >= limit:
break
else:
# Use server-side iteration when only date bounds are present
# (no search) to avoid over-fetching.
if from_date_obj or to_date_obj:
messages = []
if from_date_obj:
# Walk forward from start date (oldest -> newest)
async for msg in client.iter_messages(
entity, offset_date=from_date_obj, reverse=True
):
if to_date_obj and msg.date > to_date_obj:
break
if msg.date < from_date_obj:
continue
messages.append(msg)
if len(messages) >= limit:
break
else:
# Only upper bound: walk backward from end bound
async for msg in client.iter_messages(
# offset_date is exclusive; +1µs makes to_date inclusive
entity,
offset_date=to_date_obj + timedelta(microseconds=1),
):
messages.append(msg)
if len(messages) >= limit:
break
else:
messages = await client.get_messages(entity, limit=limit, **params)
if not messages:
return "No messages found matching the criteria."
@ -538,6 +575,93 @@ async def list_messages(
return log_and_format_error("list_messages", e, chat_id=chat_id)
@mcp.tool()
async def list_topics(
chat_id: int,
limit: int = 200,
offset_topic: int = 0,
search_query: str = None,
) -> str:
"""
Retrieve forum topics from a supergroup with the forum feature enabled.
Note for LLM: You can send a message to a selected topic via reply_to_message tool
by using Topic ID as the message_id parameter.
Args:
chat_id: The ID of the forum-enabled chat (supergroup).
limit: Maximum number of topics to retrieve.
offset_topic: Topic ID offset for pagination.
search_query: Optional query to filter topics by title.
"""
try:
entity = await client.get_entity(chat_id)
if not isinstance(entity, Channel) or not getattr(entity, "megagroup", False):
return "The specified chat is not a supergroup."
if not getattr(entity, "forum", False):
return "The specified supergroup does not have forum topics enabled."
result = await client(
functions.channels.GetForumTopicsRequest(
channel=entity,
offset_date=0,
offset_id=0,
offset_topic=offset_topic,
limit=limit,
q=search_query or None,
)
)
topics = getattr(result, "topics", None) or []
if not topics:
return "No topics found for this chat."
messages_map = {}
if getattr(result, "messages", None):
messages_map = {message.id: message for message in result.messages}
lines = []
for topic in topics:
line_parts = [f"Topic ID: {topic.id}"]
title = getattr(topic, "title", None) or "(no title)"
line_parts.append(f"Title: {title}")
total_messages = getattr(topic, "total_messages", None)
if total_messages is not None:
line_parts.append(f"Messages: {total_messages}")
unread_count = getattr(topic, "unread_count", None)
if unread_count:
line_parts.append(f"Unread: {unread_count}")
if getattr(topic, "closed", False):
line_parts.append("Closed: Yes")
if getattr(topic, "hidden", False):
line_parts.append("Hidden: Yes")
top_message_id = getattr(topic, "top_message", None)
top_message = messages_map.get(top_message_id)
if top_message and getattr(top_message, "date", None):
line_parts.append(f"Last Activity: {top_message.date.isoformat()}")
lines.append(" | ".join(line_parts))
return "\n".join(lines)
except Exception as e:
return log_and_format_error(
"list_topics",
e,
chat_id=chat_id,
limit=limit,
offset_topic=offset_topic,
search_query=search_query,
)
@mcp.tool()
async def list_chats(chat_type: str = None, limit: int = 20) -> str:
"""
@ -2653,6 +2777,79 @@ async def get_pinned_messages(chat_id: Union[int, str]) -> str:
return log_and_format_error("get_pinned_messages", e, chat_id=chat_id)
@mcp.tool()
async def create_poll(
chat_id: int,
question: str,
options: list,
multiple_choice: bool = False,
quiz_mode: bool = False,
public_votes: bool = True,
close_date: str = None,
) -> str:
"""
Create a poll in a chat using Telegram's native poll feature.
Args:
chat_id: The ID of the chat to send the poll to
question: The poll question
options: List of answer options (2-10 options)
multiple_choice: Whether users can select multiple answers
quiz_mode: Whether this is a quiz (has correct answer)
public_votes: Whether votes are public
close_date: Optional close date in ISO format (YYYY-MM-DD HH:MM:SS)
"""
try:
entity = await client.get_entity(chat_id)
# Validate options
if len(options) < 2:
return "Error: Poll must have at least 2 options."
if len(options) > 10:
return "Error: Poll can have at most 10 options."
# Parse close date if provided
close_date_obj = None
if close_date:
try:
close_date_obj = datetime.fromisoformat(close_date.replace("Z", "+00:00"))
except ValueError:
return f"Invalid close_date format. Use YYYY-MM-DD HH:MM:SS format."
# Create the poll using InputMediaPoll with SendMediaRequest
from telethon.tl.types import InputMediaPoll, Poll, PollAnswer, TextWithEntities
import random
poll = Poll(
id=random.randint(0, 2**63 - 1),
question=TextWithEntities(text=question, entities=[]),
answers=[
PollAnswer(text=TextWithEntities(text=option, entities=[]), option=bytes([i]))
for i, option in enumerate(options)
],
multiple_choice=multiple_choice,
quiz=quiz_mode,
public_voters=public_votes,
close_date=close_date_obj,
)
result = await client(
functions.messages.SendMediaRequest(
peer=entity,
media=InputMediaPoll(poll=poll),
message="",
random_id=random.randint(0, 2**63 - 1),
)
)
return f"Poll created successfully in chat {chat_id}."
except Exception as e:
logger.exception(f"create_poll failed (chat_id={chat_id}, question='{question}')")
return log_and_format_error(
"create_poll", e, chat_id=chat_id, question=question, options=options
)
if __name__ == "__main__":
nest_asyncio.apply()