Refactored several sections of main.py to enhance readability by adjusting line breaks and indentation. This includes formatting changes in error handling and function calls across various asynchronous functions.
2461 lines
89 KiB
Python
2461 lines
89 KiB
Python
import os
|
|
import sys
|
|
import json
|
|
import time
|
|
import asyncio
|
|
import sqlite3
|
|
import logging
|
|
import mimetypes
|
|
from datetime import datetime, timedelta
|
|
from typing import List, Dict, Optional, Union, Any
|
|
|
|
# Third-party libraries
|
|
import nest_asyncio
|
|
from dotenv import load_dotenv
|
|
from mcp.server.fastmcp import FastMCP
|
|
from telethon import TelegramClient, functions, utils
|
|
from telethon.sessions import StringSession
|
|
from telethon.tl.types import (
|
|
User,
|
|
Chat,
|
|
Channel,
|
|
ChatAdminRights,
|
|
ChatBannedRights,
|
|
ChannelParticipantsKicked,
|
|
ChannelParticipantsAdmins,
|
|
InputChatPhoto,
|
|
InputChatUploadedPhoto,
|
|
InputChatPhotoEmpty,
|
|
InputPeerUser,
|
|
InputPeerChat,
|
|
InputPeerChannel,
|
|
)
|
|
import telethon.errors.rpcerrorlist
|
|
|
|
|
|
def json_serializer(obj):
|
|
"""Helper function to convert non-serializable objects for JSON serialization."""
|
|
if isinstance(obj, datetime):
|
|
return obj.isoformat()
|
|
if isinstance(obj, bytes):
|
|
return obj.decode("utf-8", errors="replace")
|
|
# Add other non-serializable types as needed
|
|
raise TypeError(f"Object of type {type(obj)} is not JSON serializable")
|
|
|
|
|
|
load_dotenv()
|
|
|
|
TELEGRAM_API_ID = int(os.getenv("TELEGRAM_API_ID"))
|
|
TELEGRAM_API_HASH = os.getenv("TELEGRAM_API_HASH")
|
|
TELEGRAM_SESSION_NAME = os.getenv("TELEGRAM_SESSION_NAME")
|
|
|
|
# Check if a string session exists in environment, otherwise use file-based session
|
|
SESSION_STRING = os.getenv("TELEGRAM_SESSION_STRING")
|
|
|
|
mcp = FastMCP("telegram")
|
|
|
|
if SESSION_STRING:
|
|
# Use the string session if available
|
|
client = TelegramClient(StringSession(SESSION_STRING), TELEGRAM_API_ID, TELEGRAM_API_HASH)
|
|
else:
|
|
# Use file-based session
|
|
client = TelegramClient(TELEGRAM_SESSION_NAME, TELEGRAM_API_ID, TELEGRAM_API_HASH)
|
|
|
|
# Setup robust logging with both file and console output
|
|
logger = logging.getLogger("telegram_mcp")
|
|
logger.setLevel(logging.ERROR) # Set to ERROR for production, INFO for debugging
|
|
|
|
# Create console handler
|
|
console_handler = logging.StreamHandler()
|
|
console_handler.setLevel(logging.ERROR) # Set to ERROR for production, INFO for debugging
|
|
|
|
# Create file handler with absolute path
|
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
|
log_file_path = os.path.join(script_dir, "mcp_errors.log")
|
|
|
|
try:
|
|
file_handler = logging.FileHandler(log_file_path, mode="a") # Append mode
|
|
file_handler.setLevel(logging.ERROR)
|
|
|
|
# Create formatter and add to handlers
|
|
formatter = logging.Formatter(
|
|
"%(asctime)s [%(levelname)s] %(name)s - %(message)s - %(filename)s:%(lineno)d"
|
|
)
|
|
console_handler.setFormatter(formatter)
|
|
file_handler.setFormatter(formatter)
|
|
|
|
# Add handlers to logger
|
|
logger.addHandler(console_handler)
|
|
logger.addHandler(file_handler)
|
|
|
|
logger.info(f"Logging initialized to {log_file_path}")
|
|
except Exception as log_error:
|
|
print(f"WARNING: Error setting up log file: {log_error}")
|
|
# Fallback to console-only logging
|
|
logger.addHandler(console_handler)
|
|
logger.error(f"Failed to set up log file handler: {log_error}")
|
|
|
|
# Error code prefix mapping for better error tracing
|
|
ERROR_PREFIXES = {
|
|
"chat": "CHAT",
|
|
"msg": "MSG",
|
|
"contact": "CONTACT",
|
|
"group": "GROUP",
|
|
"media": "MEDIA",
|
|
"profile": "PROFILE",
|
|
"auth": "AUTH",
|
|
"admin": "ADMIN",
|
|
}
|
|
|
|
|
|
def log_and_format_error(
|
|
function_name: str, error: Exception, prefix: str = None, **kwargs
|
|
) -> str:
|
|
"""
|
|
Centralized error handling function that logs the error and returns a formatted user-friendly message.
|
|
|
|
Args:
|
|
function_name: Name of the function where error occurred
|
|
error: The exception that was raised
|
|
prefix: Error code prefix (e.g., "CHAT", "MSG") - if None, will be derived from function_name
|
|
**kwargs: Additional context parameters to include in log
|
|
|
|
Returns:
|
|
A user-friendly error message with error code
|
|
"""
|
|
# Generate a consistent error code
|
|
if prefix is None:
|
|
# Try to derive prefix from function name
|
|
for key, value in ERROR_PREFIXES.items():
|
|
if key in function_name.lower():
|
|
prefix = value
|
|
break
|
|
if prefix is None:
|
|
prefix = "GEN" # Generic prefix if none matches
|
|
|
|
error_code = f"{prefix}-ERR-{abs(hash(function_name)) % 1000:03d}"
|
|
|
|
# Format the additional context parameters
|
|
context = ", ".join(f"{k}={v}" for k, v in kwargs.items())
|
|
|
|
# Log the full technical error
|
|
logger.exception(f"{function_name} failed ({context}): {error}")
|
|
|
|
# Return a user-friendly message
|
|
return f"An error occurred (code: {error_code}). Check mcp_errors.log for details."
|
|
|
|
|
|
def format_entity(entity) -> Dict[str, Any]:
|
|
"""Helper function to format entity information consistently."""
|
|
result = {"id": entity.id}
|
|
|
|
if hasattr(entity, "title"):
|
|
result["name"] = entity.title
|
|
result["type"] = "group" if isinstance(entity, Chat) else "channel"
|
|
elif hasattr(entity, "first_name"):
|
|
name_parts = []
|
|
if entity.first_name:
|
|
name_parts.append(entity.first_name)
|
|
if hasattr(entity, "last_name") and entity.last_name:
|
|
name_parts.append(entity.last_name)
|
|
result["name"] = " ".join(name_parts)
|
|
result["type"] = "user"
|
|
if hasattr(entity, "username") and entity.username:
|
|
result["username"] = entity.username
|
|
if hasattr(entity, "phone") and entity.phone:
|
|
result["phone"] = entity.phone
|
|
|
|
return result
|
|
|
|
|
|
def format_message(message) -> Dict[str, Any]:
|
|
"""Helper function to format message information consistently."""
|
|
result = {
|
|
"id": message.id,
|
|
"date": message.date.isoformat(),
|
|
"text": message.message or "",
|
|
}
|
|
|
|
if message.from_id:
|
|
result["from_id"] = utils.get_peer_id(message.from_id)
|
|
|
|
if message.media:
|
|
result["has_media"] = True
|
|
result["media_type"] = type(message.media).__name__
|
|
|
|
return result
|
|
|
|
|
|
@mcp.tool()
|
|
async def get_chats(page: int = 1, page_size: int = 20) -> str:
|
|
"""
|
|
Get a paginated list of chats.
|
|
Args:
|
|
page: Page number (1-indexed).
|
|
page_size: Number of chats per page.
|
|
"""
|
|
try:
|
|
dialogs = await client.get_dialogs()
|
|
start = (page - 1) * page_size
|
|
end = start + page_size
|
|
if start >= len(dialogs):
|
|
return "Page out of range."
|
|
chats = dialogs[start:end]
|
|
lines = []
|
|
for dialog in chats:
|
|
entity = dialog.entity
|
|
chat_id = entity.id
|
|
title = getattr(entity, "title", None) or getattr(entity, "first_name", "Unknown")
|
|
lines.append(f"Chat ID: {chat_id}, Title: {title}")
|
|
return "\n".join(lines)
|
|
except Exception as e:
|
|
return log_and_format_error("get_chats", e)
|
|
|
|
|
|
@mcp.tool()
|
|
async def get_messages(chat_id: int, page: int = 1, page_size: int = 20) -> str:
|
|
"""
|
|
Get paginated messages from a specific chat.
|
|
Args:
|
|
chat_id: The ID of the chat.
|
|
page: Page number (1-indexed).
|
|
page_size: Number of messages per page.
|
|
"""
|
|
try:
|
|
entity = await client.get_entity(chat_id)
|
|
offset = (page - 1) * page_size
|
|
messages = await client.get_messages(entity, limit=page_size, add_offset=offset)
|
|
if not messages:
|
|
return "No messages found for this page."
|
|
lines = []
|
|
for msg in messages:
|
|
lines.append(f"ID: {msg.id} | Date: {msg.date} | Message: {msg.message}")
|
|
return "\n".join(lines)
|
|
except Exception as e:
|
|
return log_and_format_error(
|
|
"get_messages", e, chat_id=chat_id, page=page, page_size=page_size
|
|
)
|
|
|
|
|
|
@mcp.tool()
|
|
async def send_message(chat_id: int, message: str) -> str:
|
|
"""
|
|
Send a message to a specific chat.
|
|
Args:
|
|
chat_id: The ID of the chat.
|
|
message: The message content to send.
|
|
"""
|
|
try:
|
|
entity = await client.get_entity(chat_id)
|
|
await client.send_message(entity, message)
|
|
return "Message sent successfully."
|
|
except Exception as e:
|
|
return log_and_format_error("send_message", e, chat_id=chat_id)
|
|
|
|
|
|
@mcp.tool()
|
|
async def list_contacts() -> str:
|
|
"""
|
|
List all contacts in your Telegram account.
|
|
"""
|
|
try:
|
|
result = await client(functions.contacts.GetContactsRequest(hash=0))
|
|
users = result.users
|
|
if not users:
|
|
return "No contacts found."
|
|
lines = []
|
|
for user in users:
|
|
name = f"{getattr(user, 'first_name', '')} {getattr(user, 'last_name', '')}".strip()
|
|
username = getattr(user, "username", "")
|
|
phone = getattr(user, "phone", "")
|
|
contact_info = f"ID: {user.id}, Name: {name}"
|
|
if username:
|
|
contact_info += f", Username: @{username}"
|
|
if phone:
|
|
contact_info += f", Phone: {phone}"
|
|
lines.append(contact_info)
|
|
return "\n".join(lines)
|
|
except Exception as e:
|
|
return log_and_format_error("list_contacts", e)
|
|
|
|
|
|
@mcp.tool()
|
|
async def search_contacts(query: str) -> str:
|
|
"""
|
|
Search for contacts by name, username, or phone number using Telethon's SearchRequest.
|
|
Args:
|
|
query: The search term to look for in contact names, usernames, or phone numbers.
|
|
"""
|
|
try:
|
|
result = await client(functions.contacts.SearchRequest(q=query, limit=50))
|
|
users = result.users
|
|
if not users:
|
|
return f"No contacts found matching '{query}'."
|
|
lines = []
|
|
for user in users:
|
|
name = f"{getattr(user, 'first_name', '')} {getattr(user, 'last_name', '')}".strip()
|
|
username = getattr(user, "username", "")
|
|
phone = getattr(user, "phone", "")
|
|
contact_info = f"ID: {user.id}, Name: {name}"
|
|
if username:
|
|
contact_info += f", Username: @{username}"
|
|
if phone:
|
|
contact_info += f", Phone: {phone}"
|
|
lines.append(contact_info)
|
|
return "\n".join(lines)
|
|
except Exception as e:
|
|
return log_and_format_error("search_contacts", e, query=query)
|
|
|
|
|
|
@mcp.tool()
|
|
async def get_contact_ids() -> str:
|
|
"""
|
|
Get all contact IDs in your Telegram account.
|
|
"""
|
|
try:
|
|
result = await client(functions.contacts.GetContactIDsRequest(hash=0))
|
|
if not result:
|
|
return "No contact IDs found."
|
|
return "Contact IDs: " + ", ".join(str(cid) for cid in result)
|
|
except Exception as e:
|
|
return log_and_format_error("get_contact_ids", e)
|
|
|
|
|
|
@mcp.tool()
|
|
async def list_messages(
|
|
chat_id: int,
|
|
limit: int = 20,
|
|
search_query: str = None,
|
|
from_date: str = None,
|
|
to_date: str = None,
|
|
) -> str:
|
|
"""
|
|
Retrieve messages with optional filters.
|
|
|
|
Args:
|
|
chat_id: The ID of the chat to get messages from.
|
|
limit: Maximum number of messages to retrieve.
|
|
search_query: Filter messages containing this text.
|
|
from_date: Filter messages starting from this date (format: YYYY-MM-DD).
|
|
to_date: Filter messages until this date (format: YYYY-MM-DD).
|
|
"""
|
|
try:
|
|
entity = await client.get_entity(chat_id)
|
|
|
|
# Parse date filters if provided
|
|
from_date_obj = None
|
|
to_date_obj = None
|
|
|
|
if from_date:
|
|
try:
|
|
from_date_obj = datetime.strptime(from_date, "%Y-%m-%d")
|
|
# Make it timezone aware by adding UTC timezone info
|
|
# Use datetime.timezone.utc for Python 3.9+ or import timezone directly for 3.13+
|
|
try:
|
|
# For Python 3.9+
|
|
from_date_obj = from_date_obj.replace(tzinfo=datetime.timezone.utc)
|
|
except AttributeError:
|
|
# For Python 3.13+
|
|
from datetime import timezone
|
|
|
|
from_date_obj = from_date_obj.replace(tzinfo=timezone.utc)
|
|
except ValueError:
|
|
return f"Invalid from_date format. Use YYYY-MM-DD."
|
|
|
|
if to_date:
|
|
try:
|
|
to_date_obj = datetime.strptime(to_date, "%Y-%m-%d")
|
|
# Set to end of day and make timezone aware
|
|
to_date_obj = to_date_obj + timedelta(days=1, microseconds=-1)
|
|
# Add timezone info
|
|
try:
|
|
to_date_obj = to_date_obj.replace(tzinfo=datetime.timezone.utc)
|
|
except AttributeError:
|
|
from datetime import timezone
|
|
|
|
to_date_obj = to_date_obj.replace(tzinfo=timezone.utc)
|
|
except ValueError:
|
|
return f"Invalid to_date format. Use YYYY-MM-DD."
|
|
|
|
# Prepare filter parameters
|
|
params = {}
|
|
if search_query:
|
|
params["search"] = search_query
|
|
|
|
messages = await client.get_messages(entity, limit=limit, **params)
|
|
|
|
# Apply date filters (Telethon doesn't support date filtering in get_messages directly)
|
|
if from_date_obj or to_date_obj:
|
|
filtered_messages = []
|
|
for msg in messages:
|
|
if from_date_obj and msg.date < from_date_obj:
|
|
continue
|
|
if to_date_obj and msg.date > to_date_obj:
|
|
continue
|
|
filtered_messages.append(msg)
|
|
messages = filtered_messages
|
|
|
|
if not messages:
|
|
return "No messages found matching the criteria."
|
|
|
|
lines = []
|
|
for msg in messages:
|
|
sender = ""
|
|
if msg.sender:
|
|
sender_name = getattr(msg.sender, "first_name", "") or getattr(
|
|
msg.sender, "title", "Unknown"
|
|
)
|
|
sender = f"{sender_name} | "
|
|
|
|
lines.append(
|
|
f"ID: {msg.id} | {sender}Date: {msg.date} | Message: {msg.message or '[Media/No text]'}"
|
|
)
|
|
|
|
return "\n".join(lines)
|
|
except Exception as e:
|
|
return log_and_format_error("list_messages", e, chat_id=chat_id)
|
|
|
|
|
|
@mcp.tool()
|
|
async def list_chats(chat_type: str = None, limit: int = 20) -> str:
|
|
"""
|
|
List available chats with metadata.
|
|
|
|
Args:
|
|
chat_type: Filter by chat type ('user', 'group', 'channel', or None for all)
|
|
limit: Maximum number of chats to retrieve.
|
|
"""
|
|
try:
|
|
dialogs = await client.get_dialogs(limit=limit)
|
|
|
|
results = []
|
|
for dialog in dialogs:
|
|
entity = dialog.entity
|
|
|
|
# Filter by type if requested
|
|
current_type = None
|
|
if isinstance(entity, User):
|
|
current_type = "user"
|
|
elif isinstance(entity, Chat):
|
|
current_type = "group"
|
|
elif isinstance(entity, Channel):
|
|
if getattr(entity, "broadcast", False):
|
|
current_type = "channel"
|
|
else:
|
|
current_type = "group" # Supergroup
|
|
|
|
if chat_type and current_type != chat_type.lower():
|
|
continue
|
|
|
|
# Format chat info
|
|
chat_info = f"Chat ID: {entity.id}"
|
|
|
|
if hasattr(entity, "title"):
|
|
chat_info += f", Title: {entity.title}"
|
|
elif hasattr(entity, "first_name"):
|
|
name = f"{entity.first_name}"
|
|
if hasattr(entity, "last_name") and entity.last_name:
|
|
name += f" {entity.last_name}"
|
|
chat_info += f", Name: {name}"
|
|
|
|
chat_info += f", Type: {current_type}"
|
|
|
|
if hasattr(entity, "username") and entity.username:
|
|
chat_info += f", Username: @{entity.username}"
|
|
|
|
# Add unread count if available
|
|
if hasattr(dialog, "unread_count") and dialog.unread_count > 0:
|
|
chat_info += f", Unread: {dialog.unread_count}"
|
|
|
|
results.append(chat_info)
|
|
|
|
if not results:
|
|
return f"No chats found matching the criteria."
|
|
|
|
return "\n".join(results)
|
|
except Exception as e:
|
|
return log_and_format_error("list_chats", e, chat_type=chat_type, limit=limit)
|
|
|
|
|
|
@mcp.tool()
|
|
async def get_chat(chat_id: int) -> str:
|
|
"""
|
|
Get detailed information about a specific chat.
|
|
|
|
Args:
|
|
chat_id: The ID of the chat.
|
|
"""
|
|
try:
|
|
entity = await client.get_entity(chat_id)
|
|
|
|
result = []
|
|
result.append(f"ID: {entity.id}")
|
|
|
|
is_channel = isinstance(entity, Channel)
|
|
is_chat = isinstance(entity, Chat)
|
|
is_user = isinstance(entity, User)
|
|
|
|
if hasattr(entity, "title"):
|
|
result.append(f"Title: {entity.title}")
|
|
chat_type = (
|
|
"Channel" if is_channel and getattr(entity, "broadcast", False) else "Group"
|
|
)
|
|
if is_channel and getattr(entity, "megagroup", False):
|
|
chat_type = "Supergroup"
|
|
elif is_chat:
|
|
chat_type = "Group (Basic)"
|
|
result.append(f"Type: {chat_type}")
|
|
if hasattr(entity, "username") and entity.username:
|
|
result.append(f"Username: @{entity.username}")
|
|
|
|
# Fetch participants count reliably
|
|
try:
|
|
participants_count = (await client.get_participants(entity, limit=0)).total
|
|
result.append(f"Participants: {participants_count}")
|
|
except Exception as pe:
|
|
result.append(f"Participants: Error fetching ({pe})")
|
|
|
|
elif is_user:
|
|
name = f"{entity.first_name}"
|
|
if entity.last_name:
|
|
name += f" {entity.last_name}"
|
|
result.append(f"Name: {name}")
|
|
result.append(f"Type: User")
|
|
if entity.username:
|
|
result.append(f"Username: @{entity.username}")
|
|
if entity.phone:
|
|
result.append(f"Phone: {entity.phone}")
|
|
result.append(f"Bot: {'Yes' if entity.bot else 'No'}")
|
|
result.append(f"Verified: {'Yes' if entity.verified else 'No'}")
|
|
|
|
# Get last activity if it's a dialog
|
|
try:
|
|
# Using get_dialogs might be slow if there are many dialogs
|
|
# Alternative: Get entity again via get_dialogs if needed for unread count
|
|
dialog = await client.get_dialogs(limit=1, offset_id=0, offset_peer=entity)
|
|
if dialog:
|
|
dialog = dialog[0]
|
|
result.append(f"Unread Messages: {dialog.unread_count}")
|
|
if dialog.message:
|
|
last_msg = dialog.message
|
|
sender_name = "Unknown"
|
|
if last_msg.sender:
|
|
sender_name = getattr(last_msg.sender, "first_name", "") or getattr(
|
|
last_msg.sender, "title", "Unknown"
|
|
)
|
|
if hasattr(last_msg.sender, "last_name") and last_msg.sender.last_name:
|
|
sender_name += f" {last_msg.sender.last_name}"
|
|
sender_name = sender_name.strip() or "Unknown"
|
|
result.append(f"Last Message: From {sender_name} at {last_msg.date}")
|
|
result.append(f"Message: {last_msg.message or '[Media/No text]'}")
|
|
except Exception as diag_ex:
|
|
logger.warning(f"Could not get dialog info for {chat_id}: {diag_ex}")
|
|
pass
|
|
|
|
return "\n".join(result)
|
|
except Exception as e:
|
|
return log_and_format_error("get_chat", e, chat_id=chat_id)
|
|
|
|
|
|
@mcp.tool()
|
|
async def get_direct_chat_by_contact(contact_query: str) -> str:
|
|
"""
|
|
Find a direct chat with a specific contact by name, username, or phone.
|
|
|
|
Args:
|
|
contact_query: Name, username, or phone number to search for.
|
|
"""
|
|
try:
|
|
# Fetch all contacts using the correct Telethon method
|
|
result = await client(functions.contacts.GetContactsRequest(hash=0))
|
|
contacts = result.users
|
|
found_contacts = []
|
|
for contact in contacts:
|
|
if not contact:
|
|
continue
|
|
name = (
|
|
f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip()
|
|
)
|
|
username = getattr(contact, "username", "")
|
|
phone = getattr(contact, "phone", "")
|
|
if (
|
|
contact_query.lower() in name.lower()
|
|
or (username and contact_query.lower() in username.lower())
|
|
or (phone and contact_query in phone)
|
|
):
|
|
found_contacts.append(contact)
|
|
if not found_contacts:
|
|
return f"No contacts found matching '{contact_query}'."
|
|
# If we found contacts, look for direct chats with them
|
|
results = []
|
|
dialogs = await client.get_dialogs()
|
|
for contact in found_contacts:
|
|
contact_name = (
|
|
f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip()
|
|
)
|
|
for dialog in dialogs:
|
|
if isinstance(dialog.entity, User) and dialog.entity.id == contact.id:
|
|
chat_info = f"Chat ID: {dialog.entity.id}, Contact: {contact_name}"
|
|
if getattr(contact, "username", ""):
|
|
chat_info += f", Username: @{contact.username}"
|
|
if dialog.unread_count:
|
|
chat_info += f", Unread: {dialog.unread_count}"
|
|
results.append(chat_info)
|
|
break
|
|
if not results:
|
|
found_names = ", ".join(
|
|
[f"{c.first_name} {c.last_name}".strip() for c in found_contacts]
|
|
)
|
|
return f"Found contacts: {found_names}, but no direct chats were found with them."
|
|
return "\n".join(results)
|
|
except Exception as e:
|
|
return log_and_format_error("get_direct_chat_by_contact", e, contact_query=contact_query)
|
|
|
|
|
|
@mcp.tool()
|
|
async def get_contact_chats(contact_id: int) -> str:
|
|
"""
|
|
List all chats involving a specific contact.
|
|
|
|
Args:
|
|
contact_id: The ID of the contact.
|
|
"""
|
|
try:
|
|
# Get contact info
|
|
contact = await client.get_entity(contact_id)
|
|
if not isinstance(contact, User):
|
|
return f"ID {contact_id} is not a user/contact."
|
|
|
|
contact_name = (
|
|
f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip()
|
|
)
|
|
|
|
# Find direct chat
|
|
direct_chat = None
|
|
dialogs = await client.get_dialogs()
|
|
|
|
results = []
|
|
|
|
# Look for direct chat
|
|
for dialog in dialogs:
|
|
if isinstance(dialog.entity, User) and dialog.entity.id == contact_id:
|
|
chat_info = f"Direct Chat ID: {dialog.entity.id}, Type: Private"
|
|
if dialog.unread_count:
|
|
chat_info += f", Unread: {dialog.unread_count}"
|
|
results.append(chat_info)
|
|
break
|
|
|
|
# Look for common groups/channels
|
|
common_chats = []
|
|
try:
|
|
common = await client.get_common_chats(contact)
|
|
for chat in common:
|
|
chat_type = "Channel" if getattr(chat, "broadcast", False) else "Group"
|
|
chat_info = f"Chat ID: {chat.id}, Title: {chat.title}, Type: {chat_type}"
|
|
results.append(chat_info)
|
|
except:
|
|
results.append("Could not retrieve common groups.")
|
|
|
|
if not results:
|
|
return f"No chats found with {contact_name} (ID: {contact_id})."
|
|
|
|
return f"Chats with {contact_name} (ID: {contact_id}):\n" + "\n".join(results)
|
|
except Exception as e:
|
|
return log_and_format_error("get_contact_chats", e, contact_id=contact_id)
|
|
|
|
|
|
@mcp.tool()
|
|
async def get_last_interaction(contact_id: int) -> str:
|
|
"""
|
|
Get the most recent message with a contact.
|
|
|
|
Args:
|
|
contact_id: The ID of the contact.
|
|
"""
|
|
try:
|
|
# Get contact info
|
|
contact = await client.get_entity(contact_id)
|
|
if not isinstance(contact, User):
|
|
return f"ID {contact_id} is not a user/contact."
|
|
|
|
contact_name = (
|
|
f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip()
|
|
)
|
|
|
|
# Get the last few messages
|
|
messages = await client.get_messages(contact, limit=5)
|
|
|
|
if not messages:
|
|
return f"No messages found with {contact_name} (ID: {contact_id})."
|
|
|
|
results = [f"Last interactions with {contact_name} (ID: {contact_id}):"]
|
|
|
|
for msg in messages:
|
|
sender = "You" if msg.out else contact_name
|
|
message_text = msg.message or "[Media/No text]"
|
|
results.append(f"Date: {msg.date}, From: {sender}, Message: {message_text}")
|
|
|
|
return "\n".join(results)
|
|
except Exception as e:
|
|
return log_and_format_error("get_last_interaction", e, contact_id=contact_id)
|
|
|
|
|
|
@mcp.tool()
|
|
async def get_message_context(chat_id: int, message_id: int, context_size: int = 3) -> str:
|
|
"""
|
|
Retrieve context around a specific message.
|
|
|
|
Args:
|
|
chat_id: The ID of the chat.
|
|
message_id: The ID of the central message.
|
|
context_size: Number of messages before and after to include.
|
|
"""
|
|
try:
|
|
chat = await client.get_entity(chat_id)
|
|
# Get messages around the specified message
|
|
messages_before = await client.get_messages(chat, limit=context_size, max_id=message_id)
|
|
central_message = await client.get_messages(chat, ids=message_id)
|
|
# Fix: get_messages(ids=...) returns a single Message, not a list
|
|
if central_message is not None and not isinstance(central_message, list):
|
|
central_message = [central_message]
|
|
elif central_message is None:
|
|
central_message = []
|
|
messages_after = await client.get_messages(
|
|
chat, limit=context_size, min_id=message_id, reverse=True
|
|
)
|
|
if not central_message:
|
|
return f"Message with ID {message_id} not found in chat {chat_id}."
|
|
# Combine messages in chronological order
|
|
all_messages = list(messages_before) + list(central_message) + list(messages_after)
|
|
all_messages.sort(key=lambda m: m.id)
|
|
results = [f"Context for message {message_id} in chat {chat_id}:"]
|
|
for msg in all_messages:
|
|
sender_name = "Unknown"
|
|
if msg.sender:
|
|
sender_name = getattr(msg.sender, "first_name", "") or getattr(
|
|
msg.sender, "title", "Unknown"
|
|
)
|
|
highlight = " [THIS MESSAGE]" if msg.id == message_id else ""
|
|
results.append(
|
|
f"ID: {msg.id} | {sender_name} | {msg.date}{highlight}\n{msg.message or '[Media/No text]'}\n"
|
|
)
|
|
return "\n".join(results)
|
|
except Exception as e:
|
|
return log_and_format_error(
|
|
"get_message_context",
|
|
e,
|
|
chat_id=chat_id,
|
|
message_id=message_id,
|
|
context_size=context_size,
|
|
)
|
|
|
|
|
|
@mcp.tool()
|
|
async def add_contact(phone: str, first_name: str, last_name: str = "") -> str:
|
|
"""
|
|
Add a new contact to your Telegram account.
|
|
Args:
|
|
phone: The phone number of the contact (with country code).
|
|
first_name: The contact's first name.
|
|
last_name: The contact's last name (optional).
|
|
"""
|
|
try:
|
|
# Try to import the required types first
|
|
from telethon.tl.types import InputPhoneContact
|
|
|
|
result = await client(
|
|
functions.contacts.ImportContactsRequest(
|
|
contacts=[
|
|
InputPhoneContact(
|
|
client_id=0, phone=phone, first_name=first_name, last_name=last_name
|
|
)
|
|
]
|
|
)
|
|
)
|
|
if result.imported:
|
|
return f"Contact {first_name} {last_name} added successfully."
|
|
else:
|
|
return f"Contact not added. Response: {str(result)}"
|
|
except (ImportError, AttributeError) as type_err:
|
|
# Try alternative approach using raw API
|
|
try:
|
|
result = await client(
|
|
functions.contacts.ImportContactsRequest(
|
|
contacts=[
|
|
{
|
|
"client_id": 0,
|
|
"phone": phone,
|
|
"first_name": first_name,
|
|
"last_name": last_name,
|
|
}
|
|
]
|
|
)
|
|
)
|
|
if hasattr(result, "imported") and result.imported:
|
|
return f"Contact {first_name} {last_name} added successfully (alt method)."
|
|
else:
|
|
return f"Contact not added. Alternative method response: {str(result)}"
|
|
except Exception as alt_e:
|
|
logger.exception(f"add_contact (alt method) failed (phone={phone})")
|
|
return log_and_format_error("add_contact", alt_e, phone=phone)
|
|
except Exception as e:
|
|
logger.exception(f"add_contact failed (phone={phone})")
|
|
return log_and_format_error("add_contact", e, phone=phone)
|
|
|
|
|
|
@mcp.tool()
|
|
async def delete_contact(user_id: int) -> str:
|
|
"""
|
|
Delete a contact by user ID.
|
|
Args:
|
|
user_id: The Telegram user ID of the contact to delete.
|
|
"""
|
|
try:
|
|
user = await client.get_entity(user_id)
|
|
await client(functions.contacts.DeleteContactsRequest(id=[user]))
|
|
return f"Contact with user ID {user_id} deleted."
|
|
except Exception as e:
|
|
return log_and_format_error("delete_contact", e, user_id=user_id)
|
|
|
|
|
|
@mcp.tool()
|
|
async def block_user(user_id: int) -> str:
|
|
"""
|
|
Block a user by user ID.
|
|
Args:
|
|
user_id: The Telegram user ID to block.
|
|
"""
|
|
try:
|
|
user = await client.get_entity(user_id)
|
|
await client(functions.contacts.BlockRequest(id=user))
|
|
return f"User {user_id} blocked."
|
|
except Exception as e:
|
|
return log_and_format_error("block_user", e, user_id=user_id)
|
|
|
|
|
|
@mcp.tool()
|
|
async def unblock_user(user_id: int) -> str:
|
|
"""
|
|
Unblock a user by user ID.
|
|
Args:
|
|
user_id: The Telegram user ID to unblock.
|
|
"""
|
|
try:
|
|
user = await client.get_entity(user_id)
|
|
await client(functions.contacts.UnblockRequest(id=user))
|
|
return f"User {user_id} unblocked."
|
|
except Exception as e:
|
|
return log_and_format_error("unblock_user", e, user_id=user_id)
|
|
|
|
|
|
@mcp.tool()
|
|
async def get_me() -> str:
|
|
"""
|
|
Get your own user information.
|
|
"""
|
|
try:
|
|
me = await client.get_me()
|
|
return json.dumps(format_entity(me), indent=2)
|
|
except Exception as e:
|
|
return log_and_format_error("get_me", e)
|
|
|
|
|
|
@mcp.tool()
|
|
async def create_group(title: str, user_ids: list) -> str:
|
|
"""
|
|
Create a new group or supergroup and add users.
|
|
|
|
Args:
|
|
title: Title for the new group
|
|
user_ids: List of user IDs to add to the group
|
|
"""
|
|
try:
|
|
# Convert user IDs to entities
|
|
users = []
|
|
for user_id in user_ids:
|
|
try:
|
|
user = await client.get_entity(user_id)
|
|
users.append(user)
|
|
except Exception as e:
|
|
logger.error(f"Failed to get entity for user ID {user_id}: {e}")
|
|
return f"Error: Could not find user with ID {user_id}"
|
|
|
|
if not users:
|
|
return "Error: No valid users provided"
|
|
|
|
# Create the group with the users
|
|
try:
|
|
# Create a new chat with selected users
|
|
result = await client(functions.messages.CreateChatRequest(users=users, title=title))
|
|
|
|
# Check what type of response we got
|
|
if hasattr(result, "chats") and result.chats:
|
|
created_chat = result.chats[0]
|
|
return f"Group created with ID: {created_chat.id}"
|
|
elif hasattr(result, "chat") and result.chat:
|
|
return f"Group created with ID: {result.chat.id}"
|
|
elif hasattr(result, "chat_id"):
|
|
return f"Group created with ID: {result.chat_id}"
|
|
else:
|
|
# If we can't determine the chat ID directly from the result
|
|
# Try to find it in recent dialogs
|
|
await asyncio.sleep(1) # Give Telegram a moment to register the new group
|
|
dialogs = await client.get_dialogs(limit=5) # Get recent dialogs
|
|
for dialog in dialogs:
|
|
if dialog.title == title:
|
|
return f"Group created with ID: {dialog.id}"
|
|
|
|
# If we still can't find it, at least return success
|
|
return f"Group created successfully. Please check your recent chats for '{title}'."
|
|
|
|
except Exception as create_err:
|
|
if "PEER_FLOOD" in str(create_err):
|
|
return "Error: Cannot create group due to Telegram limits. Try again later."
|
|
else:
|
|
raise # Let the outer exception handler catch it
|
|
except Exception as e:
|
|
logger.exception(f"create_group failed (title={title}, user_ids={user_ids})")
|
|
return log_and_format_error("create_group", e, title=title, user_ids=user_ids)
|
|
|
|
|
|
@mcp.tool()
|
|
async def invite_to_group(group_id: int, user_ids: list) -> str:
|
|
"""
|
|
Invite users to a group or channel.
|
|
|
|
Args:
|
|
group_id: The ID of the group/channel.
|
|
user_ids: List of user IDs to invite.
|
|
"""
|
|
try:
|
|
entity = await client.get_entity(group_id)
|
|
users_to_add = []
|
|
|
|
for user_id in user_ids:
|
|
try:
|
|
user = await client.get_entity(user_id)
|
|
users_to_add.append(user)
|
|
except ValueError as e:
|
|
return f"Error: User with ID {user_id} could not be found. {e}"
|
|
|
|
try:
|
|
result = await client(
|
|
functions.channels.InviteToChannelRequest(channel=entity, users=users_to_add)
|
|
)
|
|
|
|
invited_count = 0
|
|
if hasattr(result, "users") and result.users:
|
|
invited_count = len(result.users)
|
|
elif hasattr(result, "count"):
|
|
invited_count = result.count
|
|
|
|
return f"Successfully invited {invited_count} users to {entity.title}"
|
|
except telethon.errors.rpcerrorlist.UserNotMutualContactError:
|
|
return "Error: Cannot invite users who are not mutual contacts. Please ensure the users are in your contacts and have added you back."
|
|
except telethon.errors.rpcerrorlist.UserPrivacyRestrictedError:
|
|
return (
|
|
"Error: One or more users have privacy settings that prevent you from adding them."
|
|
)
|
|
except Exception as e:
|
|
return log_and_format_error("invite_to_group", e, group_id=group_id, user_ids=user_ids)
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"telegram_mcp invite_to_group failed (group_id={group_id}, user_ids={user_ids})",
|
|
exc_info=True,
|
|
)
|
|
return log_and_format_error("invite_to_group", e, group_id=group_id, user_ids=user_ids)
|
|
|
|
|
|
@mcp.tool()
|
|
async def leave_chat(chat_id: int) -> str:
|
|
"""
|
|
Leave a group or channel by chat ID.
|
|
|
|
Args:
|
|
chat_id: The chat ID to leave.
|
|
"""
|
|
try:
|
|
entity = await client.get_entity(chat_id)
|
|
|
|
# Check the entity type carefully
|
|
if isinstance(entity, Channel):
|
|
# Handle both channels and supergroups (which are also channels in Telegram)
|
|
try:
|
|
await client(functions.channels.LeaveChannelRequest(channel=entity))
|
|
chat_name = getattr(entity, "title", str(chat_id))
|
|
return f"Left channel/supergroup {chat_name} (ID: {chat_id})."
|
|
except Exception as chan_err:
|
|
return log_and_format_error("leave_chat", chan_err, chat_id=chat_id)
|
|
|
|
elif isinstance(entity, Chat):
|
|
# Traditional basic groups (not supergroups)
|
|
try:
|
|
# First try with InputPeerUser
|
|
me = await client.get_me(input_peer=True)
|
|
await client(
|
|
functions.messages.DeleteChatUserRequest(
|
|
chat_id=entity.id, user_id=me # Use the entity ID directly
|
|
)
|
|
)
|
|
chat_name = getattr(entity, "title", str(chat_id))
|
|
return f"Left basic group {chat_name} (ID: {chat_id})."
|
|
except Exception as chat_err:
|
|
# If the above fails, try the second approach
|
|
logger.warning(
|
|
f"First leave attempt failed: {chat_err}, trying alternative method"
|
|
)
|
|
|
|
try:
|
|
# Alternative approach - sometimes this works better
|
|
me_full = await client.get_me()
|
|
await client(
|
|
functions.messages.DeleteChatUserRequest(
|
|
chat_id=entity.id, user_id=me_full.id
|
|
)
|
|
)
|
|
chat_name = getattr(entity, "title", str(chat_id))
|
|
return f"Left basic group {chat_name} (ID: {chat_id})."
|
|
except Exception as alt_err:
|
|
return log_and_format_error("leave_chat", alt_err, chat_id=chat_id)
|
|
else:
|
|
# Cannot leave a user chat this way
|
|
entity_type = type(entity).__name__
|
|
return log_and_format_error(
|
|
"leave_chat",
|
|
Exception(
|
|
f"Cannot leave chat ID {chat_id} of type {entity_type}. This function is for groups and channels only."
|
|
),
|
|
chat_id=chat_id,
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.exception(f"leave_chat failed (chat_id={chat_id})")
|
|
|
|
# Provide helpful hint for common errors
|
|
error_str = str(e).lower()
|
|
if "invalid" in error_str and "chat" in error_str:
|
|
return log_and_format_error(
|
|
"leave_chat",
|
|
Exception(
|
|
f"Error leaving chat: This appears to be a channel/supergroup. Please check the chat ID and try again."
|
|
),
|
|
chat_id=chat_id,
|
|
)
|
|
|
|
return log_and_format_error("leave_chat", e, chat_id=chat_id)
|
|
|
|
|
|
@mcp.tool()
|
|
async def get_participants(chat_id: int) -> str:
|
|
"""
|
|
List all participants in a group or channel.
|
|
Args:
|
|
chat_id: The group or channel ID.
|
|
"""
|
|
try:
|
|
participants = await client.get_participants(chat_id)
|
|
lines = [
|
|
f"ID: {p.id}, Name: {getattr(p, 'first_name', '')} {getattr(p, 'last_name', '')}"
|
|
for p in participants
|
|
]
|
|
return "\n".join(lines)
|
|
except Exception as e:
|
|
return log_and_format_error("get_participants", e, chat_id=chat_id)
|
|
|
|
|
|
@mcp.tool()
|
|
async def send_file(chat_id: int, file_path: str, caption: str = None) -> str:
|
|
"""
|
|
Send a file to a chat.
|
|
Args:
|
|
chat_id: The chat ID.
|
|
file_path: Absolute path to the file to send (must exist and be readable).
|
|
caption: Optional caption for the file.
|
|
"""
|
|
try:
|
|
if not os.path.isfile(file_path):
|
|
return f"File not found: {file_path}"
|
|
if not os.access(file_path, os.R_OK):
|
|
return f"File is not readable: {file_path}"
|
|
entity = await client.get_entity(chat_id)
|
|
await client.send_file(entity, file_path, caption=caption)
|
|
return f"File sent to chat {chat_id}."
|
|
except Exception as e:
|
|
return log_and_format_error(
|
|
"send_file", e, chat_id=chat_id, file_path=file_path, caption=caption
|
|
)
|
|
|
|
|
|
@mcp.tool()
|
|
async def download_media(chat_id: int, message_id: int, file_path: str) -> str:
|
|
"""
|
|
Download media from a message in a chat.
|
|
Args:
|
|
chat_id: The chat ID.
|
|
message_id: The message ID containing the media.
|
|
file_path: Absolute path to save the downloaded file (must be writable).
|
|
"""
|
|
try:
|
|
entity = await client.get_entity(chat_id)
|
|
msg = await client.get_messages(entity, ids=message_id)
|
|
if not msg or not msg.media:
|
|
return "No media found in the specified message."
|
|
# Check if directory is writable
|
|
dir_path = os.path.dirname(file_path) or "."
|
|
if not os.access(dir_path, os.W_OK):
|
|
return f"Directory not writable: {dir_path}"
|
|
await client.download_media(msg, file=file_path)
|
|
if not os.path.isfile(file_path):
|
|
return f"Download failed: file not created at {file_path}"
|
|
return f"Media downloaded to {file_path}."
|
|
except Exception as e:
|
|
return log_and_format_error(
|
|
"download_media", e, chat_id=chat_id, message_id=message_id, file_path=file_path
|
|
)
|
|
|
|
|
|
@mcp.tool()
|
|
async def update_profile(first_name: str = None, last_name: str = None, about: str = None) -> str:
|
|
"""
|
|
Update your profile information (name, bio).
|
|
"""
|
|
try:
|
|
await client(
|
|
functions.account.UpdateProfileRequest(
|
|
first_name=first_name, last_name=last_name, about=about
|
|
)
|
|
)
|
|
return "Profile updated."
|
|
except Exception as e:
|
|
return log_and_format_error(
|
|
"update_profile", e, first_name=first_name, last_name=last_name, about=about
|
|
)
|
|
|
|
|
|
@mcp.tool()
|
|
async def set_profile_photo(file_path: str) -> str:
|
|
"""
|
|
Set a new profile photo.
|
|
"""
|
|
try:
|
|
await client(
|
|
functions.photos.UploadProfilePhotoRequest(file=await client.upload_file(file_path))
|
|
)
|
|
return "Profile photo updated."
|
|
except Exception as e:
|
|
return log_and_format_error("set_profile_photo", e, file_path=file_path)
|
|
|
|
|
|
@mcp.tool()
|
|
async def delete_profile_photo() -> str:
|
|
"""
|
|
Delete your current profile photo.
|
|
"""
|
|
try:
|
|
photos = await client(
|
|
functions.photos.GetUserPhotosRequest(user_id="me", offset=0, max_id=0, limit=1)
|
|
)
|
|
if not photos.photos:
|
|
return "No profile photo to delete."
|
|
await client(functions.photos.DeletePhotosRequest(id=[photos.photos[0].id]))
|
|
return "Profile photo deleted."
|
|
except Exception as e:
|
|
return log_and_format_error("delete_profile_photo", e)
|
|
|
|
|
|
@mcp.tool()
|
|
async def get_privacy_settings() -> str:
|
|
"""
|
|
Get your privacy settings for last seen status.
|
|
"""
|
|
try:
|
|
# Import needed types directly
|
|
from telethon.tl.types import InputPrivacyKeyStatusTimestamp
|
|
|
|
try:
|
|
settings = await client(
|
|
functions.account.GetPrivacyRequest(key=InputPrivacyKeyStatusTimestamp())
|
|
)
|
|
return str(settings)
|
|
except TypeError as e:
|
|
if "TLObject was expected" in str(e):
|
|
return "Error: Privacy settings API call failed due to type mismatch. This is likely a version compatibility issue with Telethon."
|
|
else:
|
|
raise
|
|
except Exception as e:
|
|
logger.exception("get_privacy_settings failed")
|
|
return log_and_format_error("get_privacy_settings", e)
|
|
|
|
|
|
@mcp.tool()
|
|
async def set_privacy_settings(
|
|
key: str, allow_users: list = None, disallow_users: list = None
|
|
) -> str:
|
|
"""
|
|
Set privacy settings (e.g., last seen, phone, etc.).
|
|
|
|
Args:
|
|
key: The privacy setting to modify ('status' for last seen, 'phone', 'profile_photo', etc.)
|
|
allow_users: List of user IDs to allow
|
|
disallow_users: List of user IDs to disallow
|
|
"""
|
|
try:
|
|
# Import needed types
|
|
from telethon.tl.types import (
|
|
InputPrivacyKeyStatusTimestamp,
|
|
InputPrivacyKeyPhoneNumber,
|
|
InputPrivacyKeyProfilePhoto,
|
|
InputPrivacyValueAllowUsers,
|
|
InputPrivacyValueDisallowUsers,
|
|
InputPrivacyValueAllowAll,
|
|
InputPrivacyValueDisallowAll,
|
|
)
|
|
|
|
# Map the simplified keys to their corresponding input types
|
|
key_mapping = {
|
|
"status": InputPrivacyKeyStatusTimestamp,
|
|
"phone": InputPrivacyKeyPhoneNumber,
|
|
"profile_photo": InputPrivacyKeyProfilePhoto,
|
|
}
|
|
|
|
# Get the appropriate key class
|
|
if key not in key_mapping:
|
|
return f"Error: Unsupported privacy key '{key}'. Supported keys: {', '.join(key_mapping.keys())}"
|
|
|
|
privacy_key = key_mapping[key]()
|
|
|
|
# Prepare the rules
|
|
rules = []
|
|
|
|
# Process allow rules
|
|
if allow_users is None or len(allow_users) == 0:
|
|
# If no specific users to allow, allow everyone by default
|
|
rules.append(InputPrivacyValueAllowAll())
|
|
else:
|
|
# Convert user IDs to InputUser entities
|
|
try:
|
|
allow_entities = []
|
|
for user_id in allow_users:
|
|
try:
|
|
user = await client.get_entity(user_id)
|
|
allow_entities.append(user)
|
|
except Exception as user_err:
|
|
logger.warning(f"Could not get entity for user ID {user_id}: {user_err}")
|
|
|
|
if allow_entities:
|
|
rules.append(InputPrivacyValueAllowUsers(users=allow_entities))
|
|
except Exception as allow_err:
|
|
logger.error(f"Error processing allowed users: {allow_err}")
|
|
return log_and_format_error("set_privacy_settings", allow_err, key=key)
|
|
|
|
# Process disallow rules
|
|
if disallow_users and len(disallow_users) > 0:
|
|
try:
|
|
disallow_entities = []
|
|
for user_id in disallow_users:
|
|
try:
|
|
user = await client.get_entity(user_id)
|
|
disallow_entities.append(user)
|
|
except Exception as user_err:
|
|
logger.warning(f"Could not get entity for user ID {user_id}: {user_err}")
|
|
|
|
if disallow_entities:
|
|
rules.append(InputPrivacyValueDisallowUsers(users=disallow_entities))
|
|
except Exception as disallow_err:
|
|
logger.error(f"Error processing disallowed users: {disallow_err}")
|
|
return log_and_format_error("set_privacy_settings", disallow_err, key=key)
|
|
|
|
# Apply the privacy settings
|
|
try:
|
|
result = await client(
|
|
functions.account.SetPrivacyRequest(key=privacy_key, rules=rules)
|
|
)
|
|
return f"Privacy settings for {key} updated successfully."
|
|
except TypeError as type_err:
|
|
if "TLObject was expected" in str(type_err):
|
|
return "Error: Privacy settings API call failed due to type mismatch. This is likely a version compatibility issue with Telethon."
|
|
else:
|
|
raise
|
|
except Exception as e:
|
|
logger.exception(f"set_privacy_settings failed (key={key})")
|
|
return log_and_format_error("set_privacy_settings", e, key=key)
|
|
|
|
|
|
@mcp.tool()
|
|
async def import_contacts(contacts: list) -> str:
|
|
"""
|
|
Import a list of contacts. Each contact should be a dict with phone, first_name, last_name.
|
|
"""
|
|
try:
|
|
input_contacts = [
|
|
functions.contacts.InputPhoneContact(
|
|
client_id=i,
|
|
phone=c["phone"],
|
|
first_name=c["first_name"],
|
|
last_name=c.get("last_name", ""),
|
|
)
|
|
for i, c in enumerate(contacts)
|
|
]
|
|
result = await client(functions.contacts.ImportContactsRequest(contacts=input_contacts))
|
|
return f"Imported {len(result.imported)} contacts."
|
|
except Exception as e:
|
|
return log_and_format_error("import_contacts", e, contacts=contacts)
|
|
|
|
|
|
@mcp.tool()
|
|
async def export_contacts() -> str:
|
|
"""
|
|
Export all contacts as a JSON string.
|
|
"""
|
|
try:
|
|
result = await client(functions.contacts.GetContactsRequest(hash=0))
|
|
users = result.users
|
|
return json.dumps([format_entity(u) for u in users], indent=2)
|
|
except Exception as e:
|
|
return log_and_format_error("export_contacts", e)
|
|
|
|
|
|
@mcp.tool()
|
|
async def get_blocked_users() -> str:
|
|
"""
|
|
Get a list of blocked users.
|
|
"""
|
|
try:
|
|
result = await client(functions.contacts.GetBlockedRequest(offset=0, limit=100))
|
|
return json.dumps([format_entity(u) for u in result.users], indent=2)
|
|
except Exception as e:
|
|
return log_and_format_error("get_blocked_users", e)
|
|
|
|
|
|
@mcp.tool()
|
|
async def create_channel(title: str, about: str = "", megagroup: bool = False) -> str:
|
|
"""
|
|
Create a new channel or supergroup.
|
|
"""
|
|
try:
|
|
result = await client(
|
|
functions.channels.CreateChannelRequest(title=title, about=about, megagroup=megagroup)
|
|
)
|
|
return f"Channel '{title}' created with ID: {result.chats[0].id}"
|
|
except Exception as e:
|
|
return log_and_format_error(
|
|
"create_channel", e, title=title, about=about, megagroup=megagroup
|
|
)
|
|
|
|
|
|
@mcp.tool()
|
|
async def edit_chat_title(chat_id: int, title: str) -> str:
|
|
"""
|
|
Edit the title of a chat, group, or channel.
|
|
"""
|
|
try:
|
|
entity = await client.get_entity(chat_id)
|
|
if isinstance(entity, Channel):
|
|
await client(functions.channels.EditTitleRequest(channel=entity, title=title))
|
|
elif isinstance(entity, Chat):
|
|
await client(functions.messages.EditChatTitleRequest(chat_id=chat_id, title=title))
|
|
else:
|
|
return f"Cannot edit title for this entity type ({type(entity)})."
|
|
return f"Chat {chat_id} title updated to '{title}'."
|
|
except Exception as e:
|
|
logger.exception(f"edit_chat_title failed (chat_id={chat_id}, title='{title}')")
|
|
return log_and_format_error("edit_chat_title", e, chat_id=chat_id, title=title)
|
|
|
|
|
|
@mcp.tool()
|
|
async def edit_chat_photo(chat_id: int, file_path: str) -> str:
|
|
"""
|
|
Edit the photo of a chat, group, or channel. Requires a file path to an image.
|
|
"""
|
|
try:
|
|
if not os.path.isfile(file_path):
|
|
return f"Photo file not found: {file_path}"
|
|
if not os.access(file_path, os.R_OK):
|
|
return f"Photo file not readable: {file_path}"
|
|
|
|
entity = await client.get_entity(chat_id)
|
|
uploaded_file = await client.upload_file(file_path)
|
|
|
|
if isinstance(entity, Channel):
|
|
# For channels/supergroups, use EditPhotoRequest with InputChatUploadedPhoto
|
|
input_photo = InputChatUploadedPhoto(file=uploaded_file)
|
|
await client(functions.channels.EditPhotoRequest(channel=entity, photo=input_photo))
|
|
elif isinstance(entity, Chat):
|
|
# For basic groups, use EditChatPhotoRequest with InputChatUploadedPhoto
|
|
input_photo = InputChatUploadedPhoto(file=uploaded_file)
|
|
await client(
|
|
functions.messages.EditChatPhotoRequest(chat_id=chat_id, photo=input_photo)
|
|
)
|
|
else:
|
|
return f"Cannot edit photo for this entity type ({type(entity)})."
|
|
|
|
return f"Chat {chat_id} photo updated."
|
|
except Exception as e:
|
|
logger.exception(f"edit_chat_photo failed (chat_id={chat_id}, file_path='{file_path}')")
|
|
return log_and_format_error("edit_chat_photo", e, chat_id=chat_id, file_path=file_path)
|
|
|
|
|
|
@mcp.tool()
|
|
async def delete_chat_photo(chat_id: int) -> str:
|
|
"""
|
|
Delete the photo of a chat, group, or channel.
|
|
"""
|
|
try:
|
|
entity = await client.get_entity(chat_id)
|
|
if isinstance(entity, Channel):
|
|
# Use InputChatPhotoEmpty for channels/supergroups
|
|
await client(
|
|
functions.channels.EditPhotoRequest(channel=entity, photo=InputChatPhotoEmpty())
|
|
)
|
|
elif isinstance(entity, Chat):
|
|
# Use None (or InputChatPhotoEmpty) for basic groups
|
|
await client(
|
|
functions.messages.EditChatPhotoRequest(
|
|
chat_id=chat_id, photo=InputChatPhotoEmpty()
|
|
)
|
|
)
|
|
else:
|
|
return f"Cannot delete photo for this entity type ({type(entity)})."
|
|
|
|
return f"Chat {chat_id} photo deleted."
|
|
except Exception as e:
|
|
logger.exception(f"delete_chat_photo failed (chat_id={chat_id})")
|
|
return log_and_format_error("delete_chat_photo", e, chat_id=chat_id)
|
|
|
|
|
|
@mcp.tool()
|
|
async def promote_admin(group_id: int, user_id: int, rights: dict = None) -> str:
|
|
"""
|
|
Promote a user to admin in a group/channel.
|
|
|
|
Args:
|
|
group_id: ID of the group/channel
|
|
user_id: User ID to promote
|
|
rights: Admin rights to give (optional)
|
|
"""
|
|
try:
|
|
chat = await client.get_entity(group_id)
|
|
user = await client.get_entity(user_id)
|
|
|
|
# Set default admin rights if not provided
|
|
if not rights:
|
|
rights = {
|
|
"change_info": True,
|
|
"post_messages": True,
|
|
"edit_messages": True,
|
|
"delete_messages": True,
|
|
"ban_users": True,
|
|
"invite_users": True,
|
|
"pin_messages": True,
|
|
"add_admins": False,
|
|
"anonymous": False,
|
|
"manage_call": True,
|
|
"other": True,
|
|
}
|
|
|
|
admin_rights = ChatAdminRights(
|
|
change_info=rights.get("change_info", True),
|
|
post_messages=rights.get("post_messages", True),
|
|
edit_messages=rights.get("edit_messages", True),
|
|
delete_messages=rights.get("delete_messages", True),
|
|
ban_users=rights.get("ban_users", True),
|
|
invite_users=rights.get("invite_users", True),
|
|
pin_messages=rights.get("pin_messages", True),
|
|
add_admins=rights.get("add_admins", False),
|
|
anonymous=rights.get("anonymous", False),
|
|
manage_call=rights.get("manage_call", True),
|
|
other=rights.get("other", True),
|
|
)
|
|
|
|
try:
|
|
result = await client(
|
|
functions.channels.EditAdminRequest(
|
|
channel=chat, user_id=user, admin_rights=admin_rights, rank="Admin"
|
|
)
|
|
)
|
|
return f"Successfully promoted user {user_id} to admin in {chat.title}"
|
|
except telethon.errors.rpcerrorlist.UserNotMutualContactError:
|
|
return "Error: Cannot promote users who are not mutual contacts. Please ensure the user is in your contacts and has added you back."
|
|
except Exception as e:
|
|
return log_and_format_error("promote_admin", e, group_id=group_id, user_id=user_id)
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"telegram_mcp promote_admin failed (group_id={group_id}, user_id={user_id})",
|
|
exc_info=True,
|
|
)
|
|
return log_and_format_error("promote_admin", e, group_id=group_id, user_id=user_id)
|
|
|
|
|
|
@mcp.tool()
|
|
async def demote_admin(group_id: int, user_id: int) -> str:
|
|
"""
|
|
Demote a user from admin in a group/channel.
|
|
|
|
Args:
|
|
group_id: ID of the group/channel
|
|
user_id: User ID to demote
|
|
"""
|
|
try:
|
|
chat = await client.get_entity(group_id)
|
|
user = await client.get_entity(user_id)
|
|
|
|
# Create empty admin rights (regular user)
|
|
admin_rights = ChatAdminRights(
|
|
change_info=False,
|
|
post_messages=False,
|
|
edit_messages=False,
|
|
delete_messages=False,
|
|
ban_users=False,
|
|
invite_users=False,
|
|
pin_messages=False,
|
|
add_admins=False,
|
|
anonymous=False,
|
|
manage_call=False,
|
|
other=False,
|
|
)
|
|
|
|
try:
|
|
result = await client(
|
|
functions.channels.EditAdminRequest(
|
|
channel=chat, user_id=user, admin_rights=admin_rights, rank=""
|
|
)
|
|
)
|
|
return f"Successfully demoted user {user_id} from admin in {chat.title}"
|
|
except telethon.errors.rpcerrorlist.UserNotMutualContactError:
|
|
return "Error: Cannot modify admin status of users who are not mutual contacts. Please ensure the user is in your contacts and has added you back."
|
|
except Exception as e:
|
|
return log_and_format_error("demote_admin", e, group_id=group_id, user_id=user_id)
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"telegram_mcp demote_admin failed (group_id={group_id}, user_id={user_id})",
|
|
exc_info=True,
|
|
)
|
|
return log_and_format_error("demote_admin", e, group_id=group_id, user_id=user_id)
|
|
|
|
|
|
@mcp.tool()
|
|
async def ban_user(chat_id: int, user_id: int) -> str:
|
|
"""
|
|
Ban a user from a group or channel.
|
|
|
|
Args:
|
|
chat_id: ID of the group/channel
|
|
user_id: User ID to ban
|
|
"""
|
|
try:
|
|
chat = await client.get_entity(chat_id)
|
|
user = await client.get_entity(user_id)
|
|
|
|
# Create banned rights (all restrictions enabled)
|
|
banned_rights = ChatBannedRights(
|
|
until_date=None, # Ban forever
|
|
view_messages=True,
|
|
send_messages=True,
|
|
send_media=True,
|
|
send_stickers=True,
|
|
send_gifs=True,
|
|
send_games=True,
|
|
send_inline=True,
|
|
embed_links=True,
|
|
send_polls=True,
|
|
change_info=True,
|
|
invite_users=True,
|
|
pin_messages=True,
|
|
)
|
|
|
|
try:
|
|
await client(
|
|
functions.channels.EditBannedRequest(
|
|
channel=chat, participant=user, banned_rights=banned_rights
|
|
)
|
|
)
|
|
return f"User {user_id} banned from chat {chat.title} (ID: {chat_id})."
|
|
except telethon.errors.rpcerrorlist.UserNotMutualContactError:
|
|
return "Error: Cannot ban users who are not mutual contacts. Please ensure the user is in your contacts and has added you back."
|
|
except Exception as e:
|
|
return log_and_format_error("ban_user", e, chat_id=chat_id, user_id=user_id)
|
|
except Exception as e:
|
|
logger.exception(f"ban_user failed (chat_id={chat_id}, user_id={user_id})")
|
|
return log_and_format_error("ban_user", e, chat_id=chat_id, user_id=user_id)
|
|
|
|
|
|
@mcp.tool()
|
|
async def unban_user(chat_id: int, user_id: int) -> str:
|
|
"""
|
|
Unban a user from a group or channel.
|
|
|
|
Args:
|
|
chat_id: ID of the group/channel
|
|
user_id: User ID to unban
|
|
"""
|
|
try:
|
|
chat = await client.get_entity(chat_id)
|
|
user = await client.get_entity(user_id)
|
|
|
|
# Create unbanned rights (no restrictions)
|
|
unbanned_rights = ChatBannedRights(
|
|
until_date=None,
|
|
view_messages=False,
|
|
send_messages=False,
|
|
send_media=False,
|
|
send_stickers=False,
|
|
send_gifs=False,
|
|
send_games=False,
|
|
send_inline=False,
|
|
embed_links=False,
|
|
send_polls=False,
|
|
change_info=False,
|
|
invite_users=False,
|
|
pin_messages=False,
|
|
)
|
|
|
|
try:
|
|
await client(
|
|
functions.channels.EditBannedRequest(
|
|
channel=chat, participant=user, banned_rights=unbanned_rights
|
|
)
|
|
)
|
|
return f"User {user_id} unbanned from chat {chat.title} (ID: {chat_id})."
|
|
except telethon.errors.rpcerrorlist.UserNotMutualContactError:
|
|
return "Error: Cannot modify status of users who are not mutual contacts. Please ensure the user is in your contacts and has added you back."
|
|
except Exception as e:
|
|
return log_and_format_error("unban_user", e, chat_id=chat_id, user_id=user_id)
|
|
except Exception as e:
|
|
logger.exception(f"unban_user failed (chat_id={chat_id}, user_id={user_id})")
|
|
return log_and_format_error("unban_user", e, chat_id=chat_id, user_id=user_id)
|
|
|
|
|
|
@mcp.tool()
|
|
async def get_admins(chat_id: int) -> str:
|
|
"""
|
|
Get all admins in a group or channel.
|
|
"""
|
|
try:
|
|
# Fix: Use the correct filter type ChannelParticipantsAdmins
|
|
participants = await client.get_participants(chat_id, filter=ChannelParticipantsAdmins())
|
|
lines = [
|
|
f"ID: {p.id}, Name: {getattr(p, 'first_name', '')} {getattr(p, 'last_name', '')}".strip()
|
|
for p in participants
|
|
]
|
|
return "\n".join(lines) if lines else "No admins found."
|
|
except Exception as e:
|
|
logger.exception(f"get_admins failed (chat_id={chat_id})")
|
|
return log_and_format_error("get_admins", e, chat_id=chat_id)
|
|
|
|
|
|
@mcp.tool()
|
|
async def get_banned_users(chat_id: int) -> str:
|
|
"""
|
|
Get all banned users in a group or channel.
|
|
"""
|
|
try:
|
|
# Fix: Use the correct filter type ChannelParticipantsKicked
|
|
participants = await client.get_participants(
|
|
chat_id, filter=ChannelParticipantsKicked(q="")
|
|
)
|
|
lines = [
|
|
f"ID: {p.id}, Name: {getattr(p, 'first_name', '')} {getattr(p, 'last_name', '')}".strip()
|
|
for p in participants
|
|
]
|
|
return "\n".join(lines) if lines else "No banned users found."
|
|
except Exception as e:
|
|
logger.exception(f"get_banned_users failed (chat_id={chat_id})")
|
|
return log_and_format_error("get_banned_users", e, chat_id=chat_id)
|
|
|
|
|
|
@mcp.tool()
|
|
async def get_invite_link(chat_id: int) -> str:
|
|
"""
|
|
Get the invite link for a group or channel.
|
|
"""
|
|
try:
|
|
entity = await client.get_entity(chat_id)
|
|
|
|
# Try using ExportChatInviteRequest first
|
|
try:
|
|
from telethon.tl import functions
|
|
|
|
result = await client(functions.messages.ExportChatInviteRequest(peer=entity))
|
|
return result.link
|
|
except AttributeError:
|
|
# If the function doesn't exist in the current Telethon version
|
|
logger.warning("ExportChatInviteRequest not available, using alternative method")
|
|
except Exception as e1:
|
|
# If that fails, log and try alternative approach
|
|
logger.warning(f"ExportChatInviteRequest failed: {e1}")
|
|
|
|
# Alternative approach using client.export_chat_invite_link
|
|
try:
|
|
invite_link = await client.export_chat_invite_link(entity)
|
|
return invite_link
|
|
except Exception as e2:
|
|
logger.warning(f"export_chat_invite_link failed: {e2}")
|
|
|
|
# Last resort: Try directly fetching chat info
|
|
try:
|
|
if isinstance(entity, (Chat, Channel)):
|
|
full_chat = await client(functions.messages.GetFullChatRequest(chat_id=entity.id))
|
|
if hasattr(full_chat, "full_chat") and hasattr(full_chat.full_chat, "invite_link"):
|
|
return full_chat.full_chat.invite_link or "No invite link available."
|
|
except Exception as e3:
|
|
logger.warning(f"GetFullChatRequest failed: {e3}")
|
|
|
|
return "Could not retrieve invite link for this chat."
|
|
except Exception as e:
|
|
logger.exception(f"get_invite_link failed (chat_id={chat_id})")
|
|
return log_and_format_error("get_invite_link", e, chat_id=chat_id)
|
|
|
|
|
|
@mcp.tool()
|
|
async def join_chat_by_link(link: str) -> str:
|
|
"""
|
|
Join a chat by invite link.
|
|
"""
|
|
try:
|
|
# Extract the hash from the invite link
|
|
if "/" in link:
|
|
hash_part = link.split("/")[-1]
|
|
if hash_part.startswith("+"):
|
|
hash_part = hash_part[1:] # Remove the '+' if present
|
|
else:
|
|
hash_part = link
|
|
|
|
# Try checking the invite before joining
|
|
try:
|
|
from telethon.errors import (
|
|
InviteHashExpiredError,
|
|
InviteHashInvalidError,
|
|
UserAlreadyParticipantError,
|
|
ChatAdminRequiredError,
|
|
UsersTooMuchError,
|
|
)
|
|
|
|
# Try to check invite info first (will often fail if not a member)
|
|
invite_info = await client(functions.messages.CheckChatInviteRequest(hash=hash_part))
|
|
if hasattr(invite_info, "chat") and invite_info.chat:
|
|
# If we got chat info, we're already a member
|
|
chat_title = getattr(invite_info.chat, "title", "Unknown Chat")
|
|
return f"You are already a member of this chat: {chat_title}"
|
|
except Exception as check_err:
|
|
# This often fails if not a member - just continue
|
|
pass
|
|
|
|
# Join the chat using the hash
|
|
try:
|
|
result = await client(functions.messages.ImportChatInviteRequest(hash=hash_part))
|
|
if result and hasattr(result, "chats") and result.chats:
|
|
chat_title = getattr(result.chats[0], "title", "Unknown Chat")
|
|
return f"Successfully joined chat: {chat_title}"
|
|
return f"Joined chat via invite hash."
|
|
except Exception as join_err:
|
|
err_str = str(join_err).lower()
|
|
if "expired" in err_str:
|
|
return "The invite hash has expired and is no longer valid."
|
|
elif "invalid" in err_str:
|
|
return "The invite hash is invalid or malformed."
|
|
elif "already" in err_str and "participant" in err_str:
|
|
return "You are already a member of this chat."
|
|
elif "admin" in err_str:
|
|
return "Cannot join this chat - requires admin approval."
|
|
elif "too much" in err_str or "too many" in err_str:
|
|
return "Cannot join this chat - it has reached maximum number of participants."
|
|
else:
|
|
raise # Re-raise to be caught by the outer exception handler
|
|
except Exception as e:
|
|
logger.exception(f"join_chat_by_link failed (link={link})")
|
|
return log_and_format_error("join_chat_by_link", e, link=link)
|
|
|
|
|
|
@mcp.tool()
|
|
async def export_chat_invite(chat_id: int) -> str:
|
|
"""
|
|
Export a chat invite link.
|
|
"""
|
|
try:
|
|
entity = await client.get_entity(chat_id)
|
|
|
|
# Try using ExportChatInviteRequest first
|
|
try:
|
|
from telethon.tl import functions
|
|
|
|
result = await client(functions.messages.ExportChatInviteRequest(peer=entity))
|
|
return result.link
|
|
except AttributeError:
|
|
# If the function doesn't exist in the current Telethon version
|
|
logger.warning("ExportChatInviteRequest not available, using alternative method")
|
|
except Exception as e1:
|
|
# If that fails, log and try alternative approach
|
|
logger.warning(f"ExportChatInviteRequest failed: {e1}")
|
|
|
|
# Alternative approach using client.export_chat_invite_link
|
|
try:
|
|
invite_link = await client.export_chat_invite_link(entity)
|
|
return invite_link
|
|
except Exception as e2:
|
|
logger.warning(f"export_chat_invite_link failed: {e2}")
|
|
return log_and_format_error("export_chat_invite", e2, chat_id=chat_id)
|
|
except Exception as e:
|
|
logger.exception(f"export_chat_invite failed (chat_id={chat_id})")
|
|
return log_and_format_error("export_chat_invite", e, chat_id=chat_id)
|
|
|
|
|
|
@mcp.tool()
|
|
async def import_chat_invite(hash: str) -> str:
|
|
"""
|
|
Import a chat invite by hash.
|
|
"""
|
|
try:
|
|
# Remove any prefixes like '+' if present
|
|
if hash.startswith("+"):
|
|
hash = hash[1:]
|
|
|
|
# Try checking the invite before joining
|
|
try:
|
|
from telethon.errors import (
|
|
InviteHashExpiredError,
|
|
InviteHashInvalidError,
|
|
UserAlreadyParticipantError,
|
|
ChatAdminRequiredError,
|
|
UsersTooMuchError,
|
|
)
|
|
|
|
# Try to check invite info first (will often fail if not a member)
|
|
invite_info = await client(functions.messages.CheckChatInviteRequest(hash=hash))
|
|
if hasattr(invite_info, "chat") and invite_info.chat:
|
|
# If we got chat info, we're already a member
|
|
chat_title = getattr(invite_info.chat, "title", "Unknown Chat")
|
|
return f"You are already a member of this chat: {chat_title}"
|
|
except Exception as check_err:
|
|
# This often fails if not a member - just continue
|
|
pass
|
|
|
|
# Join the chat using the hash
|
|
try:
|
|
result = await client(functions.messages.ImportChatInviteRequest(hash=hash))
|
|
if result and hasattr(result, "chats") and result.chats:
|
|
chat_title = getattr(result.chats[0], "title", "Unknown Chat")
|
|
return f"Successfully joined chat: {chat_title}"
|
|
return f"Joined chat via invite hash."
|
|
except Exception as join_err:
|
|
err_str = str(join_err).lower()
|
|
if "expired" in err_str:
|
|
return "The invite hash has expired and is no longer valid."
|
|
elif "invalid" in err_str:
|
|
return "The invite hash is invalid or malformed."
|
|
elif "already" in err_str and "participant" in err_str:
|
|
return "You are already a member of this chat."
|
|
elif "admin" in err_str:
|
|
return "Cannot join this chat - requires admin approval."
|
|
elif "too much" in err_str or "too many" in err_str:
|
|
return "Cannot join this chat - it has reached maximum number of participants."
|
|
else:
|
|
raise # Re-raise to be caught by the outer exception handler
|
|
except Exception as e:
|
|
logger.exception(f"import_chat_invite failed (hash={hash})")
|
|
return log_and_format_error("import_chat_invite", e, hash=hash)
|
|
|
|
|
|
@mcp.tool()
|
|
async def send_voice(chat_id: int, file_path: str) -> str:
|
|
"""
|
|
Send a voice message to a chat. File must be an OGG/OPUS voice note.
|
|
Args:
|
|
chat_id: The chat ID.
|
|
file_path: Absolute path to the OGG/OPUS file.
|
|
"""
|
|
try:
|
|
if not os.path.isfile(file_path):
|
|
return f"File not found: {file_path}"
|
|
if not os.access(file_path, os.R_OK):
|
|
return f"File is not readable: {file_path}"
|
|
mime, _ = mimetypes.guess_type(file_path)
|
|
if not (
|
|
mime
|
|
and (
|
|
mime == "audio/ogg"
|
|
or file_path.lower().endswith(".ogg")
|
|
or file_path.lower().endswith(".opus")
|
|
)
|
|
):
|
|
return "Voice file must be .ogg or .opus format."
|
|
entity = await client.get_entity(chat_id)
|
|
await client.send_file(entity, file_path, voice_note=True)
|
|
return f"Voice message sent to chat {chat_id}."
|
|
except Exception as e:
|
|
return log_and_format_error("send_voice", e, chat_id=chat_id, file_path=file_path)
|
|
|
|
|
|
@mcp.tool()
|
|
async def forward_message(from_chat_id: int, message_id: int, to_chat_id: int) -> str:
|
|
"""
|
|
Forward a message from one chat to another.
|
|
"""
|
|
try:
|
|
from_entity = await client.get_entity(from_chat_id)
|
|
to_entity = await client.get_entity(to_chat_id)
|
|
await client.forward_messages(to_entity, message_id, from_entity)
|
|
return f"Message {message_id} forwarded from {from_chat_id} to {to_chat_id}."
|
|
except Exception as e:
|
|
return log_and_format_error(
|
|
"forward_message",
|
|
e,
|
|
from_chat_id=from_chat_id,
|
|
message_id=message_id,
|
|
to_chat_id=to_chat_id,
|
|
)
|
|
|
|
|
|
@mcp.tool()
|
|
async def edit_message(chat_id: int, message_id: int, new_text: str) -> str:
|
|
"""
|
|
Edit a message you sent.
|
|
"""
|
|
try:
|
|
entity = await client.get_entity(chat_id)
|
|
await client.edit_message(entity, message_id, new_text)
|
|
return f"Message {message_id} edited."
|
|
except Exception as e:
|
|
return log_and_format_error(
|
|
"edit_message", e, chat_id=chat_id, message_id=message_id, new_text=new_text
|
|
)
|
|
|
|
|
|
@mcp.tool()
|
|
async def delete_message(chat_id: int, message_id: int) -> str:
|
|
"""
|
|
Delete a message by ID.
|
|
"""
|
|
try:
|
|
entity = await client.get_entity(chat_id)
|
|
await client.delete_messages(entity, message_id)
|
|
return f"Message {message_id} deleted."
|
|
except Exception as e:
|
|
return log_and_format_error("delete_message", e, chat_id=chat_id, message_id=message_id)
|
|
|
|
|
|
@mcp.tool()
|
|
async def pin_message(chat_id: int, message_id: int) -> str:
|
|
"""
|
|
Pin a message in a chat.
|
|
"""
|
|
try:
|
|
entity = await client.get_entity(chat_id)
|
|
await client.pin_message(entity, message_id)
|
|
return f"Message {message_id} pinned in chat {chat_id}."
|
|
except Exception as e:
|
|
return log_and_format_error("pin_message", e, chat_id=chat_id, message_id=message_id)
|
|
|
|
|
|
@mcp.tool()
|
|
async def unpin_message(chat_id: int, message_id: int) -> str:
|
|
"""
|
|
Unpin a message in a chat.
|
|
"""
|
|
try:
|
|
entity = await client.get_entity(chat_id)
|
|
await client.unpin_message(entity, message_id)
|
|
return f"Message {message_id} unpinned in chat {chat_id}."
|
|
except Exception as e:
|
|
return log_and_format_error("unpin_message", e, chat_id=chat_id, message_id=message_id)
|
|
|
|
|
|
@mcp.tool()
|
|
async def mark_as_read(chat_id: int) -> str:
|
|
"""
|
|
Mark all messages as read in a chat.
|
|
"""
|
|
try:
|
|
entity = await client.get_entity(chat_id)
|
|
await client.send_read_acknowledge(entity)
|
|
return f"Marked all messages as read in chat {chat_id}."
|
|
except Exception as e:
|
|
return log_and_format_error("mark_as_read", e, chat_id=chat_id)
|
|
|
|
|
|
@mcp.tool()
|
|
async def reply_to_message(chat_id: int, message_id: int, text: str) -> str:
|
|
"""
|
|
Reply to a specific message in a chat.
|
|
"""
|
|
try:
|
|
entity = await client.get_entity(chat_id)
|
|
await client.send_message(entity, text, reply_to=message_id)
|
|
return f"Replied to message {message_id} in chat {chat_id}."
|
|
except Exception as e:
|
|
return log_and_format_error(
|
|
"reply_to_message", e, chat_id=chat_id, message_id=message_id, text=text
|
|
)
|
|
|
|
|
|
@mcp.tool()
|
|
async def get_media_info(chat_id: int, message_id: int) -> str:
|
|
"""
|
|
Get info about media in a message.
|
|
Args:
|
|
chat_id: The chat ID.
|
|
message_id: The message ID.
|
|
"""
|
|
try:
|
|
entity = await client.get_entity(chat_id)
|
|
msg = await client.get_messages(entity, ids=message_id)
|
|
if not msg or not msg.media:
|
|
return "No media found in the specified message."
|
|
return str(msg.media)
|
|
except Exception as e:
|
|
return log_and_format_error("get_media_info", e, chat_id=chat_id, message_id=message_id)
|
|
|
|
|
|
@mcp.tool()
|
|
async def search_public_chats(query: str) -> str:
|
|
"""
|
|
Search for public chats, channels, or bots by username or title.
|
|
"""
|
|
try:
|
|
result = await client(functions.contacts.SearchRequest(q=query, limit=20))
|
|
return json.dumps([format_entity(u) for u in result.users], indent=2)
|
|
except Exception as e:
|
|
return log_and_format_error("search_public_chats", e, query=query)
|
|
|
|
|
|
@mcp.tool()
|
|
async def search_messages(chat_id: int, query: str, limit: int = 20) -> str:
|
|
"""
|
|
Search for messages in a chat by text.
|
|
"""
|
|
try:
|
|
entity = await client.get_entity(chat_id)
|
|
messages = await client.get_messages(entity, limit=limit, search=query)
|
|
return "\n".join([f"ID: {m.id} | {m.date} | {m.message}" for m in messages])
|
|
except Exception as e:
|
|
return log_and_format_error(
|
|
"search_messages", e, chat_id=chat_id, query=query, limit=limit
|
|
)
|
|
|
|
|
|
@mcp.tool()
|
|
async def resolve_username(username: str) -> str:
|
|
"""
|
|
Resolve a username to a user or chat ID.
|
|
"""
|
|
try:
|
|
result = await client(functions.contacts.ResolveUsernameRequest(username=username))
|
|
return str(result)
|
|
except Exception as e:
|
|
return log_and_format_error("resolve_username", e, username=username)
|
|
|
|
|
|
@mcp.tool()
|
|
async def mute_chat(chat_id: int) -> str:
|
|
"""
|
|
Mute notifications for a chat.
|
|
"""
|
|
try:
|
|
from telethon.tl.types import InputPeerNotifySettings
|
|
|
|
peer = await client.get_entity(chat_id)
|
|
await client(
|
|
functions.account.UpdateNotifySettingsRequest(
|
|
peer=peer, settings=InputPeerNotifySettings(mute_until=2**31 - 1)
|
|
)
|
|
)
|
|
return f"Chat {chat_id} muted."
|
|
except (ImportError, AttributeError) as type_err:
|
|
try:
|
|
# Alternative approach directly using raw API
|
|
peer = await client.get_input_entity(chat_id)
|
|
await client(
|
|
functions.account.UpdateNotifySettingsRequest(
|
|
peer=peer,
|
|
settings={
|
|
"mute_until": 2**31 - 1, # Far future
|
|
"show_previews": False,
|
|
"silent": True,
|
|
},
|
|
)
|
|
)
|
|
return f"Chat {chat_id} muted (using alternative method)."
|
|
except Exception as alt_e:
|
|
logger.exception(f"mute_chat (alt method) failed (chat_id={chat_id})")
|
|
return log_and_format_error("mute_chat", alt_e, chat_id=chat_id)
|
|
except Exception as e:
|
|
logger.exception(f"mute_chat failed (chat_id={chat_id})")
|
|
return log_and_format_error("mute_chat", e, chat_id=chat_id)
|
|
|
|
|
|
@mcp.tool()
|
|
async def unmute_chat(chat_id: int) -> str:
|
|
"""
|
|
Unmute notifications for a chat.
|
|
"""
|
|
try:
|
|
from telethon.tl.types import InputPeerNotifySettings
|
|
|
|
peer = await client.get_entity(chat_id)
|
|
await client(
|
|
functions.account.UpdateNotifySettingsRequest(
|
|
peer=peer, settings=InputPeerNotifySettings(mute_until=0)
|
|
)
|
|
)
|
|
return f"Chat {chat_id} unmuted."
|
|
except (ImportError, AttributeError) as type_err:
|
|
try:
|
|
# Alternative approach directly using raw API
|
|
peer = await client.get_input_entity(chat_id)
|
|
await client(
|
|
functions.account.UpdateNotifySettingsRequest(
|
|
peer=peer,
|
|
settings={
|
|
"mute_until": 0, # Unmute (current time)
|
|
"show_previews": True,
|
|
"silent": False,
|
|
},
|
|
)
|
|
)
|
|
return f"Chat {chat_id} unmuted (using alternative method)."
|
|
except Exception as alt_e:
|
|
logger.exception(f"unmute_chat (alt method) failed (chat_id={chat_id})")
|
|
return log_and_format_error("unmute_chat", alt_e, chat_id=chat_id)
|
|
except Exception as e:
|
|
logger.exception(f"unmute_chat failed (chat_id={chat_id})")
|
|
return log_and_format_error("unmute_chat", e, chat_id=chat_id)
|
|
|
|
|
|
@mcp.tool()
|
|
async def archive_chat(chat_id: int) -> str:
|
|
"""
|
|
Archive a chat.
|
|
"""
|
|
try:
|
|
await client(
|
|
functions.messages.ToggleDialogPinRequest(
|
|
peer=await client.get_entity(chat_id), pinned=True
|
|
)
|
|
)
|
|
return f"Chat {chat_id} archived."
|
|
except Exception as e:
|
|
return log_and_format_error("archive_chat", e, chat_id=chat_id)
|
|
|
|
|
|
@mcp.tool()
|
|
async def unarchive_chat(chat_id: int) -> str:
|
|
"""
|
|
Unarchive a chat.
|
|
"""
|
|
try:
|
|
await client(
|
|
functions.messages.ToggleDialogPinRequest(
|
|
peer=await client.get_entity(chat_id), pinned=False
|
|
)
|
|
)
|
|
return f"Chat {chat_id} unarchived."
|
|
except Exception as e:
|
|
return log_and_format_error("unarchive_chat", e, chat_id=chat_id)
|
|
|
|
|
|
@mcp.tool()
|
|
async def get_sticker_sets() -> str:
|
|
"""
|
|
Get all sticker sets.
|
|
"""
|
|
try:
|
|
result = await client(functions.messages.GetAllStickersRequest(hash=0))
|
|
return json.dumps([s.title for s in result.sets], indent=2)
|
|
except Exception as e:
|
|
return log_and_format_error("get_sticker_sets", e)
|
|
|
|
|
|
@mcp.tool()
|
|
async def send_sticker(chat_id: int, file_path: str) -> str:
|
|
"""
|
|
Send a sticker to a chat. File must be a valid .webp sticker file.
|
|
Args:
|
|
chat_id: The chat ID.
|
|
file_path: Absolute path to the .webp sticker file.
|
|
"""
|
|
try:
|
|
if not os.path.isfile(file_path):
|
|
return f"Sticker file not found: {file_path}"
|
|
if not os.access(file_path, os.R_OK):
|
|
return f"Sticker file is not readable: {file_path}"
|
|
if not file_path.lower().endswith(".webp"):
|
|
return "Sticker file must be a .webp file."
|
|
entity = await client.get_entity(chat_id)
|
|
await client.send_file(entity, file_path, force_document=False)
|
|
return f"Sticker sent to chat {chat_id}."
|
|
except Exception as e:
|
|
return log_and_format_error("send_sticker", e, chat_id=chat_id, file_path=file_path)
|
|
|
|
|
|
@mcp.tool()
|
|
async def get_gif_search(query: str, limit: int = 10) -> str:
|
|
"""
|
|
Search for GIFs by query. Returns a list of Telegram document IDs (not file paths).
|
|
Args:
|
|
query: Search term for GIFs.
|
|
limit: Max number of GIFs to return.
|
|
"""
|
|
try:
|
|
# Try approach 1: SearchGifsRequest
|
|
try:
|
|
result = await client(
|
|
functions.messages.SearchGifsRequest(q=query, offset_id=0, limit=limit)
|
|
)
|
|
if not result.gifs:
|
|
return "[]"
|
|
return json.dumps(
|
|
[g.document.id for g in result.gifs], indent=2, default=json_serializer
|
|
)
|
|
except (AttributeError, ImportError):
|
|
# Fallback approach: Use SearchRequest with GIF filter
|
|
try:
|
|
from telethon.tl.types import InputMessagesFilterGif
|
|
|
|
result = await client(
|
|
functions.messages.SearchRequest(
|
|
peer="gif",
|
|
q=query,
|
|
filter=InputMessagesFilterGif(),
|
|
min_date=None,
|
|
max_date=None,
|
|
offset_id=0,
|
|
add_offset=0,
|
|
limit=limit,
|
|
max_id=0,
|
|
min_id=0,
|
|
hash=0,
|
|
)
|
|
)
|
|
if not result or not hasattr(result, "messages") or not result.messages:
|
|
return "[]"
|
|
# Extract document IDs from any messages with media
|
|
gif_ids = []
|
|
for msg in result.messages:
|
|
if hasattr(msg, "media") and msg.media and hasattr(msg.media, "document"):
|
|
gif_ids.append(msg.media.document.id)
|
|
return json.dumps(gif_ids, default=json_serializer)
|
|
except Exception as inner_e:
|
|
# Last resort: Try to fetch from a public bot
|
|
return f"Could not search GIFs using available methods: {inner_e}"
|
|
except Exception as e:
|
|
logger.exception(f"get_gif_search failed (query={query}, limit={limit})")
|
|
return log_and_format_error("get_gif_search", e, query=query, limit=limit)
|
|
|
|
|
|
@mcp.tool()
|
|
async def send_gif(chat_id: int, gif_id: int) -> str:
|
|
"""
|
|
Send a GIF to a chat by Telegram GIF document ID (not a file path).
|
|
Args:
|
|
chat_id: The chat ID.
|
|
gif_id: Telegram document ID for the GIF (from get_gif_search).
|
|
"""
|
|
try:
|
|
if not isinstance(gif_id, int):
|
|
return "gif_id must be a Telegram document ID (integer), not a file path. Use get_gif_search to find IDs."
|
|
entity = await client.get_entity(chat_id)
|
|
await client.send_file(entity, gif_id)
|
|
return f"GIF sent to chat {chat_id}."
|
|
except Exception as e:
|
|
return log_and_format_error("send_gif", e, chat_id=chat_id, gif_id=gif_id)
|
|
|
|
|
|
@mcp.tool()
|
|
async def get_bot_info(bot_username: str) -> str:
|
|
"""
|
|
Get information about a bot by username.
|
|
"""
|
|
try:
|
|
entity = await client.get_entity(bot_username)
|
|
if not entity:
|
|
return f"Bot with username {bot_username} not found."
|
|
|
|
result = await client(functions.users.GetFullUserRequest(id=entity))
|
|
|
|
# Create a more structured, serializable response
|
|
if hasattr(result, "to_dict"):
|
|
# Use custom serializer to handle non-serializable types
|
|
return json.dumps(result.to_dict(), indent=2, default=json_serializer)
|
|
else:
|
|
# Fallback if to_dict is not available
|
|
info = {
|
|
"bot_info": {
|
|
"id": entity.id,
|
|
"username": entity.username,
|
|
"first_name": entity.first_name,
|
|
"last_name": getattr(entity, "last_name", ""),
|
|
"is_bot": getattr(entity, "bot", False),
|
|
"verified": getattr(entity, "verified", False),
|
|
}
|
|
}
|
|
if hasattr(result, "full_user") and hasattr(result.full_user, "about"):
|
|
info["bot_info"]["about"] = result.full_user.about
|
|
|
|
return json.dumps(info, indent=2)
|
|
except Exception as e:
|
|
logger.exception(f"get_bot_info failed (bot_username={bot_username})")
|
|
return log_and_format_error("get_bot_info", e, bot_username=bot_username)
|
|
|
|
|
|
@mcp.tool()
|
|
async def set_bot_commands(bot_username: str, commands: list) -> str:
|
|
"""
|
|
Set bot commands for a bot you own.
|
|
Note: This function can only be used if the Telegram client is a bot account.
|
|
Regular user accounts cannot set bot commands.
|
|
|
|
Args:
|
|
bot_username: The username of the bot to set commands for.
|
|
commands: List of command dictionaries with 'command' and 'description' keys.
|
|
"""
|
|
try:
|
|
# First check if the current client is a bot
|
|
me = await client.get_me()
|
|
if not getattr(me, "bot", False):
|
|
return "Error: This function can only be used by bot accounts. Your current Telegram account is a regular user account, not a bot."
|
|
|
|
# Import required types
|
|
from telethon.tl.types import BotCommand, BotCommandScopeDefault
|
|
from telethon.tl.functions.bots import SetBotCommandsRequest
|
|
|
|
# Create BotCommand objects from the command dictionaries
|
|
bot_commands = [
|
|
BotCommand(command=c["command"], description=c["description"]) for c in commands
|
|
]
|
|
|
|
# Get the bot entity
|
|
bot = await client.get_entity(bot_username)
|
|
|
|
# Set the commands with proper scope
|
|
await client(
|
|
SetBotCommandsRequest(
|
|
scope=BotCommandScopeDefault(),
|
|
lang_code="en", # Default language code
|
|
commands=bot_commands,
|
|
)
|
|
)
|
|
|
|
return f"Bot commands set for {bot_username}."
|
|
except ImportError as ie:
|
|
logger.exception(f"set_bot_commands failed - ImportError: {ie}")
|
|
return log_and_format_error("set_bot_commands", ie)
|
|
except Exception as e:
|
|
logger.exception(f"set_bot_commands failed (bot_username={bot_username})")
|
|
return log_and_format_error("set_bot_commands", e, bot_username=bot_username)
|
|
|
|
|
|
@mcp.tool()
|
|
async def get_history(chat_id: int, limit: int = 100) -> str:
|
|
"""
|
|
Get full chat history (up to limit).
|
|
"""
|
|
try:
|
|
entity = await client.get_entity(chat_id)
|
|
messages = await client.get_messages(entity, limit=limit)
|
|
return "\n".join([f"ID: {m.id} | {m.date} | {m.message}" for m in messages])
|
|
except Exception as e:
|
|
return log_and_format_error("get_history", e, chat_id=chat_id, limit=limit)
|
|
|
|
|
|
@mcp.tool()
|
|
async def get_user_photos(user_id: int, limit: int = 10) -> str:
|
|
"""
|
|
Get profile photos of a user.
|
|
"""
|
|
try:
|
|
user = await client.get_entity(user_id)
|
|
photos = await client(
|
|
functions.photos.GetUserPhotosRequest(user_id=user, offset=0, max_id=0, limit=limit)
|
|
)
|
|
return json.dumps([p.id for p in photos.photos], indent=2)
|
|
except Exception as e:
|
|
return log_and_format_error("get_user_photos", e, user_id=user_id, limit=limit)
|
|
|
|
|
|
@mcp.tool()
|
|
async def get_user_status(user_id: int) -> str:
|
|
"""
|
|
Get the online status of a user.
|
|
"""
|
|
try:
|
|
user = await client.get_entity(user_id)
|
|
return str(user.status)
|
|
except Exception as e:
|
|
return log_and_format_error("get_user_status", e, user_id=user_id)
|
|
|
|
|
|
@mcp.tool()
|
|
async def get_recent_actions(chat_id: int) -> str:
|
|
"""
|
|
Get recent admin actions (admin log) in a group or channel.
|
|
"""
|
|
try:
|
|
result = await client(
|
|
functions.channels.GetAdminLogRequest(
|
|
channel=chat_id, q="", events_filter=None, admins=[], max_id=0, min_id=0, limit=20
|
|
)
|
|
)
|
|
|
|
if not result or not result.events:
|
|
return "No recent admin actions found."
|
|
|
|
# Use the custom serializer to handle datetime objects
|
|
return json.dumps([e.to_dict() for e in result.events], indent=2, default=json_serializer)
|
|
except Exception as e:
|
|
logger.exception(f"get_recent_actions failed (chat_id={chat_id})")
|
|
return log_and_format_error("get_recent_actions", e, chat_id=chat_id)
|
|
|
|
|
|
@mcp.tool()
|
|
async def get_pinned_messages(chat_id: int) -> str:
|
|
"""
|
|
Get all pinned messages in a chat.
|
|
"""
|
|
try:
|
|
entity = await client.get_entity(chat_id)
|
|
# Use correct filter based on Telethon version
|
|
try:
|
|
# Try newer Telethon approach
|
|
from telethon.tl.types import InputMessagesFilterPinned
|
|
|
|
messages = await client.get_messages(entity, filter=InputMessagesFilterPinned())
|
|
except (ImportError, AttributeError):
|
|
# Fallback - try without filter and manually filter pinned
|
|
all_messages = await client.get_messages(entity, limit=50)
|
|
messages = [m for m in all_messages if getattr(m, "pinned", False)]
|
|
|
|
if not messages:
|
|
return "No pinned messages found in this chat."
|
|
|
|
return "\n".join(
|
|
[f"ID: {m.id} | {m.date} | {m.message or '[Media/No text]'}" for m in messages]
|
|
)
|
|
except Exception as e:
|
|
logger.exception(f"get_pinned_messages failed (chat_id={chat_id})")
|
|
return log_and_format_error("get_pinned_messages", e, chat_id=chat_id)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
nest_asyncio.apply()
|
|
|
|
async def main() -> None:
|
|
try:
|
|
# Start the Telethon client non-interactively
|
|
print("Starting Telegram client...")
|
|
await client.start()
|
|
|
|
print("Telegram client started. Running MCP server...")
|
|
# Use the asynchronous entrypoint instead of mcp.run()
|
|
await mcp.run_stdio_async()
|
|
except Exception as e:
|
|
print(f"Error starting client: {e}", file=sys.stderr)
|
|
if isinstance(e, sqlite3.OperationalError) and "database is locked" in str(e):
|
|
print(
|
|
"Database lock detected. Please ensure no other instances are running.",
|
|
file=sys.stderr,
|
|
)
|
|
sys.exit(1)
|
|
|
|
asyncio.run(main())
|