diff --git a/main.py b/main.py index c7899df..de46be0 100644 --- a/main.py +++ b/main.py @@ -16,23 +16,33 @@ from mcp.server.fastmcp import FastMCP from telethon import TelegramClient, functions, utils from telethon.sessions import StringSession from telethon.tl.types import ( - User, Chat, Channel, - ChatAdminRights, ChatBannedRights, - ChannelParticipantsKicked, ChannelParticipantsAdmins, - InputChatPhoto, InputChatUploadedPhoto, InputChatPhotoEmpty, - InputPeerUser, InputPeerChat, InputPeerChannel + User, + Chat, + Channel, + ChatAdminRights, + ChatBannedRights, + ChannelParticipantsKicked, + ChannelParticipantsAdmins, + InputChatPhoto, + InputChatUploadedPhoto, + InputChatPhotoEmpty, + InputPeerUser, + InputPeerChat, + InputPeerChannel, ) import telethon.errors.rpcerrorlist + def json_serializer(obj): """Helper function to convert non-serializable objects for JSON serialization.""" if isinstance(obj, datetime): return obj.isoformat() if isinstance(obj, bytes): - return obj.decode('utf-8', errors='replace') + return obj.decode("utf-8", errors="replace") # Add other non-serializable types as needed raise TypeError(f"Object of type {type(obj)} is not JSON serializable") + load_dotenv() TELEGRAM_API_ID = int(os.getenv("TELEGRAM_API_ID")) @@ -64,18 +74,20 @@ script_dir = os.path.dirname(os.path.abspath(__file__)) log_file_path = os.path.join(script_dir, "mcp_errors.log") try: - file_handler = logging.FileHandler(log_file_path, mode='a') # Append mode + file_handler = logging.FileHandler(log_file_path, mode="a") # Append mode file_handler.setLevel(logging.ERROR) - + # Create formatter and add to handlers - formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(name)s - %(message)s - %(filename)s:%(lineno)d') + formatter = logging.Formatter( + "%(asctime)s [%(levelname)s] %(name)s - %(message)s - %(filename)s:%(lineno)d" + ) console_handler.setFormatter(formatter) file_handler.setFormatter(formatter) - + # Add handlers to logger logger.addHandler(console_handler) logger.addHandler(file_handler) - + logger.info(f"Logging initialized to {log_file_path}") except Exception as log_error: print(f"WARNING: Error setting up log file: {log_error}") @@ -92,19 +104,22 @@ ERROR_PREFIXES = { "media": "MEDIA", "profile": "PROFILE", "auth": "AUTH", - "admin": "ADMIN" + "admin": "ADMIN", } -def log_and_format_error(function_name: str, error: Exception, prefix: str = None, **kwargs) -> str: + +def log_and_format_error( + function_name: str, error: Exception, prefix: str = None, **kwargs +) -> str: """ Centralized error handling function that logs the error and returns a formatted user-friendly message. - + Args: function_name: Name of the function where error occurred error: The exception that was raised prefix: Error code prefix (e.g., "CHAT", "MSG") - if None, will be derived from function_name **kwargs: Additional context parameters to include in log - + Returns: A user-friendly error message with error code """ @@ -117,22 +132,23 @@ def log_and_format_error(function_name: str, error: Exception, prefix: str = Non break if prefix is None: prefix = "GEN" # Generic prefix if none matches - + error_code = f"{prefix}-ERR-{abs(hash(function_name)) % 1000:03d}" - + # Format the additional context parameters context = ", ".join(f"{k}={v}" for k, v in kwargs.items()) - + # Log the full technical error logger.exception(f"{function_name} failed ({context}): {error}") - + # Return a user-friendly message return f"An error occurred (code: {error_code}). Check mcp_errors.log for details." + def format_entity(entity) -> Dict[str, Any]: """Helper function to format entity information consistently.""" result = {"id": entity.id} - + if hasattr(entity, "title"): result["name"] = entity.title result["type"] = "group" if isinstance(entity, Chat) else "channel" @@ -148,7 +164,7 @@ def format_entity(entity) -> Dict[str, Any]: result["username"] = entity.username if hasattr(entity, "phone") and entity.phone: result["phone"] = entity.phone - + return result @@ -159,14 +175,14 @@ def format_message(message) -> Dict[str, Any]: "date": message.date.isoformat(), "text": message.message or "", } - + if message.from_id: result["from_id"] = utils.get_peer_id(message.from_id) - + if message.media: result["has_media"] = True result["media_type"] = type(message.media).__name__ - + return result @@ -216,7 +232,9 @@ async def get_messages(chat_id: int, page: int = 1, page_size: int = 20) -> str: lines.append(f"ID: {msg.id} | Date: {msg.date} | Message: {msg.message}") return "\n".join(lines) except Exception as e: - return log_and_format_error("get_messages", e, chat_id=chat_id, page=page, page_size=page_size) + return log_and_format_error( + "get_messages", e, chat_id=chat_id, page=page, page_size=page_size + ) @mcp.tool() @@ -248,8 +266,8 @@ async def list_contacts() -> str: lines = [] for user in users: name = f"{getattr(user, 'first_name', '')} {getattr(user, 'last_name', '')}".strip() - username = getattr(user, 'username', '') - phone = getattr(user, 'phone', '') + username = getattr(user, "username", "") + phone = getattr(user, "phone", "") contact_info = f"ID: {user.id}, Name: {name}" if username: contact_info += f", Username: @{username}" @@ -276,8 +294,8 @@ async def search_contacts(query: str) -> str: lines = [] for user in users: name = f"{getattr(user, 'first_name', '')} {getattr(user, 'last_name', '')}".strip() - username = getattr(user, 'username', '') - phone = getattr(user, 'phone', '') + username = getattr(user, "username", "") + phone = getattr(user, "phone", "") contact_info = f"ID: {user.id}, Name: {name}" if username: contact_info += f", Username: @{username}" @@ -304,11 +322,16 @@ async def get_contact_ids() -> str: @mcp.tool() -async def list_messages(chat_id: int, limit: int = 20, search_query: str = None, - from_date: str = None, to_date: str = None) -> str: +async def list_messages( + chat_id: int, + limit: int = 20, + search_query: str = None, + from_date: str = None, + to_date: str = None, +) -> str: """ Retrieve messages with optional filters. - + Args: chat_id: The ID of the chat to get messages from. limit: Maximum number of messages to retrieve. @@ -318,11 +341,11 @@ async def list_messages(chat_id: int, limit: int = 20, search_query: str = None, """ try: entity = await client.get_entity(chat_id) - + # Parse date filters if provided from_date_obj = None to_date_obj = None - + if from_date: try: from_date_obj = datetime.strptime(from_date, "%Y-%m-%d") @@ -334,10 +357,11 @@ async def list_messages(chat_id: int, limit: int = 20, search_query: str = None, except AttributeError: # For Python 3.13+ from datetime import timezone + from_date_obj = from_date_obj.replace(tzinfo=timezone.utc) except ValueError: return f"Invalid from_date format. Use YYYY-MM-DD." - + if to_date: try: to_date_obj = datetime.strptime(to_date, "%Y-%m-%d") @@ -348,17 +372,18 @@ async def list_messages(chat_id: int, limit: int = 20, search_query: str = None, to_date_obj = to_date_obj.replace(tzinfo=datetime.timezone.utc) except AttributeError: from datetime import timezone + to_date_obj = to_date_obj.replace(tzinfo=timezone.utc) except ValueError: return f"Invalid to_date format. Use YYYY-MM-DD." - + # Prepare filter parameters params = {} if search_query: - params['search'] = search_query - + params["search"] = search_query + messages = await client.get_messages(entity, limit=limit, **params) - + # Apply date filters (Telethon doesn't support date filtering in get_messages directly) if from_date_obj or to_date_obj: filtered_messages = [] @@ -369,19 +394,23 @@ async def list_messages(chat_id: int, limit: int = 20, search_query: str = None, continue filtered_messages.append(msg) messages = filtered_messages - + if not messages: return "No messages found matching the criteria." - + lines = [] for msg in messages: sender = "" if msg.sender: - sender_name = getattr(msg.sender, 'first_name', '') or getattr(msg.sender, 'title', 'Unknown') + sender_name = getattr(msg.sender, "first_name", "") or getattr( + msg.sender, "title", "Unknown" + ) sender = f"{sender_name} | " - - lines.append(f"ID: {msg.id} | {sender}Date: {msg.date} | Message: {msg.message or '[Media/No text]'}") - + + lines.append( + f"ID: {msg.id} | {sender}Date: {msg.date} | Message: {msg.message or '[Media/No text]'}" + ) + return "\n".join(lines) except Exception as e: return log_and_format_error("list_messages", e, chat_id=chat_id) @@ -391,18 +420,18 @@ async def list_messages(chat_id: int, limit: int = 20, search_query: str = None, async def list_chats(chat_type: str = None, limit: int = 20) -> str: """ List available chats with metadata. - + Args: chat_type: Filter by chat type ('user', 'group', 'channel', or None for all) limit: Maximum number of chats to retrieve. """ try: dialogs = await client.get_dialogs(limit=limit) - + results = [] for dialog in dialogs: entity = dialog.entity - + # Filter by type if requested current_type = None if isinstance(entity, User): @@ -410,39 +439,39 @@ async def list_chats(chat_type: str = None, limit: int = 20) -> str: elif isinstance(entity, Chat): current_type = "group" elif isinstance(entity, Channel): - if getattr(entity, 'broadcast', False): + if getattr(entity, "broadcast", False): current_type = "channel" else: current_type = "group" # Supergroup - + if chat_type and current_type != chat_type.lower(): continue - + # Format chat info chat_info = f"Chat ID: {entity.id}" - - if hasattr(entity, 'title'): + + if hasattr(entity, "title"): chat_info += f", Title: {entity.title}" - elif hasattr(entity, 'first_name'): + elif hasattr(entity, "first_name"): name = f"{entity.first_name}" - if hasattr(entity, 'last_name') and entity.last_name: + if hasattr(entity, "last_name") and entity.last_name: name += f" {entity.last_name}" chat_info += f", Name: {name}" - + chat_info += f", Type: {current_type}" - - if hasattr(entity, 'username') and entity.username: + + if hasattr(entity, "username") and entity.username: chat_info += f", Username: @{entity.username}" - + # Add unread count if available - if hasattr(dialog, 'unread_count') and dialog.unread_count > 0: + if hasattr(dialog, "unread_count") and dialog.unread_count > 0: chat_info += f", Unread: {dialog.unread_count}" - + results.append(chat_info) - + if not results: return f"No chats found matching the criteria." - + return "\n".join(results) except Exception as e: return log_and_format_error("list_chats", e, chat_type=chat_type, limit=limit) @@ -452,37 +481,39 @@ async def list_chats(chat_type: str = None, limit: int = 20) -> str: async def get_chat(chat_id: int) -> str: """ Get detailed information about a specific chat. - + Args: chat_id: The ID of the chat. """ try: entity = await client.get_entity(chat_id) - + result = [] result.append(f"ID: {entity.id}") - + is_channel = isinstance(entity, Channel) is_chat = isinstance(entity, Chat) is_user = isinstance(entity, User) - if hasattr(entity, 'title'): + if hasattr(entity, "title"): result.append(f"Title: {entity.title}") - chat_type = "Channel" if is_channel and getattr(entity, 'broadcast', False) else "Group" - if is_channel and getattr(entity, 'megagroup', False): + chat_type = ( + "Channel" if is_channel and getattr(entity, "broadcast", False) else "Group" + ) + if is_channel and getattr(entity, "megagroup", False): chat_type = "Supergroup" elif is_chat: chat_type = "Group (Basic)" result.append(f"Type: {chat_type}") - if hasattr(entity, 'username') and entity.username: + if hasattr(entity, "username") and entity.username: result.append(f"Username: @{entity.username}") - + # Fetch participants count reliably try: - participants_count = (await client.get_participants(entity, limit=0)).total - result.append(f"Participants: {participants_count}") + participants_count = (await client.get_participants(entity, limit=0)).total + result.append(f"Participants: {participants_count}") except Exception as pe: - result.append(f"Participants: Error fetching ({pe})") + result.append(f"Participants: Error fetching ({pe})") elif is_user: name = f"{entity.first_name}" @@ -496,7 +527,7 @@ async def get_chat(chat_id: int) -> str: result.append(f"Phone: {entity.phone}") result.append(f"Bot: {'Yes' if entity.bot else 'No'}") result.append(f"Verified: {'Yes' if entity.verified else 'No'}") - + # Get last activity if it's a dialog try: # Using get_dialogs might be slow if there are many dialogs @@ -509,16 +540,18 @@ async def get_chat(chat_id: int) -> str: last_msg = dialog.message sender_name = "Unknown" if last_msg.sender: - sender_name = getattr(last_msg.sender, 'first_name', '') or getattr(last_msg.sender, 'title', 'Unknown') - if hasattr(last_msg.sender, 'last_name') and last_msg.sender.last_name: - sender_name += f" {last_msg.sender.last_name}" + sender_name = getattr(last_msg.sender, "first_name", "") or getattr( + last_msg.sender, "title", "Unknown" + ) + if hasattr(last_msg.sender, "last_name") and last_msg.sender.last_name: + sender_name += f" {last_msg.sender.last_name}" sender_name = sender_name.strip() or "Unknown" result.append(f"Last Message: From {sender_name} at {last_msg.date}") result.append(f"Message: {last_msg.message or '[Media/No text]'}") except Exception as diag_ex: logger.warning(f"Could not get dialog info for {chat_id}: {diag_ex}") pass - + return "\n".join(result) except Exception as e: return log_and_format_error("get_chat", e, chat_id=chat_id) @@ -528,7 +561,7 @@ async def get_chat(chat_id: int) -> str: async def get_direct_chat_by_contact(contact_query: str) -> str: """ Find a direct chat with a specific contact by name, username, or phone. - + Args: contact_query: Name, username, or phone number to search for. """ @@ -540,12 +573,16 @@ async def get_direct_chat_by_contact(contact_query: str) -> str: for contact in contacts: if not contact: continue - name = f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip() - username = getattr(contact, 'username', '') - phone = getattr(contact, 'phone', '') - if (contact_query.lower() in name.lower() or - (username and contact_query.lower() in username.lower()) or - (phone and contact_query in phone)): + name = ( + f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip() + ) + username = getattr(contact, "username", "") + phone = getattr(contact, "phone", "") + if ( + contact_query.lower() in name.lower() + or (username and contact_query.lower() in username.lower()) + or (phone and contact_query in phone) + ): found_contacts.append(contact) if not found_contacts: return f"No contacts found matching '{contact_query}'." @@ -553,18 +590,22 @@ async def get_direct_chat_by_contact(contact_query: str) -> str: results = [] dialogs = await client.get_dialogs() for contact in found_contacts: - contact_name = f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip() + contact_name = ( + f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip() + ) for dialog in dialogs: if isinstance(dialog.entity, User) and dialog.entity.id == contact.id: chat_info = f"Chat ID: {dialog.entity.id}, Contact: {contact_name}" - if getattr(contact, 'username', ''): + if getattr(contact, "username", ""): chat_info += f", Username: @{contact.username}" if dialog.unread_count: chat_info += f", Unread: {dialog.unread_count}" results.append(chat_info) break if not results: - found_names = ", ".join([f"{c.first_name} {c.last_name}".strip() for c in found_contacts]) + found_names = ", ".join( + [f"{c.first_name} {c.last_name}".strip() for c in found_contacts] + ) return f"Found contacts: {found_names}, but no direct chats were found with them." return "\n".join(results) except Exception as e: @@ -575,7 +616,7 @@ async def get_direct_chat_by_contact(contact_query: str) -> str: async def get_contact_chats(contact_id: int) -> str: """ List all chats involving a specific contact. - + Args: contact_id: The ID of the contact. """ @@ -584,15 +625,17 @@ async def get_contact_chats(contact_id: int) -> str: contact = await client.get_entity(contact_id) if not isinstance(contact, User): return f"ID {contact_id} is not a user/contact." - - contact_name = f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip() - + + contact_name = ( + f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip() + ) + # Find direct chat direct_chat = None dialogs = await client.get_dialogs() - + results = [] - + # Look for direct chat for dialog in dialogs: if isinstance(dialog.entity, User) and dialog.entity.id == contact_id: @@ -601,21 +644,21 @@ async def get_contact_chats(contact_id: int) -> str: chat_info += f", Unread: {dialog.unread_count}" results.append(chat_info) break - + # Look for common groups/channels common_chats = [] try: common = await client.get_common_chats(contact) for chat in common: - chat_type = "Channel" if getattr(chat, 'broadcast', False) else "Group" + chat_type = "Channel" if getattr(chat, "broadcast", False) else "Group" chat_info = f"Chat ID: {chat.id}, Title: {chat.title}, Type: {chat_type}" results.append(chat_info) except: results.append("Could not retrieve common groups.") - + if not results: return f"No chats found with {contact_name} (ID: {contact_id})." - + return f"Chats with {contact_name} (ID: {contact_id}):\n" + "\n".join(results) except Exception as e: return log_and_format_error("get_contact_chats", e, contact_id=contact_id) @@ -625,7 +668,7 @@ async def get_contact_chats(contact_id: int) -> str: async def get_last_interaction(contact_id: int) -> str: """ Get the most recent message with a contact. - + Args: contact_id: The ID of the contact. """ @@ -634,22 +677,24 @@ async def get_last_interaction(contact_id: int) -> str: contact = await client.get_entity(contact_id) if not isinstance(contact, User): return f"ID {contact_id} is not a user/contact." - - contact_name = f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip() - + + contact_name = ( + f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip() + ) + # Get the last few messages messages = await client.get_messages(contact, limit=5) - + if not messages: return f"No messages found with {contact_name} (ID: {contact_id})." - + results = [f"Last interactions with {contact_name} (ID: {contact_id}):"] - + for msg in messages: sender = "You" if msg.out else contact_name message_text = msg.message or "[Media/No text]" results.append(f"Date: {msg.date}, From: {sender}, Message: {message_text}") - + return "\n".join(results) except Exception as e: return log_and_format_error("get_last_interaction", e, contact_id=contact_id) @@ -659,7 +704,7 @@ async def get_last_interaction(contact_id: int) -> str: async def get_message_context(chat_id: int, message_id: int, context_size: int = 3) -> str: """ Retrieve context around a specific message. - + Args: chat_id: The ID of the chat. message_id: The ID of the central message. @@ -668,25 +713,15 @@ async def get_message_context(chat_id: int, message_id: int, context_size: int = try: chat = await client.get_entity(chat_id) # Get messages around the specified message - messages_before = await client.get_messages( - chat, - limit=context_size, - max_id=message_id - ) - central_message = await client.get_messages( - chat, - ids=message_id - ) + messages_before = await client.get_messages(chat, limit=context_size, max_id=message_id) + central_message = await client.get_messages(chat, ids=message_id) # Fix: get_messages(ids=...) returns a single Message, not a list if central_message is not None and not isinstance(central_message, list): central_message = [central_message] elif central_message is None: central_message = [] messages_after = await client.get_messages( - chat, - limit=context_size, - min_id=message_id, - reverse=True + chat, limit=context_size, min_id=message_id, reverse=True ) if not central_message: return f"Message with ID {message_id} not found in chat {chat_id}." @@ -697,12 +732,22 @@ async def get_message_context(chat_id: int, message_id: int, context_size: int = for msg in all_messages: sender_name = "Unknown" if msg.sender: - sender_name = getattr(msg.sender, 'first_name', '') or getattr(msg.sender, 'title', 'Unknown') + sender_name = getattr(msg.sender, "first_name", "") or getattr( + msg.sender, "title", "Unknown" + ) highlight = " [THIS MESSAGE]" if msg.id == message_id else "" - results.append(f"ID: {msg.id} | {sender_name} | {msg.date}{highlight}\n{msg.message or '[Media/No text]'}\n") + results.append( + f"ID: {msg.id} | {sender_name} | {msg.date}{highlight}\n{msg.message or '[Media/No text]'}\n" + ) return "\n".join(results) except Exception as e: - return log_and_format_error("get_message_context", e, chat_id=chat_id, message_id=message_id, context_size=context_size) + return log_and_format_error( + "get_message_context", + e, + chat_id=chat_id, + message_id=message_id, + context_size=context_size, + ) @mcp.tool() @@ -717,17 +762,16 @@ async def add_contact(phone: str, first_name: str, last_name: str = "") -> str: try: # Try to import the required types first from telethon.tl.types import InputPhoneContact - - result = await client(functions.contacts.ImportContactsRequest( - contacts=[ - InputPhoneContact( - client_id=0, - phone=phone, - first_name=first_name, - last_name=last_name - ) - ] - )) + + result = await client( + functions.contacts.ImportContactsRequest( + contacts=[ + InputPhoneContact( + client_id=0, phone=phone, first_name=first_name, last_name=last_name + ) + ] + ) + ) if result.imported: return f"Contact {first_name} {last_name} added successfully." else: @@ -735,15 +779,19 @@ async def add_contact(phone: str, first_name: str, last_name: str = "") -> str: except (ImportError, AttributeError) as type_err: # Try alternative approach using raw API try: - result = await client(functions.contacts.ImportContactsRequest( - contacts=[{ - 'client_id': 0, - 'phone': phone, - 'first_name': first_name, - 'last_name': last_name - }] - )) - if hasattr(result, 'imported') and result.imported: + result = await client( + functions.contacts.ImportContactsRequest( + contacts=[ + { + "client_id": 0, + "phone": phone, + "first_name": first_name, + "last_name": last_name, + } + ] + ) + ) + if hasattr(result, "imported") and result.imported: return f"Contact {first_name} {last_name} added successfully (alt method)." else: return f"Contact not added. Alternative method response: {str(result)}" @@ -816,7 +864,7 @@ async def get_me() -> str: async def create_group(title: str, user_ids: list) -> str: """ Create a new group or supergroup and add users. - + Args: title: Title for the new group user_ids: List of user IDs to add to the group @@ -831,25 +879,22 @@ async def create_group(title: str, user_ids: list) -> str: except Exception as e: logger.error(f"Failed to get entity for user ID {user_id}: {e}") return f"Error: Could not find user with ID {user_id}" - + if not users: return "Error: No valid users provided" - + # Create the group with the users try: # Create a new chat with selected users - result = await client(functions.messages.CreateChatRequest( - users=users, - title=title - )) - + result = await client(functions.messages.CreateChatRequest(users=users, title=title)) + # Check what type of response we got - if hasattr(result, 'chats') and result.chats: + if hasattr(result, "chats") and result.chats: created_chat = result.chats[0] return f"Group created with ID: {created_chat.id}" - elif hasattr(result, 'chat') and result.chat: + elif hasattr(result, "chat") and result.chat: return f"Group created with ID: {result.chat.id}" - elif hasattr(result, 'chat_id'): + elif hasattr(result, "chat_id"): return f"Group created with ID: {result.chat_id}" else: # If we can't determine the chat ID directly from the result @@ -859,10 +904,10 @@ async def create_group(title: str, user_ids: list) -> str: for dialog in dialogs: if dialog.title == title: return f"Group created with ID: {dialog.id}" - + # If we still can't find it, at least return success return f"Group created successfully. Please check your recent chats for '{title}'." - + except Exception as create_err: if "PEER_FLOOD" in str(create_err): return "Error: Cannot create group due to Telegram limits. Try again later." @@ -877,7 +922,7 @@ async def create_group(title: str, user_ids: list) -> str: async def invite_to_group(group_id: int, user_ids: list) -> str: """ Invite users to a group or channel. - + Args: group_id: The ID of the group/channel. user_ids: List of user IDs to invite. @@ -885,36 +930,40 @@ async def invite_to_group(group_id: int, user_ids: list) -> str: try: entity = await client.get_entity(group_id) users_to_add = [] - + for user_id in user_ids: try: user = await client.get_entity(user_id) users_to_add.append(user) except ValueError as e: return f"Error: User with ID {user_id} could not be found. {e}" - + try: - result = await client(functions.channels.InviteToChannelRequest( - channel=entity, - users=users_to_add - )) - + result = await client( + functions.channels.InviteToChannelRequest(channel=entity, users=users_to_add) + ) + invited_count = 0 - if hasattr(result, 'users') and result.users: + if hasattr(result, "users") and result.users: invited_count = len(result.users) - elif hasattr(result, 'count'): + elif hasattr(result, "count"): invited_count = result.count - + return f"Successfully invited {invited_count} users to {entity.title}" except telethon.errors.rpcerrorlist.UserNotMutualContactError: return "Error: Cannot invite users who are not mutual contacts. Please ensure the users are in your contacts and have added you back." except telethon.errors.rpcerrorlist.UserPrivacyRestrictedError: - return "Error: One or more users have privacy settings that prevent you from adding them." + return ( + "Error: One or more users have privacy settings that prevent you from adding them." + ) except Exception as e: return log_and_format_error("invite_to_group", e, group_id=group_id, user_ids=user_ids) - + except Exception as e: - logger.error(f"telegram_mcp invite_to_group failed (group_id={group_id}, user_ids={user_ids})", exc_info=True) + logger.error( + f"telegram_mcp invite_to_group failed (group_id={group_id}, user_ids={user_ids})", + exc_info=True, + ) return log_and_format_error("invite_to_group", e, group_id=group_id, user_ids=user_ids) @@ -922,62 +971,78 @@ async def invite_to_group(group_id: int, user_ids: list) -> str: async def leave_chat(chat_id: int) -> str: """ Leave a group or channel by chat ID. - + Args: chat_id: The chat ID to leave. """ try: entity = await client.get_entity(chat_id) - + # Check the entity type carefully if isinstance(entity, Channel): # Handle both channels and supergroups (which are also channels in Telegram) try: await client(functions.channels.LeaveChannelRequest(channel=entity)) - chat_name = getattr(entity, 'title', str(chat_id)) + chat_name = getattr(entity, "title", str(chat_id)) return f"Left channel/supergroup {chat_name} (ID: {chat_id})." except Exception as chan_err: return log_and_format_error("leave_chat", chan_err, chat_id=chat_id) - + elif isinstance(entity, Chat): # Traditional basic groups (not supergroups) try: # First try with InputPeerUser me = await client.get_me(input_peer=True) - await client(functions.messages.DeleteChatUserRequest( - chat_id=entity.id, # Use the entity ID directly - user_id=me - )) - chat_name = getattr(entity, 'title', str(chat_id)) + await client( + functions.messages.DeleteChatUserRequest( + chat_id=entity.id, user_id=me # Use the entity ID directly + ) + ) + chat_name = getattr(entity, "title", str(chat_id)) return f"Left basic group {chat_name} (ID: {chat_id})." except Exception as chat_err: # If the above fails, try the second approach - logger.warning(f"First leave attempt failed: {chat_err}, trying alternative method") - + logger.warning( + f"First leave attempt failed: {chat_err}, trying alternative method" + ) + try: # Alternative approach - sometimes this works better me_full = await client.get_me() - await client(functions.messages.DeleteChatUserRequest( - chat_id=entity.id, - user_id=me_full.id - )) - chat_name = getattr(entity, 'title', str(chat_id)) + await client( + functions.messages.DeleteChatUserRequest( + chat_id=entity.id, user_id=me_full.id + ) + ) + chat_name = getattr(entity, "title", str(chat_id)) return f"Left basic group {chat_name} (ID: {chat_id})." except Exception as alt_err: return log_and_format_error("leave_chat", alt_err, chat_id=chat_id) else: # Cannot leave a user chat this way entity_type = type(entity).__name__ - return log_and_format_error("leave_chat", Exception(f"Cannot leave chat ID {chat_id} of type {entity_type}. This function is for groups and channels only."), chat_id=chat_id) - + return log_and_format_error( + "leave_chat", + Exception( + f"Cannot leave chat ID {chat_id} of type {entity_type}. This function is for groups and channels only." + ), + chat_id=chat_id, + ) + except Exception as e: logger.exception(f"leave_chat failed (chat_id={chat_id})") - + # Provide helpful hint for common errors error_str = str(e).lower() if "invalid" in error_str and "chat" in error_str: - return log_and_format_error("leave_chat", Exception(f"Error leaving chat: This appears to be a channel/supergroup. Please check the chat ID and try again."), chat_id=chat_id) - + return log_and_format_error( + "leave_chat", + Exception( + f"Error leaving chat: This appears to be a channel/supergroup. Please check the chat ID and try again." + ), + chat_id=chat_id, + ) + return log_and_format_error("leave_chat", e, chat_id=chat_id) @@ -990,7 +1055,10 @@ async def get_participants(chat_id: int) -> str: """ try: participants = await client.get_participants(chat_id) - lines = [f"ID: {p.id}, Name: {getattr(p, 'first_name', '')} {getattr(p, 'last_name', '')}" for p in participants] + lines = [ + f"ID: {p.id}, Name: {getattr(p, 'first_name', '')} {getattr(p, 'last_name', '')}" + for p in participants + ] return "\n".join(lines) except Exception as e: return log_and_format_error("get_participants", e, chat_id=chat_id) @@ -1014,7 +1082,9 @@ async def send_file(chat_id: int, file_path: str, caption: str = None) -> str: await client.send_file(entity, file_path, caption=caption) return f"File sent to chat {chat_id}." except Exception as e: - return log_and_format_error("send_file", e, chat_id=chat_id, file_path=file_path, caption=caption) + return log_and_format_error( + "send_file", e, chat_id=chat_id, file_path=file_path, caption=caption + ) @mcp.tool() @@ -1032,7 +1102,7 @@ async def download_media(chat_id: int, message_id: int, file_path: str) -> str: if not msg or not msg.media: return "No media found in the specified message." # Check if directory is writable - dir_path = os.path.dirname(file_path) or '.' + dir_path = os.path.dirname(file_path) or "." if not os.access(dir_path, os.W_OK): return f"Directory not writable: {dir_path}" await client.download_media(msg, file=file_path) @@ -1040,7 +1110,9 @@ async def download_media(chat_id: int, message_id: int, file_path: str) -> str: return f"Download failed: file not created at {file_path}" return f"Media downloaded to {file_path}." except Exception as e: - return log_and_format_error("download_media", e, chat_id=chat_id, message_id=message_id, file_path=file_path) + return log_and_format_error( + "download_media", e, chat_id=chat_id, message_id=message_id, file_path=file_path + ) @mcp.tool() @@ -1049,14 +1121,16 @@ async def update_profile(first_name: str = None, last_name: str = None, about: s Update your profile information (name, bio). """ try: - await client(functions.account.UpdateProfileRequest( - first_name=first_name, - last_name=last_name, - about=about - )) + await client( + functions.account.UpdateProfileRequest( + first_name=first_name, last_name=last_name, about=about + ) + ) return "Profile updated." except Exception as e: - return log_and_format_error("update_profile", e, first_name=first_name, last_name=last_name, about=about) + return log_and_format_error( + "update_profile", e, first_name=first_name, last_name=last_name, about=about + ) @mcp.tool() @@ -1065,9 +1139,9 @@ async def set_profile_photo(file_path: str) -> str: Set a new profile photo. """ try: - await client(functions.photos.UploadProfilePhotoRequest( - file=await client.upload_file(file_path) - )) + await client( + functions.photos.UploadProfilePhotoRequest(file=await client.upload_file(file_path)) + ) return "Profile photo updated." except Exception as e: return log_and_format_error("set_profile_photo", e, file_path=file_path) @@ -1079,7 +1153,9 @@ async def delete_profile_photo() -> str: Delete your current profile photo. """ try: - photos = await client(functions.photos.GetUserPhotosRequest(user_id='me', offset=0, max_id=0, limit=1)) + photos = await client( + functions.photos.GetUserPhotosRequest(user_id="me", offset=0, max_id=0, limit=1) + ) if not photos.photos: return "No profile photo to delete." await client(functions.photos.DeletePhotosRequest(id=[photos.photos[0].id])) @@ -1096,11 +1172,11 @@ async def get_privacy_settings() -> str: try: # Import needed types directly from telethon.tl.types import InputPrivacyKeyStatusTimestamp - + try: - settings = await client(functions.account.GetPrivacyRequest( - key=InputPrivacyKeyStatusTimestamp() - )) + settings = await client( + functions.account.GetPrivacyRequest(key=InputPrivacyKeyStatusTimestamp()) + ) return str(settings) except TypeError as e: if "TLObject was expected" in str(e): @@ -1113,43 +1189,45 @@ async def get_privacy_settings() -> str: @mcp.tool() -async def set_privacy_settings(key: str, allow_users: list = None, disallow_users: list = None) -> str: +async def set_privacy_settings( + key: str, allow_users: list = None, disallow_users: list = None +) -> str: """ Set privacy settings (e.g., last seen, phone, etc.). - + Args: key: The privacy setting to modify ('status' for last seen, 'phone', 'profile_photo', etc.) - allow_users: List of user IDs to allow + allow_users: List of user IDs to allow disallow_users: List of user IDs to disallow """ try: # Import needed types from telethon.tl.types import ( - InputPrivacyKeyStatusTimestamp, + InputPrivacyKeyStatusTimestamp, InputPrivacyKeyPhoneNumber, InputPrivacyKeyProfilePhoto, - InputPrivacyValueAllowUsers, + InputPrivacyValueAllowUsers, InputPrivacyValueDisallowUsers, InputPrivacyValueAllowAll, - InputPrivacyValueDisallowAll + InputPrivacyValueDisallowAll, ) - + # Map the simplified keys to their corresponding input types key_mapping = { - 'status': InputPrivacyKeyStatusTimestamp, - 'phone': InputPrivacyKeyPhoneNumber, - 'profile_photo': InputPrivacyKeyProfilePhoto, + "status": InputPrivacyKeyStatusTimestamp, + "phone": InputPrivacyKeyPhoneNumber, + "profile_photo": InputPrivacyKeyProfilePhoto, } - + # Get the appropriate key class if key not in key_mapping: return f"Error: Unsupported privacy key '{key}'. Supported keys: {', '.join(key_mapping.keys())}" - + privacy_key = key_mapping[key]() - + # Prepare the rules rules = [] - + # Process allow rules if allow_users is None or len(allow_users) == 0: # If no specific users to allow, allow everyone by default @@ -1164,13 +1242,13 @@ async def set_privacy_settings(key: str, allow_users: list = None, disallow_user allow_entities.append(user) except Exception as user_err: logger.warning(f"Could not get entity for user ID {user_id}: {user_err}") - + if allow_entities: rules.append(InputPrivacyValueAllowUsers(users=allow_entities)) except Exception as allow_err: logger.error(f"Error processing allowed users: {allow_err}") return log_and_format_error("set_privacy_settings", allow_err, key=key) - + # Process disallow rules if disallow_users and len(disallow_users) > 0: try: @@ -1181,19 +1259,18 @@ async def set_privacy_settings(key: str, allow_users: list = None, disallow_user disallow_entities.append(user) except Exception as user_err: logger.warning(f"Could not get entity for user ID {user_id}: {user_err}") - + if disallow_entities: rules.append(InputPrivacyValueDisallowUsers(users=disallow_entities)) except Exception as disallow_err: logger.error(f"Error processing disallowed users: {disallow_err}") return log_and_format_error("set_privacy_settings", disallow_err, key=key) - + # Apply the privacy settings try: - result = await client(functions.account.SetPrivacyRequest( - key=privacy_key, - rules=rules - )) + result = await client( + functions.account.SetPrivacyRequest(key=privacy_key, rules=rules) + ) return f"Privacy settings for {key} updated successfully." except TypeError as type_err: if "TLObject was expected" in str(type_err): @@ -1211,7 +1288,15 @@ async def import_contacts(contacts: list) -> str: Import a list of contacts. Each contact should be a dict with phone, first_name, last_name. """ try: - input_contacts = [functions.contacts.InputPhoneContact(client_id=i, phone=c['phone'], first_name=c['first_name'], last_name=c.get('last_name', '')) for i, c in enumerate(contacts)] + input_contacts = [ + functions.contacts.InputPhoneContact( + client_id=i, + phone=c["phone"], + first_name=c["first_name"], + last_name=c.get("last_name", ""), + ) + for i, c in enumerate(contacts) + ] result = await client(functions.contacts.ImportContactsRequest(contacts=input_contacts)) return f"Imported {len(result.imported)} contacts." except Exception as e: @@ -1249,14 +1334,14 @@ async def create_channel(title: str, about: str = "", megagroup: bool = False) - Create a new channel or supergroup. """ try: - result = await client(functions.channels.CreateChannelRequest( - title=title, - about=about, - megagroup=megagroup - )) + result = await client( + functions.channels.CreateChannelRequest(title=title, about=about, megagroup=megagroup) + ) return f"Channel '{title}' created with ID: {result.chats[0].id}" except Exception as e: - return log_and_format_error("create_channel", e, title=title, about=about, megagroup=megagroup) + return log_and_format_error( + "create_channel", e, title=title, about=about, megagroup=megagroup + ) @mcp.tool() @@ -1267,11 +1352,11 @@ async def edit_chat_title(chat_id: int, title: str) -> str: try: entity = await client.get_entity(chat_id) if isinstance(entity, Channel): - await client(functions.channels.EditTitleRequest(channel=entity, title=title)) + await client(functions.channels.EditTitleRequest(channel=entity, title=title)) elif isinstance(entity, Chat): - await client(functions.messages.EditChatTitleRequest(chat_id=chat_id, title=title)) + await client(functions.messages.EditChatTitleRequest(chat_id=chat_id, title=title)) else: - return f"Cannot edit title for this entity type ({type(entity)})." + return f"Cannot edit title for this entity type ({type(entity)})." return f"Chat {chat_id} title updated to '{title}'." except Exception as e: logger.exception(f"edit_chat_title failed (chat_id={chat_id}, title='{title}')") @@ -1285,24 +1370,26 @@ async def edit_chat_photo(chat_id: int, file_path: str) -> str: """ try: if not os.path.isfile(file_path): - return f"Photo file not found: {file_path}" + return f"Photo file not found: {file_path}" if not os.access(file_path, os.R_OK): - return f"Photo file not readable: {file_path}" + return f"Photo file not readable: {file_path}" entity = await client.get_entity(chat_id) uploaded_file = await client.upload_file(file_path) if isinstance(entity, Channel): - # For channels/supergroups, use EditPhotoRequest with InputChatUploadedPhoto - input_photo = InputChatUploadedPhoto(file=uploaded_file) - await client(functions.channels.EditPhotoRequest(channel=entity, photo=input_photo)) + # For channels/supergroups, use EditPhotoRequest with InputChatUploadedPhoto + input_photo = InputChatUploadedPhoto(file=uploaded_file) + await client(functions.channels.EditPhotoRequest(channel=entity, photo=input_photo)) elif isinstance(entity, Chat): - # For basic groups, use EditChatPhotoRequest with InputChatUploadedPhoto - input_photo = InputChatUploadedPhoto(file=uploaded_file) - await client(functions.messages.EditChatPhotoRequest(chat_id=chat_id, photo=input_photo)) + # For basic groups, use EditChatPhotoRequest with InputChatUploadedPhoto + input_photo = InputChatUploadedPhoto(file=uploaded_file) + await client( + functions.messages.EditChatPhotoRequest(chat_id=chat_id, photo=input_photo) + ) else: - return f"Cannot edit photo for this entity type ({type(entity)})." - + return f"Cannot edit photo for this entity type ({type(entity)})." + return f"Chat {chat_id} photo updated." except Exception as e: logger.exception(f"edit_chat_photo failed (chat_id={chat_id}, file_path='{file_path}')") @@ -1318,12 +1405,18 @@ async def delete_chat_photo(chat_id: int) -> str: entity = await client.get_entity(chat_id) if isinstance(entity, Channel): # Use InputChatPhotoEmpty for channels/supergroups - await client(functions.channels.EditPhotoRequest(channel=entity, photo=InputChatPhotoEmpty())) + await client( + functions.channels.EditPhotoRequest(channel=entity, photo=InputChatPhotoEmpty()) + ) elif isinstance(entity, Chat): - # Use None (or InputChatPhotoEmpty) for basic groups - await client(functions.messages.EditChatPhotoRequest(chat_id=chat_id, photo=InputChatPhotoEmpty())) + # Use None (or InputChatPhotoEmpty) for basic groups + await client( + functions.messages.EditChatPhotoRequest( + chat_id=chat_id, photo=InputChatPhotoEmpty() + ) + ) else: - return f"Cannot delete photo for this entity type ({type(entity)})." + return f"Cannot delete photo for this entity type ({type(entity)})." return f"Chat {chat_id} photo deleted." except Exception as e: @@ -1335,7 +1428,7 @@ async def delete_chat_photo(chat_id: int) -> str: async def promote_admin(group_id: int, user_id: int, rights: dict = None) -> str: """ Promote a user to admin in a group/channel. - + Args: group_id: ID of the group/channel user_id: User ID to promote @@ -1344,52 +1437,54 @@ async def promote_admin(group_id: int, user_id: int, rights: dict = None) -> str try: chat = await client.get_entity(group_id) user = await client.get_entity(user_id) - + # Set default admin rights if not provided if not rights: rights = { - 'change_info': True, - 'post_messages': True, - 'edit_messages': True, - 'delete_messages': True, - 'ban_users': True, - 'invite_users': True, - 'pin_messages': True, - 'add_admins': False, - 'anonymous': False, - 'manage_call': True, - 'other': True + "change_info": True, + "post_messages": True, + "edit_messages": True, + "delete_messages": True, + "ban_users": True, + "invite_users": True, + "pin_messages": True, + "add_admins": False, + "anonymous": False, + "manage_call": True, + "other": True, } - + admin_rights = ChatAdminRights( - change_info=rights.get('change_info', True), - post_messages=rights.get('post_messages', True), - edit_messages=rights.get('edit_messages', True), - delete_messages=rights.get('delete_messages', True), - ban_users=rights.get('ban_users', True), - invite_users=rights.get('invite_users', True), - pin_messages=rights.get('pin_messages', True), - add_admins=rights.get('add_admins', False), - anonymous=rights.get('anonymous', False), - manage_call=rights.get('manage_call', True), - other=rights.get('other', True) + change_info=rights.get("change_info", True), + post_messages=rights.get("post_messages", True), + edit_messages=rights.get("edit_messages", True), + delete_messages=rights.get("delete_messages", True), + ban_users=rights.get("ban_users", True), + invite_users=rights.get("invite_users", True), + pin_messages=rights.get("pin_messages", True), + add_admins=rights.get("add_admins", False), + anonymous=rights.get("anonymous", False), + manage_call=rights.get("manage_call", True), + other=rights.get("other", True), ) - + try: - result = await client(functions.channels.EditAdminRequest( - channel=chat, - user_id=user, - admin_rights=admin_rights, - rank="Admin" - )) + result = await client( + functions.channels.EditAdminRequest( + channel=chat, user_id=user, admin_rights=admin_rights, rank="Admin" + ) + ) return f"Successfully promoted user {user_id} to admin in {chat.title}" except telethon.errors.rpcerrorlist.UserNotMutualContactError: return "Error: Cannot promote users who are not mutual contacts. Please ensure the user is in your contacts and has added you back." except Exception as e: return log_and_format_error("promote_admin", e, group_id=group_id, user_id=user_id) - + except Exception as e: - logger.error(f"telegram_mcp promote_admin failed (group_id={group_id}, user_id={user_id})", exc_info=True) + logger.error( + f"telegram_mcp promote_admin failed (group_id={group_id}, user_id={user_id})", + exc_info=True, + ) return log_and_format_error("promote_admin", e, group_id=group_id, user_id=user_id) @@ -1397,7 +1492,7 @@ async def promote_admin(group_id: int, user_id: int, rights: dict = None) -> str async def demote_admin(group_id: int, user_id: int) -> str: """ Demote a user from admin in a group/channel. - + Args: group_id: ID of the group/channel user_id: User ID to demote @@ -1405,7 +1500,7 @@ async def demote_admin(group_id: int, user_id: int) -> str: try: chat = await client.get_entity(group_id) user = await client.get_entity(user_id) - + # Create empty admin rights (regular user) admin_rights = ChatAdminRights( change_info=False, @@ -1418,24 +1513,26 @@ async def demote_admin(group_id: int, user_id: int) -> str: add_admins=False, anonymous=False, manage_call=False, - other=False + other=False, ) - + try: - result = await client(functions.channels.EditAdminRequest( - channel=chat, - user_id=user, - admin_rights=admin_rights, - rank="" - )) + result = await client( + functions.channels.EditAdminRequest( + channel=chat, user_id=user, admin_rights=admin_rights, rank="" + ) + ) return f"Successfully demoted user {user_id} from admin in {chat.title}" except telethon.errors.rpcerrorlist.UserNotMutualContactError: return "Error: Cannot modify admin status of users who are not mutual contacts. Please ensure the user is in your contacts and has added you back." except Exception as e: return log_and_format_error("demote_admin", e, group_id=group_id, user_id=user_id) - + except Exception as e: - logger.error(f"telegram_mcp demote_admin failed (group_id={group_id}, user_id={user_id})", exc_info=True) + logger.error( + f"telegram_mcp demote_admin failed (group_id={group_id}, user_id={user_id})", + exc_info=True, + ) return log_and_format_error("demote_admin", e, group_id=group_id, user_id=user_id) @@ -1443,7 +1540,7 @@ async def demote_admin(group_id: int, user_id: int) -> str: async def ban_user(chat_id: int, user_id: int) -> str: """ Ban a user from a group or channel. - + Args: chat_id: ID of the group/channel user_id: User ID to ban @@ -1451,7 +1548,7 @@ async def ban_user(chat_id: int, user_id: int) -> str: try: chat = await client.get_entity(chat_id) user = await client.get_entity(user_id) - + # Create banned rights (all restrictions enabled) banned_rights = ChatBannedRights( until_date=None, # Ban forever @@ -1466,15 +1563,15 @@ async def ban_user(chat_id: int, user_id: int) -> str: send_polls=True, change_info=True, invite_users=True, - pin_messages=True + pin_messages=True, ) - + try: - await client(functions.channels.EditBannedRequest( - channel=chat, - participant=user, - banned_rights=banned_rights - )) + await client( + functions.channels.EditBannedRequest( + channel=chat, participant=user, banned_rights=banned_rights + ) + ) return f"User {user_id} banned from chat {chat.title} (ID: {chat_id})." except telethon.errors.rpcerrorlist.UserNotMutualContactError: return "Error: Cannot ban users who are not mutual contacts. Please ensure the user is in your contacts and has added you back." @@ -1489,7 +1586,7 @@ async def ban_user(chat_id: int, user_id: int) -> str: async def unban_user(chat_id: int, user_id: int) -> str: """ Unban a user from a group or channel. - + Args: chat_id: ID of the group/channel user_id: User ID to unban @@ -1497,7 +1594,7 @@ async def unban_user(chat_id: int, user_id: int) -> str: try: chat = await client.get_entity(chat_id) user = await client.get_entity(user_id) - + # Create unbanned rights (no restrictions) unbanned_rights = ChatBannedRights( until_date=None, @@ -1512,15 +1609,15 @@ async def unban_user(chat_id: int, user_id: int) -> str: send_polls=False, change_info=False, invite_users=False, - pin_messages=False + pin_messages=False, ) - + try: - await client(functions.channels.EditBannedRequest( - channel=chat, - participant=user, - banned_rights=unbanned_rights - )) + await client( + functions.channels.EditBannedRequest( + channel=chat, participant=user, banned_rights=unbanned_rights + ) + ) return f"User {user_id} unbanned from chat {chat.title} (ID: {chat_id})." except telethon.errors.rpcerrorlist.UserNotMutualContactError: return "Error: Cannot modify status of users who are not mutual contacts. Please ensure the user is in your contacts and has added you back." @@ -1538,8 +1635,11 @@ async def get_admins(chat_id: int) -> str: """ try: # Fix: Use the correct filter type ChannelParticipantsAdmins - participants = await client.get_participants(chat_id, filter=ChannelParticipantsAdmins()) - lines = [f"ID: {p.id}, Name: {getattr(p, 'first_name', '')} {getattr(p, 'last_name', '')}".strip() for p in participants] + participants = await client.get_participants(chat_id, filter=ChannelParticipantsAdmins()) + lines = [ + f"ID: {p.id}, Name: {getattr(p, 'first_name', '')} {getattr(p, 'last_name', '')}".strip() + for p in participants + ] return "\n".join(lines) if lines else "No admins found." except Exception as e: logger.exception(f"get_admins failed (chat_id={chat_id})") @@ -1553,8 +1653,13 @@ async def get_banned_users(chat_id: int) -> str: """ try: # Fix: Use the correct filter type ChannelParticipantsKicked - participants = await client.get_participants(chat_id, filter=ChannelParticipantsKicked(q='')) - lines = [f"ID: {p.id}, Name: {getattr(p, 'first_name', '')} {getattr(p, 'last_name', '')}".strip() for p in participants] + participants = await client.get_participants( + chat_id, filter=ChannelParticipantsKicked(q="") + ) + lines = [ + f"ID: {p.id}, Name: {getattr(p, 'first_name', '')} {getattr(p, 'last_name', '')}".strip() + for p in participants + ] return "\n".join(lines) if lines else "No banned users found." except Exception as e: logger.exception(f"get_banned_users failed (chat_id={chat_id})") @@ -1568,13 +1673,12 @@ async def get_invite_link(chat_id: int) -> str: """ try: entity = await client.get_entity(chat_id) - + # Try using ExportChatInviteRequest first try: from telethon.tl import functions - result = await client(functions.messages.ExportChatInviteRequest( - peer=entity - )) + + result = await client(functions.messages.ExportChatInviteRequest(peer=entity)) return result.link except AttributeError: # If the function doesn't exist in the current Telethon version @@ -1582,25 +1686,23 @@ async def get_invite_link(chat_id: int) -> str: except Exception as e1: # If that fails, log and try alternative approach logger.warning(f"ExportChatInviteRequest failed: {e1}") - + # Alternative approach using client.export_chat_invite_link try: invite_link = await client.export_chat_invite_link(entity) return invite_link except Exception as e2: logger.warning(f"export_chat_invite_link failed: {e2}") - + # Last resort: Try directly fetching chat info try: if isinstance(entity, (Chat, Channel)): - full_chat = await client(functions.messages.GetFullChatRequest( - chat_id=entity.id - )) - if hasattr(full_chat, 'full_chat') and hasattr(full_chat.full_chat, 'invite_link'): + full_chat = await client(functions.messages.GetFullChatRequest(chat_id=entity.id)) + if hasattr(full_chat, "full_chat") and hasattr(full_chat.full_chat, "invite_link"): return full_chat.full_chat.invite_link or "No invite link available." except Exception as e3: logger.warning(f"GetFullChatRequest failed: {e3}") - + return "Could not retrieve invite link for this chat." except Exception as e: logger.exception(f"get_invite_link failed (chat_id={chat_id})") @@ -1614,34 +1716,38 @@ async def join_chat_by_link(link: str) -> str: """ try: # Extract the hash from the invite link - if '/' in link: - hash_part = link.split('/')[-1] - if hash_part.startswith('+'): + if "/" in link: + hash_part = link.split("/")[-1] + if hash_part.startswith("+"): hash_part = hash_part[1:] # Remove the '+' if present else: hash_part = link - + # Try checking the invite before joining try: - from telethon.errors import (InviteHashExpiredError, InviteHashInvalidError, - UserAlreadyParticipantError, ChatAdminRequiredError, - UsersTooMuchError) - + from telethon.errors import ( + InviteHashExpiredError, + InviteHashInvalidError, + UserAlreadyParticipantError, + ChatAdminRequiredError, + UsersTooMuchError, + ) + # Try to check invite info first (will often fail if not a member) invite_info = await client(functions.messages.CheckChatInviteRequest(hash=hash_part)) - if hasattr(invite_info, 'chat') and invite_info.chat: + if hasattr(invite_info, "chat") and invite_info.chat: # If we got chat info, we're already a member - chat_title = getattr(invite_info.chat, 'title', 'Unknown Chat') + chat_title = getattr(invite_info.chat, "title", "Unknown Chat") return f"You are already a member of this chat: {chat_title}" except Exception as check_err: # This often fails if not a member - just continue pass - + # Join the chat using the hash try: result = await client(functions.messages.ImportChatInviteRequest(hash=hash_part)) - if result and hasattr(result, 'chats') and result.chats: - chat_title = getattr(result.chats[0], 'title', 'Unknown Chat') + if result and hasattr(result, "chats") and result.chats: + chat_title = getattr(result.chats[0], "title", "Unknown Chat") return f"Successfully joined chat: {chat_title}" return f"Joined chat via invite hash." except Exception as join_err: @@ -1670,13 +1776,12 @@ async def export_chat_invite(chat_id: int) -> str: """ try: entity = await client.get_entity(chat_id) - + # Try using ExportChatInviteRequest first try: from telethon.tl import functions - result = await client(functions.messages.ExportChatInviteRequest( - peer=entity - )) + + result = await client(functions.messages.ExportChatInviteRequest(peer=entity)) return result.link except AttributeError: # If the function doesn't exist in the current Telethon version @@ -1684,7 +1789,7 @@ async def export_chat_invite(chat_id: int) -> str: except Exception as e1: # If that fails, log and try alternative approach logger.warning(f"ExportChatInviteRequest failed: {e1}") - + # Alternative approach using client.export_chat_invite_link try: invite_link = await client.export_chat_invite_link(entity) @@ -1704,30 +1809,34 @@ async def import_chat_invite(hash: str) -> str: """ try: # Remove any prefixes like '+' if present - if hash.startswith('+'): + if hash.startswith("+"): hash = hash[1:] - + # Try checking the invite before joining try: - from telethon.errors import (InviteHashExpiredError, InviteHashInvalidError, - UserAlreadyParticipantError, ChatAdminRequiredError, - UsersTooMuchError) - + from telethon.errors import ( + InviteHashExpiredError, + InviteHashInvalidError, + UserAlreadyParticipantError, + ChatAdminRequiredError, + UsersTooMuchError, + ) + # Try to check invite info first (will often fail if not a member) invite_info = await client(functions.messages.CheckChatInviteRequest(hash=hash)) - if hasattr(invite_info, 'chat') and invite_info.chat: + if hasattr(invite_info, "chat") and invite_info.chat: # If we got chat info, we're already a member - chat_title = getattr(invite_info.chat, 'title', 'Unknown Chat') + chat_title = getattr(invite_info.chat, "title", "Unknown Chat") return f"You are already a member of this chat: {chat_title}" except Exception as check_err: # This often fails if not a member - just continue pass - + # Join the chat using the hash try: result = await client(functions.messages.ImportChatInviteRequest(hash=hash)) - if result and hasattr(result, 'chats') and result.chats: - chat_title = getattr(result.chats[0], 'title', 'Unknown Chat') + if result and hasattr(result, "chats") and result.chats: + chat_title = getattr(result.chats[0], "title", "Unknown Chat") return f"Successfully joined chat: {chat_title}" return f"Joined chat via invite hash." except Exception as join_err: @@ -1763,7 +1872,14 @@ async def send_voice(chat_id: int, file_path: str) -> str: if not os.access(file_path, os.R_OK): return f"File is not readable: {file_path}" mime, _ = mimetypes.guess_type(file_path) - if not (mime and (mime == 'audio/ogg' or file_path.lower().endswith('.ogg') or file_path.lower().endswith('.opus'))): + if not ( + mime + and ( + mime == "audio/ogg" + or file_path.lower().endswith(".ogg") + or file_path.lower().endswith(".opus") + ) + ): return "Voice file must be .ogg or .opus format." entity = await client.get_entity(chat_id) await client.send_file(entity, file_path, voice_note=True) @@ -1783,7 +1899,13 @@ async def forward_message(from_chat_id: int, message_id: int, to_chat_id: int) - await client.forward_messages(to_entity, message_id, from_entity) return f"Message {message_id} forwarded from {from_chat_id} to {to_chat_id}." except Exception as e: - return log_and_format_error("forward_message", e, from_chat_id=from_chat_id, message_id=message_id, to_chat_id=to_chat_id) + return log_and_format_error( + "forward_message", + e, + from_chat_id=from_chat_id, + message_id=message_id, + to_chat_id=to_chat_id, + ) @mcp.tool() @@ -1796,7 +1918,9 @@ async def edit_message(chat_id: int, message_id: int, new_text: str) -> str: await client.edit_message(entity, message_id, new_text) return f"Message {message_id} edited." except Exception as e: - return log_and_format_error("edit_message", e, chat_id=chat_id, message_id=message_id, new_text=new_text) + return log_and_format_error( + "edit_message", e, chat_id=chat_id, message_id=message_id, new_text=new_text + ) @mcp.tool() @@ -1861,7 +1985,9 @@ async def reply_to_message(chat_id: int, message_id: int, text: str) -> str: await client.send_message(entity, text, reply_to=message_id) return f"Replied to message {message_id} in chat {chat_id}." except Exception as e: - return log_and_format_error("reply_to_message", e, chat_id=chat_id, message_id=message_id, text=text) + return log_and_format_error( + "reply_to_message", e, chat_id=chat_id, message_id=message_id, text=text + ) @mcp.tool() @@ -1922,7 +2048,9 @@ async def search_messages(chat_id: int, query: str, limit: int = 20) -> str: messages = await client.get_messages(entity, limit=limit, search=query) return "\n".join([f"ID: {m.id} | {m.date} | {m.message}" for m in messages]) except Exception as e: - return log_and_format_error("search_messages", e, chat_id=chat_id, query=query, limit=limit) + return log_and_format_error( + "search_messages", e, chat_id=chat_id, query=query, limit=limit + ) @mcp.tool() @@ -1944,25 +2072,28 @@ async def mute_chat(chat_id: int) -> str: """ try: from telethon.tl.types import InputPeerNotifySettings - + peer = await client.get_entity(chat_id) - await client(functions.account.UpdateNotifySettingsRequest( - peer=peer, - settings=InputPeerNotifySettings(mute_until=2**31-1) - )) + await client( + functions.account.UpdateNotifySettingsRequest( + peer=peer, settings=InputPeerNotifySettings(mute_until=2**31 - 1) + ) + ) return f"Chat {chat_id} muted." except (ImportError, AttributeError) as type_err: try: # Alternative approach directly using raw API peer = await client.get_input_entity(chat_id) - await client(functions.account.UpdateNotifySettingsRequest( - peer=peer, - settings={ - 'mute_until': 2**31-1, # Far future - 'show_previews': False, - 'silent': True - } - )) + await client( + functions.account.UpdateNotifySettingsRequest( + peer=peer, + settings={ + "mute_until": 2**31 - 1, # Far future + "show_previews": False, + "silent": True, + }, + ) + ) return f"Chat {chat_id} muted (using alternative method)." except Exception as alt_e: logger.exception(f"mute_chat (alt method) failed (chat_id={chat_id})") @@ -1979,25 +2110,28 @@ async def unmute_chat(chat_id: int) -> str: """ try: from telethon.tl.types import InputPeerNotifySettings - + peer = await client.get_entity(chat_id) - await client(functions.account.UpdateNotifySettingsRequest( - peer=peer, - settings=InputPeerNotifySettings(mute_until=0) - )) + await client( + functions.account.UpdateNotifySettingsRequest( + peer=peer, settings=InputPeerNotifySettings(mute_until=0) + ) + ) return f"Chat {chat_id} unmuted." except (ImportError, AttributeError) as type_err: try: # Alternative approach directly using raw API peer = await client.get_input_entity(chat_id) - await client(functions.account.UpdateNotifySettingsRequest( - peer=peer, - settings={ - 'mute_until': 0, # Unmute (current time) - 'show_previews': True, - 'silent': False - } - )) + await client( + functions.account.UpdateNotifySettingsRequest( + peer=peer, + settings={ + "mute_until": 0, # Unmute (current time) + "show_previews": True, + "silent": False, + }, + ) + ) return f"Chat {chat_id} unmuted (using alternative method)." except Exception as alt_e: logger.exception(f"unmute_chat (alt method) failed (chat_id={chat_id})") @@ -2013,10 +2147,11 @@ async def archive_chat(chat_id: int) -> str: Archive a chat. """ try: - await client(functions.messages.ToggleDialogPinRequest( - peer=await client.get_entity(chat_id), - pinned=True - )) + await client( + functions.messages.ToggleDialogPinRequest( + peer=await client.get_entity(chat_id), pinned=True + ) + ) return f"Chat {chat_id} archived." except Exception as e: return log_and_format_error("archive_chat", e, chat_id=chat_id) @@ -2028,10 +2163,11 @@ async def unarchive_chat(chat_id: int) -> str: Unarchive a chat. """ try: - await client(functions.messages.ToggleDialogPinRequest( - peer=await client.get_entity(chat_id), - pinned=False - )) + await client( + functions.messages.ToggleDialogPinRequest( + peer=await client.get_entity(chat_id), pinned=False + ) + ) return f"Chat {chat_id} unarchived." except Exception as e: return log_and_format_error("unarchive_chat", e, chat_id=chat_id) @@ -2062,7 +2198,7 @@ async def send_sticker(chat_id: int, file_path: str) -> str: return f"Sticker file not found: {file_path}" if not os.access(file_path, os.R_OK): return f"Sticker file is not readable: {file_path}" - if not file_path.lower().endswith('.webp'): + if not file_path.lower().endswith(".webp"): return "Sticker file must be a .webp file." entity = await client.get_entity(chat_id) await client.send_file(entity, file_path, force_document=False) @@ -2082,25 +2218,40 @@ async def get_gif_search(query: str, limit: int = 10) -> str: try: # Try approach 1: SearchGifsRequest try: - result = await client(functions.messages.SearchGifsRequest(q=query, offset_id=0, limit=limit)) + result = await client( + functions.messages.SearchGifsRequest(q=query, offset_id=0, limit=limit) + ) if not result.gifs: return "[]" - return json.dumps([g.document.id for g in result.gifs], indent=2, default=json_serializer) + return json.dumps( + [g.document.id for g in result.gifs], indent=2, default=json_serializer + ) except (AttributeError, ImportError): # Fallback approach: Use SearchRequest with GIF filter try: from telethon.tl.types import InputMessagesFilterGif - result = await client(functions.messages.SearchRequest( - peer="gif", q=query, filter=InputMessagesFilterGif(), - min_date=None, max_date=None, offset_id=0, add_offset=0, - limit=limit, max_id=0, min_id=0, hash=0 - )) - if not result or not hasattr(result, 'messages') or not result.messages: + + result = await client( + functions.messages.SearchRequest( + peer="gif", + q=query, + filter=InputMessagesFilterGif(), + min_date=None, + max_date=None, + offset_id=0, + add_offset=0, + limit=limit, + max_id=0, + min_id=0, + hash=0, + ) + ) + if not result or not hasattr(result, "messages") or not result.messages: return "[]" # Extract document IDs from any messages with media gif_ids = [] for msg in result.messages: - if hasattr(msg, 'media') and msg.media and hasattr(msg.media, 'document'): + if hasattr(msg, "media") and msg.media and hasattr(msg.media, "document"): gif_ids.append(msg.media.document.id) return json.dumps(gif_ids, default=json_serializer) except Exception as inner_e: @@ -2138,11 +2289,11 @@ async def get_bot_info(bot_username: str) -> str: entity = await client.get_entity(bot_username) if not entity: return f"Bot with username {bot_username} not found." - + result = await client(functions.users.GetFullUserRequest(id=entity)) - + # Create a more structured, serializable response - if hasattr(result, 'to_dict'): + if hasattr(result, "to_dict"): # Use custom serializer to handle non-serializable types return json.dumps(result.to_dict(), indent=2, default=json_serializer) else: @@ -2159,7 +2310,7 @@ async def get_bot_info(bot_username: str) -> str: } if hasattr(result, "full_user") and hasattr(result.full_user, "about"): info["bot_info"]["about"] = result.full_user.about - + return json.dumps(info, indent=2) except Exception as e: logger.exception(f"get_bot_info failed (bot_username={bot_username})") @@ -2172,7 +2323,7 @@ async def set_bot_commands(bot_username: str, commands: list) -> str: Set bot commands for a bot you own. Note: This function can only be used if the Telegram client is a bot account. Regular user accounts cannot set bot commands. - + Args: bot_username: The username of the bot to set commands for. commands: List of command dictionaries with 'command' and 'description' keys. @@ -2180,29 +2331,30 @@ async def set_bot_commands(bot_username: str, commands: list) -> str: try: # First check if the current client is a bot me = await client.get_me() - if not getattr(me, 'bot', False): + if not getattr(me, "bot", False): return "Error: This function can only be used by bot accounts. Your current Telegram account is a regular user account, not a bot." - + # Import required types from telethon.tl.types import BotCommand, BotCommandScopeDefault from telethon.tl.functions.bots import SetBotCommandsRequest - + # Create BotCommand objects from the command dictionaries bot_commands = [ - BotCommand(command=c['command'], description=c['description']) - for c in commands + BotCommand(command=c["command"], description=c["description"]) for c in commands ] - + # Get the bot entity bot = await client.get_entity(bot_username) - + # Set the commands with proper scope - await client(SetBotCommandsRequest( - scope=BotCommandScopeDefault(), - lang_code="en", # Default language code - commands=bot_commands - )) - + await client( + SetBotCommandsRequest( + scope=BotCommandScopeDefault(), + lang_code="en", # Default language code + commands=bot_commands, + ) + ) + return f"Bot commands set for {bot_username}." except ImportError as ie: logger.exception(f"set_bot_commands failed - ImportError: {ie}") @@ -2232,7 +2384,9 @@ async def get_user_photos(user_id: int, limit: int = 10) -> str: """ try: user = await client.get_entity(user_id) - photos = await client(functions.photos.GetUserPhotosRequest(user_id=user, offset=0, max_id=0, limit=limit)) + photos = await client( + functions.photos.GetUserPhotosRequest(user_id=user, offset=0, max_id=0, limit=limit) + ) return json.dumps([p.id for p in photos.photos], indent=2) except Exception as e: return log_and_format_error("get_user_photos", e, user_id=user_id, limit=limit) @@ -2256,19 +2410,15 @@ async def get_recent_actions(chat_id: int) -> str: Get recent admin actions (admin log) in a group or channel. """ try: - result = await client(functions.channels.GetAdminLogRequest( - channel=chat_id, - q="", - events_filter=None, - admins=[], - max_id=0, - min_id=0, - limit=20 - )) - + result = await client( + functions.channels.GetAdminLogRequest( + channel=chat_id, q="", events_filter=None, admins=[], max_id=0, min_id=0, limit=20 + ) + ) + if not result or not result.events: return "No recent admin actions found." - + # Use the custom serializer to handle datetime objects return json.dumps([e.to_dict() for e in result.events], indent=2, default=json_serializer) except Exception as e: @@ -2287,16 +2437,19 @@ async def get_pinned_messages(chat_id: int) -> str: try: # Try newer Telethon approach from telethon.tl.types import InputMessagesFilterPinned + messages = await client.get_messages(entity, filter=InputMessagesFilterPinned()) except (ImportError, AttributeError): # Fallback - try without filter and manually filter pinned all_messages = await client.get_messages(entity, limit=50) - messages = [m for m in all_messages if getattr(m, 'pinned', False)] - + messages = [m for m in all_messages if getattr(m, "pinned", False)] + if not messages: return "No pinned messages found in this chat." - - return "\n".join([f"ID: {m.id} | {m.date} | {m.message or '[Media/No text]'}" for m in messages]) + + return "\n".join( + [f"ID: {m.id} | {m.date} | {m.message or '[Media/No text]'}" for m in messages] + ) except Exception as e: logger.exception(f"get_pinned_messages failed (chat_id={chat_id})") return log_and_format_error("get_pinned_messages", e, chat_id=chat_id) @@ -2310,7 +2463,7 @@ if __name__ == "__main__": # Start the Telethon client non-interactively print("Starting Telegram client...") await client.start() - + print("Telegram client started. Running MCP server...") # Use the asynchronous entrypoint instead of mcp.run() await mcp.run_stdio_async() @@ -2319,7 +2472,7 @@ if __name__ == "__main__": if isinstance(e, sqlite3.OperationalError) and "database is locked" in str(e): print( "Database lock detected. Please ensure no other instances are running.", - file=sys.stderr + file=sys.stderr, ) sys.exit(1) diff --git a/session_string_generator.py b/session_string_generator.py index e63958c..5647ab5 100755 --- a/session_string_generator.py +++ b/session_string_generator.py @@ -40,46 +40,52 @@ except ValueError: print("\n----- Telegram Session String Generator -----\n") print("This script will generate a session string for your Telegram account.") -print("You will be asked to enter your phone number and the verification code sent to your Telegram app.") +print( + "You will be asked to enter your phone number and the verification code sent to your Telegram app." +) print("The generated session string can be added to your .env file.") -print("\nYour credentials will NOT be stored on any server and are only used for local authentication.\n") +print( + "\nYour credentials will NOT be stored on any server and are only used for local authentication.\n" +) try: # Connect to Telegram and generate the session string with TelegramClient(StringSession(), API_ID, API_HASH) as client: # The client.session.save() function from StringSession returns the session string session_string = StringSession.save(client.session) - + print("\nAuthentication successful!") print("\n----- Your Session String -----") print(f"\n{session_string}\n") print("Add this to your .env file as:") print(f"TELEGRAM_SESSION_STRING={session_string}") print("\nIMPORTANT: Keep this string private and never share it with anyone!") - + # Optional: auto-update the .env file - choice = input("\nWould you like to automatically update your .env file with this session string? (y/N): ") - if choice.lower() == 'y': + choice = input( + "\nWould you like to automatically update your .env file with this session string? (y/N): " + ) + if choice.lower() == "y": try: # Read the current .env file - with open('.env', 'r') as file: + with open(".env", "r") as file: env_contents = file.readlines() - + # Update or add the SESSION_STRING line session_string_line_found = False for i, line in enumerate(env_contents): - if line.startswith('TELEGRAM_SESSION_STRING='): + if line.startswith("TELEGRAM_SESSION_STRING="): env_contents[i] = f"TELEGRAM_SESSION_STRING={session_string}\n" session_string_line_found = True break - + if not session_string_line_found: env_contents.append(f"TELEGRAM_SESSION_STRING={session_string}\n") - + # Write back to the .env file - with open('.env', 'w') as file: + with open(".env", "w") as file: file.writelines(env_contents) - + print("\n.env file updated successfully!") except Exception as e: print(f"\nError updating .env file: {e}") @@ -88,4 +94,4 @@ try: except Exception as e: print(f"\nError: {e}") print("Failed to generate session string. Please try again.") - sys.exit(1) \ No newline at end of file + sys.exit(1)