From 80245bcdc9aecd59c80741afbfd34cf030a5f3ec Mon Sep 17 00:00:00 2001 From: Korzhavin Ivan Date: Mon, 20 Oct 2025 01:01:13 +0200 Subject: [PATCH] fix: resolve stash conflict --- main.py | 423 ++++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 288 insertions(+), 135 deletions(-) diff --git a/main.py b/main.py index c5de9b5..edd8fa4 100644 --- a/main.py +++ b/main.py @@ -66,7 +66,9 @@ mcp = FastMCP("telegram") if SESSION_STRING: # Use the string session if available - client = TelegramClient(StringSession(SESSION_STRING), TELEGRAM_API_ID, TELEGRAM_API_HASH) + client = TelegramClient( + StringSession(SESSION_STRING), TELEGRAM_API_ID, TELEGRAM_API_HASH + ) else: # Use file-based session client = TelegramClient(TELEGRAM_SESSION_NAME, TELEGRAM_API_ID, TELEGRAM_API_HASH) @@ -77,7 +79,9 @@ logger.setLevel(logging.ERROR) # Set to ERROR for production, INFO for debuggin # Create console handler console_handler = logging.StreamHandler() -console_handler.setLevel(logging.ERROR) # Set to ERROR for production, INFO for debugging +console_handler.setLevel( + logging.ERROR +) # Set to ERROR for production, INFO for debugging # Create file handler with absolute path script_dir = os.path.dirname(os.path.abspath(__file__)) @@ -89,7 +93,9 @@ try: # Create formatters # Console formatter remains in the old format - console_formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(name)s - %(message)s") + console_formatter = logging.Formatter( + "%(asctime)s [%(levelname)s] %(name)s - %(message)s" + ) console_handler.setFormatter(console_formatter) # File formatter is now JSON @@ -102,7 +108,6 @@ try: # Add handlers to logger logger.addHandler(console_handler) logger.addHandler(file_handler) - logger.info(f"Logging initialized to {log_file_path}") except Exception as log_error: print(f"WARNING: Error setting up log file: {log_error}") @@ -110,9 +115,8 @@ except Exception as log_error: logger.addHandler(console_handler) logger.error(f"Failed to set up log file handler: {log_error}") + # Error code prefix mapping for better error tracing - - class ErrorCategory(str, Enum): CHAT = "CHAT" MSG = "MSG" @@ -159,14 +163,18 @@ def log_and_format_error( prefix = category break - prefix_str = prefix.value if isinstance(prefix, ErrorCategory) else (prefix or "GEN") + prefix_str = ( + prefix.value if isinstance(prefix, ErrorCategory) else (prefix or "GEN") + ) error_code = f"{prefix_str}-ERR-{abs(hash(function_name)) % 1000:03d}" # Format the additional context parameters context = ", ".join(f"{k}={v}" for k, v in kwargs.items()) # Log the full technical error - logger.error(f"Error in {function_name} ({context}) - Code: {error_code}", exc_info=True) + logger.error( + f"Error in {function_name} ({context}) - Code: {error_code}", exc_info=True + ) # Return a user-friendly message if user_message: @@ -221,7 +229,10 @@ def validate_id(*param_names_to_validate): ) # Handle other invalid types - return None, f"Invalid {p_name}: {value}. Type must be an integer or a string." + return ( + None, + f"Invalid {p_name}: {value}. Type must be an integer or a string.", + ) if isinstance(param_value, list): validated_list = [] @@ -238,7 +249,9 @@ def validate_id(*param_names_to_validate): validated_list.append(validated_item) kwargs[param_name] = validated_list else: - validated_value, error_msg = validate_single_id(param_value, param_name) + validated_value, error_msg = validate_single_id( + param_value, param_name + ) if error_msg: return log_and_format_error( func.__name__, @@ -334,7 +347,9 @@ async def get_chats(page: int = 1, page_size: int = 20) -> str: for dialog in chats: entity = dialog.entity chat_id = entity.id - title = getattr(entity, "title", None) or getattr(entity, "first_name", "Unknown") + title = getattr(entity, "title", None) or getattr( + entity, "first_name", "Unknown" + ) lines.append(f"Chat ID: {chat_id}, Title: {title}") return "\n".join(lines) except Exception as e: @@ -343,7 +358,9 @@ async def get_chats(page: int = 1, page_size: int = 20) -> str: @mcp.tool() @validate_id("chat_id") -async def get_messages(chat_id: Union[int, str], page: int = 1, page_size: int = 20) -> str: +async def get_messages( + chat_id: Union[int, str], page: int = 1, page_size: int = 20 +) -> str: """ Get paginated messages from a specific chat. Args: @@ -751,7 +768,9 @@ async def get_chat(chat_id: Union[int, str]) -> str: if hasattr(entity, "title"): result.append(f"Title: {entity.title}") chat_type = ( - "Channel" if is_channel and getattr(entity, "broadcast", False) else "Group" + "Channel" + if is_channel and getattr(entity, "broadcast", False) + else "Group" ) if is_channel and getattr(entity, "megagroup", False): chat_type = "Supergroup" @@ -763,7 +782,9 @@ async def get_chat(chat_id: Union[int, str]) -> str: # Fetch participants count reliably try: - participants_count = (await client.get_participants(entity, limit=0)).total + participants_count = ( + await client.get_participants(entity, limit=0) + ).total result.append(f"Participants: {participants_count}") except Exception as pe: result.append(f"Participants: Error fetching ({pe})") @@ -793,13 +814,18 @@ async def get_chat(chat_id: Union[int, str]) -> str: last_msg = dialog.message sender_name = "Unknown" if last_msg.sender: - sender_name = getattr(last_msg.sender, "first_name", "") or getattr( - last_msg.sender, "title", "Unknown" - ) - if hasattr(last_msg.sender, "last_name") and last_msg.sender.last_name: + sender_name = getattr( + last_msg.sender, "first_name", "" + ) or getattr(last_msg.sender, "title", "Unknown") + if ( + hasattr(last_msg.sender, "last_name") + and last_msg.sender.last_name + ): sender_name += f" {last_msg.sender.last_name}" sender_name = sender_name.strip() or "Unknown" - result.append(f"Last Message: From {sender_name} at {last_msg.date}") + result.append( + f"Last Message: From {sender_name} at {last_msg.date}" + ) result.append(f"Message: {last_msg.message or '[Media/No text]'}") except Exception as diag_ex: logger.warning(f"Could not get dialog info for {chat_id}: {diag_ex}") @@ -826,9 +852,7 @@ async def get_direct_chat_by_contact(contact_query: str) -> str: for contact in contacts: if not contact: continue - name = ( - f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip() - ) + name = f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip() username = getattr(contact, "username", "") phone = getattr(contact, "phone", "") if ( @@ -843,9 +867,7 @@ async def get_direct_chat_by_contact(contact_query: str) -> str: results = [] dialogs = await client.get_dialogs() for contact in found_contacts: - contact_name = ( - f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip() - ) + contact_name = f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip() for dialog in dialogs: if isinstance(dialog.entity, User) and dialog.entity.id == contact.id: chat_info = f"Chat ID: {dialog.entity.id}, Contact: {contact_name}" @@ -862,7 +884,9 @@ async def get_direct_chat_by_contact(contact_query: str) -> str: return f"Found contacts: {found_names}, but no direct chats were found with them." return "\n".join(results) except Exception as e: - return log_and_format_error("get_direct_chat_by_contact", e, contact_query=contact_query) + return log_and_format_error( + "get_direct_chat_by_contact", e, contact_query=contact_query + ) @mcp.tool() @@ -880,9 +904,7 @@ async def get_contact_chats(contact_id: Union[int, str]) -> str: if not isinstance(contact, User): return f"ID {contact_id} is not a user/contact." - contact_name = ( - f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip() - ) + contact_name = f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip() # Find direct chat direct_chat = None @@ -905,7 +927,9 @@ async def get_contact_chats(contact_id: Union[int, str]) -> str: common = await client.get_common_chats(contact) for chat in common: chat_type = "Channel" if getattr(chat, "broadcast", False) else "Group" - chat_info = f"Chat ID: {chat.id}, Title: {chat.title}, Type: {chat_type}" + chat_info = ( + f"Chat ID: {chat.id}, Title: {chat.title}, Type: {chat_type}" + ) results.append(chat_info) except: results.append("Could not retrieve common groups.") @@ -933,9 +957,7 @@ async def get_last_interaction(contact_id: Union[int, str]) -> str: if not isinstance(contact, User): return f"ID {contact_id} is not a user/contact." - contact_name = ( - f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip() - ) + contact_name = f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip() # Get the last few messages messages = await client.get_messages(contact, limit=5) @@ -971,7 +993,9 @@ async def get_message_context( try: chat = await client.get_entity(chat_id) # Get messages around the specified message - messages_before = await client.get_messages(chat, limit=context_size, max_id=message_id) + messages_before = await client.get_messages( + chat, limit=context_size, max_id=message_id + ) central_message = await client.get_messages(chat, ids=message_id) # Fix: get_messages(ids=...) returns a single Message, not a list if central_message is not None and not isinstance(central_message, list): @@ -984,7 +1008,9 @@ async def get_message_context( if not central_message: return f"Message with ID {message_id} not found in chat {chat_id}." # Combine messages in chronological order - all_messages = list(messages_before) + list(central_message) + list(messages_after) + all_messages = ( + list(messages_before) + list(central_message) + list(messages_after) + ) all_messages.sort(key=lambda m: m.id) results = [f"Context for message {message_id} in chat {chat_id}:"] for msg in all_messages: @@ -995,7 +1021,9 @@ async def get_message_context( reply_content = "" if msg.reply_to and msg.reply_to.reply_to_msg_id: try: - replied_msg = await client.get_messages(chat, ids=msg.reply_to.reply_to_msg_id) + replied_msg = await client.get_messages( + chat, ids=msg.reply_to.reply_to_msg_id + ) if replied_msg: replied_sender = "Unknown" if replied_msg.sender: @@ -1004,9 +1032,7 @@ async def get_message_context( ) or getattr(replied_msg.sender, "title", "Unknown") reply_content = f" | reply to {msg.reply_to.reply_to_msg_id}\n → Replied message: [{replied_sender}] {replied_msg.message or '[Media/No text]'}" except Exception: - reply_content = ( - f" | reply to {msg.reply_to.reply_to_msg_id} (original message not found)" - ) + reply_content = f" | reply to {msg.reply_to.reply_to_msg_id} (original message not found)" results.append( f"ID: {msg.id} | {sender_name} | {msg.date}{highlight}{reply_content}\n{msg.message or '[Media/No text]'}\n" @@ -1039,7 +1065,10 @@ async def add_contact(phone: str, first_name: str, last_name: str = "") -> str: functions.contacts.ImportContactsRequest( contacts=[ InputPhoneContact( - client_id=0, phone=phone, first_name=first_name, last_name=last_name + client_id=0, + phone=phone, + first_name=first_name, + last_name=last_name, ) ] ) @@ -1064,7 +1093,9 @@ async def add_contact(phone: str, first_name: str, last_name: str = "") -> str: ) ) if hasattr(result, "imported") and result.imported: - return f"Contact {first_name} {last_name} added successfully (alt method)." + return ( + f"Contact {first_name} {last_name} added successfully (alt method)." + ) else: return f"Contact not added. Alternative method response: {str(result)}" except Exception as alt_e: @@ -1162,7 +1193,9 @@ async def create_group(title: str, user_ids: List[Union[int, str]]) -> str: # Create the group with the users try: # Create a new chat with selected users - result = await client(functions.messages.CreateChatRequest(users=users, title=title)) + result = await client( + functions.messages.CreateChatRequest(users=users, title=title) + ) # Check what type of response we got if hasattr(result, "chats") and result.chats: @@ -1175,7 +1208,9 @@ async def create_group(title: str, user_ids: List[Union[int, str]]) -> str: else: # If we can't determine the chat ID directly from the result # Try to find it in recent dialogs - await asyncio.sleep(1) # Give Telegram a moment to register the new group + await asyncio.sleep( + 1 + ) # Give Telegram a moment to register the new group dialogs = await client.get_dialogs(limit=5) # Get recent dialogs for dialog in dialogs: if dialog.title == title: @@ -1196,7 +1231,9 @@ async def create_group(title: str, user_ids: List[Union[int, str]]) -> str: @mcp.tool() @validate_id("group_id", "user_ids") -async def invite_to_group(group_id: Union[int, str], user_ids: List[Union[int, str]]) -> str: +async def invite_to_group( + group_id: Union[int, str], user_ids: List[Union[int, str]] +) -> str: """ Invite users to a group or channel. @@ -1217,7 +1254,9 @@ async def invite_to_group(group_id: Union[int, str], user_ids: List[Union[int, s try: result = await client( - functions.channels.InviteToChannelRequest(channel=entity, users=users_to_add) + functions.channels.InviteToChannelRequest( + channel=entity, users=users_to_add + ) ) invited_count = 0 @@ -1230,18 +1269,20 @@ async def invite_to_group(group_id: Union[int, str], user_ids: List[Union[int, s except telethon.errors.rpcerrorlist.UserNotMutualContactError: return "Error: Cannot invite users who are not mutual contacts. Please ensure the users are in your contacts and have added you back." except telethon.errors.rpcerrorlist.UserPrivacyRestrictedError: - return ( - "Error: One or more users have privacy settings that prevent you from adding them." - ) + return "Error: One or more users have privacy settings that prevent you from adding them." except Exception as e: - return log_and_format_error("invite_to_group", e, group_id=group_id, user_ids=user_ids) + return log_and_format_error( + "invite_to_group", e, group_id=group_id, user_ids=user_ids + ) except Exception as e: logger.error( f"telegram_mcp invite_to_group failed (group_id={group_id}, user_ids={user_ids})", exc_info=True, ) - return log_and_format_error("invite_to_group", e, group_id=group_id, user_ids=user_ids) + return log_and_format_error( + "invite_to_group", e, group_id=group_id, user_ids=user_ids + ) @mcp.tool() @@ -1273,7 +1314,8 @@ async def leave_chat(chat_id: Union[int, str]) -> str: me = await client.get_me(input_peer=True) await client( functions.messages.DeleteChatUserRequest( - chat_id=entity.id, user_id=me # Use the entity ID directly + chat_id=entity.id, + user_id=me, # Use the entity ID directly ) ) chat_name = getattr(entity, "title", str(chat_id)) @@ -1345,7 +1387,9 @@ async def get_participants(chat_id: Union[int, str]) -> str: @mcp.tool() @validate_id("chat_id") -async def send_file(chat_id: Union[int, str], file_path: str, caption: str = None) -> str: +async def send_file( + chat_id: Union[int, str], file_path: str, caption: str = None +) -> str: """ Send a file to a chat. Args: @@ -1369,7 +1413,9 @@ async def send_file(chat_id: Union[int, str], file_path: str, caption: str = Non @mcp.tool() @validate_id("chat_id") -async def download_media(chat_id: Union[int, str], message_id: int, file_path: str) -> str: +async def download_media( + chat_id: Union[int, str], message_id: int, file_path: str +) -> str: """ Download media from a message in a chat. Args: @@ -1392,12 +1438,18 @@ async def download_media(chat_id: Union[int, str], message_id: int, file_path: s return f"Media downloaded to {file_path}." except Exception as e: return log_and_format_error( - "download_media", e, chat_id=chat_id, message_id=message_id, file_path=file_path + "download_media", + e, + chat_id=chat_id, + message_id=message_id, + file_path=file_path, ) @mcp.tool() -async def update_profile(first_name: str = None, last_name: str = None, about: str = None) -> str: +async def update_profile( + first_name: str = None, last_name: str = None, about: str = None +) -> str: """ Update your profile information (name, bio). """ @@ -1421,7 +1473,9 @@ async def set_profile_photo(file_path: str) -> str: """ try: await client( - functions.photos.UploadProfilePhotoRequest(file=await client.upload_file(file_path)) + functions.photos.UploadProfilePhotoRequest( + file=await client.upload_file(file_path) + ) ) return "Profile photo updated." except Exception as e: @@ -1435,7 +1489,9 @@ async def delete_profile_photo() -> str: """ try: photos = await client( - functions.photos.GetUserPhotosRequest(user_id="me", offset=0, max_id=0, limit=1) + functions.photos.GetUserPhotosRequest( + user_id="me", offset=0, max_id=0, limit=1 + ) ) if not photos.photos: return "No profile photo to delete." @@ -1456,7 +1512,9 @@ async def get_privacy_settings() -> str: try: settings = await client( - functions.account.GetPrivacyRequest(key=InputPrivacyKeyStatusTimestamp()) + functions.account.GetPrivacyRequest( + key=InputPrivacyKeyStatusTimestamp() + ) ) return str(settings) except TypeError as e: @@ -1525,7 +1583,9 @@ async def set_privacy_settings( user = await client.get_entity(user_id) allow_entities.append(user) except Exception as user_err: - logger.warning(f"Could not get entity for user ID {user_id}: {user_err}") + logger.warning( + f"Could not get entity for user ID {user_id}: {user_err}" + ) if allow_entities: rules.append(InputPrivacyValueAllowUsers(users=allow_entities)) @@ -1542,13 +1602,19 @@ async def set_privacy_settings( user = await client.get_entity(user_id) disallow_entities.append(user) except Exception as user_err: - logger.warning(f"Could not get entity for user ID {user_id}: {user_err}") + logger.warning( + f"Could not get entity for user ID {user_id}: {user_err}" + ) if disallow_entities: - rules.append(InputPrivacyValueDisallowUsers(users=disallow_entities)) + rules.append( + InputPrivacyValueDisallowUsers(users=disallow_entities) + ) except Exception as disallow_err: logger.error(f"Error processing disallowed users: {disallow_err}") - return log_and_format_error("set_privacy_settings", disallow_err, key=key) + return log_and_format_error( + "set_privacy_settings", disallow_err, key=key + ) # Apply the privacy settings try: @@ -1581,7 +1647,9 @@ async def import_contacts(contacts: list) -> str: ) for i, c in enumerate(contacts) ] - result = await client(functions.contacts.ImportContactsRequest(contacts=input_contacts)) + result = await client( + functions.contacts.ImportContactsRequest(contacts=input_contacts) + ) return f"Imported {len(result.imported)} contacts." except Exception as e: return log_and_format_error("import_contacts", e, contacts=contacts) @@ -1619,7 +1687,9 @@ async def create_channel(title: str, about: str = "", megagroup: bool = False) - """ try: result = await client( - functions.channels.CreateChannelRequest(title=title, about=about, megagroup=megagroup) + functions.channels.CreateChannelRequest( + title=title, about=about, megagroup=megagroup + ) ) return f"Channel '{title}' created with ID: {result.chats[0].id}" except Exception as e: @@ -1637,9 +1707,13 @@ async def edit_chat_title(chat_id: Union[int, str], title: str) -> str: try: entity = await client.get_entity(chat_id) if isinstance(entity, Channel): - await client(functions.channels.EditTitleRequest(channel=entity, title=title)) + await client( + functions.channels.EditTitleRequest(channel=entity, title=title) + ) elif isinstance(entity, Chat): - await client(functions.messages.EditChatTitleRequest(chat_id=chat_id, title=title)) + await client( + functions.messages.EditChatTitleRequest(chat_id=chat_id, title=title) + ) else: return f"Cannot edit title for this entity type ({type(entity)})." return f"Chat {chat_id} title updated to '{title}'." @@ -1666,20 +1740,28 @@ async def edit_chat_photo(chat_id: Union[int, str], file_path: str) -> str: if isinstance(entity, Channel): # For channels/supergroups, use EditPhotoRequest with InputChatUploadedPhoto input_photo = InputChatUploadedPhoto(file=uploaded_file) - await client(functions.channels.EditPhotoRequest(channel=entity, photo=input_photo)) + await client( + functions.channels.EditPhotoRequest(channel=entity, photo=input_photo) + ) elif isinstance(entity, Chat): # For basic groups, use EditChatPhotoRequest with InputChatUploadedPhoto input_photo = InputChatUploadedPhoto(file=uploaded_file) await client( - functions.messages.EditChatPhotoRequest(chat_id=chat_id, photo=input_photo) + functions.messages.EditChatPhotoRequest( + chat_id=chat_id, photo=input_photo + ) ) else: return f"Cannot edit photo for this entity type ({type(entity)})." return f"Chat {chat_id} photo updated." except Exception as e: - logger.exception(f"edit_chat_photo failed (chat_id={chat_id}, file_path='{file_path}')") - return log_and_format_error("edit_chat_photo", e, chat_id=chat_id, file_path=file_path) + logger.exception( + f"edit_chat_photo failed (chat_id={chat_id}, file_path='{file_path}')" + ) + return log_and_format_error( + "edit_chat_photo", e, chat_id=chat_id, file_path=file_path + ) @mcp.tool() @@ -1693,7 +1775,9 @@ async def delete_chat_photo(chat_id: Union[int, str]) -> str: if isinstance(entity, Channel): # Use InputChatPhotoEmpty for channels/supergroups await client( - functions.channels.EditPhotoRequest(channel=entity, photo=InputChatPhotoEmpty()) + functions.channels.EditPhotoRequest( + channel=entity, photo=InputChatPhotoEmpty() + ) ) elif isinstance(entity, Chat): # Use None (or InputChatPhotoEmpty) for basic groups @@ -1768,14 +1852,18 @@ async def promote_admin( except telethon.errors.rpcerrorlist.UserNotMutualContactError: return "Error: Cannot promote users who are not mutual contacts. Please ensure the user is in your contacts and has added you back." except Exception as e: - return log_and_format_error("promote_admin", e, group_id=group_id, user_id=user_id) + return log_and_format_error( + "promote_admin", e, group_id=group_id, user_id=user_id + ) except Exception as e: logger.error( f"telegram_mcp promote_admin failed (group_id={group_id}, user_id={user_id})", exc_info=True, ) - return log_and_format_error("promote_admin", e, group_id=group_id, user_id=user_id) + return log_and_format_error( + "promote_admin", e, group_id=group_id, user_id=user_id + ) @mcp.tool() @@ -1817,14 +1905,18 @@ async def demote_admin(group_id: Union[int, str], user_id: Union[int, str]) -> s except telethon.errors.rpcerrorlist.UserNotMutualContactError: return "Error: Cannot modify admin status of users who are not mutual contacts. Please ensure the user is in your contacts and has added you back." except Exception as e: - return log_and_format_error("demote_admin", e, group_id=group_id, user_id=user_id) + return log_and_format_error( + "demote_admin", e, group_id=group_id, user_id=user_id + ) except Exception as e: logger.error( f"telegram_mcp demote_admin failed (group_id={group_id}, user_id={user_id})", exc_info=True, ) - return log_and_format_error("demote_admin", e, group_id=group_id, user_id=user_id) + return log_and_format_error( + "demote_admin", e, group_id=group_id, user_id=user_id + ) @mcp.tool() @@ -1915,7 +2007,9 @@ async def unban_user(chat_id: Union[int, str], user_id: Union[int, str]) -> str: except telethon.errors.rpcerrorlist.UserNotMutualContactError: return "Error: Cannot modify status of users who are not mutual contacts. Please ensure the user is in your contacts and has added you back." except Exception as e: - return log_and_format_error("unban_user", e, chat_id=chat_id, user_id=user_id) + return log_and_format_error( + "unban_user", e, chat_id=chat_id, user_id=user_id + ) except Exception as e: logger.exception(f"unban_user failed (chat_id={chat_id}, user_id={user_id})") return log_and_format_error("unban_user", e, chat_id=chat_id, user_id=user_id) @@ -1929,7 +2023,9 @@ async def get_admins(chat_id: Union[int, str]) -> str: """ try: # Fix: Use the correct filter type ChannelParticipantsAdmins - participants = await client.get_participants(chat_id, filter=ChannelParticipantsAdmins()) + participants = await client.get_participants( + chat_id, filter=ChannelParticipantsAdmins() + ) lines = [ f"ID: {p.id}, Name: {getattr(p, 'first_name', '')} {getattr(p, 'last_name', '')}".strip() for p in participants @@ -1974,11 +2070,15 @@ async def get_invite_link(chat_id: Union[int, str]) -> str: try: from telethon.tl import functions - result = await client(functions.messages.ExportChatInviteRequest(peer=entity)) + result = await client( + functions.messages.ExportChatInviteRequest(peer=entity) + ) return result.link except AttributeError: # If the function doesn't exist in the current Telethon version - logger.warning("ExportChatInviteRequest not available, using alternative method") + logger.warning( + "ExportChatInviteRequest not available, using alternative method" + ) except Exception as e1: # If that fails, log and try alternative approach logger.warning(f"ExportChatInviteRequest failed: {e1}") @@ -1993,9 +2093,15 @@ async def get_invite_link(chat_id: Union[int, str]) -> str: # Last resort: Try directly fetching chat info try: if isinstance(entity, (Chat, Channel)): - full_chat = await client(functions.messages.GetFullChatRequest(chat_id=entity.id)) - if hasattr(full_chat, "full_chat") and hasattr(full_chat.full_chat, "invite_link"): - return full_chat.full_chat.invite_link or "No invite link available." + full_chat = await client( + functions.messages.GetFullChatRequest(chat_id=entity.id) + ) + if hasattr(full_chat, "full_chat") and hasattr( + full_chat.full_chat, "invite_link" + ): + return ( + full_chat.full_chat.invite_link or "No invite link available." + ) except Exception as e3: logger.warning(f"GetFullChatRequest failed: {e3}") @@ -2021,48 +2127,36 @@ async def join_chat_by_link(link: str) -> str: # Try checking the invite before joining try: - from telethon.errors import ( - InviteHashExpiredError, - InviteHashInvalidError, - UserAlreadyParticipantError, - ChatAdminRequiredError, - UsersTooMuchError, - ) - # Try to check invite info first (will often fail if not a member) - invite_info = await client(functions.messages.CheckChatInviteRequest(hash=hash_part)) + invite_info = await client( + functions.messages.CheckChatInviteRequest(hash=hash_part) + ) if hasattr(invite_info, "chat") and invite_info.chat: # If we got chat info, we're already a member chat_title = getattr(invite_info.chat, "title", "Unknown Chat") return f"You are already a member of this chat: {chat_title}" - except Exception as check_err: + except Exception: # This often fails if not a member - just continue pass # Join the chat using the hash - try: - result = await client(functions.messages.ImportChatInviteRequest(hash=hash_part)) - if result and hasattr(result, "chats") and result.chats: - chat_title = getattr(result.chats[0], "title", "Unknown Chat") - return f"Successfully joined chat: {chat_title}" - return f"Joined chat via invite hash." - except Exception as join_err: - err_str = str(join_err).lower() - if "expired" in err_str: - return "The invite hash has expired and is no longer valid." - elif "invalid" in err_str: - return "The invite hash is invalid or malformed." - elif "already" in err_str and "participant" in err_str: - return "You are already a member of this chat." - elif "admin" in err_str: - return "Cannot join this chat - requires admin approval." - elif "too much" in err_str or "too many" in err_str: - return "Cannot join this chat - it has reached maximum number of participants." - else: - raise # Re-raise to be caught by the outer exception handler + result = await client( + functions.messages.ImportChatInviteRequest(hash=hash_part) + ) + if result and hasattr(result, "chats") and result.chats: + chat_title = getattr(result.chats[0], "title", "Unknown Chat") + return f"Successfully joined chat: {chat_title}" + return f"Joined chat via invite hash." except Exception as e: + err_str = str(e).lower() + if "expired" in err_str: + return "The invite hash has expired and is no longer valid." + elif "invalid" in err_str: + return "The invite hash is invalid or malformed." + elif "already" in err_str and "participant" in err_str: + return "You are already a member of this chat." logger.exception(f"join_chat_by_link failed (link={link})") - return log_and_format_error("join_chat_by_link", e, link=link) + return f"Error joining chat: {e}" @mcp.tool() @@ -2078,11 +2172,15 @@ async def export_chat_invite(chat_id: Union[int, str]) -> str: try: from telethon.tl import functions - result = await client(functions.messages.ExportChatInviteRequest(peer=entity)) + result = await client( + functions.messages.ExportChatInviteRequest(peer=entity) + ) return result.link except AttributeError: # If the function doesn't exist in the current Telethon version - logger.warning("ExportChatInviteRequest not available, using alternative method") + logger.warning( + "ExportChatInviteRequest not available, using alternative method" + ) except Exception as e1: # If that fails, log and try alternative approach logger.warning(f"ExportChatInviteRequest failed: {e1}") @@ -2094,6 +2192,7 @@ async def export_chat_invite(chat_id: Union[int, str]) -> str: except Exception as e2: logger.warning(f"export_chat_invite_link failed: {e2}") return log_and_format_error("export_chat_invite", e2, chat_id=chat_id) + except Exception as e: logger.exception(f"export_chat_invite failed (chat_id={chat_id})") return log_and_format_error("export_chat_invite", e, chat_id=chat_id) @@ -2120,7 +2219,9 @@ async def import_chat_invite(hash: str) -> str: ) # Try to check invite info first (will often fail if not a member) - invite_info = await client(functions.messages.CheckChatInviteRequest(hash=hash)) + invite_info = await client( + functions.messages.CheckChatInviteRequest(hash=hash) + ) if hasattr(invite_info, "chat") and invite_info.chat: # If we got chat info, we're already a member chat_title = getattr(invite_info.chat, "title", "Unknown Chat") @@ -2150,6 +2251,7 @@ async def import_chat_invite(hash: str) -> str: return "Cannot join this chat - it has reached maximum number of participants." else: raise # Re-raise to be caught by the outer exception handler + except Exception as e: logger.exception(f"import_chat_invite failed (hash={hash})") return log_and_format_error("import_chat_invite", e, hash=hash) @@ -2160,6 +2262,7 @@ async def import_chat_invite(hash: str) -> str: async def send_voice(chat_id: Union[int, str], file_path: str) -> str: """ Send a voice message to a chat. File must be an OGG/OPUS voice note. + Args: chat_id: The chat ID or username. file_path: Absolute path to the OGG/OPUS file. @@ -2169,6 +2272,7 @@ async def send_voice(chat_id: Union[int, str], file_path: str) -> str: return f"File not found: {file_path}" if not os.access(file_path, os.R_OK): return f"File is not readable: {file_path}" + mime, _ = mimetypes.guess_type(file_path) if not ( mime @@ -2179,11 +2283,14 @@ async def send_voice(chat_id: Union[int, str], file_path: str) -> str: ) ): return "Voice file must be .ogg or .opus format." + entity = await client.get_entity(chat_id) await client.send_file(entity, file_path, voice_note=True) return f"Voice message sent to chat {chat_id}." except Exception as e: - return log_and_format_error("send_voice", e, chat_id=chat_id, file_path=file_path) + return log_and_format_error( + "send_voice", e, chat_id=chat_id, file_path=file_path + ) @mcp.tool() @@ -2236,7 +2343,9 @@ async def delete_message(chat_id: Union[int, str], message_id: int) -> str: await client.delete_messages(entity, message_id) return f"Message {message_id} deleted." except Exception as e: - return log_and_format_error("delete_message", e, chat_id=chat_id, message_id=message_id) + return log_and_format_error( + "delete_message", e, chat_id=chat_id, message_id=message_id + ) @mcp.tool() @@ -2250,7 +2359,9 @@ async def pin_message(chat_id: Union[int, str], message_id: int) -> str: await client.pin_message(entity, message_id) return f"Message {message_id} pinned in chat {chat_id}." except Exception as e: - return log_and_format_error("pin_message", e, chat_id=chat_id, message_id=message_id) + return log_and_format_error( + "pin_message", e, chat_id=chat_id, message_id=message_id + ) @mcp.tool() @@ -2264,7 +2375,9 @@ async def unpin_message(chat_id: Union[int, str], message_id: int) -> str: await client.unpin_message(entity, message_id) return f"Message {message_id} unpinned in chat {chat_id}." except Exception as e: - return log_and_format_error("unpin_message", e, chat_id=chat_id, message_id=message_id) + return log_and_format_error( + "unpin_message", e, chat_id=chat_id, message_id=message_id + ) @mcp.tool() @@ -2302,6 +2415,7 @@ async def reply_to_message(chat_id: Union[int, str], message_id: int, text: str) async def get_media_info(chat_id: Union[int, str], message_id: int) -> str: """ Get info about media in a message. + Args: chat_id: The chat ID or username. message_id: The message ID. @@ -2309,11 +2423,15 @@ async def get_media_info(chat_id: Union[int, str], message_id: int) -> str: try: entity = await client.get_entity(chat_id) msg = await client.get_messages(entity, ids=message_id) + if not msg or not msg.media: return "No media found in the specified message." + return str(msg.media) except Exception as e: - return log_and_format_error("get_media_info", e, chat_id=chat_id, message_id=message_id) + return log_and_format_error( + "get_media_info", e, chat_id=chat_id, message_id=message_id + ) @mcp.tool() @@ -2337,6 +2455,7 @@ async def search_messages(chat_id: Union[int, str], query: str, limit: int = 20) try: entity = await client.get_entity(chat_id) messages = await client.get_messages(entity, limit=limit, search=query) + lines = [] for msg in messages: sender_name = get_sender_name(msg) @@ -2359,7 +2478,9 @@ async def resolve_username(username: str) -> str: Resolve a username to a user or chat ID. """ try: - result = await client(functions.contacts.ResolveUsernameRequest(username=username)) + result = await client( + functions.contacts.ResolveUsernameRequest(username=username) + ) return str(result) except Exception as e: return log_and_format_error("resolve_username", e, username=username) @@ -2469,7 +2590,7 @@ async def unarchive_chat(chat_id: Union[int, str]) -> str: try: await client( functions.messages.ToggleDialogPinRequest( - peer=await client.get_entity(chat_id), pinned=False + peer=await client.get_entity(_id), pinned=False ) ) return f"Chat {chat_id} unarchived." @@ -2494,6 +2615,7 @@ async def get_sticker_sets() -> str: async def send_sticker(chat_id: Union[int, str], file_path: str) -> str: """ Send a sticker to a chat. File must be a valid .webp sticker file. + Args: chat_id: The chat ID or username. file_path: Absolute path to the .webp sticker file. @@ -2505,17 +2627,21 @@ async def send_sticker(chat_id: Union[int, str], file_path: str) -> str: return f"Sticker file is not readable: {file_path}" if not file_path.lower().endswith(".webp"): return "Sticker file must be a .webp file." + entity = await client.get_entity(chat_id) await client.send_file(entity, file_path, force_document=False) return f"Sticker sent to chat {chat_id}." except Exception as e: - return log_and_format_error("send_sticker", e, chat_id=chat_id, file_path=file_path) + return log_and_format_error( + "send_sticker", e, chat_id=chat_id, file_path=file_path + ) @mcp.tool() async def get_gif_search(query: str, limit: int = 10) -> str: """ Search for GIFs by query. Returns a list of Telegram document IDs (not file paths). + Args: query: Search term for GIFs. limit: Max number of GIFs to return. @@ -2556,7 +2682,11 @@ async def get_gif_search(query: str, limit: int = 10) -> str: # Extract document IDs from any messages with media gif_ids = [] for msg in result.messages: - if hasattr(msg, "media") and msg.media and hasattr(msg.media, "document"): + if ( + hasattr(msg, "media") + and msg.media + and hasattr(msg.media, "document") + ): gif_ids.append(msg.media.document.id) return json.dumps(gif_ids, default=json_serializer) except Exception as inner_e: @@ -2572,6 +2702,7 @@ async def get_gif_search(query: str, limit: int = 10) -> str: async def send_gif(chat_id: Union[int, str], gif_id: int) -> str: """ Send a GIF to a chat by Telegram GIF document ID (not a file path). + Args: chat_id: The chat ID or username. gif_id: Telegram document ID for the GIF (from get_gif_search). @@ -2616,7 +2747,6 @@ async def get_bot_info(bot_username: str) -> str: } if hasattr(result, "full_user") and hasattr(result.full_user, "about"): info["bot_info"]["about"] = result.full_user.about - return json.dumps(info, indent=2) except Exception as e: logger.exception(f"get_bot_info failed (bot_username={bot_username})") @@ -2646,7 +2776,8 @@ async def set_bot_commands(bot_username: str, commands: list) -> str: # Create BotCommand objects from the command dictionaries bot_commands = [ - BotCommand(command=c["command"], description=c["description"]) for c in commands + BotCommand(command=c["command"], description=c["description"]) + for c in commands ] # Get the bot entity @@ -2679,6 +2810,7 @@ async def get_history(chat_id: Union[int, str], limit: int = 100) -> str: try: entity = await client.get_entity(chat_id) messages = await client.get_messages(entity, limit=limit) + lines = [] for msg in messages: sender_name = get_sender_name(msg) @@ -2702,7 +2834,9 @@ async def get_user_photos(user_id: Union[int, str], limit: int = 10) -> str: try: user = await client.get_entity(user_id) photos = await client( - functions.photos.GetUserPhotosRequest(user_id=user, offset=0, max_id=0, limit=limit) + functions.photos.GetUserPhotosRequest( + user_id=user, offset=0, max_id=0, limit=limit + ) ) return json.dumps([p.id for p in photos.photos], indent=2) except Exception as e: @@ -2731,7 +2865,13 @@ async def get_recent_actions(chat_id: Union[int, str]) -> str: try: result = await client( functions.channels.GetAdminLogRequest( - channel=chat_id, q="", events_filter=None, admins=[], max_id=0, min_id=0, limit=20 + channel=chat_id, + q="", + events_filter=None, + admins=[], + max_id=0, + min_id=0, + limit=20, ) ) @@ -2739,7 +2879,9 @@ async def get_recent_actions(chat_id: Union[int, str]) -> str: return "No recent admin actions found." # Use the custom serializer to handle datetime objects - return json.dumps([e.to_dict() for e in result.events], indent=2, default=json_serializer) + return json.dumps( + [e.to_dict() for e in result.events], indent=2, default=json_serializer + ) except Exception as e: logger.exception(f"get_recent_actions failed (chat_id={chat_id})") return log_and_format_error("get_recent_actions", e, chat_id=chat_id) @@ -2753,12 +2895,15 @@ async def get_pinned_messages(chat_id: Union[int, str]) -> str: """ try: entity = await client.get_entity(chat_id) + # Use correct filter based on Telethon version try: # Try newer Telethon approach from telethon.tl.types import InputMessagesFilterPinned - messages = await client.get_messages(entity, filter=InputMessagesFilterPinned()) + messages = await client.get_messages( + entity, filter=InputMessagesFilterPinned() + ) except (ImportError, AttributeError): # Fallback - try without filter and manually filter pinned all_messages = await client.get_messages(entity, limit=50) @@ -2818,7 +2963,9 @@ async def create_poll( close_date_obj = None if close_date: try: - close_date_obj = datetime.fromisoformat(close_date.replace("Z", "+00:00")) + close_date_obj = datetime.fromisoformat( + close_date.replace("Z", "+00:00") + ) except ValueError: return f"Invalid close_date format. Use YYYY-MM-DD HH:MM:SS format." @@ -2830,7 +2977,9 @@ async def create_poll( id=random.randint(0, 2**63 - 1), question=TextWithEntities(text=question, entities=[]), answers=[ - PollAnswer(text=TextWithEntities(text=option, entities=[]), option=bytes([i])) + PollAnswer( + text=TextWithEntities(text=option, entities=[]), option=bytes([i]) + ) for i, option in enumerate(options) ], multiple_choice=multiple_choice, @@ -2850,7 +2999,9 @@ async def create_poll( return f"Poll created successfully in chat {chat_id}." except Exception as e: - logger.exception(f"create_poll failed (chat_id={chat_id}, question='{question}')") + logger.exception( + f"create_poll failed (chat_id={chat_id}, question='{question}')" + ) return log_and_format_error( "create_poll", e, chat_id=chat_id, question=question, options=options ) @@ -2870,7 +3021,9 @@ if __name__ == "__main__": await mcp.run_stdio_async() except Exception as e: print(f"Error starting client: {e}", file=sys.stderr) - if isinstance(e, sqlite3.OperationalError) and "database is locked" in str(e): + if isinstance(e, sqlite3.OperationalError) and "database is locked" in str( + e + ): print( "Database lock detected. Please ensure no other instances are running.", file=sys.stderr,