refactor: Consolidate and clean up imports in main.py

This commit is contained in:
anonim 2025-04-18 15:03:46 +03:00
parent 3faa4a006e
commit 0833e51f48

266
main.py
View file

@ -1,19 +1,27 @@
import os
import sys
import time
from dotenv import load_dotenv
import asyncio
import nest_asyncio
from mcp.server.fastmcp import FastMCP
from telethon import TelegramClient
from telethon.sessions import StringSession
import sqlite3
from telethon.tl.types import User, Chat, Channel, ChatAdminRights, ChatBannedRights, ChannelParticipantsKicked, ChannelParticipantsAdmins, InputChatPhoto, InputChatUploadedPhoto, InputChatPhotoEmpty, InputPeerUser, InputPeerChat, InputPeerChannel
from datetime import datetime, timedelta
import json
from typing import List, Dict, Optional, Union, Any
import mimetypes
import time
import asyncio
import sqlite3
import logging
import mimetypes
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Union, Any
# Third-party libraries
import nest_asyncio
from dotenv import load_dotenv
from mcp.server.fastmcp import FastMCP
from telethon import TelegramClient, functions, utils
from telethon.sessions import StringSession
from telethon.tl.types import (
User, Chat, Channel,
ChatAdminRights, ChatBannedRights,
ChannelParticipantsKicked, ChannelParticipantsAdmins,
InputChatPhoto, InputChatUploadedPhoto, InputChatPhotoEmpty,
InputPeerUser, InputPeerChat, InputPeerChannel
)
import telethon.errors.rpcerrorlist
def json_serializer(obj):
@ -60,7 +68,7 @@ try:
file_handler.setLevel(logging.ERROR)
# Create formatter and add to handlers
formatter = logging.Formatter('%(asctime)s %(levelname)s %(name)s %(message)s')
formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(name)s - %(message)s - %(filename)s:%(lineno)d')
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)
@ -73,6 +81,53 @@ except Exception as log_error:
print(f"WARNING: Error setting up log file: {log_error}")
# Fallback to console-only logging
logger.addHandler(console_handler)
logger.error(f"Failed to set up log file handler: {log_error}")
# Error code prefix mapping for better error tracing
ERROR_PREFIXES = {
"chat": "CHAT",
"msg": "MSG",
"contact": "CONTACT",
"group": "GROUP",
"media": "MEDIA",
"profile": "PROFILE",
"auth": "AUTH",
"admin": "ADMIN"
}
def log_and_format_error(function_name: str, error: Exception, prefix: str = None, **kwargs) -> str:
"""
Centralized error handling function that logs the error and returns a formatted user-friendly message.
Args:
function_name: Name of the function where error occurred
error: The exception that was raised
prefix: Error code prefix (e.g., "CHAT", "MSG") - if None, will be derived from function_name
**kwargs: Additional context parameters to include in log
Returns:
A user-friendly error message with error code
"""
# Generate a consistent error code
if prefix is None:
# Try to derive prefix from function name
for key, value in ERROR_PREFIXES.items():
if key in function_name.lower():
prefix = value
break
if prefix is None:
prefix = "GEN" # Generic prefix if none matches
error_code = f"{prefix}-ERR-{abs(hash(function_name)) % 1000:03d}"
# Format the additional context parameters
context = ", ".join(f"{k}={v}" for k, v in kwargs.items())
# Log the full technical error
logger.exception(f"{function_name} failed ({context}): {error}")
# Return a user-friendly message
return f"An error occurred (code: {error_code}). Check mcp_errors.log for details."
def format_entity(entity) -> Dict[str, Any]:
"""Helper function to format entity information consistently."""
@ -138,8 +193,7 @@ async def get_chats(page: int = 1, page_size: int = 20) -> str:
lines.append(f"Chat ID: {chat_id}, Title: {title}")
return "\n".join(lines)
except Exception as e:
logger.exception(f"get_chats failed (page={page}, page_size={page_size})")
return "An error occurred (code: GETCHATS-ERR-001). Check mcp_errors.log for details."
return log_and_format_error("get_chats", e)
@mcp.tool()
@ -162,8 +216,7 @@ async def get_messages(chat_id: int, page: int = 1, page_size: int = 20) -> str:
lines.append(f"ID: {msg.id} | Date: {msg.date} | Message: {msg.message}")
return "\n".join(lines)
except Exception as e:
logger.exception(f"get_messages failed (chat_id={chat_id}, page={page}, page_size={page_size})")
return "An error occurred (code: GETMSGS-ERR-001). Check mcp_errors.log for details."
return log_and_format_error("get_messages", e, chat_id=chat_id, page=page, page_size=page_size)
@mcp.tool()
@ -179,8 +232,7 @@ async def send_message(chat_id: int, message: str) -> str:
await client.send_message(entity, message)
return "Message sent successfully."
except Exception as e:
logger.exception(f"send_message failed (chat_id={chat_id})")
return "An error occurred (code: SENDMSG-ERR-001). Check mcp_errors.log for details."
return log_and_format_error("send_message", e, chat_id=chat_id)
@mcp.tool()
@ -206,7 +258,7 @@ async def list_contacts() -> str:
lines.append(contact_info)
return "\n".join(lines)
except Exception as e:
return f"Error listing contacts: {e}"
return log_and_format_error("list_contacts", e)
@mcp.tool()
@ -234,7 +286,7 @@ async def search_contacts(query: str) -> str:
lines.append(contact_info)
return "\n".join(lines)
except Exception as e:
return f"Error searching contacts: {e}"
return log_and_format_error("search_contacts", e, query=query)
@mcp.tool()
@ -248,7 +300,7 @@ async def get_contact_ids() -> str:
return "No contact IDs found."
return "Contact IDs: " + ", ".join(str(cid) for cid in result)
except Exception as e:
return f"Error getting contact IDs: {e}"
return log_and_format_error("get_contact_ids", e)
@mcp.tool()
@ -332,8 +384,7 @@ async def list_messages(chat_id: int, limit: int = 20, search_query: str = None,
return "\n".join(lines)
except Exception as e:
logger.exception(f"list_messages failed (chat_id={chat_id})")
return f"Error retrieving messages: {e}"
return log_and_format_error("list_messages", e, chat_id=chat_id)
@mcp.tool()
@ -394,7 +445,7 @@ async def list_chats(chat_type: str = None, limit: int = 20) -> str:
return "\n".join(results)
except Exception as e:
return f"Error listing chats: {e}"
return log_and_format_error("list_chats", e, chat_type=chat_type, limit=limit)
@mcp.tool()
@ -470,8 +521,7 @@ async def get_chat(chat_id: int) -> str:
return "\n".join(result)
except Exception as e:
logger.exception(f"get_chat failed (chat_id={chat_id})")
return f"Error getting chat info: {e}"
return log_and_format_error("get_chat", e, chat_id=chat_id)
@mcp.tool()
@ -518,7 +568,7 @@ async def get_direct_chat_by_contact(contact_query: str) -> str:
return f"Found contacts: {found_names}, but no direct chats were found with them."
return "\n".join(results)
except Exception as e:
return f"Error finding direct chat: {e}"
return log_and_format_error("get_direct_chat_by_contact", e, contact_query=contact_query)
@mcp.tool()
@ -568,7 +618,7 @@ async def get_contact_chats(contact_id: int) -> str:
return f"Chats with {contact_name} (ID: {contact_id}):\n" + "\n".join(results)
except Exception as e:
return f"Error retrieving contact chats: {e}"
return log_and_format_error("get_contact_chats", e, contact_id=contact_id)
@mcp.tool()
@ -602,7 +652,7 @@ async def get_last_interaction(contact_id: int) -> str:
return "\n".join(results)
except Exception as e:
return f"Error retrieving last interaction: {e}"
return log_and_format_error("get_last_interaction", e, contact_id=contact_id)
@mcp.tool()
@ -652,7 +702,7 @@ async def get_message_context(chat_id: int, message_id: int, context_size: int =
results.append(f"ID: {msg.id} | {sender_name} | {msg.date}{highlight}\n{msg.message or '[Media/No text]'}\n")
return "\n".join(results)
except Exception as e:
return f"Error retrieving message context: {e}"
return log_and_format_error("get_message_context", e, chat_id=chat_id, message_id=message_id, context_size=context_size)
@mcp.tool()
@ -699,10 +749,10 @@ async def add_contact(phone: str, first_name: str, last_name: str = "") -> str:
return f"Contact not added. Alternative method response: {str(result)}"
except Exception as alt_e:
logger.exception(f"add_contact (alt method) failed (phone={phone})")
return f"Error adding contact (alternative method): {alt_e}"
return log_and_format_error("add_contact", alt_e, phone=phone)
except Exception as e:
logger.exception(f"add_contact failed (phone={phone})")
return f"Error adding contact: {e}"
return log_and_format_error("add_contact", e, phone=phone)
@mcp.tool()
@ -717,7 +767,7 @@ async def delete_contact(user_id: int) -> str:
await client(functions.contacts.DeleteContactsRequest(id=[user]))
return f"Contact with user ID {user_id} deleted."
except Exception as e:
return f"Error deleting contact: {e}"
return log_and_format_error("delete_contact", e, user_id=user_id)
@mcp.tool()
@ -732,7 +782,7 @@ async def block_user(user_id: int) -> str:
await client(functions.contacts.BlockRequest(id=user))
return f"User {user_id} blocked."
except Exception as e:
return f"Error blocking user: {e}"
return log_and_format_error("block_user", e, user_id=user_id)
@mcp.tool()
@ -747,7 +797,7 @@ async def unblock_user(user_id: int) -> str:
await client(functions.contacts.UnblockRequest(id=user))
return f"User {user_id} unblocked."
except Exception as e:
return f"Error unblocking user: {e}"
return log_and_format_error("unblock_user", e, user_id=user_id)
@mcp.tool()
@ -759,7 +809,7 @@ async def get_me() -> str:
me = await client.get_me()
return json.dumps(format_entity(me), indent=2)
except Exception as e:
return f"Error getting your info: {e}"
return log_and_format_error("get_me", e)
@mcp.tool()
@ -820,7 +870,7 @@ async def create_group(title: str, user_ids: list) -> str:
raise # Let the outer exception handler catch it
except Exception as e:
logger.exception(f"create_group failed (title={title}, user_ids={user_ids})")
return f"Error creating group: {e}"
return log_and_format_error("create_group", e, title=title, user_ids=user_ids)
@mcp.tool()
@ -861,11 +911,11 @@ async def invite_to_group(group_id: int, user_ids: list) -> str:
except telethon.errors.rpcerrorlist.UserPrivacyRestrictedError:
return "Error: One or more users have privacy settings that prevent you from adding them."
except Exception as e:
return f"Error inviting users: {e}"
return log_and_format_error("invite_to_group", e, group_id=group_id, user_ids=user_ids)
except Exception as e:
logger.error(f"telegram_mcp invite_to_group failed (group_id={group_id}, user_ids={user_ids})", exc_info=True)
return f"Error: {e}"
return log_and_format_error("invite_to_group", e, group_id=group_id, user_ids=user_ids)
@mcp.tool()
@ -887,7 +937,7 @@ async def leave_chat(chat_id: int) -> str:
chat_name = getattr(entity, 'title', str(chat_id))
return f"Left channel/supergroup {chat_name} (ID: {chat_id})."
except Exception as chan_err:
return f"Error leaving channel: {chan_err}"
return log_and_format_error("leave_chat", chan_err, chat_id=chat_id)
elif isinstance(entity, Chat):
# Traditional basic groups (not supergroups)
@ -914,11 +964,11 @@ async def leave_chat(chat_id: int) -> str:
chat_name = getattr(entity, 'title', str(chat_id))
return f"Left basic group {chat_name} (ID: {chat_id})."
except Exception as alt_err:
return f"Error leaving basic group: {alt_err}"
return log_and_format_error("leave_chat", alt_err, chat_id=chat_id)
else:
# Cannot leave a user chat this way
entity_type = type(entity).__name__
return f"Cannot leave chat ID {chat_id} of type {entity_type}. This function is for groups and channels only."
return log_and_format_error("leave_chat", Exception(f"Cannot leave chat ID {chat_id} of type {entity_type}. This function is for groups and channels only."), chat_id=chat_id)
except Exception as e:
logger.exception(f"leave_chat failed (chat_id={chat_id})")
@ -926,9 +976,9 @@ async def leave_chat(chat_id: int) -> str:
# Provide helpful hint for common errors
error_str = str(e).lower()
if "invalid" in error_str and "chat" in error_str:
return "Error: This appears to be a channel/supergroup. Please check the chat ID and try again."
return log_and_format_error("leave_chat", Exception(f"Error leaving chat: This appears to be a channel/supergroup. Please check the chat ID and try again."), chat_id=chat_id)
return f"Error leaving chat: {e}"
return log_and_format_error("leave_chat", e, chat_id=chat_id)
@mcp.tool()
@ -943,7 +993,7 @@ async def get_participants(chat_id: int) -> str:
lines = [f"ID: {p.id}, Name: {getattr(p, 'first_name', '')} {getattr(p, 'last_name', '')}" for p in participants]
return "\n".join(lines)
except Exception as e:
return f"Error getting participants: {e}"
return log_and_format_error("get_participants", e, chat_id=chat_id)
@mcp.tool()
@ -964,7 +1014,7 @@ async def send_file(chat_id: int, file_path: str, caption: str = None) -> str:
await client.send_file(entity, file_path, caption=caption)
return f"File sent to chat {chat_id}."
except Exception as e:
return f"Error sending file: {e}"
return log_and_format_error("send_file", e, chat_id=chat_id, file_path=file_path, caption=caption)
@mcp.tool()
@ -990,7 +1040,7 @@ async def download_media(chat_id: int, message_id: int, file_path: str) -> str:
return f"Download failed: file not created at {file_path}"
return f"Media downloaded to {file_path}."
except Exception as e:
return f"Error downloading media: {e}"
return log_and_format_error("download_media", e, chat_id=chat_id, message_id=message_id, file_path=file_path)
@mcp.tool()
@ -1006,7 +1056,7 @@ async def update_profile(first_name: str = None, last_name: str = None, about: s
))
return "Profile updated."
except Exception as e:
return f"Error updating profile: {e}"
return log_and_format_error("update_profile", e, first_name=first_name, last_name=last_name, about=about)
@mcp.tool()
@ -1020,7 +1070,7 @@ async def set_profile_photo(file_path: str) -> str:
))
return "Profile photo updated."
except Exception as e:
return f"Error setting profile photo: {e}"
return log_and_format_error("set_profile_photo", e, file_path=file_path)
@mcp.tool()
@ -1035,7 +1085,7 @@ async def delete_profile_photo() -> str:
await client(functions.photos.DeletePhotosRequest(id=[photos.photos[0].id]))
return "Profile photo deleted."
except Exception as e:
return f"Error deleting profile photo: {e}"
return log_and_format_error("delete_profile_photo", e)
@mcp.tool()
@ -1059,7 +1109,7 @@ async def get_privacy_settings() -> str:
raise
except Exception as e:
logger.exception("get_privacy_settings failed")
return f"Error getting privacy settings: {e}"
return log_and_format_error("get_privacy_settings", e)
@mcp.tool()
@ -1119,7 +1169,7 @@ async def set_privacy_settings(key: str, allow_users: list = None, disallow_user
rules.append(InputPrivacyValueAllowUsers(users=allow_entities))
except Exception as allow_err:
logger.error(f"Error processing allowed users: {allow_err}")
return f"Error processing allowed users: {allow_err}"
return log_and_format_error("set_privacy_settings", allow_err, key=key)
# Process disallow rules
if disallow_users and len(disallow_users) > 0:
@ -1136,7 +1186,7 @@ async def set_privacy_settings(key: str, allow_users: list = None, disallow_user
rules.append(InputPrivacyValueDisallowUsers(users=disallow_entities))
except Exception as disallow_err:
logger.error(f"Error processing disallowed users: {disallow_err}")
return f"Error processing disallowed users: {disallow_err}"
return log_and_format_error("set_privacy_settings", disallow_err, key=key)
# Apply the privacy settings
try:
@ -1152,7 +1202,7 @@ async def set_privacy_settings(key: str, allow_users: list = None, disallow_user
raise
except Exception as e:
logger.exception(f"set_privacy_settings failed (key={key})")
return f"Error setting privacy settings: {e}"
return log_and_format_error("set_privacy_settings", e, key=key)
@mcp.tool()
@ -1165,7 +1215,7 @@ async def import_contacts(contacts: list) -> str:
result = await client(functions.contacts.ImportContactsRequest(contacts=input_contacts))
return f"Imported {len(result.imported)} contacts."
except Exception as e:
return f"Error importing contacts: {e}"
return log_and_format_error("import_contacts", e, contacts=contacts)
@mcp.tool()
@ -1178,7 +1228,7 @@ async def export_contacts() -> str:
users = result.users
return json.dumps([format_entity(u) for u in users], indent=2)
except Exception as e:
return f"Error exporting contacts: {e}"
return log_and_format_error("export_contacts", e)
@mcp.tool()
@ -1190,7 +1240,7 @@ async def get_blocked_users() -> str:
result = await client(functions.contacts.GetBlockedRequest(offset=0, limit=100))
return json.dumps([format_entity(u) for u in result.users], indent=2)
except Exception as e:
return f"Error getting blocked users: {e}"
return log_and_format_error("get_blocked_users", e)
@mcp.tool()
@ -1206,7 +1256,7 @@ async def create_channel(title: str, about: str = "", megagroup: bool = False) -
))
return f"Channel '{title}' created with ID: {result.chats[0].id}"
except Exception as e:
return f"Error creating channel: {e}"
return log_and_format_error("create_channel", e, title=title, about=about, megagroup=megagroup)
@mcp.tool()
@ -1225,7 +1275,7 @@ async def edit_chat_title(chat_id: int, title: str) -> str:
return f"Chat {chat_id} title updated to '{title}'."
except Exception as e:
logger.exception(f"edit_chat_title failed (chat_id={chat_id}, title='{title}')")
return f"Error editing chat title: {e}"
return log_and_format_error("edit_chat_title", e, chat_id=chat_id, title=title)
@mcp.tool()
@ -1256,7 +1306,7 @@ async def edit_chat_photo(chat_id: int, file_path: str) -> str:
return f"Chat {chat_id} photo updated."
except Exception as e:
logger.exception(f"edit_chat_photo failed (chat_id={chat_id}, file_path='{file_path}')")
return f"Error editing chat photo: {e}"
return log_and_format_error("edit_chat_photo", e, chat_id=chat_id, file_path=file_path)
@mcp.tool()
@ -1278,7 +1328,7 @@ async def delete_chat_photo(chat_id: int) -> str:
return f"Chat {chat_id} photo deleted."
except Exception as e:
logger.exception(f"delete_chat_photo failed (chat_id={chat_id})")
return f"Error deleting chat photo: {e}"
return log_and_format_error("delete_chat_photo", e, chat_id=chat_id)
@mcp.tool()
@ -1336,11 +1386,11 @@ async def promote_admin(group_id: int, user_id: int, rights: dict = None) -> str
except telethon.errors.rpcerrorlist.UserNotMutualContactError:
return "Error: Cannot promote users who are not mutual contacts. Please ensure the user is in your contacts and has added you back."
except Exception as e:
return f"Error promoting user to admin: {e}"
return log_and_format_error("promote_admin", e, group_id=group_id, user_id=user_id)
except Exception as e:
logger.error(f"telegram_mcp promote_admin failed (group_id={group_id}, user_id={user_id})", exc_info=True)
return f"Error: {str(e)}"
return log_and_format_error("promote_admin", e, group_id=group_id, user_id=user_id)
@mcp.tool()
@ -1382,11 +1432,11 @@ async def demote_admin(group_id: int, user_id: int) -> str:
except telethon.errors.rpcerrorlist.UserNotMutualContactError:
return "Error: Cannot modify admin status of users who are not mutual contacts. Please ensure the user is in your contacts and has added you back."
except Exception as e:
return f"Error demoting admin: {e}"
return log_and_format_error("demote_admin", e, group_id=group_id, user_id=user_id)
except Exception as e:
logger.error(f"telegram_mcp demote_admin failed (group_id={group_id}, user_id={user_id})", exc_info=True)
return f"Error: {str(e)}"
return log_and_format_error("demote_admin", e, group_id=group_id, user_id=user_id)
@mcp.tool()
@ -1429,10 +1479,10 @@ async def ban_user(chat_id: int, user_id: int) -> str:
except telethon.errors.rpcerrorlist.UserNotMutualContactError:
return "Error: Cannot ban users who are not mutual contacts. Please ensure the user is in your contacts and has added you back."
except Exception as e:
return f"Error banning user: {e}"
return log_and_format_error("ban_user", e, chat_id=chat_id, user_id=user_id)
except Exception as e:
logger.exception(f"ban_user failed (chat_id={chat_id}, user_id={user_id})")
return f"Error: {e}"
return log_and_format_error("ban_user", e, chat_id=chat_id, user_id=user_id)
@mcp.tool()
@ -1475,10 +1525,10 @@ async def unban_user(chat_id: int, user_id: int) -> str:
except telethon.errors.rpcerrorlist.UserNotMutualContactError:
return "Error: Cannot modify status of users who are not mutual contacts. Please ensure the user is in your contacts and has added you back."
except Exception as e:
return f"Error unbanning user: {e}"
return log_and_format_error("unban_user", e, chat_id=chat_id, user_id=user_id)
except Exception as e:
logger.exception(f"unban_user failed (chat_id={chat_id}, user_id={user_id})")
return f"Error: {e}"
return log_and_format_error("unban_user", e, chat_id=chat_id, user_id=user_id)
@mcp.tool()
@ -1493,7 +1543,7 @@ async def get_admins(chat_id: int) -> str:
return "\n".join(lines) if lines else "No admins found."
except Exception as e:
logger.exception(f"get_admins failed (chat_id={chat_id})")
return f"Error getting admins: {e}"
return log_and_format_error("get_admins", e, chat_id=chat_id)
@mcp.tool()
@ -1508,7 +1558,7 @@ async def get_banned_users(chat_id: int) -> str:
return "\n".join(lines) if lines else "No banned users found."
except Exception as e:
logger.exception(f"get_banned_users failed (chat_id={chat_id})")
return f"Error getting banned users: {e}"
return log_and_format_error("get_banned_users", e, chat_id=chat_id)
@mcp.tool()
@ -1554,7 +1604,7 @@ async def get_invite_link(chat_id: int) -> str:
return "Could not retrieve invite link for this chat."
except Exception as e:
logger.exception(f"get_invite_link failed (chat_id={chat_id})")
return f"Error getting invite link: {e}"
return log_and_format_error("get_invite_link", e, chat_id=chat_id)
@mcp.tool()
@ -1610,7 +1660,7 @@ async def join_chat_by_link(link: str) -> str:
raise # Re-raise to be caught by the outer exception handler
except Exception as e:
logger.exception(f"join_chat_by_link failed (link={link})")
return f"Error joining chat: {e}"
return log_and_format_error("join_chat_by_link", e, link=link)
@mcp.tool()
@ -1641,10 +1691,10 @@ async def export_chat_invite(chat_id: int) -> str:
return invite_link
except Exception as e2:
logger.warning(f"export_chat_invite_link failed: {e2}")
return f"Could not export chat invite: {e2}"
return log_and_format_error("export_chat_invite", e2, chat_id=chat_id)
except Exception as e:
logger.exception(f"export_chat_invite failed (chat_id={chat_id})")
return f"Error exporting chat invite: {e}"
return log_and_format_error("export_chat_invite", e, chat_id=chat_id)
@mcp.tool()
@ -1696,7 +1746,7 @@ async def import_chat_invite(hash: str) -> str:
raise # Re-raise to be caught by the outer exception handler
except Exception as e:
logger.exception(f"import_chat_invite failed (hash={hash})")
return f"Error importing chat invite: {e}"
return log_and_format_error("import_chat_invite", e, hash=hash)
@mcp.tool()
@ -1719,7 +1769,7 @@ async def send_voice(chat_id: int, file_path: str) -> str:
await client.send_file(entity, file_path, voice_note=True)
return f"Voice message sent to chat {chat_id}."
except Exception as e:
return f"Error sending voice: {e}"
return log_and_format_error("send_voice", e, chat_id=chat_id, file_path=file_path)
@mcp.tool()
@ -1733,7 +1783,7 @@ async def forward_message(from_chat_id: int, message_id: int, to_chat_id: int) -
await client.forward_messages(to_entity, message_id, from_entity)
return f"Message {message_id} forwarded from {from_chat_id} to {to_chat_id}."
except Exception as e:
return f"Error forwarding message: {e}"
return log_and_format_error("forward_message", e, from_chat_id=from_chat_id, message_id=message_id, to_chat_id=to_chat_id)
@mcp.tool()
@ -1746,7 +1796,7 @@ async def edit_message(chat_id: int, message_id: int, new_text: str) -> str:
await client.edit_message(entity, message_id, new_text)
return f"Message {message_id} edited."
except Exception as e:
return f"Error editing message: {e}"
return log_and_format_error("edit_message", e, chat_id=chat_id, message_id=message_id, new_text=new_text)
@mcp.tool()
@ -1759,7 +1809,7 @@ async def delete_message(chat_id: int, message_id: int) -> str:
await client.delete_messages(entity, message_id)
return f"Message {message_id} deleted."
except Exception as e:
return f"Error deleting message: {e}"
return log_and_format_error("delete_message", e, chat_id=chat_id, message_id=message_id)
@mcp.tool()
@ -1772,7 +1822,7 @@ async def pin_message(chat_id: int, message_id: int) -> str:
await client.pin_message(entity, message_id)
return f"Message {message_id} pinned in chat {chat_id}."
except Exception as e:
return f"Error pinning message: {e}"
return log_and_format_error("pin_message", e, chat_id=chat_id, message_id=message_id)
@mcp.tool()
@ -1785,7 +1835,7 @@ async def unpin_message(chat_id: int, message_id: int) -> str:
await client.unpin_message(entity, message_id)
return f"Message {message_id} unpinned in chat {chat_id}."
except Exception as e:
return f"Error unpinning message: {e}"
return log_and_format_error("unpin_message", e, chat_id=chat_id, message_id=message_id)
@mcp.tool()
@ -1798,7 +1848,7 @@ async def mark_as_read(chat_id: int) -> str:
await client.send_read_acknowledge(entity)
return f"Marked all messages as read in chat {chat_id}."
except Exception as e:
return f"Error marking as read: {e}"
return log_and_format_error("mark_as_read", e, chat_id=chat_id)
@mcp.tool()
@ -1811,7 +1861,7 @@ async def reply_to_message(chat_id: int, message_id: int, text: str) -> str:
await client.send_message(entity, text, reply_to=message_id)
return f"Replied to message {message_id} in chat {chat_id}."
except Exception as e:
return f"Error replying to message: {e}"
return log_and_format_error("reply_to_message", e, chat_id=chat_id, message_id=message_id, text=text)
@mcp.tool()
@ -1829,7 +1879,7 @@ async def upload_file(file_path: str) -> str:
file = await client.upload_file(file_path)
return str(file)
except Exception as e:
return f"Error uploading file: {e}"
return log_and_format_error("upload_file", e, file_path=file_path)
@mcp.tool()
@ -1847,7 +1897,7 @@ async def get_media_info(chat_id: int, message_id: int) -> str:
return "No media found in the specified message."
return str(msg.media)
except Exception as e:
return f"Error getting media info: {e}"
return log_and_format_error("get_media_info", e, chat_id=chat_id, message_id=message_id)
@mcp.tool()
@ -1859,7 +1909,7 @@ async def search_public_chats(query: str) -> str:
result = await client(functions.contacts.SearchRequest(q=query, limit=20))
return json.dumps([format_entity(u) for u in result.users], indent=2)
except Exception as e:
return f"Error searching public chats: {e}"
return log_and_format_error("search_public_chats", e, query=query)
@mcp.tool()
@ -1872,7 +1922,7 @@ async def search_messages(chat_id: int, query: str, limit: int = 20) -> str:
messages = await client.get_messages(entity, limit=limit, search=query)
return "\n".join([f"ID: {m.id} | {m.date} | {m.message}" for m in messages])
except Exception as e:
return f"Error searching messages: {e}"
return log_and_format_error("search_messages", e, chat_id=chat_id, query=query, limit=limit)
@mcp.tool()
@ -1884,7 +1934,7 @@ async def resolve_username(username: str) -> str:
result = await client(functions.contacts.ResolveUsernameRequest(username=username))
return str(result)
except Exception as e:
return f"Error resolving username: {e}"
return log_and_format_error("resolve_username", e, username=username)
@mcp.tool()
@ -1916,10 +1966,10 @@ async def mute_chat(chat_id: int) -> str:
return f"Chat {chat_id} muted (using alternative method)."
except Exception as alt_e:
logger.exception(f"mute_chat (alt method) failed (chat_id={chat_id})")
return f"Error muting chat (alternative method): {alt_e}"
return log_and_format_error("mute_chat", alt_e, chat_id=chat_id)
except Exception as e:
logger.exception(f"mute_chat failed (chat_id={chat_id})")
return f"Error muting chat: {e}"
return log_and_format_error("mute_chat", e, chat_id=chat_id)
@mcp.tool()
@ -1951,10 +2001,10 @@ async def unmute_chat(chat_id: int) -> str:
return f"Chat {chat_id} unmuted (using alternative method)."
except Exception as alt_e:
logger.exception(f"unmute_chat (alt method) failed (chat_id={chat_id})")
return f"Error unmuting chat (alternative method): {alt_e}"
return log_and_format_error("unmute_chat", alt_e, chat_id=chat_id)
except Exception as e:
logger.exception(f"unmute_chat failed (chat_id={chat_id})")
return f"Error unmuting chat: {e}"
return log_and_format_error("unmute_chat", e, chat_id=chat_id)
@mcp.tool()
@ -1969,7 +2019,7 @@ async def archive_chat(chat_id: int) -> str:
))
return f"Chat {chat_id} archived."
except Exception as e:
return f"Error archiving chat: {e}"
return log_and_format_error("archive_chat", e, chat_id=chat_id)
@mcp.tool()
@ -1984,7 +2034,7 @@ async def unarchive_chat(chat_id: int) -> str:
))
return f"Chat {chat_id} unarchived."
except Exception as e:
return f"Error unarchiving chat: {e}"
return log_and_format_error("unarchive_chat", e, chat_id=chat_id)
@mcp.tool()
@ -1996,7 +2046,7 @@ async def get_sticker_sets() -> str:
result = await client(functions.messages.GetAllStickersRequest(hash=0))
return json.dumps([s.title for s in result.sets], indent=2)
except Exception as e:
return f"Error getting sticker sets: {e}"
return log_and_format_error("get_sticker_sets", e)
@mcp.tool()
@ -2018,7 +2068,7 @@ async def send_sticker(chat_id: int, file_path: str) -> str:
await client.send_file(entity, file_path, force_document=False)
return f"Sticker sent to chat {chat_id}."
except Exception as e:
return f"Error sending sticker: {e}"
return log_and_format_error("send_sticker", e, chat_id=chat_id, file_path=file_path)
@mcp.tool()
@ -2058,7 +2108,7 @@ async def get_gif_search(query: str, limit: int = 10) -> str:
return f"Could not search GIFs using available methods: {inner_e}"
except Exception as e:
logger.exception(f"get_gif_search failed (query={query}, limit={limit})")
return f"Error searching GIFs: {e}"
return log_and_format_error("get_gif_search", e, query=query, limit=limit)
@mcp.tool()
@ -2076,7 +2126,7 @@ async def send_gif(chat_id: int, gif_id: int) -> str:
await client.send_file(entity, gif_id)
return f"GIF sent to chat {chat_id}."
except Exception as e:
return f"Error sending GIF: {e}"
return log_and_format_error("send_gif", e, chat_id=chat_id, gif_id=gif_id)
@mcp.tool()
@ -2113,7 +2163,7 @@ async def get_bot_info(bot_username: str) -> str:
return json.dumps(info, indent=2)
except Exception as e:
logger.exception(f"get_bot_info failed (bot_username={bot_username})")
return f"Error getting bot info: {e}"
return log_and_format_error("get_bot_info", e, bot_username=bot_username)
@mcp.tool()
@ -2156,10 +2206,10 @@ async def set_bot_commands(bot_username: str, commands: list) -> str:
return f"Bot commands set for {bot_username}."
except ImportError as ie:
logger.exception(f"set_bot_commands failed - ImportError: {ie}")
return f"Error: Your Telethon version doesn't support SetBotCommandsRequest. Please update Telethon."
return log_and_format_error("set_bot_commands", ie)
except Exception as e:
logger.exception(f"set_bot_commands failed (bot_username={bot_username})")
return f"Error setting bot commands: {e}"
return log_and_format_error("set_bot_commands", e, bot_username=bot_username)
@mcp.tool()
@ -2172,7 +2222,7 @@ async def get_history(chat_id: int, limit: int = 100) -> str:
messages = await client.get_messages(entity, limit=limit)
return "\n".join([f"ID: {m.id} | {m.date} | {m.message}" for m in messages])
except Exception as e:
return f"Error getting history: {e}"
return log_and_format_error("get_history", e, chat_id=chat_id, limit=limit)
@mcp.tool()
@ -2185,7 +2235,7 @@ async def get_user_photos(user_id: int, limit: int = 10) -> str:
photos = await client(functions.photos.GetUserPhotosRequest(user_id=user, offset=0, max_id=0, limit=limit))
return json.dumps([p.id for p in photos.photos], indent=2)
except Exception as e:
return f"Error getting user photos: {e}"
return log_and_format_error("get_user_photos", e, user_id=user_id, limit=limit)
@mcp.tool()
@ -2197,7 +2247,7 @@ async def get_user_status(user_id: int) -> str:
user = await client.get_entity(user_id)
return str(user.status)
except Exception as e:
return f"Error getting user status: {e}"
return log_and_format_error("get_user_status", e, user_id=user_id)
@mcp.tool()
@ -2223,7 +2273,7 @@ async def get_recent_actions(chat_id: int) -> str:
return json.dumps([e.to_dict() for e in result.events], indent=2, default=json_serializer)
except Exception as e:
logger.exception(f"get_recent_actions failed (chat_id={chat_id})")
return f"Error getting recent actions: {e}"
return log_and_format_error("get_recent_actions", e, chat_id=chat_id)
@mcp.tool()
@ -2249,7 +2299,7 @@ async def get_pinned_messages(chat_id: int) -> str:
return "\n".join([f"ID: {m.id} | {m.date} | {m.message or '[Media/No text]'}" for m in messages])
except Exception as e:
logger.exception(f"get_pinned_messages failed (chat_id={chat_id})")
return f"Error getting pinned messages: {e}"
return log_and_format_error("get_pinned_messages", e, chat_id=chat_id)
if __name__ == "__main__":