feat: enhance error handling and logging in main.py

- Added logging setup to capture errors in mcp_errors.log.
- Improved error handling in get_chats, get_messages, send_message, and other functions to provide clearer feedback.
- Added file existence and readability checks for file-related functions.
- Updated documentation for function arguments to clarify requirements.
This commit is contained in:
anonim 2025-04-15 16:47:06 +03:00
parent 6f25a900f5
commit 710b9fd05c

94
main.py
View file

@ -15,6 +15,8 @@ from datetime import datetime, timedelta
import json
from typing import List, Dict, Optional, Union, Any
from telethon import functions
import mimetypes
import logging
load_dotenv()
@ -34,6 +36,13 @@ else:
# Use file-based session
client = TelegramClient(TELEGRAM_SESSION_NAME, TELEGRAM_API_ID, TELEGRAM_API_HASH)
# Setup logger for error reporting
logging.basicConfig(
filename='mcp_errors.log',
level=logging.ERROR,
format='%(asctime)s %(levelname)s %(name)s %(message)s'
)
logger = logging.getLogger("mcp")
def format_entity(entity) -> Dict[str, Any]:
"""Helper function to format entity information consistently."""
@ -80,11 +89,11 @@ def format_message(message) -> Dict[str, Any]:
async def get_chats(page: int = 1, page_size: int = 20) -> str:
"""
Get a paginated list of chats.
Args:
page: Page number (1-indexed).
page_size: Number of chats per page.
"""
try:
dialogs = await client.get_dialogs()
start = (page - 1) * page_size
end = start + page_size
@ -93,19 +102,20 @@ async def get_chats(page: int = 1, page_size: int = 20) -> str:
chats = dialogs[start:end]
lines = []
for dialog in chats:
# For groups or channels, use the title; for users, show first name.
entity = dialog.entity
chat_id = entity.id
title = getattr(entity, "title", None) or getattr(entity, "first_name", "Unknown")
lines.append(f"Chat ID: {chat_id}, Title: {title}")
return "\n".join(lines)
except Exception as e:
logger.exception(f"get_chats failed (page={page}, page_size={page_size})")
return "An error occurred (code: GETCHATS-ERR-001). Check mcp_errors.log for details."
@mcp.tool()
async def get_messages(chat_id: int, page: int = 1, page_size: int = 20) -> str:
"""
Get paginated messages from a specific chat.
Args:
chat_id: The ID of the chat.
page: Page number (1-indexed).
@ -113,9 +123,6 @@ async def get_messages(chat_id: int, page: int = 1, page_size: int = 20) -> str:
"""
try:
entity = await client.get_entity(chat_id)
except Exception as e:
return f"Could not resolve chat with ID {chat_id}: {e}"
offset = (page - 1) * page_size
messages = await client.get_messages(entity, limit=page_size, add_offset=offset)
if not messages:
@ -124,27 +131,26 @@ async def get_messages(chat_id: int, page: int = 1, page_size: int = 20) -> str:
for msg in messages:
lines.append(f"ID: {msg.id} | Date: {msg.date} | Message: {msg.message}")
return "\n".join(lines)
except Exception as e:
logger.exception(f"get_messages failed (chat_id={chat_id}, page={page}, page_size={page_size})")
return "An error occurred (code: GETMSGS-ERR-001). Check mcp_errors.log for details."
@mcp.tool()
async def send_message(chat_id: int, message: str) -> str:
"""
Send a message to a specific chat.
Args:
chat_id: The ID of the chat.
message: The message content to send.
"""
try:
entity = await client.get_entity(chat_id)
except Exception as e:
return f"Could not resolve chat with ID {chat_id}: {e}"
try:
await client.send_message(entity, message)
return "Message sent successfully."
except Exception as e:
return f"Failed to send message: {e}"
logger.exception(f"send_message failed (chat_id={chat_id})")
return "An error occurred (code: SENDMSG-ERR-001). Check mcp_errors.log for details."
@mcp.tool()
@ -735,10 +741,14 @@ async def send_file(chat_id: int, file_path: str, caption: str = None) -> str:
Send a file to a chat.
Args:
chat_id: The chat ID.
file_path: Path to the file to send.
file_path: Absolute path to the file to send (must exist and be readable).
caption: Optional caption for the file.
"""
try:
if not os.path.isfile(file_path):
return f"File not found: {file_path}"
if not os.access(file_path, os.R_OK):
return f"File is not readable: {file_path}"
entity = await client.get_entity(chat_id)
await client.send_file(entity, file_path, caption=caption)
return f"File sent to chat {chat_id}."
@ -753,14 +763,20 @@ async def download_media(chat_id: int, message_id: int, file_path: str) -> str:
Args:
chat_id: The chat ID.
message_id: The message ID containing the media.
file_path: Path to save the downloaded file.
file_path: Absolute path to save the downloaded file (must be writable).
"""
try:
entity = await client.get_entity(chat_id)
msg = await client.get_messages(entity, ids=message_id)
if not msg or not msg.media:
return "No media found in the specified message."
# Check if directory is writable
dir_path = os.path.dirname(file_path) or '.'
if not os.access(dir_path, os.W_OK):
return f"Directory not writable: {dir_path}"
await client.download_media(msg, file=file_path)
if not os.path.isfile(file_path):
return f"Download failed: file not created at {file_path}"
return f"Media downloaded to {file_path}."
except Exception as e:
return f"Error downloading media: {e}"
@ -1088,9 +1104,19 @@ async def import_chat_invite(hash: str) -> str:
@mcp.tool()
async def send_voice(chat_id: int, file_path: str) -> str:
"""
Send a voice message to a chat.
Send a voice message to a chat. File must be an OGG/OPUS voice note.
Args:
chat_id: The chat ID.
file_path: Absolute path to the OGG/OPUS file.
"""
try:
if not os.path.isfile(file_path):
return f"File not found: {file_path}"
if not os.access(file_path, os.R_OK):
return f"File is not readable: {file_path}"
mime, _ = mimetypes.guess_type(file_path)
if not (mime and (mime == 'audio/ogg' or file_path.lower().endswith('.ogg') or file_path.lower().endswith('.opus'))):
return "Voice file must be .ogg or .opus format."
entity = await client.get_entity(chat_id)
await client.send_file(entity, file_path, voice_note=True)
return f"Voice message sent to chat {chat_id}."
@ -1193,9 +1219,15 @@ async def reply_to_message(chat_id: int, message_id: int, text: str) -> str:
@mcp.tool()
async def upload_file(file_path: str) -> str:
"""
Upload a file to Telegram servers (returns file handle).
Upload a file to Telegram servers (returns file handle as string, not a file path).
Args:
file_path: Absolute path to the file to upload (must exist and be readable).
"""
try:
if not os.path.isfile(file_path):
return f"File not found: {file_path}"
if not os.access(file_path, os.R_OK):
return f"File is not readable: {file_path}"
file = await client.upload_file(file_path)
return str(file)
except Exception as e:
@ -1206,12 +1238,15 @@ async def upload_file(file_path: str) -> str:
async def get_media_info(chat_id: int, message_id: int) -> str:
"""
Get info about media in a message.
Args:
chat_id: The chat ID.
message_id: The message ID.
"""
try:
entity = await client.get_entity(chat_id)
msg = await client.get_messages(entity, ids=message_id)
if not msg or not msg.media:
return "No media found."
return "No media found in the specified message."
return str(msg.media)
except Exception as e:
return f"Error getting media info: {e}"
@ -1329,9 +1364,18 @@ async def get_sticker_sets() -> str:
@mcp.tool()
async def send_sticker(chat_id: int, file_path: str) -> str:
"""
Send a sticker to a chat.
Send a sticker to a chat. File must be a valid .webp sticker file.
Args:
chat_id: The chat ID.
file_path: Absolute path to the .webp sticker file.
"""
try:
if not os.path.isfile(file_path):
return f"Sticker file not found: {file_path}"
if not os.access(file_path, os.R_OK):
return f"Sticker file is not readable: {file_path}"
if not file_path.lower().endswith('.webp'):
return "Sticker file must be a .webp file."
entity = await client.get_entity(chat_id)
await client.send_file(entity, file_path, force_document=False)
return f"Sticker sent to chat {chat_id}."
@ -1342,10 +1386,15 @@ async def send_sticker(chat_id: int, file_path: str) -> str:
@mcp.tool()
async def get_gif_search(query: str, limit: int = 10) -> str:
"""
Search for GIFs by query.
Search for GIFs by query. Returns a list of Telegram document IDs (not file paths).
Args:
query: Search term for GIFs.
limit: Max number of GIFs to return.
"""
try:
result = await client(functions.messages.SearchGifsRequest(q=query, offset_id=0, limit=limit))
if not result.gifs:
return "No GIFs found for this query."
return json.dumps([g.document.id for g in result.gifs], indent=2)
except Exception as e:
return f"Error searching GIFs: {e}"
@ -1354,9 +1403,14 @@ async def get_gif_search(query: str, limit: int = 10) -> str:
@mcp.tool()
async def send_gif(chat_id: int, gif_id: int) -> str:
"""
Send a GIF to a chat by GIF document ID.
Send a GIF to a chat by Telegram GIF document ID (not a file path).
Args:
chat_id: The chat ID.
gif_id: Telegram document ID for the GIF (from get_gif_search).
"""
try:
if not isinstance(gif_id, int):
return "gif_id must be a Telegram document ID (integer), not a file path. Use get_gif_search to find IDs."
entity = await client.get_entity(chat_id)
await client.send_file(entity, gif_id)
return f"GIF sent to chat {chat_id}."