From e297416e54a76bd4b4aa5ddbec5d0dc237f27ee1 Mon Sep 17 00:00:00 2001 From: anonim <70073044+l1v0n1@users.noreply.github.com> Date: Thu, 17 Apr 2025 17:56:42 +0300 Subject: [PATCH] feat: refine logging and error handling in main.py, update .env.example and README - Enhanced logging setup in main.py for better error tracking and console output. - Improved error handling in group and user management functions. - Updated .env.example to include a sample session string and removed outdated test configurations. - Expanded README.md with new tool examples and usage instructions for better clarity. - Removed the test.py file as it is no longer needed for testing. --- .env.example | 43 +--- .gitignore | 20 +- README.md | 276 +++++++++++++++++++++-- main.py | 620 +++++++++++++++++++++++++++++++++++++++------------ setup.py | 1 + test.py | 583 ------------------------------------------------ 6 files changed, 743 insertions(+), 800 deletions(-) create mode 100644 setup.py delete mode 100644 test.py diff --git a/.env.example b/.env.example index 9a49d47..8af8654 100644 --- a/.env.example +++ b/.env.example @@ -6,45 +6,4 @@ TELEGRAM_API_HASH=0123456789abcdef0123456789abcdef # Option 1: File-based session (a .session file will be created) TELEGRAM_SESSION_NAME=telegram_session # Option 2: String-based session (if you generate one, e.g., using Telethon's string session generator) -TELEGRAM_SESSION_STRING= - -# --- Test Script Configuration (test.py) --- -# Fill these with IDs/paths relevant to YOUR test environment. -# Using real user/group IDs requires caution and consent. - -# A chat ID where the script can send/edit/delete messages safely. -# Defaults to your "Saved Messages" if left empty or 0. -TEST_CHAT_ID=0 - -# The ID of a Supergroup or Channel you own or are an admin in. -# Required for testing admin actions (ban, invite, promote, etc.). -TEST_SUPERGROUP_ID=0 - -# The numeric User ID of a TEST account (NOT a real person unless they consent). -# This user will be banned/unbanned/invited/promoted in the TEST_SUPERGROUP_ID. -TEST_USER_ID=0 - -# The username (without @) of the TEST_USER_ID. -TEST_USERNAME=username - -# --- Optional Test Variables --- - -# Phone number (E.164 format, e.g., +15551234567) for add_contact test. -TEST_CONTACT_PHONE=+15551234567 -TEST_CONTACT_FNAME=Test -TEST_CONTACT_LNAME=Contact - -# Paths to dummy files for testing uploads/sending media. -# The script will create test_upload.txt if it doesn't exist. -# You might need to provide small, valid image/voice/sticker files for other tests. -TEST_FILE_PATH=test_upload.txt -TEST_PHOTO_PATH=test_photo.jpg -TEST_VOICE_PATH=test_voice.ogg -TEST_STICKER_PATH=sticker.webp - -# Username of a bot you own (required for get_bot_info, set_bot_commands tests) -TEST_BOT_USERNAME=your_bot_username - -# Hash part or full invite link for a group/channel you want to test joining -# Required for join_chat_by_link / import_chat_invite tests -TEST_INVITE_LINK_HASH=https://t.me/+AbCdEfGhIjK \ No newline at end of file +TELEGRAM_SESSION_STRING=1231231232erfdfdffd diff --git a/.gitignore b/.gitignore index 1bfc37f..733c724 100644 --- a/.gitignore +++ b/.gitignore @@ -85,7 +85,7 @@ ipython_config.py # pyenv # For a library or package, you might want to ignore these files since the code is # intended to run in multiple environments; otherwise, check them in: -# .python-version +.python-version # pipenv # According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. @@ -189,12 +189,16 @@ claude_desktop_config.json # Test files .cursor/ -test_voice.ogg -test_upload.txt -test_output.txt -sticker.webp -test_download_* -test_download.* -two.png +tests/ +test.py +extract_test_issues.py *.tmp mcp_errors.log +telegram_test.log +test_issues_report.md + +# Temporary data files +test_upload.txt +test_voice.ogg +sticker.webp +two.png \ No newline at end of file diff --git a/README.md b/README.md index 58d20fc..73fa181 100644 --- a/README.md +++ b/README.md @@ -186,33 +186,269 @@ Edit `~/.cursor/mcp.json`: --- -## ๐Ÿงช Testing +## ๐Ÿ“ Tool Examples with Code & Output -A comprehensive test script is included to validate all functionality: +Below are examples of the most commonly used tools with their implementation and sample output. -```bash -# Basic test (redirects output to file) -python test.py > test_output.txt 2>&1 +### Getting Your Chats + +```python +@mcp.tool() +async def get_chats(page: int = 1, page_size: int = 20) -> str: + """ + Get a paginated list of chats. + Args: + page: Page number (1-indexed). + page_size: Number of chats per page. + """ + try: + dialogs = await client.get_dialogs() + start = (page - 1) * page_size + end = start + page_size + if start >= len(dialogs): + return "Page out of range." + chats = dialogs[start:end] + lines = [] + for dialog in chats: + entity = dialog.entity + chat_id = entity.id + title = getattr(entity, "title", None) or getattr(entity, "first_name", "Unknown") + lines.append(f"Chat ID: {chat_id}, Title: {title}") + return "\n".join(lines) + except Exception as e: + logger.exception(f"get_chats failed (page={page}, page_size={page_size})") + return "An error occurred (code: GETCHATS-ERR-001). Check mcp_errors.log for details." ``` -The test script uses environment variables from your `.env` file to configure testing parameters. See `.env.example` for all available test configuration options. - -### Test Configuration - -You can configure test parameters in your `.env` file: - +Example output: ``` -# A safe chat ID where tests can send/delete messages -TEST_CHAT_ID=your_saved_messages_id - -# A supergroup you admin for testing group operations -TEST_SUPERGROUP_ID=your_supergroup_id - -# A test user account ID (not a real person unless they consent) -TEST_USER_ID=test_user_id +Chat ID: 123456789, Title: John Doe +Chat ID: -100987654321, Title: My Project Group +Chat ID: 111223344, Title: Jane Smith +Chat ID: -200123456789, Title: News Channel ``` -The tests are designed to be non-destructive, but use caution when testing with real accounts. +### Sending Messages + +```python +@mcp.tool() +async def send_message(chat_id: int, message: str) -> str: + """ + Send a message to a specific chat. + Args: + chat_id: The ID of the chat. + message: The message content to send. + """ + try: + entity = await client.get_entity(chat_id) + await client.send_message(entity, message) + return "Message sent successfully." + except Exception as e: + logger.exception(f"send_message failed (chat_id={chat_id})") + return "An error occurred (code: SENDMSG-ERR-001). Check mcp_errors.log for details." +``` + +Example output: +``` +Message sent successfully. +``` + +### Getting Chat Invite Links + +The `get_invite_link` function is particularly robust with multiple fallback methods: + +```python +@mcp.tool() +async def get_invite_link(chat_id: int) -> str: + """ + Get the invite link for a group or channel. + """ + try: + entity = await client.get_entity(chat_id) + + # Try using ExportChatInviteRequest first + try: + from telethon.tl import functions + result = await client(functions.messages.ExportChatInviteRequest( + peer=entity + )) + return result.link + except AttributeError: + # If the function doesn't exist in the current Telethon version + logger.warning("ExportChatInviteRequest not available, using alternative method") + except Exception as e1: + # If that fails, log and try alternative approach + logger.warning(f"ExportChatInviteRequest failed: {e1}") + + # Alternative approach using client.export_chat_invite_link + try: + invite_link = await client.export_chat_invite_link(entity) + return invite_link + except Exception as e2: + logger.warning(f"export_chat_invite_link failed: {e2}") + + # Last resort: Try directly fetching chat info + try: + if isinstance(entity, (Chat, Channel)): + full_chat = await client(functions.messages.GetFullChatRequest( + chat_id=entity.id + )) + if hasattr(full_chat, 'full_chat') and hasattr(full_chat.full_chat, 'invite_link'): + return full_chat.full_chat.invite_link or "No invite link available." + except Exception as e3: + logger.warning(f"GetFullChatRequest failed: {e3}") + + return "Could not retrieve invite link for this chat." + except Exception as e: + logger.exception(f"get_invite_link failed (chat_id={chat_id})") + return f"Error getting invite link: {e}" +``` + +Example output: +``` +https://t.me/+AbCdEfGhIjKlMnOp +``` + +### Joining Chats via Invite Links + +```python +@mcp.tool() +async def join_chat_by_link(link: str) -> str: + """ + Join a chat by invite link. + """ + try: + # Extract the hash from the invite link + if '/' in link: + hash_part = link.split('/')[-1] + if hash_part.startswith('+'): + hash_part = hash_part[1:] # Remove the '+' if present + else: + hash_part = link + + # Try checking the invite before joining + try: + # Try to check invite info first (will often fail if not a member) + invite_info = await client(functions.messages.CheckChatInviteRequest(hash=hash_part)) + if hasattr(invite_info, 'chat') and invite_info.chat: + # If we got chat info, we're already a member + chat_title = getattr(invite_info.chat, 'title', 'Unknown Chat') + return f"You are already a member of this chat: {chat_title}" + except Exception: + # This often fails if not a member - just continue + pass + + # Join the chat using the hash + result = await client(functions.messages.ImportChatInviteRequest(hash=hash_part)) + if result and hasattr(result, 'chats') and result.chats: + chat_title = getattr(result.chats[0], 'title', 'Unknown Chat') + return f"Successfully joined chat: {chat_title}" + return f"Joined chat via invite hash." + except Exception as e: + err_str = str(e).lower() + if "expired" in err_str: + return "The invite hash has expired and is no longer valid." + elif "invalid" in err_str: + return "The invite hash is invalid or malformed." + elif "already" in err_str and "participant" in err_str: + return "You are already a member of this chat." + logger.exception(f"join_chat_by_link failed (link={link})") + return f"Error joining chat: {e}" +``` + +Example output: +``` +Successfully joined chat: Developer Community +``` + +### Searching Public Chats + +```python +@mcp.tool() +async def search_public_chats(query: str) -> str: + """ + Search for public chats, channels, or bots by username or title. + """ + try: + result = await client(functions.contacts.SearchRequest(q=query, limit=20)) + return json.dumps([format_entity(u) for u in result.users], indent=2) + except Exception as e: + return f"Error searching public chats: {e}" +``` + +Example output: +```json +[ + { + "id": 123456789, + "name": "TelegramBot", + "type": "user", + "username": "telegram_bot" + }, + { + "id": 987654321, + "name": "Telegram News", + "type": "user", + "username": "telegram_news" + } +] +``` + +### Getting Direct Chats with Contacts + +```python +@mcp.tool() +async def get_direct_chat_by_contact(contact_query: str) -> str: + """ + Find a direct chat with a specific contact by name, username, or phone. + + Args: + contact_query: Name, username, or phone number to search for. + """ + try: + # Fetch all contacts using the correct Telethon method + result = await client(functions.contacts.GetContactsRequest(hash=0)) + contacts = result.users + found_contacts = [] + for contact in contacts: + if not contact: + continue + name = f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip() + username = getattr(contact, 'username', '') + phone = getattr(contact, 'phone', '') + if (contact_query.lower() in name.lower() or + (username and contact_query.lower() in username.lower()) or + (phone and contact_query in phone)): + found_contacts.append(contact) + if not found_contacts: + return f"No contacts found matching '{contact_query}'." + # If we found contacts, look for direct chats with them + results = [] + dialogs = await client.get_dialogs() + for contact in found_contacts: + contact_name = f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip() + for dialog in dialogs: + if isinstance(dialog.entity, User) and dialog.entity.id == contact.id: + chat_info = f"Chat ID: {dialog.entity.id}, Contact: {contact_name}" + if getattr(contact, 'username', ''): + chat_info += f", Username: @{contact.username}" + if dialog.unread_count: + chat_info += f", Unread: {dialog.unread_count}" + results.append(chat_info) + break + + if not results: + return f"Found contacts matching '{contact_query}', but no direct chats with them." + + return "\n".join(results) + except Exception as e: + return f"Error searching for direct chat: {e}" +``` + +Example output: +``` +Chat ID: 123456789, Contact: John Smith, Username: @johnsmith, Unread: 3 +``` --- diff --git a/main.py b/main.py index ddbbe89..2d4b0b4 100644 --- a/main.py +++ b/main.py @@ -17,6 +17,7 @@ from typing import List, Dict, Optional, Union, Any from telethon import functions import mimetypes import logging +import telethon.errors.rpcerrorlist # Helper function for JSON serialization of datetime, bytes, and other non-serializable objects def json_serializer(obj): @@ -46,13 +47,36 @@ else: # Use file-based session client = TelegramClient(TELEGRAM_SESSION_NAME, TELEGRAM_API_ID, TELEGRAM_API_HASH) -# Setup logger for error reporting -logging.basicConfig( - filename='mcp_errors.log', - level=logging.ERROR, - format='%(asctime)s %(levelname)s %(name)s %(message)s' -) -logger = logging.getLogger("mcp") +# Setup robust logging with both file and console output +logger = logging.getLogger("telegram_mcp") +logger.setLevel(logging.ERROR) # Set to ERROR for production, INFO for debugging + +# Create console handler +console_handler = logging.StreamHandler() +console_handler.setLevel(logging.ERROR) # Set to ERROR for production, INFO for debugging + +# Create file handler with absolute path +script_dir = os.path.dirname(os.path.abspath(__file__)) +log_file_path = os.path.join(script_dir, "mcp_errors.log") + +try: + file_handler = logging.FileHandler(log_file_path, mode='a') # Append mode + file_handler.setLevel(logging.ERROR) + + # Create formatter and add to handlers + formatter = logging.Formatter('%(asctime)s %(levelname)s %(name)s %(message)s') + console_handler.setFormatter(formatter) + file_handler.setFormatter(formatter) + + # Add handlers to logger + logger.addHandler(console_handler) + logger.addHandler(file_handler) + + logger.info(f"Logging initialized to {log_file_path}") +except Exception as log_error: + print(f"WARNING: Error setting up log file: {log_error}") + # Fallback to console-only logging + logger.addHandler(console_handler) def format_entity(entity) -> Dict[str, Any]: """Helper function to format entity information consistently.""" @@ -255,7 +279,14 @@ async def list_messages(chat_id: int, limit: int = 20, search_query: str = None, try: from_date_obj = datetime.strptime(from_date, "%Y-%m-%d") # Make it timezone aware by adding UTC timezone info - from_date_obj = from_date_obj.replace(tzinfo=datetime.timezone.utc) + # Use datetime.timezone.utc for Python 3.9+ or import timezone directly for 3.13+ + try: + # For Python 3.9+ + from_date_obj = from_date_obj.replace(tzinfo=datetime.timezone.utc) + except AttributeError: + # For Python 3.13+ + from datetime import timezone + from_date_obj = from_date_obj.replace(tzinfo=timezone.utc) except ValueError: return f"Invalid from_date format. Use YYYY-MM-DD." @@ -263,7 +294,13 @@ async def list_messages(chat_id: int, limit: int = 20, search_query: str = None, try: to_date_obj = datetime.strptime(to_date, "%Y-%m-%d") # Set to end of day and make timezone aware - to_date_obj = (to_date_obj + timedelta(days=1, microseconds=-1)).replace(tzinfo=datetime.timezone.utc) + to_date_obj = to_date_obj + timedelta(days=1, microseconds=-1) + # Add timezone info + try: + to_date_obj = to_date_obj.replace(tzinfo=datetime.timezone.utc) + except AttributeError: + from datetime import timezone + to_date_obj = to_date_obj.replace(tzinfo=timezone.utc) except ValueError: return f"Invalid to_date format. Use YYYY-MM-DD." @@ -732,96 +769,169 @@ async def get_me() -> str: @mcp.tool() async def create_group(title: str, user_ids: list) -> str: """ - Create a new group with the given title and user IDs. + Create a new group or supergroup and add users. + Args: - title: The group name. - user_ids: List of user IDs to add to the group. + title: Title for the new group + user_ids: List of user IDs to add to the group """ try: - users = [await client.get_entity(uid) for uid in user_ids] - result = await client(functions.messages.CreateChatRequest(users=users, title=title)) - return f"Group '{title}' created with ID: {result.chats[0].id}" + # Convert user IDs to entities + users = [] + for user_id in user_ids: + try: + user = await client.get_entity(user_id) + users.append(user) + except Exception as e: + logger.error(f"Failed to get entity for user ID {user_id}: {e}") + return f"Error: Could not find user with ID {user_id}" + + if not users: + return "Error: No valid users provided" + + # Create the group with the users + try: + # Create a new chat with selected users + result = await client(functions.messages.CreateChatRequest( + users=users, + title=title + )) + + # Check what type of response we got + if hasattr(result, 'chats') and result.chats: + created_chat = result.chats[0] + return f"Group created with ID: {created_chat.id}" + elif hasattr(result, 'chat') and result.chat: + return f"Group created with ID: {result.chat.id}" + elif hasattr(result, 'chat_id'): + return f"Group created with ID: {result.chat_id}" + else: + # If we can't determine the chat ID directly from the result + # Try to find it in recent dialogs + await asyncio.sleep(1) # Give Telegram a moment to register the new group + dialogs = await client.get_dialogs(limit=5) # Get recent dialogs + for dialog in dialogs: + if dialog.title == title: + return f"Group created with ID: {dialog.id}" + + # If we still can't find it, at least return success + return f"Group created successfully. Please check your recent chats for '{title}'." + + except Exception as create_err: + if "PEER_FLOOD" in str(create_err): + return "Error: Cannot create group due to Telegram limits. Try again later." + else: + raise # Let the outer exception handler catch it except Exception as e: + logger.exception(f"create_group failed (title={title}, user_ids={user_ids})") return f"Error creating group: {e}" @mcp.tool() async def invite_to_group(group_id: int, user_ids: list) -> str: """ - Invite users to a group or channel by group ID. + Invite users to a group or channel. + Args: - group_id: The group/channel chat ID. + group_id: The ID of the group/channel. user_ids: List of user IDs to invite. """ try: - chat_entity = await client.get_entity(group_id) - user_entities = [] - for uid in user_ids: + entity = await client.get_entity(group_id) + users_to_add = [] + + for user_id in user_ids: try: - user_entities.append(await client.get_entity(uid)) - except Exception as user_e: - logger.error(f"Could not find user entity for ID {uid}: {user_e}") - return f"Error finding user {uid}: {user_e}" - - if not user_entities: - return "No valid user IDs provided or found." - - if isinstance(chat_entity, Channel): - # Use InviteToChannelRequest for channels and supergroups - await client(functions.channels.InviteToChannelRequest( - channel=chat_entity, - users=user_entities + user = await client.get_entity(user_id) + users_to_add.append(user) + except ValueError as e: + return f"Error: User with ID {user_id} could not be found. {e}" + + try: + result = await client(functions.channels.InviteToChannelRequest( + channel=entity, + users=users_to_add )) - return f"Invited {len(user_entities)} users to channel/supergroup {group_id}." - elif isinstance(chat_entity, Chat): - # Use AddChatUserRequest for basic groups (adds one user at a time) - added_count = 0 - errors = [] - for user in user_entities: - try: - # Note: fwd_limit=0 might be needed depending on privacy settings - await client(functions.messages.AddChatUserRequest(chat_id=group_id, user_id=user, fwd_limit=50)) - added_count += 1 - except Exception as add_e: - error_msg = f"Error inviting user {getattr(user, 'id', 'unknown')} to basic group {group_id}: {add_e}" - logger.error(error_msg) - errors.append(error_msg) - result_message = f"Invited {added_count} users to basic group {group_id}." - if errors: - result_message += "\nErrors encountered:\n" + "\n".join(errors) - return result_message - else: - return f"Chat ID {group_id} is neither a Channel/Supergroup nor a basic Group." - + invited_count = 0 + if hasattr(result, 'users') and result.users: + invited_count = len(result.users) + elif hasattr(result, 'count'): + invited_count = result.count + + return f"Successfully invited {invited_count} users to {entity.title}" + except telethon.errors.rpcerrorlist.UserNotMutualContactError: + return "Error: Cannot invite users who are not mutual contacts. Please ensure the users are in your contacts and have added you back." + except telethon.errors.rpcerrorlist.UserPrivacyRestrictedError: + return "Error: One or more users have privacy settings that prevent you from adding them." + except Exception as e: + return f"Error inviting users: {e}" + except Exception as e: - logger.exception(f"invite_to_group failed (group_id={group_id}, user_ids={user_ids})") - return f"Error inviting users: {e}" + logger.error(f"telegram_mcp invite_to_group failed (group_id={group_id}, user_ids={user_ids})", exc_info=True) + return f"Error: {e}" @mcp.tool() async def leave_chat(chat_id: int) -> str: """ Leave a group or channel by chat ID. + Args: chat_id: The chat ID to leave. """ try: entity = await client.get_entity(chat_id) + + # Check the entity type carefully if isinstance(entity, Channel): - # Leave channel or supergroup - await client(functions.channels.LeaveChannelRequest(channel=entity)) - return f"Left channel/supergroup {chat_id}." + # Handle both channels and supergroups (which are also channels in Telegram) + try: + await client(functions.channels.LeaveChannelRequest(channel=entity)) + chat_name = getattr(entity, 'title', str(chat_id)) + return f"Left channel/supergroup {chat_name} (ID: {chat_id})." + except Exception as chan_err: + return f"Error leaving channel: {chan_err}" + elif isinstance(entity, Chat): - # Leave basic group - me = await client.get_me(input_peer=True) # Get self entity for DeleteChatUserRequest - await client(functions.messages.DeleteChatUserRequest(chat_id=chat_id, user_id=me)) - return f"Left basic group {chat_id}." + # Traditional basic groups (not supergroups) + try: + # First try with InputPeerUser + me = await client.get_me(input_peer=True) + await client(functions.messages.DeleteChatUserRequest( + chat_id=entity.id, # Use the entity ID directly + user_id=me + )) + chat_name = getattr(entity, 'title', str(chat_id)) + return f"Left basic group {chat_name} (ID: {chat_id})." + except Exception as chat_err: + # If the above fails, try the second approach + logger.warning(f"First leave attempt failed: {chat_err}, trying alternative method") + + try: + # Alternative approach - sometimes this works better + me_full = await client.get_me() + await client(functions.messages.DeleteChatUserRequest( + chat_id=entity.id, + user_id=me_full.id + )) + chat_name = getattr(entity, 'title', str(chat_id)) + return f"Left basic group {chat_name} (ID: {chat_id})." + except Exception as alt_err: + return f"Error leaving basic group: {alt_err}" else: - # Cannot leave a user chat this way - return f"Cannot leave chat {chat_id} of type {type(entity)}. This function is for groups and channels." + # Cannot leave a user chat this way + entity_type = type(entity).__name__ + return f"Cannot leave chat ID {chat_id} of type {entity_type}. This function is for groups and channels only." + except Exception as e: logger.exception(f"leave_chat failed (chat_id={chat_id})") + + # Provide helpful hint for common errors + error_str = str(e).lower() + if "invalid" in error_str and "chat" in error_str: + return "Error: This appears to be a channel/supergroup. Please check the chat ID and try again." + return f"Error leaving chat: {e}" @@ -935,12 +1045,24 @@ async def delete_profile_photo() -> str: @mcp.tool() async def get_privacy_settings() -> str: """ - Get your privacy settings. + Get your privacy settings for last seen status. """ try: - settings = await client(functions.account.GetPrivacyRequest(key='status_timestamp')) - return str(settings) + # Import needed types directly + from telethon.tl.types import InputPrivacyKeyStatusTimestamp + + try: + settings = await client(functions.account.GetPrivacyRequest( + key=InputPrivacyKeyStatusTimestamp() + )) + return str(settings) + except TypeError as e: + if "TLObject was expected" in str(e): + return "Error: Privacy settings API call failed due to type mismatch. This is likely a version compatibility issue with Telethon." + else: + raise except Exception as e: + logger.exception("get_privacy_settings failed") return f"Error getting privacy settings: {e}" @@ -948,19 +1070,93 @@ async def get_privacy_settings() -> str: async def set_privacy_settings(key: str, allow_users: list = None, disallow_users: list = None) -> str: """ Set privacy settings (e.g., last seen, phone, etc.). - key: e.g. 'status_timestamp', 'phone_number', 'profile_photo', 'forwards', 'voice_messages', etc. + + Args: + key: The privacy setting to modify ('status' for last seen, 'phone', 'profile_photo', etc.) + allow_users: List of user IDs to allow + disallow_users: List of user IDs to disallow """ - from telethon.tl.types import InputPrivacyKeyStatusTimestamp, InputPrivacyValueAllowUsers, InputPrivacyValueDisallowUsers try: - allow = InputPrivacyValueAllowUsers(users=[await client.get_entity(uid) for uid in (allow_users or [])]) - disallow = InputPrivacyValueDisallowUsers(users=[await client.get_entity(uid) for uid in (disallow_users or [])]) - await client(functions.account.SetPrivacyRequest( - key=getattr(functions.account, f'InputPrivacyKey{key.title().replace("_", "")}')(), - rules=[allow, disallow] - )) - return f"Privacy settings for {key} updated." + # Import needed types + from telethon.tl.types import ( + InputPrivacyKeyStatusTimestamp, + InputPrivacyKeyPhoneNumber, + InputPrivacyKeyProfilePhoto, + InputPrivacyValueAllowUsers, + InputPrivacyValueDisallowUsers, + InputPrivacyValueAllowAll, + InputPrivacyValueDisallowAll + ) + + # Map the simplified keys to their corresponding input types + key_mapping = { + 'status': InputPrivacyKeyStatusTimestamp, + 'phone': InputPrivacyKeyPhoneNumber, + 'profile_photo': InputPrivacyKeyProfilePhoto, + } + + # Get the appropriate key class + if key not in key_mapping: + return f"Error: Unsupported privacy key '{key}'. Supported keys: {', '.join(key_mapping.keys())}" + + privacy_key = key_mapping[key]() + + # Prepare the rules + rules = [] + + # Process allow rules + if allow_users is None or len(allow_users) == 0: + # If no specific users to allow, allow everyone by default + rules.append(InputPrivacyValueAllowAll()) + else: + # Convert user IDs to InputUser entities + try: + allow_entities = [] + for user_id in allow_users: + try: + user = await client.get_entity(user_id) + allow_entities.append(user) + except Exception as user_err: + logger.warning(f"Could not get entity for user ID {user_id}: {user_err}") + + if allow_entities: + rules.append(InputPrivacyValueAllowUsers(users=allow_entities)) + except Exception as allow_err: + logger.error(f"Error processing allowed users: {allow_err}") + return f"Error processing allowed users: {allow_err}" + + # Process disallow rules + if disallow_users and len(disallow_users) > 0: + try: + disallow_entities = [] + for user_id in disallow_users: + try: + user = await client.get_entity(user_id) + disallow_entities.append(user) + except Exception as user_err: + logger.warning(f"Could not get entity for user ID {user_id}: {user_err}") + + if disallow_entities: + rules.append(InputPrivacyValueDisallowUsers(users=disallow_entities)) + except Exception as disallow_err: + logger.error(f"Error processing disallowed users: {disallow_err}") + return f"Error processing disallowed users: {disallow_err}" + + # Apply the privacy settings + try: + result = await client(functions.account.SetPrivacyRequest( + key=privacy_key, + rules=rules + )) + return f"Privacy settings for {key} updated successfully." + except TypeError as type_err: + if "TLObject was expected" in str(type_err): + return "Error: Privacy settings API call failed due to type mismatch. This is likely a version compatibility issue with Telethon." + else: + raise except Exception as e: - return f"Error setting privacy: {e}" + logger.exception(f"set_privacy_settings failed (key={key})") + return f"Error setting privacy settings: {e}" @mcp.tool() @@ -1090,87 +1286,203 @@ async def delete_chat_photo(chat_id: int) -> str: @mcp.tool() -async def promote_admin(chat_id: int, user_id: int) -> str: +async def promote_admin(group_id: int, user_id: int, rights: dict = None) -> str: """ - Promote a user to admin in a group or channel. + Promote a user to admin in a group/channel. + + Args: + group_id: ID of the group/channel + user_id: User ID to promote + rights: Admin rights to give (optional) """ - from telethon.tl.types import ChatAdminRights try: + chat = await client.get_entity(group_id) user = await client.get_entity(user_id) - await client(functions.channels.EditAdminRequest( - channel=chat_id, - user_id=user, - admin_rights=ChatAdminRights( - change_info=True, post_messages=True, edit_messages=True, delete_messages=True, - ban_users=True, invite_users=True, pin_messages=True, add_admins=True, manage_call=True, other=True - ), - rank="admin" - )) - return f"User {user_id} promoted to admin in chat {chat_id}." + + # Set default admin rights if not provided + if not rights: + rights = { + 'change_info': True, + 'post_messages': True, + 'edit_messages': True, + 'delete_messages': True, + 'ban_users': True, + 'invite_users': True, + 'pin_messages': True, + 'add_admins': False, + 'anonymous': False, + 'manage_call': True, + 'other': True + } + + admin_rights = ChatAdminRights( + change_info=rights.get('change_info', True), + post_messages=rights.get('post_messages', True), + edit_messages=rights.get('edit_messages', True), + delete_messages=rights.get('delete_messages', True), + ban_users=rights.get('ban_users', True), + invite_users=rights.get('invite_users', True), + pin_messages=rights.get('pin_messages', True), + add_admins=rights.get('add_admins', False), + anonymous=rights.get('anonymous', False), + manage_call=rights.get('manage_call', True), + other=rights.get('other', True) + ) + + try: + result = await client(functions.channels.EditAdminRequest( + channel=chat, + user_id=user, + admin_rights=admin_rights, + rank="Admin" + )) + return f"Successfully promoted user {user_id} to admin in {chat.title}" + except telethon.errors.rpcerrorlist.UserNotMutualContactError: + return "Error: Cannot promote users who are not mutual contacts. Please ensure the user is in your contacts and has added you back." + except Exception as e: + return f"Error promoting user to admin: {e}" + except Exception as e: - return f"Error promoting admin: {e}" + logger.error(f"telegram_mcp promote_admin failed (group_id={group_id}, user_id={user_id})", exc_info=True) + return f"Error: {str(e)}" @mcp.tool() -async def demote_admin(chat_id: int, user_id: int) -> str: +async def demote_admin(group_id: int, user_id: int) -> str: """ - Demote an admin to regular user in a group or channel. + Demote a user from admin in a group/channel. + + Args: + group_id: ID of the group/channel + user_id: User ID to demote """ - from telethon.tl.types import ChatAdminRights try: + chat = await client.get_entity(group_id) user = await client.get_entity(user_id) - await client(functions.channels.EditAdminRequest( - channel=chat_id, - user_id=user, - admin_rights=ChatAdminRights(), - rank="" - )) - return f"User {user_id} demoted in chat {chat_id}." + + # Create empty admin rights (regular user) + admin_rights = ChatAdminRights( + change_info=False, + post_messages=False, + edit_messages=False, + delete_messages=False, + ban_users=False, + invite_users=False, + pin_messages=False, + add_admins=False, + anonymous=False, + manage_call=False, + other=False + ) + + try: + result = await client(functions.channels.EditAdminRequest( + channel=chat, + user_id=user, + admin_rights=admin_rights, + rank="" + )) + return f"Successfully demoted user {user_id} from admin in {chat.title}" + except telethon.errors.rpcerrorlist.UserNotMutualContactError: + return "Error: Cannot modify admin status of users who are not mutual contacts. Please ensure the user is in your contacts and has added you back." + except Exception as e: + return f"Error demoting admin: {e}" + except Exception as e: - return f"Error demoting admin: {e}" + logger.error(f"telegram_mcp demote_admin failed (group_id={group_id}, user_id={user_id})", exc_info=True) + return f"Error: {str(e)}" @mcp.tool() async def ban_user(chat_id: int, user_id: int) -> str: """ Ban a user from a group or channel. + + Args: + chat_id: ID of the group/channel + user_id: User ID to ban """ - from telethon.tl.types import ChatBannedRights - import time try: + chat = await client.get_entity(chat_id) user = await client.get_entity(user_id) - # Ban for 1 year (31536000 seconds) - banned_rights = ChatBannedRights(until_date=int(time.time()) + 31536000, view_messages=True) - await client(functions.channels.EditBannedRequest( - channel=chat_id, - participant=user, # Fix: Use 'participant' instead of 'user_id' - banned_rights=banned_rights - )) - return f"User {user_id} banned from chat {chat_id}." + + # Create banned rights (all restrictions enabled) + banned_rights = ChatBannedRights( + until_date=None, # Ban forever + view_messages=True, + send_messages=True, + send_media=True, + send_stickers=True, + send_gifs=True, + send_games=True, + send_inline=True, + embed_links=True, + send_polls=True, + change_info=True, + invite_users=True, + pin_messages=True + ) + + try: + await client(functions.channels.EditBannedRequest( + channel=chat, + participant=user, + banned_rights=banned_rights + )) + return f"User {user_id} banned from chat {chat.title} (ID: {chat_id})." + except telethon.errors.rpcerrorlist.UserNotMutualContactError: + return "Error: Cannot ban users who are not mutual contacts. Please ensure the user is in your contacts and has added you back." + except Exception as e: + return f"Error banning user: {e}" except Exception as e: logger.exception(f"ban_user failed (chat_id={chat_id}, user_id={user_id})") - return f"Error banning user: {e}" + return f"Error: {e}" @mcp.tool() async def unban_user(chat_id: int, user_id: int) -> str: """ Unban a user from a group or channel. + + Args: + chat_id: ID of the group/channel + user_id: User ID to unban """ - from telethon.tl.types import ChatBannedRights try: + chat = await client.get_entity(chat_id) user = await client.get_entity(user_id) - # Fix: Provide until_date=0 for unbanning - banned_rights = ChatBannedRights(until_date=0) - await client(functions.channels.EditBannedRequest( - channel=chat_id, - participant=user, - banned_rights=banned_rights - )) - return f"User {user_id} unbanned in chat {chat_id}." + + # Create unbanned rights (no restrictions) + unbanned_rights = ChatBannedRights( + until_date=None, + view_messages=False, + send_messages=False, + send_media=False, + send_stickers=False, + send_gifs=False, + send_games=False, + send_inline=False, + embed_links=False, + send_polls=False, + change_info=False, + invite_users=False, + pin_messages=False + ) + + try: + await client(functions.channels.EditBannedRequest( + channel=chat, + participant=user, + banned_rights=unbanned_rights + )) + return f"User {user_id} unbanned from chat {chat.title} (ID: {chat_id})." + except telethon.errors.rpcerrorlist.UserNotMutualContactError: + return "Error: Cannot modify status of users who are not mutual contacts. Please ensure the user is in your contacts and has added you back." + except Exception as e: + return f"Error unbanning user: {e}" except Exception as e: logger.exception(f"unban_user failed (chat_id={chat_id}, user_id={user_id})") - return f"Error unbanning user: {e}" + return f"Error: {e}" @mcp.tool() @@ -1211,17 +1523,20 @@ async def get_invite_link(chat_id: int) -> str: try: entity = await client.get_entity(chat_id) - if hasattr(functions.messages, 'ExportChatInviteRequest'): - try: - # Try using the peer parameter instead of chat_id - result = await client(functions.messages.ExportChatInviteRequest( - peer=entity - )) - return result.link - except Exception as e1: - # If that fails, try alternative approach - logger.warning(f"First approach failed: {e1}, trying alternative") - + # Try using ExportChatInviteRequest first + try: + from telethon.tl import functions + result = await client(functions.messages.ExportChatInviteRequest( + peer=entity + )) + return result.link + except AttributeError: + # If the function doesn't exist in the current Telethon version + logger.warning("ExportChatInviteRequest not available, using alternative method") + except Exception as e1: + # If that fails, log and try alternative approach + logger.warning(f"ExportChatInviteRequest failed: {e1}") + # Alternative approach using client.export_chat_invite_link try: invite_link = await client.export_chat_invite_link(entity) @@ -1230,11 +1545,15 @@ async def get_invite_link(chat_id: int) -> str: logger.warning(f"export_chat_invite_link failed: {e2}") # Last resort: Try directly fetching chat info - full_chat = await client(functions.messages.GetFullChatRequest( - chat_id=entity.id - )) - if hasattr(full_chat, 'full_chat') and hasattr(full_chat.full_chat, 'invite_link'): - return full_chat.full_chat.invite_link or "No invite link available." + try: + if isinstance(entity, (Chat, Channel)): + full_chat = await client(functions.messages.GetFullChatRequest( + chat_id=entity.id + )) + if hasattr(full_chat, 'full_chat') and hasattr(full_chat.full_chat, 'invite_link'): + return full_chat.full_chat.invite_link or "No invite link available." + except Exception as e3: + logger.warning(f"GetFullChatRequest failed: {e3}") return "Could not retrieve invite link for this chat." except Exception as e: @@ -1306,20 +1625,27 @@ async def export_chat_invite(chat_id: int) -> str: try: entity = await client.get_entity(chat_id) - # This is essentially the same as get_invite_link, but kept separate for API consistency + # Try using ExportChatInviteRequest first try: - # Try using the peer parameter instead of chat_id + from telethon.tl import functions result = await client(functions.messages.ExportChatInviteRequest( peer=entity )) return result.link + except AttributeError: + # If the function doesn't exist in the current Telethon version + logger.warning("ExportChatInviteRequest not available, using alternative method") except Exception as e1: - # If that fails, try alternative approach - logger.warning(f"ExportChatInviteRequest failed: {e1}, trying alternative") + # If that fails, log and try alternative approach + logger.warning(f"ExportChatInviteRequest failed: {e1}") - # Alternative approach - invite_link = await client.export_chat_invite_link(entity) - return invite_link + # Alternative approach using client.export_chat_invite_link + try: + invite_link = await client.export_chat_invite_link(entity) + return invite_link + except Exception as e2: + logger.warning(f"export_chat_invite_link failed: {e2}") + return f"Could not export chat invite: {e2}" except Exception as e: logger.exception(f"export_chat_invite failed (chat_id={chat_id})") return f"Error exporting chat invite: {e}" diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..0519ecb --- /dev/null +++ b/setup.py @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/test.py b/test.py deleted file mode 100644 index aa7afe2..0000000 --- a/test.py +++ /dev/null @@ -1,583 +0,0 @@ -import os -import sys -import asyncio -import nest_asyncio -from dotenv import load_dotenv -import logging -import random -import string -import json -from datetime import datetime, timedelta - -# Assume main.py is in the same directory or adjust path accordingly -from main import ( - client, mcp, - get_chats, get_messages, send_message, list_contacts, search_contacts, - get_contact_ids, list_messages, list_chats, get_chat, get_direct_chat_by_contact, - get_contact_chats, get_last_interaction, get_message_context, add_contact, - delete_contact, block_user, unblock_user, get_me, create_group, - invite_to_group, leave_chat, get_participants, send_file, download_media, - update_profile, set_profile_photo, delete_profile_photo, get_privacy_settings, - set_privacy_settings, import_contacts, export_contacts, get_blocked_users, - create_channel, edit_chat_title, edit_chat_photo, delete_chat_photo, - promote_admin, demote_admin, ban_user, unban_user, get_admins, - get_banned_users, get_invite_link, join_chat_by_link, export_chat_invite, - import_chat_invite, send_voice, forward_message, edit_message, - delete_message, pin_message, unpin_message, mark_as_read, reply_to_message, - upload_file, get_media_info, search_public_chats, search_messages, - resolve_username, mute_chat, unmute_chat, archive_chat, unarchive_chat, - get_sticker_sets, send_sticker, get_gif_search, send_gif, get_bot_info, - set_bot_commands, get_history, get_user_photos, get_user_status, - get_recent_actions, get_pinned_messages -) -# Import specific telethon types needed for tests -from telethon.errors.rpcerrorlist import UserNotParticipantError -from telethon.tl import types - -load_dotenv() - -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(levelname)s - %(message)s', - filename='.log', # Log to .log file - filemode='w' # Overwrite the log file each time -) -logger = logging.getLogger("TelegramToolTester") -logger.info("Logging configured to .log file.") # Force file creation early - -# --- Test Configuration --- -# Set these environment variables before running the test script -TEST_CHAT_ID = int(os.getenv("TEST_CHAT_ID", "0")) # A safe chat ID (e.g., Saved Messages or a test group) -TEST_SUPERGROUP_ID = int(os.getenv("TEST_SUPERGROUP_ID", "0")) # ID of a test supergroup/channel you own/admin -TEST_USER_ID = int(os.getenv("TEST_USER_ID", "0")) # ID of a test user account (NOT a real person unless they consent!) -TEST_USERNAME = os.getenv("TEST_USERNAME", "") # Username of the test user -TEST_CONTACT_PHONE = os.getenv("TEST_CONTACT_PHONE", "") # Phone number for add_contact test (e.g., +15551234567) -TEST_CONTACT_FNAME = os.getenv("TEST_CONTACT_FNAME", "Test") -TEST_CONTACT_LNAME = os.getenv("TEST_CONTACT_LNAME", "Contact") -TEST_FILE_PATH = os.getenv("TEST_FILE_PATH", "test_upload.txt") # Path to a dummy file for upload/send tests -TEST_PHOTO_PATH = os.getenv("TEST_PHOTO_PATH", "test_photo.jpg") # Path to a dummy photo file -TEST_VOICE_PATH = os.getenv("TEST_VOICE_PATH", "test_voice.ogg") # Path to a dummy ogg voice file -TEST_STICKER_PATH = os.getenv("TEST_STICKER_PATH", "test_sticker.webp") # Path to a dummy webp sticker -TEST_BOT_USERNAME = os.getenv("TEST_BOT_USERNAME", "") # Username of a bot you own -TEST_INVITE_LINK_HASH = os.getenv("TEST_INVITE_LINK_HASH", "") # Hash from a valid invite link - -# Create dummy files if they don't exist -if not os.path.exists(TEST_FILE_PATH): - with open(TEST_FILE_PATH, "w") as f: - f.write("This is a test file.") -if not os.path.exists(TEST_PHOTO_PATH): - logger.warning(f"Test photo file not found: {TEST_PHOTO_PATH}. Some tests might fail.") -if not os.path.exists(TEST_VOICE_PATH): - logger.warning(f"Test voice file not found: {TEST_VOICE_PATH}. send_voice test will fail.") -if not os.path.exists(TEST_STICKER_PATH): - logger.warning(f"Test sticker file not found: {TEST_STICKER_PATH}. send_sticker test will fail.") - - -async def run_test(tool_func, description, **kwargs): - logger.info(f"--- Testing: {description} ({tool_func.__name__}) ---") - logger.info(f"Params: {kwargs}") - try: - result = await tool_func(**kwargs) - logger.info(f"Result: {result}") - return result - except Exception as e: - logger.error(f"Error during {tool_func.__name__}: {e}", exc_info=True) - return f"TEST FAILED: {e}" - -async def run_all_tests(): - global TEST_CHAT_ID # Declare intention to modify the global variable - if not await client.is_user_authorized(): - logger.error("Client not authorized. Please run main.py first to log in.") - return - - logger.info("Starting Telegram Tool Tests...") - me = await client.get_me() - me_info = await get_me() # Use the tool version - logger.info(f"Running tests as: {me.first_name} (ID: {me.id})") - - if not TEST_CHAT_ID: - TEST_CHAT_ID = me.id # Default to Saved Messages if not set - logger.warning(f"TEST_CHAT_ID not set, defaulting to Saved Messages ({TEST_CHAT_ID}).") - - # --- Basic Info & Chat Listing --- - logger.info("--- Running Basic Info & Listing Tests ---") - await run_test(get_me, "Get own user info") - await run_test(list_chats, "List recent chats", limit=5) - await run_test(list_chats, "List groups", chat_type='group', limit=5) - await run_test(list_chats, "List channels", chat_type='channel', limit=5) - await run_test(list_chats, "List users", chat_type='user', limit=5) - - # --- Test the basic get_chats function --- - await run_test(get_chats, "Get chats tool (paginated, page 1)", page=1, page_size=5) - - # --- Specific Chat Operations (using TEST_CHAT_ID) --- - message_id_to_test = None - if TEST_CHAT_ID: - logger.info(f"--- Running tests on Chat ID: {TEST_CHAT_ID} ---") - await run_test(get_chat, "Get info for test chat", chat_id=TEST_CHAT_ID) - await run_test(get_history, "Get history for test chat", chat_id=TEST_CHAT_ID, limit=5) - await run_test(get_messages, "Get messages tool (paginated)", chat_id=TEST_CHAT_ID, page=1, page_size=5) - - today = datetime.now() - yesterday = today - timedelta(days=1) - await run_test(list_messages, "List messages tool (filtered date)", chat_id=TEST_CHAT_ID, limit=10, from_date=yesterday.strftime("%Y-%m-%d"), to_date=today.strftime("%Y-%m-%d")) - await run_test(list_messages, "List messages tool (search)", chat_id=TEST_CHAT_ID, limit=10, search_query="Test") - - # Send a test message to get IDs for subsequent tests - sent_msg_result = await run_test(send_message, "Send test message", chat_id=TEST_CHAT_ID, message=f"MCP Test Message {random.randint(1000, 9999)}") - if "successfully" in str(sent_msg_result).lower(): - try: - # Fetch the last message sent by self (hopefully the test message) - async for msg in client.iter_messages(TEST_CHAT_ID, limit=5, from_user='me'): - # Check if it's the message we likely just sent - if "MCP Test Message" in msg.text: - message_id_to_test = msg.id - logger.info(f"Using message ID {message_id_to_test} for further tests.") - break - if not message_id_to_test: # Fallback if specific message not found - last_msgs = await client.get_messages(TEST_CHAT_ID, limit=1) - if last_msgs: - message_id_to_test = last_msgs[0].id - logger.warning(f"Could not find specific test message, using last message ID: {message_id_to_test}") - - except Exception as e: - logger.error(f"Could not get last message ID: {e}") - - if message_id_to_test: - logger.info(f"--- Running Message-Specific Tests (ID: {message_id_to_test}) ---") - await run_test(edit_message, "Edit test message", chat_id=TEST_CHAT_ID, message_id=message_id_to_test, new_text="MCP Test Message (edited)") - await run_test(get_message_context, "Get context for test message", chat_id=TEST_CHAT_ID, message_id=message_id_to_test, context_size=1) - await run_test(reply_to_message, "Reply to test message", chat_id=TEST_CHAT_ID, message_id=message_id_to_test, text="Test Reply") - await run_test(pin_message, "Pin test message", chat_id=TEST_CHAT_ID, message_id=message_id_to_test) - await asyncio.sleep(2) # Give time for pin to register - await run_test(get_pinned_messages, "Get pinned messages", chat_id=TEST_CHAT_ID) - await run_test(unpin_message, "Unpin test message", chat_id=TEST_CHAT_ID, message_id=message_id_to_test) - # Forwarding (Careful: forwards TO TEST_CHAT_ID FROM TEST_CHAT_ID) - await run_test(forward_message, "Forward test message", from_chat_id=TEST_CHAT_ID, message_id=message_id_to_test, to_chat_id=TEST_CHAT_ID) - await run_test(delete_message, "Delete test message", chat_id=TEST_CHAT_ID, message_id=message_id_to_test) - else: - logger.warning("Could not obtain a message ID for message-specific tests.") - - await run_test(search_messages, "Search for 'Test' in test chat", chat_id=TEST_CHAT_ID, query="Test", limit=5) - await run_test(mark_as_read, "Mark test chat as read", chat_id=TEST_CHAT_ID) - - # File Operations - logger.info("--- Running File Operation Tests ---") - await run_test(send_file, "Send test file", chat_id=TEST_CHAT_ID, file_path=TEST_FILE_PATH, caption="Test File") - await run_test(upload_file, "Upload test file", file_path=TEST_FILE_PATH) - # Find a message with media to test download/info - media_message_id = None - media_msg_obj = None - async for msg in client.iter_messages(TEST_CHAT_ID, limit=20): - if msg.media: - media_message_id = msg.id - media_msg_obj = msg - logger.info(f"Found media message ID {media_message_id} to test download/info.") - break - if media_message_id and media_msg_obj: - await run_test(get_media_info, "Get media info", chat_id=TEST_CHAT_ID, message_id=media_message_id) - # Use a more specific download path based on filename if possible - download_filename = getattr(media_msg_obj.media, 'document', None) - if download_filename: - download_filename = getattr(download_filename, 'attributes', [None])[0] - if download_filename: - download_filename = getattr(download_filename, 'file_name', None) - download_path = f"test_download_{download_filename or media_message_id}.tmp" - - await run_test(download_media, "Download media", chat_id=TEST_CHAT_ID, message_id=media_message_id, file_path=download_path) - if os.path.exists(download_path): - try: - os.remove(download_path) - logger.info(f"Cleaned up downloaded file: {download_path}") - except OSError as e: - logger.error(f"Error removing downloaded file {download_path}: {e}") - else: - logger.warning(f"Could not find downloaded file to clean up: {download_path}") - else: - logger.warning("No media message found in recent history to test download/info.") - - # Voice/Sticker/GIF (check paths exist) - logger.info("--- Running Media Send Tests ---") - if os.path.exists(TEST_VOICE_PATH): - await run_test(send_voice, "Send voice message", chat_id=TEST_CHAT_ID, file_path=TEST_VOICE_PATH) - - # Enhanced sticker testing - logger.info("--- Running Enhanced Sticker Tests ---") - if os.path.exists(TEST_STICKER_PATH): - await run_test(send_sticker, "Send sticker file", chat_id=TEST_CHAT_ID, file_path=TEST_STICKER_PATH) - else: - logger.warning(f"Test sticker file not found at {TEST_STICKER_PATH}") - - # Test sticker set retrieval - sticker_sets = await run_test(get_sticker_sets, "Get all available sticker sets") - - # Try to parse sticker set info - try: - # Parse any available sticker info - if sticker_sets and "sets" in sticker_sets: - logger.info("Successfully retrieved sticker sets") - elif sticker_sets and len(sticker_sets) > 2: # JSON output has at least [] - logger.info(f"Retrieved sticker sets data: {sticker_sets[:100]}...") - else: - logger.warning("No sticker sets found or empty response") - - # If we're on a test account with limited stickers, we could: - # 1. Add a sticker set (not implemented in our tools) - # 2. Remove a sticker set (not implemented in our tools) - logger.info("Note: Adding/removing sticker sets requires additional tools not currently implemented") - except Exception as e: - logger.error(f"Error in sticker set processing: {e}") - - # GIF testing - gif_search_result = await run_test(get_gif_search, "Search for GIFs", query="hello", limit=1) - try: - gif_ids = json.loads(gif_search_result) - if gif_ids: - gif_id = gif_ids[0] - await run_test(send_gif, "Send GIF by ID", chat_id=TEST_CHAT_ID, gif_id=gif_id) - else: - logger.warning("No GIF IDs returned from search.") - except Exception as e: - logger.warning(f"Could not parse or send GIF ID from search result: {e}") - - # Mute/Archive - logger.info("--- Running Chat State Tests ---") - await run_test(mute_chat, "Mute test chat", chat_id=TEST_CHAT_ID) - await asyncio.sleep(1) - await run_test(unmute_chat, "Unmute test chat", chat_id=TEST_CHAT_ID) - await asyncio.sleep(1) - # Archive/Unarchive - Check if ToggleDialogPinRequest exists and test - if hasattr(types.messages, 'ToggleDialogPinRequest'): - logger.warning("--- Testing Archive/Unarchive (May depend on Telethon version) ---") - await run_test(archive_chat, "Archive test chat", chat_id=TEST_CHAT_ID) - await asyncio.sleep(1) - await run_test(unarchive_chat, "Unarchive test chat", chat_id=TEST_CHAT_ID) - await asyncio.sleep(1) - else: - logger.warning("ToggleDialogPinRequest not found, skipping archive/unarchive tests.") - - # --- Contact Operations --- - logger.info("--- Running Contact Operations Tests ---") - await run_test(list_contacts, "List contacts") - await run_test(export_contacts, "Export contacts to JSON") - - contact_to_delete_id = None - if TEST_CONTACT_PHONE and TEST_CONTACT_FNAME: - logger.warning("--- Running Add/Delete Contact Test (requires valid phone number) ---") - # Check if contact already exists - contact_exists = False - contacts_list = await list_contacts() - if TEST_CONTACT_PHONE in str(contacts_list): - logger.warning(f"Contact with phone {TEST_CONTACT_PHONE} seems to exist. Skipping add.") - # Try to find ID for deletion test anyway - try: - lines = str(contacts_list).split('\n') - for line in lines: - if TEST_CONTACT_PHONE in line: - contact_to_delete_id = int(line.split(',')[0].split(':')[1].strip()) - logger.info(f"Found existing contact ID for deletion test: {contact_to_delete_id}") - break - except Exception as e: - logger.error(f"Could not parse existing contact ID: {e}") - contact_exists = True - - if not contact_exists: - add_result = await run_test(add_contact, "Add test contact", phone=TEST_CONTACT_PHONE, first_name=TEST_CONTACT_FNAME, last_name=TEST_CONTACT_LNAME) - await asyncio.sleep(5) # Give time for contact to sync - if "added successfully" in str(add_result).lower(): - # Try to find the added contact to get its ID for deletion - search_res = await run_test(search_contacts, "Search for added contact", query=TEST_CONTACT_PHONE) - try: - lines = str(search_res).split('\n') - for line in lines: - if TEST_CONTACT_PHONE in line and f"Name: {TEST_CONTACT_FNAME}" in line: - contact_to_delete_id = int(line.split(',')[0].split(':')[1].strip()) - logger.info(f"Found added contact ID for deletion: {contact_to_delete_id}") - break - except Exception as e: - logger.error(f"Could not parse contact ID from search result: {e}") - else: - logger.warning("Add contact failed or did not return success message.") - - if contact_to_delete_id: - await run_test(get_direct_chat_by_contact, "Get direct chat by contact's phone", contact_query=TEST_CONTACT_PHONE) - await run_test(get_contact_chats, "Get chats involving contact", contact_id=contact_to_delete_id) - logger.warning(f"--- Proceeding to delete contact ID: {contact_to_delete_id} ---") - await run_test(delete_contact, "Delete test contact", user_id=contact_to_delete_id) - else: - logger.warning("Could not find contact by phone number to test deletion/other ops.") - - if TEST_USERNAME: - await run_test(search_contacts, "Search contacts by username", query=TEST_USERNAME) - await run_test(resolve_username, "Resolve test username", username=TEST_USERNAME) - await run_test(get_direct_chat_by_contact, "Get direct chat by test username", contact_query=TEST_USERNAME) - - await run_test(get_contact_ids, "Get contact IDs") - # import_contacts test requires a list of dicts, harder to setup via env vars - # logger.warning("Skipping import_contacts test - requires manual setup.") - # await run_test(import_contacts, "Import contacts", contacts=[{'phone': '+1555...', 'first_name': ...}]) - # Clarification: import_contacts test is skipped as it requires complex setup (list of dictionaries) - # which is difficult to manage solely through environment variables for automated testing. - - # --- User Interaction (Requires TEST_USER_ID) --- - if TEST_USER_ID: - logger.info(f"--- Running User Interaction Tests (User ID: {TEST_USER_ID}) ---") - await run_test(get_user_status, "Get test user status", user_id=TEST_USER_ID) - await run_test(get_user_photos, "Get test user photos", user_id=TEST_USER_ID, limit=1) - # Check if the user is a contact before running contact-specific tests - is_contact = False - contacts_res = await list_contacts() - if f"ID: {TEST_USER_ID}" in str(contacts_res): - is_contact = True - logger.info(f"User {TEST_USER_ID} is a contact.") - await run_test(get_last_interaction, "Get last interaction with test user contact", contact_id=TEST_USER_ID) - await run_test(get_contact_chats, "Get chats involving test user contact", contact_id=TEST_USER_ID) - else: - logger.warning(f"User {TEST_USER_ID} is not a contact. Skipping contact-specific tests.") - - # Block/Unblock - await run_test(block_user, "Block test user", user_id=TEST_USER_ID) - await asyncio.sleep(1) - await run_test(get_blocked_users, "Get blocked users (check if test user is present)") - await run_test(unblock_user, "Unblock test user", user_id=TEST_USER_ID) - await asyncio.sleep(1) - - # --- Supergroup/Channel Operations (Requires TEST_SUPERGROUP_ID and TEST_USER_ID) --- - created_group_id = None - created_channel_id = None - if TEST_USER_ID: - # Create Group Test (Requires TEST_USER_ID) - logger.warning("--- Running Group/Channel Creation Test ---") - create_group_res = await run_test(create_group, "Create test group with test user", title=f"MCP Test Group {random.randint(100,999)}", user_ids=[TEST_USER_ID]) - try: - if "created with ID" in create_group_res: - created_group_id = int(create_group_res.split(':')[-1].strip()) - logger.info(f"Created group ID: {created_group_id}") - await asyncio.sleep(2) - logger.warning(f"--- Leaving newly created group: {created_group_id} ---") - await run_test(leave_chat, "Leave newly created group", chat_id=created_group_id) - except Exception as e: - logger.error(f"Failed to parse or leave created group: {e}") - - # Create Channel Test (No additional users needed initially) - create_channel_res = await run_test(create_channel, "Create test channel", title=f"MCP Test Channel {random.randint(100,999)}", about="Test channel created by MCP", megagroup=False) - try: - if "created with ID" in create_channel_res: - created_channel_id = int(create_channel_res.split(':')[-1].strip()) - logger.info(f"Created channel ID: {created_channel_id}") - await asyncio.sleep(2) - logger.warning(f"--- Leaving newly created channel: {created_channel_id} ---") - await run_test(leave_chat, "Leave newly created channel", chat_id=created_channel_id) - except Exception as e: - logger.error(f"Failed to parse or leave created channel: {e}") - - if TEST_SUPERGROUP_ID: - logger.info(f"--- Running Supergroup/Channel Operations Tests (Chat ID: {TEST_SUPERGROUP_ID}) ---") - await run_test(get_chat, "Get test supergroup info", chat_id=TEST_SUPERGROUP_ID) - await run_test(get_participants, "Get participants of test supergroup", chat_id=TEST_SUPERGROUP_ID) - await run_test(get_admins, "Get admins of test supergroup", chat_id=TEST_SUPERGROUP_ID) - await run_test(get_banned_users, "Get banned users of test supergroup", chat_id=TEST_SUPERGROUP_ID) - await run_test(get_recent_actions, "Get recent actions for supergroup", chat_id=TEST_SUPERGROUP_ID) - await run_test(get_invite_link, "Get invite link for supergroup", chat_id=TEST_SUPERGROUP_ID) - await run_test(export_chat_invite, "Export chat invite for supergroup", chat_id=TEST_SUPERGROUP_ID) - - if TEST_INVITE_LINK_HASH: - logger.warning(f"--- Running Join Chat by Invite Hash Test (Requires valid HASH: {TEST_INVITE_LINK_HASH}) ---") - # Extract hash if full URL is provided - invite_hash = TEST_INVITE_LINK_HASH - if invite_hash.startswith("https://t.me/+"): - invite_hash = invite_hash.split("+", 1)[1] - logger.info(f"Extracted hash from URL: {invite_hash}") - # This will handle various cases, including invalid/expired hash or already a member - import_res = await run_test(import_chat_invite, "Join chat via import hash", hash=invite_hash) - - # Check if the response indicates already a member or successful join - already_member = "already a member" in import_res.lower() - success_join = "successfully joined" in import_res.lower() - logger.info(f"Invite result: {'Already a member' if already_member else 'Successfully joined' if success_join else 'Failed to join'}") - - # Also test the full URL version if appropriate - if TEST_INVITE_LINK_HASH.startswith("https://"): - await run_test(join_chat_by_link, "Join chat via full link", link=TEST_INVITE_LINK_HASH) - - # If we successfully joined a chat, we should leave it to clean up - if success_join and "chat:" in import_res: - try: - # Extract chat ID from success message if possible - chat_title = import_res.split("chat:", 1)[1].strip() - logger.warning(f"Attempting to find and leave newly joined chat: '{chat_title}'") - - # Try to find the chat ID by matching the title - async for dialog in client.iter_dialogs(limit=10): # Check recent dialogs - if dialog.name == chat_title: - logger.info(f"Found chat to leave: {dialog.name} (ID: {dialog.id})") - await run_test(leave_chat, "Leave newly joined chat", chat_id=dialog.id) - break - except Exception as leave_err: - logger.error(f"Failed to leave newly joined chat: {leave_err}") - else: - logger.warning("TEST_INVITE_LINK_HASH not set. Skipping join/import tests.") - - if TEST_USER_ID: - # Ban/Unban/Invite/Promote tests (Use with EXTREME caution) - logger.warning(f"--- Running potentially disruptive tests on supergroup {TEST_SUPERGROUP_ID} with user {TEST_USER_ID} ---") - await run_test(ban_user, "Ban test user from supergroup", chat_id=TEST_SUPERGROUP_ID, user_id=TEST_USER_ID) - await asyncio.sleep(2) - await run_test(get_banned_users, "Get banned users (check test user)", chat_id=TEST_SUPERGROUP_ID) - await run_test(unban_user, "Unban test user from supergroup", chat_id=TEST_SUPERGROUP_ID, user_id=TEST_USER_ID) - await asyncio.sleep(2) - # Ensure user is not already participant before inviting - try: - # Use a more specific filter if possible - # participants = await client.get_participants(TEST_SUPERGROUP_ID, filter=types.ChannelParticipantsSearch(q=str(TEST_USER_ID)), limit=1) - # Simpler check: iterate briefly - user_in_group = False - async for p in client.iter_participants(TEST_SUPERGROUP_ID, limit=50): # Limit search scope - if p.id == TEST_USER_ID: - user_in_group = True - break - if user_in_group: - logger.info(f"User {TEST_USER_ID} already in group {TEST_SUPERGROUP_ID}, skipping invite.") - else: - await run_test(invite_to_group, "Invite test user to supergroup", group_id=TEST_SUPERGROUP_ID, user_ids=[TEST_USER_ID]) - await asyncio.sleep(2) - except UserNotParticipantError: - # This error is expected if user is not participant, proceed with invite - await run_test(invite_to_group, "Invite test user to supergroup (UserNotParticipantError caught)", group_id=TEST_SUPERGROUP_ID, user_ids=[TEST_USER_ID]) - await asyncio.sleep(2) - except Exception as p_err: - logger.warning(f"Could not check participant status before invite: {p_err}. Attempting invite anyway.") - # Try inviting anyway - await run_test(invite_to_group, "Invite test user to supergroup (attempt)", group_id=TEST_SUPERGROUP_ID, user_ids=[TEST_USER_ID]) - await asyncio.sleep(2) - - await run_test(promote_admin, "Promote test user to admin", chat_id=TEST_SUPERGROUP_ID, user_id=TEST_USER_ID) - await asyncio.sleep(2) - await run_test(get_admins, "Get admins (check test user)", chat_id=TEST_SUPERGROUP_ID) - await run_test(demote_admin, "Demote test user from admin", chat_id=TEST_SUPERGROUP_ID, user_id=TEST_USER_ID) - await asyncio.sleep(2) - # Leave chat test needs careful consideration - don't leave accidentally! - # logger.warning(f"--- Skipping leave_chat test for TEST_SUPERGROUP_ID: {TEST_SUPERGROUP_ID} ---") - # await run_test(leave_chat, "Leave test supergroup", chat_id=TEST_SUPERGROUP_ID) - - # Title/Photo Edit - original_title_res = await run_test(get_chat, "Get supergroup title before edit", chat_id=TEST_SUPERGROUP_ID) - original_title = "Unknown" - if "Title:" in str(original_title_res): - try: - original_title = str(original_title_res).split("Title:")[1].split('\n')[0].strip() - logger.info(f"Original title found: '{original_title}'") - except Exception as title_e: - logger.warning(f"Could not parse original title: {title_e}") - - random_suffix = ''.join(random.choices(string.ascii_lowercase, k=4)) - new_title = f"Test Title {random_suffix}" - await run_test(edit_chat_title, "Edit supergroup title", chat_id=TEST_SUPERGROUP_ID, title=new_title) - await asyncio.sleep(2) - # Restore original title if possible - if original_title != "Unknown": - await run_test(edit_chat_title, "Restore supergroup title", chat_id=TEST_SUPERGROUP_ID, title=original_title) - else: - logger.warning("Could not determine original title to restore.") - - if os.path.exists(TEST_PHOTO_PATH): - await run_test(edit_chat_photo, "Edit supergroup photo", chat_id=TEST_SUPERGROUP_ID, file_path=TEST_PHOTO_PATH) - await asyncio.sleep(2) - await run_test(delete_chat_photo, "Delete supergroup photo", chat_id=TEST_SUPERGROUP_ID) - - # --- Profile & Privacy (Use with EXTREME caution!) --- - logger.warning("--- Running Profile & Privacy Tests (Potentially Invasive - Mostly Skipped) ---") - # logger.warning("--- update_profile tests are commented out by default ---") - # original_bio = "" # Need to fetch current bio first if we want to restore - # await run_test(update_profile, "Update profile bio", about=f"MCP Test Bio {random.randint(100,999)}") - # await asyncio.sleep(1) - # await run_test(update_profile, "Restore profile bio", about=original_bio) # Restore to empty or original - - # logger.warning("--- set/delete_profile_photo tests are commented out by default ---") - # if os.path.exists(TEST_PHOTO_PATH): - # await run_test(set_profile_photo, "Set profile photo", file_path=TEST_PHOTO_PATH) - # await asyncio.sleep(2) - # await run_test(delete_profile_photo, "Delete profile photo") - - await run_test(get_privacy_settings, "Get privacy settings (last seen)") - # set_privacy_settings is complex and risky to test automatically. - logger.warning("Skipping set_privacy_settings test due to complexity and risk.") - # Example: Allow only TEST_USER_ID to see last seen (if TEST_USER_ID is set) - # if TEST_USER_ID: - # logger.warning("Testing set_privacy_settings - allowing TEST_USER_ID for last seen") - # await run_test(set_privacy_settings, "Set privacy (last seen - allow test user)", key='status_timestamp', allow_users=[TEST_USER_ID]) - # await asyncio.sleep(2) - # logger.warning("Restoring default privacy for last seen") - # await run_test(set_privacy_settings, "Restore privacy (last seen - allow all)", key='status_timestamp', allow_users=[]) # Assuming empty means allow all? - - # --- Bot Operations (Requires TEST_BOT_USERNAME) --- - if TEST_BOT_USERNAME: - logger.info(f"--- Running Bot Operations Tests (Bot: {TEST_BOT_USERNAME}) ---") - await run_test(get_bot_info, "Get bot info", bot_username=TEST_BOT_USERNAME) - - # Check if our client is a bot before testing command setting - is_bot = False - try: - me = await client.get_me() - is_bot = getattr(me, 'bot', False) - except Exception as e: - logger.error(f"Error checking if client is a bot: {e}") - - if is_bot: - # Only proceed with set_bot_commands test if we're a bot - logger.info("Client is a bot account, testing set_bot_commands") - await run_test(set_bot_commands, "Set bot commands", bot_username=TEST_BOT_USERNAME, - commands=[{'command': 'mcp_test', 'description': 'MCP Test Command'}]) - await asyncio.sleep(2) - await run_test(set_bot_commands, "Clear bot commands", bot_username=TEST_BOT_USERNAME, commands=[]) - else: - # Skip the set_bot_commands test if we're not a bot - logger.warning("Client is a regular user account, not a bot. Skipping set_bot_commands test.") - logger.info("Note: The set_bot_commands function can only be used by bot accounts.") - else: - logger.warning("TEST_BOT_USERNAME not set. Skipping bot tests.") - - # --- Other Operations --- - logger.info("--- Running Other Operations Tests ---") - await run_test(search_public_chats, "Search public chats for 'bot'", query="bot") - await run_test(get_sticker_sets, "Get sticker sets") - - # Final check for remaining tools that haven't been explicitly tested - logger.info("--- Testing Remaining Tools ---") - - # Test the archive/unarchive chat functions if not already tested - if TEST_CHAT_ID: - try: - # Only run if we haven't tested these already - await run_test(archive_chat, "Archive test chat (final check)", chat_id=TEST_CHAT_ID) - await asyncio.sleep(1) - await run_test(unarchive_chat, "Unarchive test chat (final check)", chat_id=TEST_CHAT_ID) - except Exception as e: - logger.warning(f"Archive/unarchive test failed: {e}") - - logger.info("--- All Tests Completed ---") - - -if __name__ == "__main__": - nest_asyncio.apply() - - async def main(): - try: - logger.info("Starting Telegram client for testing...") - # Ensure client is started and authorized - await client.start() - if not await client.is_user_authorized(): - logger.error("Client authorization failed. Please run main.py interactively first.") - sys.exit(1) - - await run_all_tests() - - except Exception as e: - logger.critical(f"Critical error during test execution: {e}", exc_info=True) - sys.exit(1) - finally: - if client.is_connected(): - logger.info("Disconnecting Telegram client...") - await client.disconnect() - - asyncio.run(main()) \ No newline at end of file