diff --git a/.env.example b/.env.example index 41cbf8b..9a49d47 100644 --- a/.env.example +++ b/.env.example @@ -1,10 +1,50 @@ -# Your Telegram API credentials from https://my.telegram.org/apps +# Telegram API Credentials (Required - get from https://my.telegram.org/apps) TELEGRAM_API_ID=123456 TELEGRAM_API_HASH=0123456789abcdef0123456789abcdef -# Session name (can be any name you choose) +# Session Management (Choose ONE) +# Option 1: File-based session (a .session file will be created) TELEGRAM_SESSION_NAME=telegram_session +# Option 2: String-based session (if you generate one, e.g., using Telethon's string session generator) +TELEGRAM_SESSION_STRING= -# Optional: Session string for portable authentication (leave empty if not using) -# This will be generated by the session_string_generator.py script -TELEGRAM_SESSION_STRING= \ No newline at end of file +# --- Test Script Configuration (test.py) --- +# Fill these with IDs/paths relevant to YOUR test environment. +# Using real user/group IDs requires caution and consent. + +# A chat ID where the script can send/edit/delete messages safely. +# Defaults to your "Saved Messages" if left empty or 0. +TEST_CHAT_ID=0 + +# The ID of a Supergroup or Channel you own or are an admin in. +# Required for testing admin actions (ban, invite, promote, etc.). +TEST_SUPERGROUP_ID=0 + +# The numeric User ID of a TEST account (NOT a real person unless they consent). +# This user will be banned/unbanned/invited/promoted in the TEST_SUPERGROUP_ID. +TEST_USER_ID=0 + +# The username (without @) of the TEST_USER_ID. +TEST_USERNAME=username + +# --- Optional Test Variables --- + +# Phone number (E.164 format, e.g., +15551234567) for add_contact test. +TEST_CONTACT_PHONE=+15551234567 +TEST_CONTACT_FNAME=Test +TEST_CONTACT_LNAME=Contact + +# Paths to dummy files for testing uploads/sending media. +# The script will create test_upload.txt if it doesn't exist. +# You might need to provide small, valid image/voice/sticker files for other tests. +TEST_FILE_PATH=test_upload.txt +TEST_PHOTO_PATH=test_photo.jpg +TEST_VOICE_PATH=test_voice.ogg +TEST_STICKER_PATH=sticker.webp + +# Username of a bot you own (required for get_bot_info, set_bot_commands tests) +TEST_BOT_USERNAME=your_bot_username + +# Hash part or full invite link for a group/channel you want to test joining +# Required for join_chat_by_link / import_chat_invite tests +TEST_INVITE_LINK_HASH=https://t.me/+AbCdEfGhIjK \ No newline at end of file diff --git a/.gitignore b/.gitignore index e5fc31d..1bfc37f 100644 --- a/.gitignore +++ b/.gitignore @@ -176,8 +176,6 @@ cython_debug/ # Telegram session files *.session *.session-journal -<<<<<<< HEAD -======= anon.session anon.session-journal anon_new.session @@ -188,3 +186,15 @@ anon_new.session-journal # Claude Desktop config claude_desktop_config.json .DS_Store + +# Test files +.cursor/ +test_voice.ogg +test_upload.txt +test_output.txt +sticker.webp +test_download_* +test_download.* +two.png +*.tmp +mcp_errors.log diff --git a/README.md b/README.md index b3c7fa4..58d20fc 100644 --- a/README.md +++ b/README.md @@ -93,7 +93,7 @@ This MCP server exposes a huge suite of Telegram tools. **Every major Telegram/T - **get_gif_search(query, limit)**: Search for GIFs - **send_gif(chat_id, gif_id)**: Send a GIF - **get_bot_info(bot_username)**: Get info about a bot -- **set_bot_commands(bot_username, commands)**: Set bot commands +- **set_bot_commands(bot_username, commands)**: Set bot commands (bot accounts only) ### Privacy, Settings, and Misc - **get_privacy_settings()**: Get privacy settings @@ -140,6 +140,8 @@ Follow the prompts to authenticate and update your `.env` file. ### 4. Configure .env +Copy `.env.example` to `.env` and fill in your values: + ``` TELEGRAM_API_ID=your_api_id_here TELEGRAM_API_HASH=your_api_hash_here @@ -184,6 +186,36 @@ Edit `~/.cursor/mcp.json`: --- +## 🧪 Testing + +A comprehensive test script is included to validate all functionality: + +```bash +# Basic test (redirects output to file) +python test.py > test_output.txt 2>&1 +``` + +The test script uses environment variables from your `.env` file to configure testing parameters. See `.env.example` for all available test configuration options. + +### Test Configuration + +You can configure test parameters in your `.env` file: + +``` +# A safe chat ID where tests can send/delete messages +TEST_CHAT_ID=your_saved_messages_id + +# A supergroup you admin for testing group operations +TEST_SUPERGROUP_ID=your_supergroup_id + +# A test user account ID (not a real person unless they consent) +TEST_USER_ID=test_user_id +``` + +The tests are designed to be non-destructive, but use caution when testing with real accounts. + +--- + ## 🎮 Usage Examples - "Show my recent chats" @@ -194,11 +226,29 @@ Edit `~/.cursor/mcp.json`: - "Mute notifications for chat 123456789" - "Promote user 111 to admin in group 123456789" - "Search for public channels about 'news'" +- "Join the Telegram group with invite link https://t.me/+AbCdEfGhIjK" +- "Send a sticker to my Saved Messages" +- "Get all my sticker sets" You can use these tools via natural language in Claude, Cursor, or any MCP-compatible client. --- +## 🧠 Error Handling & Robustness + +This implementation includes comprehensive error handling: + +- **Session management**: Works with both file-based and string-based sessions +- **Error reporting**: Detailed errors logged to `mcp_errors.log` +- **Graceful degradation**: Multiple fallback approaches for critical functions +- **User-friendly messages**: Clear, actionable error messages instead of technical errors +- **Account type detection**: Functions that require bot accounts detect and notify when used with user accounts +- **Invite link processing**: Handles various link formats and already-member cases + +The code is designed to be robust against common Telegram API issues and limitations. + +--- + ## 🛠️ Contribution Guide 1. **Fork this repo:** [chigwell/telegram-mcp](https://github.com/chigwell/telegram-mcp) @@ -220,15 +270,20 @@ You can use these tools via natural language in Claude, Cursor, or any MCP-compa - **Never commit your `.env` or session string.** - The session string gives full access to your Telegram account—keep it safe! - All processing is local; no data is sent anywhere except Telegram's API. +- Use `.env.example` as a template and keep your actual `.env` file private. +- Test files are automatically excluded in `.gitignore`. --- ## 🛠️ Troubleshooting - **Check logs** in your MCP client (Claude/Cursor) and the terminal for errors. +- **Detailed error logs** can be found in `mcp_errors.log`. - **Interpreter errors?** Make sure your `.venv` is created and selected. - **Database lock?** Use session string authentication, not file-based sessions. - **iCloud/Dropbox issues?** Move your project to a local path without spaces if you see odd errors. - **Regenerate session string** if you change your Telegram password or see auth errors. +- **Bot-only functions** will show clear messages when used with regular user accounts. +- **Test script failures?** Check test configuration in `.env` for valid test accounts/groups. --- diff --git a/main.py b/main.py index e9d4c01..ddbbe89 100644 --- a/main.py +++ b/main.py @@ -9,7 +9,7 @@ from telethon import TelegramClient from telethon.sessions import StringSession import sqlite3 from telethon import utils -from telethon.tl.types import User, Chat, Channel +from telethon.tl.types import User, Chat, Channel, ChatAdminRights, ChatBannedRights, ChannelParticipantsKicked, ChannelParticipantsAdmins, InputChatPhoto, InputChatUploadedPhoto, InputChatPhotoEmpty, InputPeerUser, InputPeerChat, InputPeerChannel from telethon.tl.functions.contacts import SearchRequest from datetime import datetime, timedelta import json @@ -18,6 +18,16 @@ from telethon import functions import mimetypes import logging +# Helper function for JSON serialization of datetime, bytes, and other non-serializable objects +def json_serializer(obj): + """Helper function to convert non-serializable objects for JSON serialization.""" + if isinstance(obj, datetime): + return obj.isoformat() + if isinstance(obj, bytes): + return obj.decode('utf-8', errors='replace') + # Add other non-serializable types as needed + raise TypeError(f"Object of type {type(obj)} is not JSON serializable") + load_dotenv() TELEGRAM_API_ID = int(os.getenv("TELEGRAM_API_ID")) @@ -244,14 +254,16 @@ async def list_messages(chat_id: int, limit: int = 20, search_query: str = None, if from_date: try: from_date_obj = datetime.strptime(from_date, "%Y-%m-%d") + # Make it timezone aware by adding UTC timezone info + from_date_obj = from_date_obj.replace(tzinfo=datetime.timezone.utc) except ValueError: return f"Invalid from_date format. Use YYYY-MM-DD." if to_date: try: to_date_obj = datetime.strptime(to_date, "%Y-%m-%d") - # Set to end of day - to_date_obj = to_date_obj + timedelta(days=1, microseconds=-1) + # Set to end of day and make timezone aware + to_date_obj = (to_date_obj + timedelta(days=1, microseconds=-1)).replace(tzinfo=datetime.timezone.utc) except ValueError: return f"Invalid to_date format. Use YYYY-MM-DD." @@ -287,6 +299,7 @@ async def list_messages(chat_id: int, limit: int = 20, search_query: str = None, return "\n".join(lines) except Exception as e: + logger.exception(f"list_messages failed (chat_id={chat_id})") return f"Error retrieving messages: {e}" @@ -365,15 +378,29 @@ async def get_chat(chat_id: int) -> str: result = [] result.append(f"ID: {entity.id}") + is_channel = isinstance(entity, Channel) + is_chat = isinstance(entity, Chat) + is_user = isinstance(entity, User) + if hasattr(entity, 'title'): result.append(f"Title: {entity.title}") - chat_type = "Channel" if getattr(entity, 'broadcast', False) else "Group" + chat_type = "Channel" if is_channel and getattr(entity, 'broadcast', False) else "Group" + if is_channel and getattr(entity, 'megagroup', False): + chat_type = "Supergroup" + elif is_chat: + chat_type = "Group (Basic)" result.append(f"Type: {chat_type}") if hasattr(entity, 'username') and entity.username: result.append(f"Username: @{entity.username}") - if hasattr(entity, 'participants_count'): - result.append(f"Participants: {entity.participants_count}") - elif isinstance(entity, User): + + # Fetch participants count reliably + try: + participants_count = (await client.get_participants(entity, limit=0)).total + result.append(f"Participants: {participants_count}") + except Exception as pe: + result.append(f"Participants: Error fetching ({pe})") + + elif is_user: name = f"{entity.first_name}" if entity.last_name: name += f" {entity.last_name}" @@ -388,21 +415,29 @@ async def get_chat(chat_id: int) -> str: # Get last activity if it's a dialog try: - dialogs = await client.get_dialogs(limit=100) - for dialog in dialogs: - if dialog.entity.id == chat_id: - result.append(f"Unread Messages: {dialog.unread_count}") - if dialog.message: - last_msg = dialog.message - sender = getattr(last_msg.sender, 'first_name', '') or 'Unknown' - result.append(f"Last Message: From {sender} at {last_msg.date}") - result.append(f"Message: {last_msg.message or '[Media/No text]'}") - break - except: + # Using get_dialogs might be slow if there are many dialogs + # Alternative: Get entity again via get_dialogs if needed for unread count + dialog = await client.get_dialogs(limit=1, offset_id=0, offset_peer=entity) + if dialog: + dialog = dialog[0] + result.append(f"Unread Messages: {dialog.unread_count}") + if dialog.message: + last_msg = dialog.message + sender_name = "Unknown" + if last_msg.sender: + sender_name = getattr(last_msg.sender, 'first_name', '') or getattr(last_msg.sender, 'title', 'Unknown') + if hasattr(last_msg.sender, 'last_name') and last_msg.sender.last_name: + sender_name += f" {last_msg.sender.last_name}" + sender_name = sender_name.strip() or "Unknown" + result.append(f"Last Message: From {sender_name} at {last_msg.date}") + result.append(f"Message: {last_msg.message or '[Media/No text]'}") + except Exception as diag_ex: + logger.warning(f"Could not get dialog info for {chat_id}: {diag_ex}") pass return "\n".join(result) except Exception as e: + logger.exception(f"get_chat failed (chat_id={chat_id})") return f"Error getting chat info: {e}" @@ -597,9 +632,12 @@ async def add_contact(phone: str, first_name: str, last_name: str = "") -> str: last_name: The contact's last name (optional). """ try: + # Try to import the required types first + from telethon.tl.types import InputPhoneContact + result = await client(functions.contacts.ImportContactsRequest( contacts=[ - functions.contacts.InputPhoneContact( + InputPhoneContact( client_id=0, phone=phone, first_name=first_name, @@ -610,8 +648,27 @@ async def add_contact(phone: str, first_name: str, last_name: str = "") -> str: if result.imported: return f"Contact {first_name} {last_name} added successfully." else: - return f"Contact not added. Response: {result.stringify()}" + return f"Contact not added. Response: {str(result)}" + except (ImportError, AttributeError) as type_err: + # Try alternative approach using raw API + try: + result = await client(functions.contacts.ImportContactsRequest( + contacts=[{ + 'client_id': 0, + 'phone': phone, + 'first_name': first_name, + 'last_name': last_name + }] + )) + if hasattr(result, 'imported') and result.imported: + return f"Contact {first_name} {last_name} added successfully (alt method)." + else: + return f"Contact not added. Alternative method response: {str(result)}" + except Exception as alt_e: + logger.exception(f"add_contact (alt method) failed (phone={phone})") + return f"Error adding contact (alternative method): {alt_e}" except Exception as e: + logger.exception(f"add_contact failed (phone={phone})") return f"Error adding contact: {e}" @@ -691,18 +748,54 @@ async def create_group(title: str, user_ids: list) -> str: @mcp.tool() async def invite_to_group(group_id: int, user_ids: list) -> str: """ - Invite users to a group by group ID. + Invite users to a group or channel by group ID. Args: - group_id: The group chat ID. + group_id: The group/channel chat ID. user_ids: List of user IDs to invite. """ try: - users = [await client.get_entity(uid) for uid in user_ids] - await client(functions.messages.AddChatUserRequest(chat_id=group_id, user_id=users[0], fwd_limit=0)) - # Telethon only allows adding one user at a time for AddChatUserRequest (for basic groups) - # For supergroups/channels, use InviteToChannelRequest - return f"Invited users to group {group_id}." + chat_entity = await client.get_entity(group_id) + user_entities = [] + for uid in user_ids: + try: + user_entities.append(await client.get_entity(uid)) + except Exception as user_e: + logger.error(f"Could not find user entity for ID {uid}: {user_e}") + return f"Error finding user {uid}: {user_e}" + + if not user_entities: + return "No valid user IDs provided or found." + + if isinstance(chat_entity, Channel): + # Use InviteToChannelRequest for channels and supergroups + await client(functions.channels.InviteToChannelRequest( + channel=chat_entity, + users=user_entities + )) + return f"Invited {len(user_entities)} users to channel/supergroup {group_id}." + elif isinstance(chat_entity, Chat): + # Use AddChatUserRequest for basic groups (adds one user at a time) + added_count = 0 + errors = [] + for user in user_entities: + try: + # Note: fwd_limit=0 might be needed depending on privacy settings + await client(functions.messages.AddChatUserRequest(chat_id=group_id, user_id=user, fwd_limit=50)) + added_count += 1 + except Exception as add_e: + error_msg = f"Error inviting user {getattr(user, 'id', 'unknown')} to basic group {group_id}: {add_e}" + logger.error(error_msg) + errors.append(error_msg) + + result_message = f"Invited {added_count} users to basic group {group_id}." + if errors: + result_message += "\nErrors encountered:\n" + "\n".join(errors) + return result_message + else: + return f"Chat ID {group_id} is neither a Channel/Supergroup nor a basic Group." + except Exception as e: + logger.exception(f"invite_to_group failed (group_id={group_id}, user_ids={user_ids})") return f"Error inviting users: {e}" @@ -714,9 +807,21 @@ async def leave_chat(chat_id: int) -> str: chat_id: The chat ID to leave. """ try: - await client(functions.messages.LeaveChatRequest(chat_id=chat_id)) - return f"Left chat {chat_id}." + entity = await client.get_entity(chat_id) + if isinstance(entity, Channel): + # Leave channel or supergroup + await client(functions.channels.LeaveChannelRequest(channel=entity)) + return f"Left channel/supergroup {chat_id}." + elif isinstance(entity, Chat): + # Leave basic group + me = await client.get_me(input_peer=True) # Get self entity for DeleteChatUserRequest + await client(functions.messages.DeleteChatUserRequest(chat_id=chat_id, user_id=me)) + return f"Left basic group {chat_id}." + else: + # Cannot leave a user chat this way + return f"Cannot leave chat {chat_id} of type {type(entity)}. This function is for groups and channels." except Exception as e: + logger.exception(f"leave_chat failed (chat_id={chat_id})") return f"Error leaving chat: {e}" @@ -918,22 +1023,47 @@ async def edit_chat_title(chat_id: int, title: str) -> str: Edit the title of a chat, group, or channel. """ try: - await client(functions.messages.EditChatTitleRequest(chat_id=chat_id, title=title)) - return f"Chat {chat_id} title updated." + entity = await client.get_entity(chat_id) + if isinstance(entity, Channel): + await client(functions.channels.EditTitleRequest(channel=entity, title=title)) + elif isinstance(entity, Chat): + await client(functions.messages.EditChatTitleRequest(chat_id=chat_id, title=title)) + else: + return f"Cannot edit title for this entity type ({type(entity)})." + return f"Chat {chat_id} title updated to '{title}'." except Exception as e: + logger.exception(f"edit_chat_title failed (chat_id={chat_id}, title='{title}')") return f"Error editing chat title: {e}" @mcp.tool() async def edit_chat_photo(chat_id: int, file_path: str) -> str: """ - Edit the photo of a chat, group, or channel. + Edit the photo of a chat, group, or channel. Requires a file path to an image. """ try: - file = await client.upload_file(file_path) - await client(functions.messages.EditChatPhotoRequest(chat_id=chat_id, photo=file)) + if not os.path.isfile(file_path): + return f"Photo file not found: {file_path}" + if not os.access(file_path, os.R_OK): + return f"Photo file not readable: {file_path}" + + entity = await client.get_entity(chat_id) + uploaded_file = await client.upload_file(file_path) + + if isinstance(entity, Channel): + # For channels/supergroups, use EditPhotoRequest with InputChatUploadedPhoto + input_photo = InputChatUploadedPhoto(file=uploaded_file) + await client(functions.channels.EditPhotoRequest(channel=entity, photo=input_photo)) + elif isinstance(entity, Chat): + # For basic groups, use EditChatPhotoRequest with InputChatUploadedPhoto + input_photo = InputChatUploadedPhoto(file=uploaded_file) + await client(functions.messages.EditChatPhotoRequest(chat_id=chat_id, photo=input_photo)) + else: + return f"Cannot edit photo for this entity type ({type(entity)})." + return f"Chat {chat_id} photo updated." except Exception as e: + logger.exception(f"edit_chat_photo failed (chat_id={chat_id}, file_path='{file_path}')") return f"Error editing chat photo: {e}" @@ -943,9 +1073,19 @@ async def delete_chat_photo(chat_id: int) -> str: Delete the photo of a chat, group, or channel. """ try: - await client(functions.messages.EditChatPhotoRequest(chat_id=chat_id, photo=None)) + entity = await client.get_entity(chat_id) + if isinstance(entity, Channel): + # Use InputChatPhotoEmpty for channels/supergroups + await client(functions.channels.EditPhotoRequest(channel=entity, photo=InputChatPhotoEmpty())) + elif isinstance(entity, Chat): + # Use None (or InputChatPhotoEmpty) for basic groups + await client(functions.messages.EditChatPhotoRequest(chat_id=chat_id, photo=InputChatPhotoEmpty())) + else: + return f"Cannot delete photo for this entity type ({type(entity)})." + return f"Chat {chat_id} photo deleted." except Exception as e: + logger.exception(f"delete_chat_photo failed (chat_id={chat_id})") return f"Error deleting chat photo: {e}" @@ -999,14 +1139,16 @@ async def ban_user(chat_id: int, user_id: int) -> str: import time try: user = await client.get_entity(user_id) - banned_rights = ChatBannedRights(until_date=int(time.time()) + 31536000, view_messages=True) + # Ban for 1 year (31536000 seconds) + banned_rights = ChatBannedRights(until_date=int(time.time()) + 31536000, view_messages=True) await client(functions.channels.EditBannedRequest( channel=chat_id, - user_id=user, + participant=user, # Fix: Use 'participant' instead of 'user_id' banned_rights=banned_rights )) return f"User {user_id} banned from chat {chat_id}." except Exception as e: + logger.exception(f"ban_user failed (chat_id={chat_id}, user_id={user_id})") return f"Error banning user: {e}" @@ -1018,14 +1160,16 @@ async def unban_user(chat_id: int, user_id: int) -> str: from telethon.tl.types import ChatBannedRights try: user = await client.get_entity(user_id) - banned_rights = ChatBannedRights() + # Fix: Provide until_date=0 for unbanning + banned_rights = ChatBannedRights(until_date=0) await client(functions.channels.EditBannedRequest( channel=chat_id, - user_id=user, + participant=user, banned_rights=banned_rights )) return f"User {user_id} unbanned in chat {chat_id}." except Exception as e: + logger.exception(f"unban_user failed (chat_id={chat_id}, user_id={user_id})") return f"Error unbanning user: {e}" @@ -1035,9 +1179,12 @@ async def get_admins(chat_id: int) -> str: Get all admins in a group or channel. """ try: - participants = await client.get_participants(chat_id, filter=functions.channels.ParticipantsAdmins()) - return "\n".join([f"ID: {p.id}, Name: {getattr(p, 'first_name', '')} {getattr(p, 'last_name', '')}" for p in participants]) + # Fix: Use the correct filter type ChannelParticipantsAdmins + participants = await client.get_participants(chat_id, filter=ChannelParticipantsAdmins()) + lines = [f"ID: {p.id}, Name: {getattr(p, 'first_name', '')} {getattr(p, 'last_name', '')}".strip() for p in participants] + return "\n".join(lines) if lines else "No admins found." except Exception as e: + logger.exception(f"get_admins failed (chat_id={chat_id})") return f"Error getting admins: {e}" @@ -1047,9 +1194,12 @@ async def get_banned_users(chat_id: int) -> str: Get all banned users in a group or channel. """ try: - participants = await client.get_participants(chat_id, filter=functions.channels.ParticipantsBanned()) - return "\n".join([f"ID: {p.id}, Name: {getattr(p, 'first_name', '')} {getattr(p, 'last_name', '')}" for p in participants]) + # Fix: Use the correct filter type ChannelParticipantsKicked + participants = await client.get_participants(chat_id, filter=ChannelParticipantsKicked(q='')) + lines = [f"ID: {p.id}, Name: {getattr(p, 'first_name', '')} {getattr(p, 'last_name', '')}".strip() for p in participants] + return "\n".join(lines) if lines else "No banned users found." except Exception as e: + logger.exception(f"get_banned_users failed (chat_id={chat_id})") return f"Error getting banned users: {e}" @@ -1059,9 +1209,36 @@ async def get_invite_link(chat_id: int) -> str: Get the invite link for a group or channel. """ try: - result = await client(functions.messages.ExportChatInviteRequest(chat_id=chat_id)) - return result.link + entity = await client.get_entity(chat_id) + + if hasattr(functions.messages, 'ExportChatInviteRequest'): + try: + # Try using the peer parameter instead of chat_id + result = await client(functions.messages.ExportChatInviteRequest( + peer=entity + )) + return result.link + except Exception as e1: + # If that fails, try alternative approach + logger.warning(f"First approach failed: {e1}, trying alternative") + + # Alternative approach using client.export_chat_invite_link + try: + invite_link = await client.export_chat_invite_link(entity) + return invite_link + except Exception as e2: + logger.warning(f"export_chat_invite_link failed: {e2}") + + # Last resort: Try directly fetching chat info + full_chat = await client(functions.messages.GetFullChatRequest( + chat_id=entity.id + )) + if hasattr(full_chat, 'full_chat') and hasattr(full_chat.full_chat, 'invite_link'): + return full_chat.full_chat.invite_link or "No invite link available." + + return "Could not retrieve invite link for this chat." except Exception as e: + logger.exception(f"get_invite_link failed (chat_id={chat_id})") return f"Error getting invite link: {e}" @@ -1071,9 +1248,53 @@ async def join_chat_by_link(link: str) -> str: Join a chat by invite link. """ try: - await client(functions.messages.ImportChatInviteRequest(hash=link.split('/')[-1])) - return f"Joined chat via link." + # Extract the hash from the invite link + if '/' in link: + hash_part = link.split('/')[-1] + if hash_part.startswith('+'): + hash_part = hash_part[1:] # Remove the '+' if present + else: + hash_part = link + + # Try checking the invite before joining + try: + from telethon.errors import (InviteHashExpiredError, InviteHashInvalidError, + UserAlreadyParticipantError, ChatAdminRequiredError, + UsersTooMuchError) + + # Try to check invite info first (will often fail if not a member) + invite_info = await client(functions.messages.CheckChatInviteRequest(hash=hash_part)) + if hasattr(invite_info, 'chat') and invite_info.chat: + # If we got chat info, we're already a member + chat_title = getattr(invite_info.chat, 'title', 'Unknown Chat') + return f"You are already a member of this chat: {chat_title}" + except Exception as check_err: + # This often fails if not a member - just continue + pass + + # Join the chat using the hash + try: + result = await client(functions.messages.ImportChatInviteRequest(hash=hash_part)) + if result and hasattr(result, 'chats') and result.chats: + chat_title = getattr(result.chats[0], 'title', 'Unknown Chat') + return f"Successfully joined chat: {chat_title}" + return f"Joined chat via invite hash." + except Exception as join_err: + err_str = str(join_err).lower() + if "expired" in err_str: + return "The invite hash has expired and is no longer valid." + elif "invalid" in err_str: + return "The invite hash is invalid or malformed." + elif "already" in err_str and "participant" in err_str: + return "You are already a member of this chat." + elif "admin" in err_str: + return "Cannot join this chat - requires admin approval." + elif "too much" in err_str or "too many" in err_str: + return "Cannot join this chat - it has reached maximum number of participants." + else: + raise # Re-raise to be caught by the outer exception handler except Exception as e: + logger.exception(f"join_chat_by_link failed (link={link})") return f"Error joining chat: {e}" @@ -1083,9 +1304,24 @@ async def export_chat_invite(chat_id: int) -> str: Export a chat invite link. """ try: - result = await client(functions.messages.ExportChatInviteRequest(chat_id=chat_id)) - return result.link + entity = await client.get_entity(chat_id) + + # This is essentially the same as get_invite_link, but kept separate for API consistency + try: + # Try using the peer parameter instead of chat_id + result = await client(functions.messages.ExportChatInviteRequest( + peer=entity + )) + return result.link + except Exception as e1: + # If that fails, try alternative approach + logger.warning(f"ExportChatInviteRequest failed: {e1}, trying alternative") + + # Alternative approach + invite_link = await client.export_chat_invite_link(entity) + return invite_link except Exception as e: + logger.exception(f"export_chat_invite failed (chat_id={chat_id})") return f"Error exporting chat invite: {e}" @@ -1095,9 +1331,49 @@ async def import_chat_invite(hash: str) -> str: Import a chat invite by hash. """ try: - await client(functions.messages.ImportChatInviteRequest(hash=hash)) - return f"Joined chat via invite hash." + # Remove any prefixes like '+' if present + if hash.startswith('+'): + hash = hash[1:] + + # Try checking the invite before joining + try: + from telethon.errors import (InviteHashExpiredError, InviteHashInvalidError, + UserAlreadyParticipantError, ChatAdminRequiredError, + UsersTooMuchError) + + # Try to check invite info first (will often fail if not a member) + invite_info = await client(functions.messages.CheckChatInviteRequest(hash=hash)) + if hasattr(invite_info, 'chat') and invite_info.chat: + # If we got chat info, we're already a member + chat_title = getattr(invite_info.chat, 'title', 'Unknown Chat') + return f"You are already a member of this chat: {chat_title}" + except Exception as check_err: + # This often fails if not a member - just continue + pass + + # Join the chat using the hash + try: + result = await client(functions.messages.ImportChatInviteRequest(hash=hash)) + if result and hasattr(result, 'chats') and result.chats: + chat_title = getattr(result.chats[0], 'title', 'Unknown Chat') + return f"Successfully joined chat: {chat_title}" + return f"Joined chat via invite hash." + except Exception as join_err: + err_str = str(join_err).lower() + if "expired" in err_str: + return "The invite hash has expired and is no longer valid." + elif "invalid" in err_str: + return "The invite hash is invalid or malformed." + elif "already" in err_str and "participant" in err_str: + return "You are already a member of this chat." + elif "admin" in err_str: + return "Cannot join this chat - requires admin approval." + elif "too much" in err_str or "too many" in err_str: + return "Cannot join this chat - it has reached maximum number of participants." + else: + raise # Re-raise to be caught by the outer exception handler except Exception as e: + logger.exception(f"import_chat_invite failed (hash={hash})") return f"Error importing chat invite: {e}" @@ -1295,12 +1571,32 @@ async def mute_chat(chat_id: int) -> str: Mute notifications for a chat. """ try: + from telethon.tl.types import InputPeerNotifySettings + + peer = await client.get_entity(chat_id) await client(functions.account.UpdateNotifySettingsRequest( - peer=await client.get_entity(chat_id), - settings=functions.account.InputPeerNotifySettings(mute_until=2**31-1) + peer=peer, + settings=InputPeerNotifySettings(mute_until=2**31-1) )) return f"Chat {chat_id} muted." + except (ImportError, AttributeError) as type_err: + try: + # Alternative approach directly using raw API + peer = await client.get_input_entity(chat_id) + await client(functions.account.UpdateNotifySettingsRequest( + peer=peer, + settings={ + 'mute_until': 2**31-1, # Far future + 'show_previews': False, + 'silent': True + } + )) + return f"Chat {chat_id} muted (using alternative method)." + except Exception as alt_e: + logger.exception(f"mute_chat (alt method) failed (chat_id={chat_id})") + return f"Error muting chat (alternative method): {alt_e}" except Exception as e: + logger.exception(f"mute_chat failed (chat_id={chat_id})") return f"Error muting chat: {e}" @@ -1310,12 +1606,32 @@ async def unmute_chat(chat_id: int) -> str: Unmute notifications for a chat. """ try: + from telethon.tl.types import InputPeerNotifySettings + + peer = await client.get_entity(chat_id) await client(functions.account.UpdateNotifySettingsRequest( - peer=await client.get_entity(chat_id), - settings=functions.account.InputPeerNotifySettings(mute_until=0) + peer=peer, + settings=InputPeerNotifySettings(mute_until=0) )) return f"Chat {chat_id} unmuted." + except (ImportError, AttributeError) as type_err: + try: + # Alternative approach directly using raw API + peer = await client.get_input_entity(chat_id) + await client(functions.account.UpdateNotifySettingsRequest( + peer=peer, + settings={ + 'mute_until': 0, # Unmute (current time) + 'show_previews': True, + 'silent': False + } + )) + return f"Chat {chat_id} unmuted (using alternative method)." + except Exception as alt_e: + logger.exception(f"unmute_chat (alt method) failed (chat_id={chat_id})") + return f"Error unmuting chat (alternative method): {alt_e}" except Exception as e: + logger.exception(f"unmute_chat failed (chat_id={chat_id})") return f"Error unmuting chat: {e}" @@ -1392,11 +1708,34 @@ async def get_gif_search(query: str, limit: int = 10) -> str: limit: Max number of GIFs to return. """ try: - result = await client(functions.messages.SearchGifsRequest(q=query, offset_id=0, limit=limit)) - if not result.gifs: - return "No GIFs found for this query." - return json.dumps([g.document.id for g in result.gifs], indent=2) + # Try approach 1: SearchGifsRequest + try: + result = await client(functions.messages.SearchGifsRequest(q=query, offset_id=0, limit=limit)) + if not result.gifs: + return "[]" + return json.dumps([g.document.id for g in result.gifs], indent=2, default=json_serializer) + except (AttributeError, ImportError): + # Fallback approach: Use SearchRequest with GIF filter + try: + from telethon.tl.types import InputMessagesFilterGif + result = await client(functions.messages.SearchRequest( + peer="gif", q=query, filter=InputMessagesFilterGif(), + min_date=None, max_date=None, offset_id=0, add_offset=0, + limit=limit, max_id=0, min_id=0, hash=0 + )) + if not result or not hasattr(result, 'messages') or not result.messages: + return "[]" + # Extract document IDs from any messages with media + gif_ids = [] + for msg in result.messages: + if hasattr(msg, 'media') and msg.media and hasattr(msg.media, 'document'): + gif_ids.append(msg.media.document.id) + return json.dumps(gif_ids, default=json_serializer) + except Exception as inner_e: + # Last resort: Try to fetch from a public bot + return f"Could not search GIFs using available methods: {inner_e}" except Exception as e: + logger.exception(f"get_gif_search failed (query={query}, limit={limit})") return f"Error searching GIFs: {e}" @@ -1424,9 +1763,34 @@ async def get_bot_info(bot_username: str) -> str: Get information about a bot by username. """ try: - result = await client(functions.users.GetFullUserRequest(id=bot_username)) - return json.dumps(result.to_dict(), indent=2) + entity = await client.get_entity(bot_username) + if not entity: + return f"Bot with username {bot_username} not found." + + result = await client(functions.users.GetFullUserRequest(id=entity)) + + # Create a more structured, serializable response + if hasattr(result, 'to_dict'): + # Use custom serializer to handle non-serializable types + return json.dumps(result.to_dict(), indent=2, default=json_serializer) + else: + # Fallback if to_dict is not available + info = { + "bot_info": { + "id": entity.id, + "username": entity.username, + "first_name": entity.first_name, + "last_name": getattr(entity, "last_name", ""), + "is_bot": getattr(entity, "bot", False), + "verified": getattr(entity, "verified", False), + } + } + if hasattr(result, "full_user") and hasattr(result.full_user, "about"): + info["bot_info"]["about"] = result.full_user.about + + return json.dumps(info, indent=2) except Exception as e: + logger.exception(f"get_bot_info failed (bot_username={bot_username})") return f"Error getting bot info: {e}" @@ -1434,16 +1798,45 @@ async def get_bot_info(bot_username: str) -> str: async def set_bot_commands(bot_username: str, commands: list) -> str: """ Set bot commands for a bot you own. + Note: This function can only be used if the Telegram client is a bot account. + Regular user accounts cannot set bot commands. + + Args: + bot_username: The username of the bot to set commands for. + commands: List of command dictionaries with 'command' and 'description' keys. """ - from telethon.tl.types import BotCommand try: - await client(functions.bots.SetBotCommandsRequest( - scope=bot_username, - lang_code='', - commands=[BotCommand(command=c['command'], description=c['description']) for c in commands] + # First check if the current client is a bot + me = await client.get_me() + if not getattr(me, 'bot', False): + return "Error: This function can only be used by bot accounts. Your current Telegram account is a regular user account, not a bot." + + # Import required types + from telethon.tl.types import BotCommand, BotCommandScopeDefault + from telethon.tl.functions.bots import SetBotCommandsRequest + + # Create BotCommand objects from the command dictionaries + bot_commands = [ + BotCommand(command=c['command'], description=c['description']) + for c in commands + ] + + # Get the bot entity + bot = await client.get_entity(bot_username) + + # Set the commands with proper scope + await client(SetBotCommandsRequest( + scope=BotCommandScopeDefault(), + lang_code="en", # Default language code + commands=bot_commands )) + return f"Bot commands set for {bot_username}." + except ImportError as ie: + logger.exception(f"set_bot_commands failed - ImportError: {ie}") + return f"Error: Your Telethon version doesn't support SetBotCommandsRequest. Please update Telethon." except Exception as e: + logger.exception(f"set_bot_commands failed (bot_username={bot_username})") return f"Error setting bot commands: {e}" @@ -1491,9 +1884,23 @@ async def get_recent_actions(chat_id: int) -> str: Get recent admin actions (admin log) in a group or channel. """ try: - result = await client(functions.channels.GetAdminLogRequest(channel=chat_id, q="", events_filter=None, admins=[], max_id=0, min_id=0, limit=20)) - return json.dumps([e.to_dict() for e in result.events], indent=2) + result = await client(functions.channels.GetAdminLogRequest( + channel=chat_id, + q="", + events_filter=None, + admins=[], + max_id=0, + min_id=0, + limit=20 + )) + + if not result or not result.events: + return "No recent admin actions found." + + # Use the custom serializer to handle datetime objects + return json.dumps([e.to_dict() for e in result.events], indent=2, default=json_serializer) except Exception as e: + logger.exception(f"get_recent_actions failed (chat_id={chat_id})") return f"Error getting recent actions: {e}" @@ -1504,9 +1911,22 @@ async def get_pinned_messages(chat_id: int) -> str: """ try: entity = await client.get_entity(chat_id) - messages = await client.get_messages(entity, filter=functions.messages.FilterPinned()) - return "\n".join([f"ID: {m.id} | {m.date} | {m.message}" for m in messages]) + # Use correct filter based on Telethon version + try: + # Try newer Telethon approach + from telethon.tl.types import InputMessagesFilterPinned + messages = await client.get_messages(entity, filter=InputMessagesFilterPinned()) + except (ImportError, AttributeError): + # Fallback - try without filter and manually filter pinned + all_messages = await client.get_messages(entity, limit=50) + messages = [m for m in all_messages if getattr(m, 'pinned', False)] + + if not messages: + return "No pinned messages found in this chat." + + return "\n".join([f"ID: {m.id} | {m.date} | {m.message or '[Media/No text]'}" for m in messages]) except Exception as e: + logger.exception(f"get_pinned_messages failed (chat_id={chat_id})") return f"Error getting pinned messages: {e}" diff --git a/test.py b/test.py new file mode 100644 index 0000000..aa7afe2 --- /dev/null +++ b/test.py @@ -0,0 +1,583 @@ +import os +import sys +import asyncio +import nest_asyncio +from dotenv import load_dotenv +import logging +import random +import string +import json +from datetime import datetime, timedelta + +# Assume main.py is in the same directory or adjust path accordingly +from main import ( + client, mcp, + get_chats, get_messages, send_message, list_contacts, search_contacts, + get_contact_ids, list_messages, list_chats, get_chat, get_direct_chat_by_contact, + get_contact_chats, get_last_interaction, get_message_context, add_contact, + delete_contact, block_user, unblock_user, get_me, create_group, + invite_to_group, leave_chat, get_participants, send_file, download_media, + update_profile, set_profile_photo, delete_profile_photo, get_privacy_settings, + set_privacy_settings, import_contacts, export_contacts, get_blocked_users, + create_channel, edit_chat_title, edit_chat_photo, delete_chat_photo, + promote_admin, demote_admin, ban_user, unban_user, get_admins, + get_banned_users, get_invite_link, join_chat_by_link, export_chat_invite, + import_chat_invite, send_voice, forward_message, edit_message, + delete_message, pin_message, unpin_message, mark_as_read, reply_to_message, + upload_file, get_media_info, search_public_chats, search_messages, + resolve_username, mute_chat, unmute_chat, archive_chat, unarchive_chat, + get_sticker_sets, send_sticker, get_gif_search, send_gif, get_bot_info, + set_bot_commands, get_history, get_user_photos, get_user_status, + get_recent_actions, get_pinned_messages +) +# Import specific telethon types needed for tests +from telethon.errors.rpcerrorlist import UserNotParticipantError +from telethon.tl import types + +load_dotenv() + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + filename='.log', # Log to .log file + filemode='w' # Overwrite the log file each time +) +logger = logging.getLogger("TelegramToolTester") +logger.info("Logging configured to .log file.") # Force file creation early + +# --- Test Configuration --- +# Set these environment variables before running the test script +TEST_CHAT_ID = int(os.getenv("TEST_CHAT_ID", "0")) # A safe chat ID (e.g., Saved Messages or a test group) +TEST_SUPERGROUP_ID = int(os.getenv("TEST_SUPERGROUP_ID", "0")) # ID of a test supergroup/channel you own/admin +TEST_USER_ID = int(os.getenv("TEST_USER_ID", "0")) # ID of a test user account (NOT a real person unless they consent!) +TEST_USERNAME = os.getenv("TEST_USERNAME", "") # Username of the test user +TEST_CONTACT_PHONE = os.getenv("TEST_CONTACT_PHONE", "") # Phone number for add_contact test (e.g., +15551234567) +TEST_CONTACT_FNAME = os.getenv("TEST_CONTACT_FNAME", "Test") +TEST_CONTACT_LNAME = os.getenv("TEST_CONTACT_LNAME", "Contact") +TEST_FILE_PATH = os.getenv("TEST_FILE_PATH", "test_upload.txt") # Path to a dummy file for upload/send tests +TEST_PHOTO_PATH = os.getenv("TEST_PHOTO_PATH", "test_photo.jpg") # Path to a dummy photo file +TEST_VOICE_PATH = os.getenv("TEST_VOICE_PATH", "test_voice.ogg") # Path to a dummy ogg voice file +TEST_STICKER_PATH = os.getenv("TEST_STICKER_PATH", "test_sticker.webp") # Path to a dummy webp sticker +TEST_BOT_USERNAME = os.getenv("TEST_BOT_USERNAME", "") # Username of a bot you own +TEST_INVITE_LINK_HASH = os.getenv("TEST_INVITE_LINK_HASH", "") # Hash from a valid invite link + +# Create dummy files if they don't exist +if not os.path.exists(TEST_FILE_PATH): + with open(TEST_FILE_PATH, "w") as f: + f.write("This is a test file.") +if not os.path.exists(TEST_PHOTO_PATH): + logger.warning(f"Test photo file not found: {TEST_PHOTO_PATH}. Some tests might fail.") +if not os.path.exists(TEST_VOICE_PATH): + logger.warning(f"Test voice file not found: {TEST_VOICE_PATH}. send_voice test will fail.") +if not os.path.exists(TEST_STICKER_PATH): + logger.warning(f"Test sticker file not found: {TEST_STICKER_PATH}. send_sticker test will fail.") + + +async def run_test(tool_func, description, **kwargs): + logger.info(f"--- Testing: {description} ({tool_func.__name__}) ---") + logger.info(f"Params: {kwargs}") + try: + result = await tool_func(**kwargs) + logger.info(f"Result: {result}") + return result + except Exception as e: + logger.error(f"Error during {tool_func.__name__}: {e}", exc_info=True) + return f"TEST FAILED: {e}" + +async def run_all_tests(): + global TEST_CHAT_ID # Declare intention to modify the global variable + if not await client.is_user_authorized(): + logger.error("Client not authorized. Please run main.py first to log in.") + return + + logger.info("Starting Telegram Tool Tests...") + me = await client.get_me() + me_info = await get_me() # Use the tool version + logger.info(f"Running tests as: {me.first_name} (ID: {me.id})") + + if not TEST_CHAT_ID: + TEST_CHAT_ID = me.id # Default to Saved Messages if not set + logger.warning(f"TEST_CHAT_ID not set, defaulting to Saved Messages ({TEST_CHAT_ID}).") + + # --- Basic Info & Chat Listing --- + logger.info("--- Running Basic Info & Listing Tests ---") + await run_test(get_me, "Get own user info") + await run_test(list_chats, "List recent chats", limit=5) + await run_test(list_chats, "List groups", chat_type='group', limit=5) + await run_test(list_chats, "List channels", chat_type='channel', limit=5) + await run_test(list_chats, "List users", chat_type='user', limit=5) + + # --- Test the basic get_chats function --- + await run_test(get_chats, "Get chats tool (paginated, page 1)", page=1, page_size=5) + + # --- Specific Chat Operations (using TEST_CHAT_ID) --- + message_id_to_test = None + if TEST_CHAT_ID: + logger.info(f"--- Running tests on Chat ID: {TEST_CHAT_ID} ---") + await run_test(get_chat, "Get info for test chat", chat_id=TEST_CHAT_ID) + await run_test(get_history, "Get history for test chat", chat_id=TEST_CHAT_ID, limit=5) + await run_test(get_messages, "Get messages tool (paginated)", chat_id=TEST_CHAT_ID, page=1, page_size=5) + + today = datetime.now() + yesterday = today - timedelta(days=1) + await run_test(list_messages, "List messages tool (filtered date)", chat_id=TEST_CHAT_ID, limit=10, from_date=yesterday.strftime("%Y-%m-%d"), to_date=today.strftime("%Y-%m-%d")) + await run_test(list_messages, "List messages tool (search)", chat_id=TEST_CHAT_ID, limit=10, search_query="Test") + + # Send a test message to get IDs for subsequent tests + sent_msg_result = await run_test(send_message, "Send test message", chat_id=TEST_CHAT_ID, message=f"MCP Test Message {random.randint(1000, 9999)}") + if "successfully" in str(sent_msg_result).lower(): + try: + # Fetch the last message sent by self (hopefully the test message) + async for msg in client.iter_messages(TEST_CHAT_ID, limit=5, from_user='me'): + # Check if it's the message we likely just sent + if "MCP Test Message" in msg.text: + message_id_to_test = msg.id + logger.info(f"Using message ID {message_id_to_test} for further tests.") + break + if not message_id_to_test: # Fallback if specific message not found + last_msgs = await client.get_messages(TEST_CHAT_ID, limit=1) + if last_msgs: + message_id_to_test = last_msgs[0].id + logger.warning(f"Could not find specific test message, using last message ID: {message_id_to_test}") + + except Exception as e: + logger.error(f"Could not get last message ID: {e}") + + if message_id_to_test: + logger.info(f"--- Running Message-Specific Tests (ID: {message_id_to_test}) ---") + await run_test(edit_message, "Edit test message", chat_id=TEST_CHAT_ID, message_id=message_id_to_test, new_text="MCP Test Message (edited)") + await run_test(get_message_context, "Get context for test message", chat_id=TEST_CHAT_ID, message_id=message_id_to_test, context_size=1) + await run_test(reply_to_message, "Reply to test message", chat_id=TEST_CHAT_ID, message_id=message_id_to_test, text="Test Reply") + await run_test(pin_message, "Pin test message", chat_id=TEST_CHAT_ID, message_id=message_id_to_test) + await asyncio.sleep(2) # Give time for pin to register + await run_test(get_pinned_messages, "Get pinned messages", chat_id=TEST_CHAT_ID) + await run_test(unpin_message, "Unpin test message", chat_id=TEST_CHAT_ID, message_id=message_id_to_test) + # Forwarding (Careful: forwards TO TEST_CHAT_ID FROM TEST_CHAT_ID) + await run_test(forward_message, "Forward test message", from_chat_id=TEST_CHAT_ID, message_id=message_id_to_test, to_chat_id=TEST_CHAT_ID) + await run_test(delete_message, "Delete test message", chat_id=TEST_CHAT_ID, message_id=message_id_to_test) + else: + logger.warning("Could not obtain a message ID for message-specific tests.") + + await run_test(search_messages, "Search for 'Test' in test chat", chat_id=TEST_CHAT_ID, query="Test", limit=5) + await run_test(mark_as_read, "Mark test chat as read", chat_id=TEST_CHAT_ID) + + # File Operations + logger.info("--- Running File Operation Tests ---") + await run_test(send_file, "Send test file", chat_id=TEST_CHAT_ID, file_path=TEST_FILE_PATH, caption="Test File") + await run_test(upload_file, "Upload test file", file_path=TEST_FILE_PATH) + # Find a message with media to test download/info + media_message_id = None + media_msg_obj = None + async for msg in client.iter_messages(TEST_CHAT_ID, limit=20): + if msg.media: + media_message_id = msg.id + media_msg_obj = msg + logger.info(f"Found media message ID {media_message_id} to test download/info.") + break + if media_message_id and media_msg_obj: + await run_test(get_media_info, "Get media info", chat_id=TEST_CHAT_ID, message_id=media_message_id) + # Use a more specific download path based on filename if possible + download_filename = getattr(media_msg_obj.media, 'document', None) + if download_filename: + download_filename = getattr(download_filename, 'attributes', [None])[0] + if download_filename: + download_filename = getattr(download_filename, 'file_name', None) + download_path = f"test_download_{download_filename or media_message_id}.tmp" + + await run_test(download_media, "Download media", chat_id=TEST_CHAT_ID, message_id=media_message_id, file_path=download_path) + if os.path.exists(download_path): + try: + os.remove(download_path) + logger.info(f"Cleaned up downloaded file: {download_path}") + except OSError as e: + logger.error(f"Error removing downloaded file {download_path}: {e}") + else: + logger.warning(f"Could not find downloaded file to clean up: {download_path}") + else: + logger.warning("No media message found in recent history to test download/info.") + + # Voice/Sticker/GIF (check paths exist) + logger.info("--- Running Media Send Tests ---") + if os.path.exists(TEST_VOICE_PATH): + await run_test(send_voice, "Send voice message", chat_id=TEST_CHAT_ID, file_path=TEST_VOICE_PATH) + + # Enhanced sticker testing + logger.info("--- Running Enhanced Sticker Tests ---") + if os.path.exists(TEST_STICKER_PATH): + await run_test(send_sticker, "Send sticker file", chat_id=TEST_CHAT_ID, file_path=TEST_STICKER_PATH) + else: + logger.warning(f"Test sticker file not found at {TEST_STICKER_PATH}") + + # Test sticker set retrieval + sticker_sets = await run_test(get_sticker_sets, "Get all available sticker sets") + + # Try to parse sticker set info + try: + # Parse any available sticker info + if sticker_sets and "sets" in sticker_sets: + logger.info("Successfully retrieved sticker sets") + elif sticker_sets and len(sticker_sets) > 2: # JSON output has at least [] + logger.info(f"Retrieved sticker sets data: {sticker_sets[:100]}...") + else: + logger.warning("No sticker sets found or empty response") + + # If we're on a test account with limited stickers, we could: + # 1. Add a sticker set (not implemented in our tools) + # 2. Remove a sticker set (not implemented in our tools) + logger.info("Note: Adding/removing sticker sets requires additional tools not currently implemented") + except Exception as e: + logger.error(f"Error in sticker set processing: {e}") + + # GIF testing + gif_search_result = await run_test(get_gif_search, "Search for GIFs", query="hello", limit=1) + try: + gif_ids = json.loads(gif_search_result) + if gif_ids: + gif_id = gif_ids[0] + await run_test(send_gif, "Send GIF by ID", chat_id=TEST_CHAT_ID, gif_id=gif_id) + else: + logger.warning("No GIF IDs returned from search.") + except Exception as e: + logger.warning(f"Could not parse or send GIF ID from search result: {e}") + + # Mute/Archive + logger.info("--- Running Chat State Tests ---") + await run_test(mute_chat, "Mute test chat", chat_id=TEST_CHAT_ID) + await asyncio.sleep(1) + await run_test(unmute_chat, "Unmute test chat", chat_id=TEST_CHAT_ID) + await asyncio.sleep(1) + # Archive/Unarchive - Check if ToggleDialogPinRequest exists and test + if hasattr(types.messages, 'ToggleDialogPinRequest'): + logger.warning("--- Testing Archive/Unarchive (May depend on Telethon version) ---") + await run_test(archive_chat, "Archive test chat", chat_id=TEST_CHAT_ID) + await asyncio.sleep(1) + await run_test(unarchive_chat, "Unarchive test chat", chat_id=TEST_CHAT_ID) + await asyncio.sleep(1) + else: + logger.warning("ToggleDialogPinRequest not found, skipping archive/unarchive tests.") + + # --- Contact Operations --- + logger.info("--- Running Contact Operations Tests ---") + await run_test(list_contacts, "List contacts") + await run_test(export_contacts, "Export contacts to JSON") + + contact_to_delete_id = None + if TEST_CONTACT_PHONE and TEST_CONTACT_FNAME: + logger.warning("--- Running Add/Delete Contact Test (requires valid phone number) ---") + # Check if contact already exists + contact_exists = False + contacts_list = await list_contacts() + if TEST_CONTACT_PHONE in str(contacts_list): + logger.warning(f"Contact with phone {TEST_CONTACT_PHONE} seems to exist. Skipping add.") + # Try to find ID for deletion test anyway + try: + lines = str(contacts_list).split('\n') + for line in lines: + if TEST_CONTACT_PHONE in line: + contact_to_delete_id = int(line.split(',')[0].split(':')[1].strip()) + logger.info(f"Found existing contact ID for deletion test: {contact_to_delete_id}") + break + except Exception as e: + logger.error(f"Could not parse existing contact ID: {e}") + contact_exists = True + + if not contact_exists: + add_result = await run_test(add_contact, "Add test contact", phone=TEST_CONTACT_PHONE, first_name=TEST_CONTACT_FNAME, last_name=TEST_CONTACT_LNAME) + await asyncio.sleep(5) # Give time for contact to sync + if "added successfully" in str(add_result).lower(): + # Try to find the added contact to get its ID for deletion + search_res = await run_test(search_contacts, "Search for added contact", query=TEST_CONTACT_PHONE) + try: + lines = str(search_res).split('\n') + for line in lines: + if TEST_CONTACT_PHONE in line and f"Name: {TEST_CONTACT_FNAME}" in line: + contact_to_delete_id = int(line.split(',')[0].split(':')[1].strip()) + logger.info(f"Found added contact ID for deletion: {contact_to_delete_id}") + break + except Exception as e: + logger.error(f"Could not parse contact ID from search result: {e}") + else: + logger.warning("Add contact failed or did not return success message.") + + if contact_to_delete_id: + await run_test(get_direct_chat_by_contact, "Get direct chat by contact's phone", contact_query=TEST_CONTACT_PHONE) + await run_test(get_contact_chats, "Get chats involving contact", contact_id=contact_to_delete_id) + logger.warning(f"--- Proceeding to delete contact ID: {contact_to_delete_id} ---") + await run_test(delete_contact, "Delete test contact", user_id=contact_to_delete_id) + else: + logger.warning("Could not find contact by phone number to test deletion/other ops.") + + if TEST_USERNAME: + await run_test(search_contacts, "Search contacts by username", query=TEST_USERNAME) + await run_test(resolve_username, "Resolve test username", username=TEST_USERNAME) + await run_test(get_direct_chat_by_contact, "Get direct chat by test username", contact_query=TEST_USERNAME) + + await run_test(get_contact_ids, "Get contact IDs") + # import_contacts test requires a list of dicts, harder to setup via env vars + # logger.warning("Skipping import_contacts test - requires manual setup.") + # await run_test(import_contacts, "Import contacts", contacts=[{'phone': '+1555...', 'first_name': ...}]) + # Clarification: import_contacts test is skipped as it requires complex setup (list of dictionaries) + # which is difficult to manage solely through environment variables for automated testing. + + # --- User Interaction (Requires TEST_USER_ID) --- + if TEST_USER_ID: + logger.info(f"--- Running User Interaction Tests (User ID: {TEST_USER_ID}) ---") + await run_test(get_user_status, "Get test user status", user_id=TEST_USER_ID) + await run_test(get_user_photos, "Get test user photos", user_id=TEST_USER_ID, limit=1) + # Check if the user is a contact before running contact-specific tests + is_contact = False + contacts_res = await list_contacts() + if f"ID: {TEST_USER_ID}" in str(contacts_res): + is_contact = True + logger.info(f"User {TEST_USER_ID} is a contact.") + await run_test(get_last_interaction, "Get last interaction with test user contact", contact_id=TEST_USER_ID) + await run_test(get_contact_chats, "Get chats involving test user contact", contact_id=TEST_USER_ID) + else: + logger.warning(f"User {TEST_USER_ID} is not a contact. Skipping contact-specific tests.") + + # Block/Unblock + await run_test(block_user, "Block test user", user_id=TEST_USER_ID) + await asyncio.sleep(1) + await run_test(get_blocked_users, "Get blocked users (check if test user is present)") + await run_test(unblock_user, "Unblock test user", user_id=TEST_USER_ID) + await asyncio.sleep(1) + + # --- Supergroup/Channel Operations (Requires TEST_SUPERGROUP_ID and TEST_USER_ID) --- + created_group_id = None + created_channel_id = None + if TEST_USER_ID: + # Create Group Test (Requires TEST_USER_ID) + logger.warning("--- Running Group/Channel Creation Test ---") + create_group_res = await run_test(create_group, "Create test group with test user", title=f"MCP Test Group {random.randint(100,999)}", user_ids=[TEST_USER_ID]) + try: + if "created with ID" in create_group_res: + created_group_id = int(create_group_res.split(':')[-1].strip()) + logger.info(f"Created group ID: {created_group_id}") + await asyncio.sleep(2) + logger.warning(f"--- Leaving newly created group: {created_group_id} ---") + await run_test(leave_chat, "Leave newly created group", chat_id=created_group_id) + except Exception as e: + logger.error(f"Failed to parse or leave created group: {e}") + + # Create Channel Test (No additional users needed initially) + create_channel_res = await run_test(create_channel, "Create test channel", title=f"MCP Test Channel {random.randint(100,999)}", about="Test channel created by MCP", megagroup=False) + try: + if "created with ID" in create_channel_res: + created_channel_id = int(create_channel_res.split(':')[-1].strip()) + logger.info(f"Created channel ID: {created_channel_id}") + await asyncio.sleep(2) + logger.warning(f"--- Leaving newly created channel: {created_channel_id} ---") + await run_test(leave_chat, "Leave newly created channel", chat_id=created_channel_id) + except Exception as e: + logger.error(f"Failed to parse or leave created channel: {e}") + + if TEST_SUPERGROUP_ID: + logger.info(f"--- Running Supergroup/Channel Operations Tests (Chat ID: {TEST_SUPERGROUP_ID}) ---") + await run_test(get_chat, "Get test supergroup info", chat_id=TEST_SUPERGROUP_ID) + await run_test(get_participants, "Get participants of test supergroup", chat_id=TEST_SUPERGROUP_ID) + await run_test(get_admins, "Get admins of test supergroup", chat_id=TEST_SUPERGROUP_ID) + await run_test(get_banned_users, "Get banned users of test supergroup", chat_id=TEST_SUPERGROUP_ID) + await run_test(get_recent_actions, "Get recent actions for supergroup", chat_id=TEST_SUPERGROUP_ID) + await run_test(get_invite_link, "Get invite link for supergroup", chat_id=TEST_SUPERGROUP_ID) + await run_test(export_chat_invite, "Export chat invite for supergroup", chat_id=TEST_SUPERGROUP_ID) + + if TEST_INVITE_LINK_HASH: + logger.warning(f"--- Running Join Chat by Invite Hash Test (Requires valid HASH: {TEST_INVITE_LINK_HASH}) ---") + # Extract hash if full URL is provided + invite_hash = TEST_INVITE_LINK_HASH + if invite_hash.startswith("https://t.me/+"): + invite_hash = invite_hash.split("+", 1)[1] + logger.info(f"Extracted hash from URL: {invite_hash}") + # This will handle various cases, including invalid/expired hash or already a member + import_res = await run_test(import_chat_invite, "Join chat via import hash", hash=invite_hash) + + # Check if the response indicates already a member or successful join + already_member = "already a member" in import_res.lower() + success_join = "successfully joined" in import_res.lower() + logger.info(f"Invite result: {'Already a member' if already_member else 'Successfully joined' if success_join else 'Failed to join'}") + + # Also test the full URL version if appropriate + if TEST_INVITE_LINK_HASH.startswith("https://"): + await run_test(join_chat_by_link, "Join chat via full link", link=TEST_INVITE_LINK_HASH) + + # If we successfully joined a chat, we should leave it to clean up + if success_join and "chat:" in import_res: + try: + # Extract chat ID from success message if possible + chat_title = import_res.split("chat:", 1)[1].strip() + logger.warning(f"Attempting to find and leave newly joined chat: '{chat_title}'") + + # Try to find the chat ID by matching the title + async for dialog in client.iter_dialogs(limit=10): # Check recent dialogs + if dialog.name == chat_title: + logger.info(f"Found chat to leave: {dialog.name} (ID: {dialog.id})") + await run_test(leave_chat, "Leave newly joined chat", chat_id=dialog.id) + break + except Exception as leave_err: + logger.error(f"Failed to leave newly joined chat: {leave_err}") + else: + logger.warning("TEST_INVITE_LINK_HASH not set. Skipping join/import tests.") + + if TEST_USER_ID: + # Ban/Unban/Invite/Promote tests (Use with EXTREME caution) + logger.warning(f"--- Running potentially disruptive tests on supergroup {TEST_SUPERGROUP_ID} with user {TEST_USER_ID} ---") + await run_test(ban_user, "Ban test user from supergroup", chat_id=TEST_SUPERGROUP_ID, user_id=TEST_USER_ID) + await asyncio.sleep(2) + await run_test(get_banned_users, "Get banned users (check test user)", chat_id=TEST_SUPERGROUP_ID) + await run_test(unban_user, "Unban test user from supergroup", chat_id=TEST_SUPERGROUP_ID, user_id=TEST_USER_ID) + await asyncio.sleep(2) + # Ensure user is not already participant before inviting + try: + # Use a more specific filter if possible + # participants = await client.get_participants(TEST_SUPERGROUP_ID, filter=types.ChannelParticipantsSearch(q=str(TEST_USER_ID)), limit=1) + # Simpler check: iterate briefly + user_in_group = False + async for p in client.iter_participants(TEST_SUPERGROUP_ID, limit=50): # Limit search scope + if p.id == TEST_USER_ID: + user_in_group = True + break + if user_in_group: + logger.info(f"User {TEST_USER_ID} already in group {TEST_SUPERGROUP_ID}, skipping invite.") + else: + await run_test(invite_to_group, "Invite test user to supergroup", group_id=TEST_SUPERGROUP_ID, user_ids=[TEST_USER_ID]) + await asyncio.sleep(2) + except UserNotParticipantError: + # This error is expected if user is not participant, proceed with invite + await run_test(invite_to_group, "Invite test user to supergroup (UserNotParticipantError caught)", group_id=TEST_SUPERGROUP_ID, user_ids=[TEST_USER_ID]) + await asyncio.sleep(2) + except Exception as p_err: + logger.warning(f"Could not check participant status before invite: {p_err}. Attempting invite anyway.") + # Try inviting anyway + await run_test(invite_to_group, "Invite test user to supergroup (attempt)", group_id=TEST_SUPERGROUP_ID, user_ids=[TEST_USER_ID]) + await asyncio.sleep(2) + + await run_test(promote_admin, "Promote test user to admin", chat_id=TEST_SUPERGROUP_ID, user_id=TEST_USER_ID) + await asyncio.sleep(2) + await run_test(get_admins, "Get admins (check test user)", chat_id=TEST_SUPERGROUP_ID) + await run_test(demote_admin, "Demote test user from admin", chat_id=TEST_SUPERGROUP_ID, user_id=TEST_USER_ID) + await asyncio.sleep(2) + # Leave chat test needs careful consideration - don't leave accidentally! + # logger.warning(f"--- Skipping leave_chat test for TEST_SUPERGROUP_ID: {TEST_SUPERGROUP_ID} ---") + # await run_test(leave_chat, "Leave test supergroup", chat_id=TEST_SUPERGROUP_ID) + + # Title/Photo Edit + original_title_res = await run_test(get_chat, "Get supergroup title before edit", chat_id=TEST_SUPERGROUP_ID) + original_title = "Unknown" + if "Title:" in str(original_title_res): + try: + original_title = str(original_title_res).split("Title:")[1].split('\n')[0].strip() + logger.info(f"Original title found: '{original_title}'") + except Exception as title_e: + logger.warning(f"Could not parse original title: {title_e}") + + random_suffix = ''.join(random.choices(string.ascii_lowercase, k=4)) + new_title = f"Test Title {random_suffix}" + await run_test(edit_chat_title, "Edit supergroup title", chat_id=TEST_SUPERGROUP_ID, title=new_title) + await asyncio.sleep(2) + # Restore original title if possible + if original_title != "Unknown": + await run_test(edit_chat_title, "Restore supergroup title", chat_id=TEST_SUPERGROUP_ID, title=original_title) + else: + logger.warning("Could not determine original title to restore.") + + if os.path.exists(TEST_PHOTO_PATH): + await run_test(edit_chat_photo, "Edit supergroup photo", chat_id=TEST_SUPERGROUP_ID, file_path=TEST_PHOTO_PATH) + await asyncio.sleep(2) + await run_test(delete_chat_photo, "Delete supergroup photo", chat_id=TEST_SUPERGROUP_ID) + + # --- Profile & Privacy (Use with EXTREME caution!) --- + logger.warning("--- Running Profile & Privacy Tests (Potentially Invasive - Mostly Skipped) ---") + # logger.warning("--- update_profile tests are commented out by default ---") + # original_bio = "" # Need to fetch current bio first if we want to restore + # await run_test(update_profile, "Update profile bio", about=f"MCP Test Bio {random.randint(100,999)}") + # await asyncio.sleep(1) + # await run_test(update_profile, "Restore profile bio", about=original_bio) # Restore to empty or original + + # logger.warning("--- set/delete_profile_photo tests are commented out by default ---") + # if os.path.exists(TEST_PHOTO_PATH): + # await run_test(set_profile_photo, "Set profile photo", file_path=TEST_PHOTO_PATH) + # await asyncio.sleep(2) + # await run_test(delete_profile_photo, "Delete profile photo") + + await run_test(get_privacy_settings, "Get privacy settings (last seen)") + # set_privacy_settings is complex and risky to test automatically. + logger.warning("Skipping set_privacy_settings test due to complexity and risk.") + # Example: Allow only TEST_USER_ID to see last seen (if TEST_USER_ID is set) + # if TEST_USER_ID: + # logger.warning("Testing set_privacy_settings - allowing TEST_USER_ID for last seen") + # await run_test(set_privacy_settings, "Set privacy (last seen - allow test user)", key='status_timestamp', allow_users=[TEST_USER_ID]) + # await asyncio.sleep(2) + # logger.warning("Restoring default privacy for last seen") + # await run_test(set_privacy_settings, "Restore privacy (last seen - allow all)", key='status_timestamp', allow_users=[]) # Assuming empty means allow all? + + # --- Bot Operations (Requires TEST_BOT_USERNAME) --- + if TEST_BOT_USERNAME: + logger.info(f"--- Running Bot Operations Tests (Bot: {TEST_BOT_USERNAME}) ---") + await run_test(get_bot_info, "Get bot info", bot_username=TEST_BOT_USERNAME) + + # Check if our client is a bot before testing command setting + is_bot = False + try: + me = await client.get_me() + is_bot = getattr(me, 'bot', False) + except Exception as e: + logger.error(f"Error checking if client is a bot: {e}") + + if is_bot: + # Only proceed with set_bot_commands test if we're a bot + logger.info("Client is a bot account, testing set_bot_commands") + await run_test(set_bot_commands, "Set bot commands", bot_username=TEST_BOT_USERNAME, + commands=[{'command': 'mcp_test', 'description': 'MCP Test Command'}]) + await asyncio.sleep(2) + await run_test(set_bot_commands, "Clear bot commands", bot_username=TEST_BOT_USERNAME, commands=[]) + else: + # Skip the set_bot_commands test if we're not a bot + logger.warning("Client is a regular user account, not a bot. Skipping set_bot_commands test.") + logger.info("Note: The set_bot_commands function can only be used by bot accounts.") + else: + logger.warning("TEST_BOT_USERNAME not set. Skipping bot tests.") + + # --- Other Operations --- + logger.info("--- Running Other Operations Tests ---") + await run_test(search_public_chats, "Search public chats for 'bot'", query="bot") + await run_test(get_sticker_sets, "Get sticker sets") + + # Final check for remaining tools that haven't been explicitly tested + logger.info("--- Testing Remaining Tools ---") + + # Test the archive/unarchive chat functions if not already tested + if TEST_CHAT_ID: + try: + # Only run if we haven't tested these already + await run_test(archive_chat, "Archive test chat (final check)", chat_id=TEST_CHAT_ID) + await asyncio.sleep(1) + await run_test(unarchive_chat, "Unarchive test chat (final check)", chat_id=TEST_CHAT_ID) + except Exception as e: + logger.warning(f"Archive/unarchive test failed: {e}") + + logger.info("--- All Tests Completed ---") + + +if __name__ == "__main__": + nest_asyncio.apply() + + async def main(): + try: + logger.info("Starting Telegram client for testing...") + # Ensure client is started and authorized + await client.start() + if not await client.is_user_authorized(): + logger.error("Client authorization failed. Please run main.py interactively first.") + sys.exit(1) + + await run_all_tests() + + except Exception as e: + logger.critical(f"Critical error during test execution: {e}", exc_info=True) + sys.exit(1) + finally: + if client.is_connected(): + logger.info("Disconnecting Telegram client...") + await client.disconnect() + + asyncio.run(main()) \ No newline at end of file