style: format code with black

This commit is contained in:
Korzhavin Ivan 2025-10-20 01:06:06 +02:00
parent 80245bcdc9
commit 994a49faba
2 changed files with 107 additions and 236 deletions

332
main.py
View file

@ -66,9 +66,7 @@ mcp = FastMCP("telegram")
if SESSION_STRING: if SESSION_STRING:
# Use the string session if available # Use the string session if available
client = TelegramClient( client = TelegramClient(StringSession(SESSION_STRING), TELEGRAM_API_ID, TELEGRAM_API_HASH)
StringSession(SESSION_STRING), TELEGRAM_API_ID, TELEGRAM_API_HASH
)
else: else:
# Use file-based session # Use file-based session
client = TelegramClient(TELEGRAM_SESSION_NAME, TELEGRAM_API_ID, TELEGRAM_API_HASH) client = TelegramClient(TELEGRAM_SESSION_NAME, TELEGRAM_API_ID, TELEGRAM_API_HASH)
@ -79,9 +77,7 @@ logger.setLevel(logging.ERROR) # Set to ERROR for production, INFO for debuggin
# Create console handler # Create console handler
console_handler = logging.StreamHandler() console_handler = logging.StreamHandler()
console_handler.setLevel( console_handler.setLevel(logging.ERROR) # Set to ERROR for production, INFO for debugging
logging.ERROR
) # Set to ERROR for production, INFO for debugging
# Create file handler with absolute path # Create file handler with absolute path
script_dir = os.path.dirname(os.path.abspath(__file__)) script_dir = os.path.dirname(os.path.abspath(__file__))
@ -93,9 +89,7 @@ try:
# Create formatters # Create formatters
# Console formatter remains in the old format # Console formatter remains in the old format
console_formatter = logging.Formatter( console_formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(name)s - %(message)s")
"%(asctime)s [%(levelname)s] %(name)s - %(message)s"
)
console_handler.setFormatter(console_formatter) console_handler.setFormatter(console_formatter)
# File formatter is now JSON # File formatter is now JSON
@ -163,18 +157,14 @@ def log_and_format_error(
prefix = category prefix = category
break break
prefix_str = ( prefix_str = prefix.value if isinstance(prefix, ErrorCategory) else (prefix or "GEN")
prefix.value if isinstance(prefix, ErrorCategory) else (prefix or "GEN")
)
error_code = f"{prefix_str}-ERR-{abs(hash(function_name)) % 1000:03d}" error_code = f"{prefix_str}-ERR-{abs(hash(function_name)) % 1000:03d}"
# Format the additional context parameters # Format the additional context parameters
context = ", ".join(f"{k}={v}" for k, v in kwargs.items()) context = ", ".join(f"{k}={v}" for k, v in kwargs.items())
# Log the full technical error # Log the full technical error
logger.error( logger.error(f"Error in {function_name} ({context}) - Code: {error_code}", exc_info=True)
f"Error in {function_name} ({context}) - Code: {error_code}", exc_info=True
)
# Return a user-friendly message # Return a user-friendly message
if user_message: if user_message:
@ -249,9 +239,7 @@ def validate_id(*param_names_to_validate):
validated_list.append(validated_item) validated_list.append(validated_item)
kwargs[param_name] = validated_list kwargs[param_name] = validated_list
else: else:
validated_value, error_msg = validate_single_id( validated_value, error_msg = validate_single_id(param_value, param_name)
param_value, param_name
)
if error_msg: if error_msg:
return log_and_format_error( return log_and_format_error(
func.__name__, func.__name__,
@ -347,9 +335,7 @@ async def get_chats(page: int = 1, page_size: int = 20) -> str:
for dialog in chats: for dialog in chats:
entity = dialog.entity entity = dialog.entity
chat_id = entity.id chat_id = entity.id
title = getattr(entity, "title", None) or getattr( title = getattr(entity, "title", None) or getattr(entity, "first_name", "Unknown")
entity, "first_name", "Unknown"
)
lines.append(f"Chat ID: {chat_id}, Title: {title}") lines.append(f"Chat ID: {chat_id}, Title: {title}")
return "\n".join(lines) return "\n".join(lines)
except Exception as e: except Exception as e:
@ -358,9 +344,7 @@ async def get_chats(page: int = 1, page_size: int = 20) -> str:
@mcp.tool() @mcp.tool()
@validate_id("chat_id") @validate_id("chat_id")
async def get_messages( async def get_messages(chat_id: Union[int, str], page: int = 1, page_size: int = 20) -> str:
chat_id: Union[int, str], page: int = 1, page_size: int = 20
) -> str:
""" """
Get paginated messages from a specific chat. Get paginated messages from a specific chat.
Args: Args:
@ -768,9 +752,7 @@ async def get_chat(chat_id: Union[int, str]) -> str:
if hasattr(entity, "title"): if hasattr(entity, "title"):
result.append(f"Title: {entity.title}") result.append(f"Title: {entity.title}")
chat_type = ( chat_type = (
"Channel" "Channel" if is_channel and getattr(entity, "broadcast", False) else "Group"
if is_channel and getattr(entity, "broadcast", False)
else "Group"
) )
if is_channel and getattr(entity, "megagroup", False): if is_channel and getattr(entity, "megagroup", False):
chat_type = "Supergroup" chat_type = "Supergroup"
@ -782,9 +764,7 @@ async def get_chat(chat_id: Union[int, str]) -> str:
# Fetch participants count reliably # Fetch participants count reliably
try: try:
participants_count = ( participants_count = (await client.get_participants(entity, limit=0)).total
await client.get_participants(entity, limit=0)
).total
result.append(f"Participants: {participants_count}") result.append(f"Participants: {participants_count}")
except Exception as pe: except Exception as pe:
result.append(f"Participants: Error fetching ({pe})") result.append(f"Participants: Error fetching ({pe})")
@ -814,18 +794,13 @@ async def get_chat(chat_id: Union[int, str]) -> str:
last_msg = dialog.message last_msg = dialog.message
sender_name = "Unknown" sender_name = "Unknown"
if last_msg.sender: if last_msg.sender:
sender_name = getattr( sender_name = getattr(last_msg.sender, "first_name", "") or getattr(
last_msg.sender, "first_name", "" last_msg.sender, "title", "Unknown"
) or getattr(last_msg.sender, "title", "Unknown") )
if ( if hasattr(last_msg.sender, "last_name") and last_msg.sender.last_name:
hasattr(last_msg.sender, "last_name")
and last_msg.sender.last_name
):
sender_name += f" {last_msg.sender.last_name}" sender_name += f" {last_msg.sender.last_name}"
sender_name = sender_name.strip() or "Unknown" sender_name = sender_name.strip() or "Unknown"
result.append( result.append(f"Last Message: From {sender_name} at {last_msg.date}")
f"Last Message: From {sender_name} at {last_msg.date}"
)
result.append(f"Message: {last_msg.message or '[Media/No text]'}") result.append(f"Message: {last_msg.message or '[Media/No text]'}")
except Exception as diag_ex: except Exception as diag_ex:
logger.warning(f"Could not get dialog info for {chat_id}: {diag_ex}") logger.warning(f"Could not get dialog info for {chat_id}: {diag_ex}")
@ -852,7 +827,9 @@ async def get_direct_chat_by_contact(contact_query: str) -> str:
for contact in contacts: for contact in contacts:
if not contact: if not contact:
continue continue
name = f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip() name = (
f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip()
)
username = getattr(contact, "username", "") username = getattr(contact, "username", "")
phone = getattr(contact, "phone", "") phone = getattr(contact, "phone", "")
if ( if (
@ -867,7 +844,9 @@ async def get_direct_chat_by_contact(contact_query: str) -> str:
results = [] results = []
dialogs = await client.get_dialogs() dialogs = await client.get_dialogs()
for contact in found_contacts: for contact in found_contacts:
contact_name = f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip() contact_name = (
f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip()
)
for dialog in dialogs: for dialog in dialogs:
if isinstance(dialog.entity, User) and dialog.entity.id == contact.id: if isinstance(dialog.entity, User) and dialog.entity.id == contact.id:
chat_info = f"Chat ID: {dialog.entity.id}, Contact: {contact_name}" chat_info = f"Chat ID: {dialog.entity.id}, Contact: {contact_name}"
@ -884,9 +863,7 @@ async def get_direct_chat_by_contact(contact_query: str) -> str:
return f"Found contacts: {found_names}, but no direct chats were found with them." return f"Found contacts: {found_names}, but no direct chats were found with them."
return "\n".join(results) return "\n".join(results)
except Exception as e: except Exception as e:
return log_and_format_error( return log_and_format_error("get_direct_chat_by_contact", e, contact_query=contact_query)
"get_direct_chat_by_contact", e, contact_query=contact_query
)
@mcp.tool() @mcp.tool()
@ -904,7 +881,9 @@ async def get_contact_chats(contact_id: Union[int, str]) -> str:
if not isinstance(contact, User): if not isinstance(contact, User):
return f"ID {contact_id} is not a user/contact." return f"ID {contact_id} is not a user/contact."
contact_name = f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip() contact_name = (
f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip()
)
# Find direct chat # Find direct chat
direct_chat = None direct_chat = None
@ -927,9 +906,7 @@ async def get_contact_chats(contact_id: Union[int, str]) -> str:
common = await client.get_common_chats(contact) common = await client.get_common_chats(contact)
for chat in common: for chat in common:
chat_type = "Channel" if getattr(chat, "broadcast", False) else "Group" chat_type = "Channel" if getattr(chat, "broadcast", False) else "Group"
chat_info = ( chat_info = f"Chat ID: {chat.id}, Title: {chat.title}, Type: {chat_type}"
f"Chat ID: {chat.id}, Title: {chat.title}, Type: {chat_type}"
)
results.append(chat_info) results.append(chat_info)
except: except:
results.append("Could not retrieve common groups.") results.append("Could not retrieve common groups.")
@ -957,7 +934,9 @@ async def get_last_interaction(contact_id: Union[int, str]) -> str:
if not isinstance(contact, User): if not isinstance(contact, User):
return f"ID {contact_id} is not a user/contact." return f"ID {contact_id} is not a user/contact."
contact_name = f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip() contact_name = (
f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip()
)
# Get the last few messages # Get the last few messages
messages = await client.get_messages(contact, limit=5) messages = await client.get_messages(contact, limit=5)
@ -993,9 +972,7 @@ async def get_message_context(
try: try:
chat = await client.get_entity(chat_id) chat = await client.get_entity(chat_id)
# Get messages around the specified message # Get messages around the specified message
messages_before = await client.get_messages( messages_before = await client.get_messages(chat, limit=context_size, max_id=message_id)
chat, limit=context_size, max_id=message_id
)
central_message = await client.get_messages(chat, ids=message_id) central_message = await client.get_messages(chat, ids=message_id)
# Fix: get_messages(ids=...) returns a single Message, not a list # Fix: get_messages(ids=...) returns a single Message, not a list
if central_message is not None and not isinstance(central_message, list): if central_message is not None and not isinstance(central_message, list):
@ -1008,9 +985,7 @@ async def get_message_context(
if not central_message: if not central_message:
return f"Message with ID {message_id} not found in chat {chat_id}." return f"Message with ID {message_id} not found in chat {chat_id}."
# Combine messages in chronological order # Combine messages in chronological order
all_messages = ( all_messages = list(messages_before) + list(central_message) + list(messages_after)
list(messages_before) + list(central_message) + list(messages_after)
)
all_messages.sort(key=lambda m: m.id) all_messages.sort(key=lambda m: m.id)
results = [f"Context for message {message_id} in chat {chat_id}:"] results = [f"Context for message {message_id} in chat {chat_id}:"]
for msg in all_messages: for msg in all_messages:
@ -1021,9 +996,7 @@ async def get_message_context(
reply_content = "" reply_content = ""
if msg.reply_to and msg.reply_to.reply_to_msg_id: if msg.reply_to and msg.reply_to.reply_to_msg_id:
try: try:
replied_msg = await client.get_messages( replied_msg = await client.get_messages(chat, ids=msg.reply_to.reply_to_msg_id)
chat, ids=msg.reply_to.reply_to_msg_id
)
if replied_msg: if replied_msg:
replied_sender = "Unknown" replied_sender = "Unknown"
if replied_msg.sender: if replied_msg.sender:
@ -1032,7 +1005,9 @@ async def get_message_context(
) or getattr(replied_msg.sender, "title", "Unknown") ) or getattr(replied_msg.sender, "title", "Unknown")
reply_content = f" | reply to {msg.reply_to.reply_to_msg_id}\n → Replied message: [{replied_sender}] {replied_msg.message or '[Media/No text]'}" reply_content = f" | reply to {msg.reply_to.reply_to_msg_id}\n → Replied message: [{replied_sender}] {replied_msg.message or '[Media/No text]'}"
except Exception: except Exception:
reply_content = f" | reply to {msg.reply_to.reply_to_msg_id} (original message not found)" reply_content = (
f" | reply to {msg.reply_to.reply_to_msg_id} (original message not found)"
)
results.append( results.append(
f"ID: {msg.id} | {sender_name} | {msg.date}{highlight}{reply_content}\n{msg.message or '[Media/No text]'}\n" f"ID: {msg.id} | {sender_name} | {msg.date}{highlight}{reply_content}\n{msg.message or '[Media/No text]'}\n"
@ -1093,9 +1068,7 @@ async def add_contact(phone: str, first_name: str, last_name: str = "") -> str:
) )
) )
if hasattr(result, "imported") and result.imported: if hasattr(result, "imported") and result.imported:
return ( return f"Contact {first_name} {last_name} added successfully (alt method)."
f"Contact {first_name} {last_name} added successfully (alt method)."
)
else: else:
return f"Contact not added. Alternative method response: {str(result)}" return f"Contact not added. Alternative method response: {str(result)}"
except Exception as alt_e: except Exception as alt_e:
@ -1193,9 +1166,7 @@ async def create_group(title: str, user_ids: List[Union[int, str]]) -> str:
# Create the group with the users # Create the group with the users
try: try:
# Create a new chat with selected users # Create a new chat with selected users
result = await client( result = await client(functions.messages.CreateChatRequest(users=users, title=title))
functions.messages.CreateChatRequest(users=users, title=title)
)
# Check what type of response we got # Check what type of response we got
if hasattr(result, "chats") and result.chats: if hasattr(result, "chats") and result.chats:
@ -1208,9 +1179,7 @@ async def create_group(title: str, user_ids: List[Union[int, str]]) -> str:
else: else:
# If we can't determine the chat ID directly from the result # If we can't determine the chat ID directly from the result
# Try to find it in recent dialogs # Try to find it in recent dialogs
await asyncio.sleep( await asyncio.sleep(1) # Give Telegram a moment to register the new group
1
) # Give Telegram a moment to register the new group
dialogs = await client.get_dialogs(limit=5) # Get recent dialogs dialogs = await client.get_dialogs(limit=5) # Get recent dialogs
for dialog in dialogs: for dialog in dialogs:
if dialog.title == title: if dialog.title == title:
@ -1231,9 +1200,7 @@ async def create_group(title: str, user_ids: List[Union[int, str]]) -> str:
@mcp.tool() @mcp.tool()
@validate_id("group_id", "user_ids") @validate_id("group_id", "user_ids")
async def invite_to_group( async def invite_to_group(group_id: Union[int, str], user_ids: List[Union[int, str]]) -> str:
group_id: Union[int, str], user_ids: List[Union[int, str]]
) -> str:
""" """
Invite users to a group or channel. Invite users to a group or channel.
@ -1254,9 +1221,7 @@ async def invite_to_group(
try: try:
result = await client( result = await client(
functions.channels.InviteToChannelRequest( functions.channels.InviteToChannelRequest(channel=entity, users=users_to_add)
channel=entity, users=users_to_add
)
) )
invited_count = 0 invited_count = 0
@ -1269,20 +1234,18 @@ async def invite_to_group(
except telethon.errors.rpcerrorlist.UserNotMutualContactError: except telethon.errors.rpcerrorlist.UserNotMutualContactError:
return "Error: Cannot invite users who are not mutual contacts. Please ensure the users are in your contacts and have added you back." return "Error: Cannot invite users who are not mutual contacts. Please ensure the users are in your contacts and have added you back."
except telethon.errors.rpcerrorlist.UserPrivacyRestrictedError: except telethon.errors.rpcerrorlist.UserPrivacyRestrictedError:
return "Error: One or more users have privacy settings that prevent you from adding them." return (
except Exception as e: "Error: One or more users have privacy settings that prevent you from adding them."
return log_and_format_error(
"invite_to_group", e, group_id=group_id, user_ids=user_ids
) )
except Exception as e:
return log_and_format_error("invite_to_group", e, group_id=group_id, user_ids=user_ids)
except Exception as e: except Exception as e:
logger.error( logger.error(
f"telegram_mcp invite_to_group failed (group_id={group_id}, user_ids={user_ids})", f"telegram_mcp invite_to_group failed (group_id={group_id}, user_ids={user_ids})",
exc_info=True, exc_info=True,
) )
return log_and_format_error( return log_and_format_error("invite_to_group", e, group_id=group_id, user_ids=user_ids)
"invite_to_group", e, group_id=group_id, user_ids=user_ids
)
@mcp.tool() @mcp.tool()
@ -1387,9 +1350,7 @@ async def get_participants(chat_id: Union[int, str]) -> str:
@mcp.tool() @mcp.tool()
@validate_id("chat_id") @validate_id("chat_id")
async def send_file( async def send_file(chat_id: Union[int, str], file_path: str, caption: str = None) -> str:
chat_id: Union[int, str], file_path: str, caption: str = None
) -> str:
""" """
Send a file to a chat. Send a file to a chat.
Args: Args:
@ -1413,9 +1374,7 @@ async def send_file(
@mcp.tool() @mcp.tool()
@validate_id("chat_id") @validate_id("chat_id")
async def download_media( async def download_media(chat_id: Union[int, str], message_id: int, file_path: str) -> str:
chat_id: Union[int, str], message_id: int, file_path: str
) -> str:
""" """
Download media from a message in a chat. Download media from a message in a chat.
Args: Args:
@ -1447,9 +1406,7 @@ async def download_media(
@mcp.tool() @mcp.tool()
async def update_profile( async def update_profile(first_name: str = None, last_name: str = None, about: str = None) -> str:
first_name: str = None, last_name: str = None, about: str = None
) -> str:
""" """
Update your profile information (name, bio). Update your profile information (name, bio).
""" """
@ -1473,9 +1430,7 @@ async def set_profile_photo(file_path: str) -> str:
""" """
try: try:
await client( await client(
functions.photos.UploadProfilePhotoRequest( functions.photos.UploadProfilePhotoRequest(file=await client.upload_file(file_path))
file=await client.upload_file(file_path)
)
) )
return "Profile photo updated." return "Profile photo updated."
except Exception as e: except Exception as e:
@ -1489,9 +1444,7 @@ async def delete_profile_photo() -> str:
""" """
try: try:
photos = await client( photos = await client(
functions.photos.GetUserPhotosRequest( functions.photos.GetUserPhotosRequest(user_id="me", offset=0, max_id=0, limit=1)
user_id="me", offset=0, max_id=0, limit=1
)
) )
if not photos.photos: if not photos.photos:
return "No profile photo to delete." return "No profile photo to delete."
@ -1512,9 +1465,7 @@ async def get_privacy_settings() -> str:
try: try:
settings = await client( settings = await client(
functions.account.GetPrivacyRequest( functions.account.GetPrivacyRequest(key=InputPrivacyKeyStatusTimestamp())
key=InputPrivacyKeyStatusTimestamp()
)
) )
return str(settings) return str(settings)
except TypeError as e: except TypeError as e:
@ -1583,9 +1534,7 @@ async def set_privacy_settings(
user = await client.get_entity(user_id) user = await client.get_entity(user_id)
allow_entities.append(user) allow_entities.append(user)
except Exception as user_err: except Exception as user_err:
logger.warning( logger.warning(f"Could not get entity for user ID {user_id}: {user_err}")
f"Could not get entity for user ID {user_id}: {user_err}"
)
if allow_entities: if allow_entities:
rules.append(InputPrivacyValueAllowUsers(users=allow_entities)) rules.append(InputPrivacyValueAllowUsers(users=allow_entities))
@ -1602,19 +1551,13 @@ async def set_privacy_settings(
user = await client.get_entity(user_id) user = await client.get_entity(user_id)
disallow_entities.append(user) disallow_entities.append(user)
except Exception as user_err: except Exception as user_err:
logger.warning( logger.warning(f"Could not get entity for user ID {user_id}: {user_err}")
f"Could not get entity for user ID {user_id}: {user_err}"
)
if disallow_entities: if disallow_entities:
rules.append( rules.append(InputPrivacyValueDisallowUsers(users=disallow_entities))
InputPrivacyValueDisallowUsers(users=disallow_entities)
)
except Exception as disallow_err: except Exception as disallow_err:
logger.error(f"Error processing disallowed users: {disallow_err}") logger.error(f"Error processing disallowed users: {disallow_err}")
return log_and_format_error( return log_and_format_error("set_privacy_settings", disallow_err, key=key)
"set_privacy_settings", disallow_err, key=key
)
# Apply the privacy settings # Apply the privacy settings
try: try:
@ -1647,9 +1590,7 @@ async def import_contacts(contacts: list) -> str:
) )
for i, c in enumerate(contacts) for i, c in enumerate(contacts)
] ]
result = await client( result = await client(functions.contacts.ImportContactsRequest(contacts=input_contacts))
functions.contacts.ImportContactsRequest(contacts=input_contacts)
)
return f"Imported {len(result.imported)} contacts." return f"Imported {len(result.imported)} contacts."
except Exception as e: except Exception as e:
return log_and_format_error("import_contacts", e, contacts=contacts) return log_and_format_error("import_contacts", e, contacts=contacts)
@ -1687,9 +1628,7 @@ async def create_channel(title: str, about: str = "", megagroup: bool = False) -
""" """
try: try:
result = await client( result = await client(
functions.channels.CreateChannelRequest( functions.channels.CreateChannelRequest(title=title, about=about, megagroup=megagroup)
title=title, about=about, megagroup=megagroup
)
) )
return f"Channel '{title}' created with ID: {result.chats[0].id}" return f"Channel '{title}' created with ID: {result.chats[0].id}"
except Exception as e: except Exception as e:
@ -1707,13 +1646,9 @@ async def edit_chat_title(chat_id: Union[int, str], title: str) -> str:
try: try:
entity = await client.get_entity(chat_id) entity = await client.get_entity(chat_id)
if isinstance(entity, Channel): if isinstance(entity, Channel):
await client( await client(functions.channels.EditTitleRequest(channel=entity, title=title))
functions.channels.EditTitleRequest(channel=entity, title=title)
)
elif isinstance(entity, Chat): elif isinstance(entity, Chat):
await client( await client(functions.messages.EditChatTitleRequest(chat_id=chat_id, title=title))
functions.messages.EditChatTitleRequest(chat_id=chat_id, title=title)
)
else: else:
return f"Cannot edit title for this entity type ({type(entity)})." return f"Cannot edit title for this entity type ({type(entity)})."
return f"Chat {chat_id} title updated to '{title}'." return f"Chat {chat_id} title updated to '{title}'."
@ -1740,28 +1675,20 @@ async def edit_chat_photo(chat_id: Union[int, str], file_path: str) -> str:
if isinstance(entity, Channel): if isinstance(entity, Channel):
# For channels/supergroups, use EditPhotoRequest with InputChatUploadedPhoto # For channels/supergroups, use EditPhotoRequest with InputChatUploadedPhoto
input_photo = InputChatUploadedPhoto(file=uploaded_file) input_photo = InputChatUploadedPhoto(file=uploaded_file)
await client( await client(functions.channels.EditPhotoRequest(channel=entity, photo=input_photo))
functions.channels.EditPhotoRequest(channel=entity, photo=input_photo)
)
elif isinstance(entity, Chat): elif isinstance(entity, Chat):
# For basic groups, use EditChatPhotoRequest with InputChatUploadedPhoto # For basic groups, use EditChatPhotoRequest with InputChatUploadedPhoto
input_photo = InputChatUploadedPhoto(file=uploaded_file) input_photo = InputChatUploadedPhoto(file=uploaded_file)
await client( await client(
functions.messages.EditChatPhotoRequest( functions.messages.EditChatPhotoRequest(chat_id=chat_id, photo=input_photo)
chat_id=chat_id, photo=input_photo
)
) )
else: else:
return f"Cannot edit photo for this entity type ({type(entity)})." return f"Cannot edit photo for this entity type ({type(entity)})."
return f"Chat {chat_id} photo updated." return f"Chat {chat_id} photo updated."
except Exception as e: except Exception as e:
logger.exception( logger.exception(f"edit_chat_photo failed (chat_id={chat_id}, file_path='{file_path}')")
f"edit_chat_photo failed (chat_id={chat_id}, file_path='{file_path}')" return log_and_format_error("edit_chat_photo", e, chat_id=chat_id, file_path=file_path)
)
return log_and_format_error(
"edit_chat_photo", e, chat_id=chat_id, file_path=file_path
)
@mcp.tool() @mcp.tool()
@ -1775,9 +1702,7 @@ async def delete_chat_photo(chat_id: Union[int, str]) -> str:
if isinstance(entity, Channel): if isinstance(entity, Channel):
# Use InputChatPhotoEmpty for channels/supergroups # Use InputChatPhotoEmpty for channels/supergroups
await client( await client(
functions.channels.EditPhotoRequest( functions.channels.EditPhotoRequest(channel=entity, photo=InputChatPhotoEmpty())
channel=entity, photo=InputChatPhotoEmpty()
)
) )
elif isinstance(entity, Chat): elif isinstance(entity, Chat):
# Use None (or InputChatPhotoEmpty) for basic groups # Use None (or InputChatPhotoEmpty) for basic groups
@ -1852,18 +1777,14 @@ async def promote_admin(
except telethon.errors.rpcerrorlist.UserNotMutualContactError: except telethon.errors.rpcerrorlist.UserNotMutualContactError:
return "Error: Cannot promote users who are not mutual contacts. Please ensure the user is in your contacts and has added you back." return "Error: Cannot promote users who are not mutual contacts. Please ensure the user is in your contacts and has added you back."
except Exception as e: except Exception as e:
return log_and_format_error( return log_and_format_error("promote_admin", e, group_id=group_id, user_id=user_id)
"promote_admin", e, group_id=group_id, user_id=user_id
)
except Exception as e: except Exception as e:
logger.error( logger.error(
f"telegram_mcp promote_admin failed (group_id={group_id}, user_id={user_id})", f"telegram_mcp promote_admin failed (group_id={group_id}, user_id={user_id})",
exc_info=True, exc_info=True,
) )
return log_and_format_error( return log_and_format_error("promote_admin", e, group_id=group_id, user_id=user_id)
"promote_admin", e, group_id=group_id, user_id=user_id
)
@mcp.tool() @mcp.tool()
@ -1905,18 +1826,14 @@ async def demote_admin(group_id: Union[int, str], user_id: Union[int, str]) -> s
except telethon.errors.rpcerrorlist.UserNotMutualContactError: except telethon.errors.rpcerrorlist.UserNotMutualContactError:
return "Error: Cannot modify admin status of users who are not mutual contacts. Please ensure the user is in your contacts and has added you back." return "Error: Cannot modify admin status of users who are not mutual contacts. Please ensure the user is in your contacts and has added you back."
except Exception as e: except Exception as e:
return log_and_format_error( return log_and_format_error("demote_admin", e, group_id=group_id, user_id=user_id)
"demote_admin", e, group_id=group_id, user_id=user_id
)
except Exception as e: except Exception as e:
logger.error( logger.error(
f"telegram_mcp demote_admin failed (group_id={group_id}, user_id={user_id})", f"telegram_mcp demote_admin failed (group_id={group_id}, user_id={user_id})",
exc_info=True, exc_info=True,
) )
return log_and_format_error( return log_and_format_error("demote_admin", e, group_id=group_id, user_id=user_id)
"demote_admin", e, group_id=group_id, user_id=user_id
)
@mcp.tool() @mcp.tool()
@ -2007,9 +1924,7 @@ async def unban_user(chat_id: Union[int, str], user_id: Union[int, str]) -> str:
except telethon.errors.rpcerrorlist.UserNotMutualContactError: except telethon.errors.rpcerrorlist.UserNotMutualContactError:
return "Error: Cannot modify status of users who are not mutual contacts. Please ensure the user is in your contacts and has added you back." return "Error: Cannot modify status of users who are not mutual contacts. Please ensure the user is in your contacts and has added you back."
except Exception as e: except Exception as e:
return log_and_format_error( return log_and_format_error("unban_user", e, chat_id=chat_id, user_id=user_id)
"unban_user", e, chat_id=chat_id, user_id=user_id
)
except Exception as e: except Exception as e:
logger.exception(f"unban_user failed (chat_id={chat_id}, user_id={user_id})") logger.exception(f"unban_user failed (chat_id={chat_id}, user_id={user_id})")
return log_and_format_error("unban_user", e, chat_id=chat_id, user_id=user_id) return log_and_format_error("unban_user", e, chat_id=chat_id, user_id=user_id)
@ -2023,9 +1938,7 @@ async def get_admins(chat_id: Union[int, str]) -> str:
""" """
try: try:
# Fix: Use the correct filter type ChannelParticipantsAdmins # Fix: Use the correct filter type ChannelParticipantsAdmins
participants = await client.get_participants( participants = await client.get_participants(chat_id, filter=ChannelParticipantsAdmins())
chat_id, filter=ChannelParticipantsAdmins()
)
lines = [ lines = [
f"ID: {p.id}, Name: {getattr(p, 'first_name', '')} {getattr(p, 'last_name', '')}".strip() f"ID: {p.id}, Name: {getattr(p, 'first_name', '')} {getattr(p, 'last_name', '')}".strip()
for p in participants for p in participants
@ -2070,15 +1983,11 @@ async def get_invite_link(chat_id: Union[int, str]) -> str:
try: try:
from telethon.tl import functions from telethon.tl import functions
result = await client( result = await client(functions.messages.ExportChatInviteRequest(peer=entity))
functions.messages.ExportChatInviteRequest(peer=entity)
)
return result.link return result.link
except AttributeError: except AttributeError:
# If the function doesn't exist in the current Telethon version # If the function doesn't exist in the current Telethon version
logger.warning( logger.warning("ExportChatInviteRequest not available, using alternative method")
"ExportChatInviteRequest not available, using alternative method"
)
except Exception as e1: except Exception as e1:
# If that fails, log and try alternative approach # If that fails, log and try alternative approach
logger.warning(f"ExportChatInviteRequest failed: {e1}") logger.warning(f"ExportChatInviteRequest failed: {e1}")
@ -2093,15 +2002,9 @@ async def get_invite_link(chat_id: Union[int, str]) -> str:
# Last resort: Try directly fetching chat info # Last resort: Try directly fetching chat info
try: try:
if isinstance(entity, (Chat, Channel)): if isinstance(entity, (Chat, Channel)):
full_chat = await client( full_chat = await client(functions.messages.GetFullChatRequest(chat_id=entity.id))
functions.messages.GetFullChatRequest(chat_id=entity.id) if hasattr(full_chat, "full_chat") and hasattr(full_chat.full_chat, "invite_link"):
) return full_chat.full_chat.invite_link or "No invite link available."
if hasattr(full_chat, "full_chat") and hasattr(
full_chat.full_chat, "invite_link"
):
return (
full_chat.full_chat.invite_link or "No invite link available."
)
except Exception as e3: except Exception as e3:
logger.warning(f"GetFullChatRequest failed: {e3}") logger.warning(f"GetFullChatRequest failed: {e3}")
@ -2128,9 +2031,7 @@ async def join_chat_by_link(link: str) -> str:
# Try checking the invite before joining # Try checking the invite before joining
try: try:
# Try to check invite info first (will often fail if not a member) # Try to check invite info first (will often fail if not a member)
invite_info = await client( invite_info = await client(functions.messages.CheckChatInviteRequest(hash=hash_part))
functions.messages.CheckChatInviteRequest(hash=hash_part)
)
if hasattr(invite_info, "chat") and invite_info.chat: if hasattr(invite_info, "chat") and invite_info.chat:
# If we got chat info, we're already a member # If we got chat info, we're already a member
chat_title = getattr(invite_info.chat, "title", "Unknown Chat") chat_title = getattr(invite_info.chat, "title", "Unknown Chat")
@ -2140,9 +2041,7 @@ async def join_chat_by_link(link: str) -> str:
pass pass
# Join the chat using the hash # Join the chat using the hash
result = await client( result = await client(functions.messages.ImportChatInviteRequest(hash=hash_part))
functions.messages.ImportChatInviteRequest(hash=hash_part)
)
if result and hasattr(result, "chats") and result.chats: if result and hasattr(result, "chats") and result.chats:
chat_title = getattr(result.chats[0], "title", "Unknown Chat") chat_title = getattr(result.chats[0], "title", "Unknown Chat")
return f"Successfully joined chat: {chat_title}" return f"Successfully joined chat: {chat_title}"
@ -2172,15 +2071,11 @@ async def export_chat_invite(chat_id: Union[int, str]) -> str:
try: try:
from telethon.tl import functions from telethon.tl import functions
result = await client( result = await client(functions.messages.ExportChatInviteRequest(peer=entity))
functions.messages.ExportChatInviteRequest(peer=entity)
)
return result.link return result.link
except AttributeError: except AttributeError:
# If the function doesn't exist in the current Telethon version # If the function doesn't exist in the current Telethon version
logger.warning( logger.warning("ExportChatInviteRequest not available, using alternative method")
"ExportChatInviteRequest not available, using alternative method"
)
except Exception as e1: except Exception as e1:
# If that fails, log and try alternative approach # If that fails, log and try alternative approach
logger.warning(f"ExportChatInviteRequest failed: {e1}") logger.warning(f"ExportChatInviteRequest failed: {e1}")
@ -2219,9 +2114,7 @@ async def import_chat_invite(hash: str) -> str:
) )
# Try to check invite info first (will often fail if not a member) # Try to check invite info first (will often fail if not a member)
invite_info = await client( invite_info = await client(functions.messages.CheckChatInviteRequest(hash=hash))
functions.messages.CheckChatInviteRequest(hash=hash)
)
if hasattr(invite_info, "chat") and invite_info.chat: if hasattr(invite_info, "chat") and invite_info.chat:
# If we got chat info, we're already a member # If we got chat info, we're already a member
chat_title = getattr(invite_info.chat, "title", "Unknown Chat") chat_title = getattr(invite_info.chat, "title", "Unknown Chat")
@ -2288,9 +2181,7 @@ async def send_voice(chat_id: Union[int, str], file_path: str) -> str:
await client.send_file(entity, file_path, voice_note=True) await client.send_file(entity, file_path, voice_note=True)
return f"Voice message sent to chat {chat_id}." return f"Voice message sent to chat {chat_id}."
except Exception as e: except Exception as e:
return log_and_format_error( return log_and_format_error("send_voice", e, chat_id=chat_id, file_path=file_path)
"send_voice", e, chat_id=chat_id, file_path=file_path
)
@mcp.tool() @mcp.tool()
@ -2343,9 +2234,7 @@ async def delete_message(chat_id: Union[int, str], message_id: int) -> str:
await client.delete_messages(entity, message_id) await client.delete_messages(entity, message_id)
return f"Message {message_id} deleted." return f"Message {message_id} deleted."
except Exception as e: except Exception as e:
return log_and_format_error( return log_and_format_error("delete_message", e, chat_id=chat_id, message_id=message_id)
"delete_message", e, chat_id=chat_id, message_id=message_id
)
@mcp.tool() @mcp.tool()
@ -2359,9 +2248,7 @@ async def pin_message(chat_id: Union[int, str], message_id: int) -> str:
await client.pin_message(entity, message_id) await client.pin_message(entity, message_id)
return f"Message {message_id} pinned in chat {chat_id}." return f"Message {message_id} pinned in chat {chat_id}."
except Exception as e: except Exception as e:
return log_and_format_error( return log_and_format_error("pin_message", e, chat_id=chat_id, message_id=message_id)
"pin_message", e, chat_id=chat_id, message_id=message_id
)
@mcp.tool() @mcp.tool()
@ -2375,9 +2262,7 @@ async def unpin_message(chat_id: Union[int, str], message_id: int) -> str:
await client.unpin_message(entity, message_id) await client.unpin_message(entity, message_id)
return f"Message {message_id} unpinned in chat {chat_id}." return f"Message {message_id} unpinned in chat {chat_id}."
except Exception as e: except Exception as e:
return log_and_format_error( return log_and_format_error("unpin_message", e, chat_id=chat_id, message_id=message_id)
"unpin_message", e, chat_id=chat_id, message_id=message_id
)
@mcp.tool() @mcp.tool()
@ -2429,9 +2314,7 @@ async def get_media_info(chat_id: Union[int, str], message_id: int) -> str:
return str(msg.media) return str(msg.media)
except Exception as e: except Exception as e:
return log_and_format_error( return log_and_format_error("get_media_info", e, chat_id=chat_id, message_id=message_id)
"get_media_info", e, chat_id=chat_id, message_id=message_id
)
@mcp.tool() @mcp.tool()
@ -2478,9 +2361,7 @@ async def resolve_username(username: str) -> str:
Resolve a username to a user or chat ID. Resolve a username to a user or chat ID.
""" """
try: try:
result = await client( result = await client(functions.contacts.ResolveUsernameRequest(username=username))
functions.contacts.ResolveUsernameRequest(username=username)
)
return str(result) return str(result)
except Exception as e: except Exception as e:
return log_and_format_error("resolve_username", e, username=username) return log_and_format_error("resolve_username", e, username=username)
@ -2632,9 +2513,7 @@ async def send_sticker(chat_id: Union[int, str], file_path: str) -> str:
await client.send_file(entity, file_path, force_document=False) await client.send_file(entity, file_path, force_document=False)
return f"Sticker sent to chat {chat_id}." return f"Sticker sent to chat {chat_id}."
except Exception as e: except Exception as e:
return log_and_format_error( return log_and_format_error("send_sticker", e, chat_id=chat_id, file_path=file_path)
"send_sticker", e, chat_id=chat_id, file_path=file_path
)
@mcp.tool() @mcp.tool()
@ -2682,11 +2561,7 @@ async def get_gif_search(query: str, limit: int = 10) -> str:
# Extract document IDs from any messages with media # Extract document IDs from any messages with media
gif_ids = [] gif_ids = []
for msg in result.messages: for msg in result.messages:
if ( if hasattr(msg, "media") and msg.media and hasattr(msg.media, "document"):
hasattr(msg, "media")
and msg.media
and hasattr(msg.media, "document")
):
gif_ids.append(msg.media.document.id) gif_ids.append(msg.media.document.id)
return json.dumps(gif_ids, default=json_serializer) return json.dumps(gif_ids, default=json_serializer)
except Exception as inner_e: except Exception as inner_e:
@ -2776,8 +2651,7 @@ async def set_bot_commands(bot_username: str, commands: list) -> str:
# Create BotCommand objects from the command dictionaries # Create BotCommand objects from the command dictionaries
bot_commands = [ bot_commands = [
BotCommand(command=c["command"], description=c["description"]) BotCommand(command=c["command"], description=c["description"]) for c in commands
for c in commands
] ]
# Get the bot entity # Get the bot entity
@ -2834,9 +2708,7 @@ async def get_user_photos(user_id: Union[int, str], limit: int = 10) -> str:
try: try:
user = await client.get_entity(user_id) user = await client.get_entity(user_id)
photos = await client( photos = await client(
functions.photos.GetUserPhotosRequest( functions.photos.GetUserPhotosRequest(user_id=user, offset=0, max_id=0, limit=limit)
user_id=user, offset=0, max_id=0, limit=limit
)
) )
return json.dumps([p.id for p in photos.photos], indent=2) return json.dumps([p.id for p in photos.photos], indent=2)
except Exception as e: except Exception as e:
@ -2879,9 +2751,7 @@ async def get_recent_actions(chat_id: Union[int, str]) -> str:
return "No recent admin actions found." return "No recent admin actions found."
# Use the custom serializer to handle datetime objects # Use the custom serializer to handle datetime objects
return json.dumps( return json.dumps([e.to_dict() for e in result.events], indent=2, default=json_serializer)
[e.to_dict() for e in result.events], indent=2, default=json_serializer
)
except Exception as e: except Exception as e:
logger.exception(f"get_recent_actions failed (chat_id={chat_id})") logger.exception(f"get_recent_actions failed (chat_id={chat_id})")
return log_and_format_error("get_recent_actions", e, chat_id=chat_id) return log_and_format_error("get_recent_actions", e, chat_id=chat_id)
@ -2901,9 +2771,7 @@ async def get_pinned_messages(chat_id: Union[int, str]) -> str:
# Try newer Telethon approach # Try newer Telethon approach
from telethon.tl.types import InputMessagesFilterPinned from telethon.tl.types import InputMessagesFilterPinned
messages = await client.get_messages( messages = await client.get_messages(entity, filter=InputMessagesFilterPinned())
entity, filter=InputMessagesFilterPinned()
)
except (ImportError, AttributeError): except (ImportError, AttributeError):
# Fallback - try without filter and manually filter pinned # Fallback - try without filter and manually filter pinned
all_messages = await client.get_messages(entity, limit=50) all_messages = await client.get_messages(entity, limit=50)
@ -2963,9 +2831,7 @@ async def create_poll(
close_date_obj = None close_date_obj = None
if close_date: if close_date:
try: try:
close_date_obj = datetime.fromisoformat( close_date_obj = datetime.fromisoformat(close_date.replace("Z", "+00:00"))
close_date.replace("Z", "+00:00")
)
except ValueError: except ValueError:
return f"Invalid close_date format. Use YYYY-MM-DD HH:MM:SS format." return f"Invalid close_date format. Use YYYY-MM-DD HH:MM:SS format."
@ -2977,9 +2843,7 @@ async def create_poll(
id=random.randint(0, 2**63 - 1), id=random.randint(0, 2**63 - 1),
question=TextWithEntities(text=question, entities=[]), question=TextWithEntities(text=question, entities=[]),
answers=[ answers=[
PollAnswer( PollAnswer(text=TextWithEntities(text=option, entities=[]), option=bytes([i]))
text=TextWithEntities(text=option, entities=[]), option=bytes([i])
)
for i, option in enumerate(options) for i, option in enumerate(options)
], ],
multiple_choice=multiple_choice, multiple_choice=multiple_choice,
@ -2999,9 +2863,7 @@ async def create_poll(
return f"Poll created successfully in chat {chat_id}." return f"Poll created successfully in chat {chat_id}."
except Exception as e: except Exception as e:
logger.exception( logger.exception(f"create_poll failed (chat_id={chat_id}, question='{question}')")
f"create_poll failed (chat_id={chat_id}, question='{question}')"
)
return log_and_format_error( return log_and_format_error(
"create_poll", e, chat_id=chat_id, question=question, options=options "create_poll", e, chat_id=chat_id, question=question, options=options
) )
@ -3021,9 +2883,7 @@ if __name__ == "__main__":
await mcp.run_stdio_async() await mcp.run_stdio_async()
except Exception as e: except Exception as e:
print(f"Error starting client: {e}", file=sys.stderr) print(f"Error starting client: {e}", file=sys.stderr)
if isinstance(e, sqlite3.OperationalError) and "database is locked" in str( if isinstance(e, sqlite3.OperationalError) and "database is locked" in str(e):
e
):
print( print(
"Database lock detected. Please ensure no other instances are running.", "Database lock detected. Please ensure no other instances are running.",
file=sys.stderr, file=sys.stderr,

11
uv.lock
View file

@ -446,6 +446,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/1e/18/98a99ad95133c6a6e2005fe89faedf294a748bd5dc803008059409ac9b1e/python_dotenv-1.1.0-py3-none-any.whl", hash = "sha256:d7c01d9e2293916c18baf562d95698754b0dbbb5e74d457c45d4f6561fb9d55d", size = 20256, upload-time = "2025-03-25T10:14:55.034Z" }, { url = "https://files.pythonhosted.org/packages/1e/18/98a99ad95133c6a6e2005fe89faedf294a748bd5dc803008059409ac9b1e/python_dotenv-1.1.0-py3-none-any.whl", hash = "sha256:d7c01d9e2293916c18baf562d95698754b0dbbb5e74d457c45d4f6561fb9d55d", size = 20256, upload-time = "2025-03-25T10:14:55.034Z" },
] ]
[[package]]
name = "python-json-logger"
version = "4.0.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/29/bf/eca6a3d43db1dae7070f70e160ab20b807627ba953663ba07928cdd3dc58/python_json_logger-4.0.0.tar.gz", hash = "sha256:f58e68eb46e1faed27e0f574a55a0455eecd7b8a5b88b85a784519ba3cff047f", size = 17683, upload-time = "2025-10-06T04:15:18.984Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/51/e5/fecf13f06e5e5f67e8837d777d1bc43fac0ed2b77a676804df5c34744727/python_json_logger-4.0.0-py3-none-any.whl", hash = "sha256:af09c9daf6a813aa4cc7180395f50f2a9e5fa056034c9953aec92e381c5ba1e2", size = 15548, upload-time = "2025-10-06T04:15:17.553Z" },
]
[[package]] [[package]]
name = "pytokens" name = "pytokens"
version = "0.2.0" version = "0.2.0"
@ -534,6 +543,7 @@ dependencies = [
{ name = "mcp", extra = ["cli"] }, { name = "mcp", extra = ["cli"] },
{ name = "nest-asyncio" }, { name = "nest-asyncio" },
{ name = "python-dotenv" }, { name = "python-dotenv" },
{ name = "python-json-logger" },
{ name = "telethon" }, { name = "telethon" },
] ]
@ -550,6 +560,7 @@ requires-dist = [
{ name = "mcp", extras = ["cli"], specifier = ">=1.4.1" }, { name = "mcp", extras = ["cli"], specifier = ">=1.4.1" },
{ name = "nest-asyncio", specifier = ">=1.6.0" }, { name = "nest-asyncio", specifier = ">=1.6.0" },
{ name = "python-dotenv", specifier = ">=1.1.0" }, { name = "python-dotenv", specifier = ">=1.1.0" },
{ name = "python-json-logger", specifier = ">=3.3.0" },
{ name = "telethon", specifier = ">=1.39.0" }, { name = "telethon", specifier = ">=1.39.0" },
] ]