feat: refine logging and error handling in main.py, update .env.example and README

- Enhanced logging setup in main.py for better error tracking and console output.
- Improved error handling in group and user management functions.
- Updated .env.example to include a sample session string and removed outdated test configurations.
- Expanded README.md with new tool examples and usage instructions for better clarity.
- Removed the test.py file as it is no longer needed for testing.
This commit is contained in:
anonim 2025-04-17 17:56:42 +03:00
parent fec185075b
commit e297416e54
6 changed files with 743 additions and 800 deletions

View file

@ -6,45 +6,4 @@ TELEGRAM_API_HASH=0123456789abcdef0123456789abcdef
# Option 1: File-based session (a .session file will be created)
TELEGRAM_SESSION_NAME=telegram_session
# Option 2: String-based session (if you generate one, e.g., using Telethon's string session generator)
TELEGRAM_SESSION_STRING=
# --- Test Script Configuration (test.py) ---
# Fill these with IDs/paths relevant to YOUR test environment.
# Using real user/group IDs requires caution and consent.
# A chat ID where the script can send/edit/delete messages safely.
# Defaults to your "Saved Messages" if left empty or 0.
TEST_CHAT_ID=0
# The ID of a Supergroup or Channel you own or are an admin in.
# Required for testing admin actions (ban, invite, promote, etc.).
TEST_SUPERGROUP_ID=0
# The numeric User ID of a TEST account (NOT a real person unless they consent).
# This user will be banned/unbanned/invited/promoted in the TEST_SUPERGROUP_ID.
TEST_USER_ID=0
# The username (without @) of the TEST_USER_ID.
TEST_USERNAME=username
# --- Optional Test Variables ---
# Phone number (E.164 format, e.g., +15551234567) for add_contact test.
TEST_CONTACT_PHONE=+15551234567
TEST_CONTACT_FNAME=Test
TEST_CONTACT_LNAME=Contact
# Paths to dummy files for testing uploads/sending media.
# The script will create test_upload.txt if it doesn't exist.
# You might need to provide small, valid image/voice/sticker files for other tests.
TEST_FILE_PATH=test_upload.txt
TEST_PHOTO_PATH=test_photo.jpg
TEST_VOICE_PATH=test_voice.ogg
TEST_STICKER_PATH=sticker.webp
# Username of a bot you own (required for get_bot_info, set_bot_commands tests)
TEST_BOT_USERNAME=your_bot_username
# Hash part or full invite link for a group/channel you want to test joining
# Required for join_chat_by_link / import_chat_invite tests
TEST_INVITE_LINK_HASH=https://t.me/+AbCdEfGhIjK
TELEGRAM_SESSION_STRING=1231231232erfdfdffd

20
.gitignore vendored
View file

@ -85,7 +85,7 @@ ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
@ -189,12 +189,16 @@ claude_desktop_config.json
# Test files
.cursor/
test_voice.ogg
test_upload.txt
test_output.txt
sticker.webp
test_download_*
test_download.*
two.png
tests/
test.py
extract_test_issues.py
*.tmp
mcp_errors.log
telegram_test.log
test_issues_report.md
# Temporary data files
test_upload.txt
test_voice.ogg
sticker.webp
two.png

276
README.md
View file

@ -186,33 +186,269 @@ Edit `~/.cursor/mcp.json`:
---
## 🧪 Testing
## 📝 Tool Examples with Code & Output
A comprehensive test script is included to validate all functionality:
Below are examples of the most commonly used tools with their implementation and sample output.
```bash
# Basic test (redirects output to file)
python test.py > test_output.txt 2>&1
### Getting Your Chats
```python
@mcp.tool()
async def get_chats(page: int = 1, page_size: int = 20) -> str:
"""
Get a paginated list of chats.
Args:
page: Page number (1-indexed).
page_size: Number of chats per page.
"""
try:
dialogs = await client.get_dialogs()
start = (page - 1) * page_size
end = start + page_size
if start >= len(dialogs):
return "Page out of range."
chats = dialogs[start:end]
lines = []
for dialog in chats:
entity = dialog.entity
chat_id = entity.id
title = getattr(entity, "title", None) or getattr(entity, "first_name", "Unknown")
lines.append(f"Chat ID: {chat_id}, Title: {title}")
return "\n".join(lines)
except Exception as e:
logger.exception(f"get_chats failed (page={page}, page_size={page_size})")
return "An error occurred (code: GETCHATS-ERR-001). Check mcp_errors.log for details."
```
The test script uses environment variables from your `.env` file to configure testing parameters. See `.env.example` for all available test configuration options.
### Test Configuration
You can configure test parameters in your `.env` file:
Example output:
```
# A safe chat ID where tests can send/delete messages
TEST_CHAT_ID=your_saved_messages_id
# A supergroup you admin for testing group operations
TEST_SUPERGROUP_ID=your_supergroup_id
# A test user account ID (not a real person unless they consent)
TEST_USER_ID=test_user_id
Chat ID: 123456789, Title: John Doe
Chat ID: -100987654321, Title: My Project Group
Chat ID: 111223344, Title: Jane Smith
Chat ID: -200123456789, Title: News Channel
```
The tests are designed to be non-destructive, but use caution when testing with real accounts.
### Sending Messages
```python
@mcp.tool()
async def send_message(chat_id: int, message: str) -> str:
"""
Send a message to a specific chat.
Args:
chat_id: The ID of the chat.
message: The message content to send.
"""
try:
entity = await client.get_entity(chat_id)
await client.send_message(entity, message)
return "Message sent successfully."
except Exception as e:
logger.exception(f"send_message failed (chat_id={chat_id})")
return "An error occurred (code: SENDMSG-ERR-001). Check mcp_errors.log for details."
```
Example output:
```
Message sent successfully.
```
### Getting Chat Invite Links
The `get_invite_link` function is particularly robust with multiple fallback methods:
```python
@mcp.tool()
async def get_invite_link(chat_id: int) -> str:
"""
Get the invite link for a group or channel.
"""
try:
entity = await client.get_entity(chat_id)
# Try using ExportChatInviteRequest first
try:
from telethon.tl import functions
result = await client(functions.messages.ExportChatInviteRequest(
peer=entity
))
return result.link
except AttributeError:
# If the function doesn't exist in the current Telethon version
logger.warning("ExportChatInviteRequest not available, using alternative method")
except Exception as e1:
# If that fails, log and try alternative approach
logger.warning(f"ExportChatInviteRequest failed: {e1}")
# Alternative approach using client.export_chat_invite_link
try:
invite_link = await client.export_chat_invite_link(entity)
return invite_link
except Exception as e2:
logger.warning(f"export_chat_invite_link failed: {e2}")
# Last resort: Try directly fetching chat info
try:
if isinstance(entity, (Chat, Channel)):
full_chat = await client(functions.messages.GetFullChatRequest(
chat_id=entity.id
))
if hasattr(full_chat, 'full_chat') and hasattr(full_chat.full_chat, 'invite_link'):
return full_chat.full_chat.invite_link or "No invite link available."
except Exception as e3:
logger.warning(f"GetFullChatRequest failed: {e3}")
return "Could not retrieve invite link for this chat."
except Exception as e:
logger.exception(f"get_invite_link failed (chat_id={chat_id})")
return f"Error getting invite link: {e}"
```
Example output:
```
https://t.me/+AbCdEfGhIjKlMnOp
```
### Joining Chats via Invite Links
```python
@mcp.tool()
async def join_chat_by_link(link: str) -> str:
"""
Join a chat by invite link.
"""
try:
# Extract the hash from the invite link
if '/' in link:
hash_part = link.split('/')[-1]
if hash_part.startswith('+'):
hash_part = hash_part[1:] # Remove the '+' if present
else:
hash_part = link
# Try checking the invite before joining
try:
# Try to check invite info first (will often fail if not a member)
invite_info = await client(functions.messages.CheckChatInviteRequest(hash=hash_part))
if hasattr(invite_info, 'chat') and invite_info.chat:
# If we got chat info, we're already a member
chat_title = getattr(invite_info.chat, 'title', 'Unknown Chat')
return f"You are already a member of this chat: {chat_title}"
except Exception:
# This often fails if not a member - just continue
pass
# Join the chat using the hash
result = await client(functions.messages.ImportChatInviteRequest(hash=hash_part))
if result and hasattr(result, 'chats') and result.chats:
chat_title = getattr(result.chats[0], 'title', 'Unknown Chat')
return f"Successfully joined chat: {chat_title}"
return f"Joined chat via invite hash."
except Exception as e:
err_str = str(e).lower()
if "expired" in err_str:
return "The invite hash has expired and is no longer valid."
elif "invalid" in err_str:
return "The invite hash is invalid or malformed."
elif "already" in err_str and "participant" in err_str:
return "You are already a member of this chat."
logger.exception(f"join_chat_by_link failed (link={link})")
return f"Error joining chat: {e}"
```
Example output:
```
Successfully joined chat: Developer Community
```
### Searching Public Chats
```python
@mcp.tool()
async def search_public_chats(query: str) -> str:
"""
Search for public chats, channels, or bots by username or title.
"""
try:
result = await client(functions.contacts.SearchRequest(q=query, limit=20))
return json.dumps([format_entity(u) for u in result.users], indent=2)
except Exception as e:
return f"Error searching public chats: {e}"
```
Example output:
```json
[
{
"id": 123456789,
"name": "TelegramBot",
"type": "user",
"username": "telegram_bot"
},
{
"id": 987654321,
"name": "Telegram News",
"type": "user",
"username": "telegram_news"
}
]
```
### Getting Direct Chats with Contacts
```python
@mcp.tool()
async def get_direct_chat_by_contact(contact_query: str) -> str:
"""
Find a direct chat with a specific contact by name, username, or phone.
Args:
contact_query: Name, username, or phone number to search for.
"""
try:
# Fetch all contacts using the correct Telethon method
result = await client(functions.contacts.GetContactsRequest(hash=0))
contacts = result.users
found_contacts = []
for contact in contacts:
if not contact:
continue
name = f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip()
username = getattr(contact, 'username', '')
phone = getattr(contact, 'phone', '')
if (contact_query.lower() in name.lower() or
(username and contact_query.lower() in username.lower()) or
(phone and contact_query in phone)):
found_contacts.append(contact)
if not found_contacts:
return f"No contacts found matching '{contact_query}'."
# If we found contacts, look for direct chats with them
results = []
dialogs = await client.get_dialogs()
for contact in found_contacts:
contact_name = f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip()
for dialog in dialogs:
if isinstance(dialog.entity, User) and dialog.entity.id == contact.id:
chat_info = f"Chat ID: {dialog.entity.id}, Contact: {contact_name}"
if getattr(contact, 'username', ''):
chat_info += f", Username: @{contact.username}"
if dialog.unread_count:
chat_info += f", Unread: {dialog.unread_count}"
results.append(chat_info)
break
if not results:
return f"Found contacts matching '{contact_query}', but no direct chats with them."
return "\n".join(results)
except Exception as e:
return f"Error searching for direct chat: {e}"
```
Example output:
```
Chat ID: 123456789, Contact: John Smith, Username: @johnsmith, Unread: 3
```
---

620
main.py
View file

@ -17,6 +17,7 @@ from typing import List, Dict, Optional, Union, Any
from telethon import functions
import mimetypes
import logging
import telethon.errors.rpcerrorlist
# Helper function for JSON serialization of datetime, bytes, and other non-serializable objects
def json_serializer(obj):
@ -46,13 +47,36 @@ else:
# Use file-based session
client = TelegramClient(TELEGRAM_SESSION_NAME, TELEGRAM_API_ID, TELEGRAM_API_HASH)
# Setup logger for error reporting
logging.basicConfig(
filename='mcp_errors.log',
level=logging.ERROR,
format='%(asctime)s %(levelname)s %(name)s %(message)s'
)
logger = logging.getLogger("mcp")
# Setup robust logging with both file and console output
logger = logging.getLogger("telegram_mcp")
logger.setLevel(logging.ERROR) # Set to ERROR for production, INFO for debugging
# Create console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.ERROR) # Set to ERROR for production, INFO for debugging
# Create file handler with absolute path
script_dir = os.path.dirname(os.path.abspath(__file__))
log_file_path = os.path.join(script_dir, "mcp_errors.log")
try:
file_handler = logging.FileHandler(log_file_path, mode='a') # Append mode
file_handler.setLevel(logging.ERROR)
# Create formatter and add to handlers
formatter = logging.Formatter('%(asctime)s %(levelname)s %(name)s %(message)s')
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)
# Add handlers to logger
logger.addHandler(console_handler)
logger.addHandler(file_handler)
logger.info(f"Logging initialized to {log_file_path}")
except Exception as log_error:
print(f"WARNING: Error setting up log file: {log_error}")
# Fallback to console-only logging
logger.addHandler(console_handler)
def format_entity(entity) -> Dict[str, Any]:
"""Helper function to format entity information consistently."""
@ -255,7 +279,14 @@ async def list_messages(chat_id: int, limit: int = 20, search_query: str = None,
try:
from_date_obj = datetime.strptime(from_date, "%Y-%m-%d")
# Make it timezone aware by adding UTC timezone info
from_date_obj = from_date_obj.replace(tzinfo=datetime.timezone.utc)
# Use datetime.timezone.utc for Python 3.9+ or import timezone directly for 3.13+
try:
# For Python 3.9+
from_date_obj = from_date_obj.replace(tzinfo=datetime.timezone.utc)
except AttributeError:
# For Python 3.13+
from datetime import timezone
from_date_obj = from_date_obj.replace(tzinfo=timezone.utc)
except ValueError:
return f"Invalid from_date format. Use YYYY-MM-DD."
@ -263,7 +294,13 @@ async def list_messages(chat_id: int, limit: int = 20, search_query: str = None,
try:
to_date_obj = datetime.strptime(to_date, "%Y-%m-%d")
# Set to end of day and make timezone aware
to_date_obj = (to_date_obj + timedelta(days=1, microseconds=-1)).replace(tzinfo=datetime.timezone.utc)
to_date_obj = to_date_obj + timedelta(days=1, microseconds=-1)
# Add timezone info
try:
to_date_obj = to_date_obj.replace(tzinfo=datetime.timezone.utc)
except AttributeError:
from datetime import timezone
to_date_obj = to_date_obj.replace(tzinfo=timezone.utc)
except ValueError:
return f"Invalid to_date format. Use YYYY-MM-DD."
@ -732,96 +769,169 @@ async def get_me() -> str:
@mcp.tool()
async def create_group(title: str, user_ids: list) -> str:
"""
Create a new group with the given title and user IDs.
Create a new group or supergroup and add users.
Args:
title: The group name.
user_ids: List of user IDs to add to the group.
title: Title for the new group
user_ids: List of user IDs to add to the group
"""
try:
users = [await client.get_entity(uid) for uid in user_ids]
result = await client(functions.messages.CreateChatRequest(users=users, title=title))
return f"Group '{title}' created with ID: {result.chats[0].id}"
# Convert user IDs to entities
users = []
for user_id in user_ids:
try:
user = await client.get_entity(user_id)
users.append(user)
except Exception as e:
logger.error(f"Failed to get entity for user ID {user_id}: {e}")
return f"Error: Could not find user with ID {user_id}"
if not users:
return "Error: No valid users provided"
# Create the group with the users
try:
# Create a new chat with selected users
result = await client(functions.messages.CreateChatRequest(
users=users,
title=title
))
# Check what type of response we got
if hasattr(result, 'chats') and result.chats:
created_chat = result.chats[0]
return f"Group created with ID: {created_chat.id}"
elif hasattr(result, 'chat') and result.chat:
return f"Group created with ID: {result.chat.id}"
elif hasattr(result, 'chat_id'):
return f"Group created with ID: {result.chat_id}"
else:
# If we can't determine the chat ID directly from the result
# Try to find it in recent dialogs
await asyncio.sleep(1) # Give Telegram a moment to register the new group
dialogs = await client.get_dialogs(limit=5) # Get recent dialogs
for dialog in dialogs:
if dialog.title == title:
return f"Group created with ID: {dialog.id}"
# If we still can't find it, at least return success
return f"Group created successfully. Please check your recent chats for '{title}'."
except Exception as create_err:
if "PEER_FLOOD" in str(create_err):
return "Error: Cannot create group due to Telegram limits. Try again later."
else:
raise # Let the outer exception handler catch it
except Exception as e:
logger.exception(f"create_group failed (title={title}, user_ids={user_ids})")
return f"Error creating group: {e}"
@mcp.tool()
async def invite_to_group(group_id: int, user_ids: list) -> str:
"""
Invite users to a group or channel by group ID.
Invite users to a group or channel.
Args:
group_id: The group/channel chat ID.
group_id: The ID of the group/channel.
user_ids: List of user IDs to invite.
"""
try:
chat_entity = await client.get_entity(group_id)
user_entities = []
for uid in user_ids:
entity = await client.get_entity(group_id)
users_to_add = []
for user_id in user_ids:
try:
user_entities.append(await client.get_entity(uid))
except Exception as user_e:
logger.error(f"Could not find user entity for ID {uid}: {user_e}")
return f"Error finding user {uid}: {user_e}"
if not user_entities:
return "No valid user IDs provided or found."
if isinstance(chat_entity, Channel):
# Use InviteToChannelRequest for channels and supergroups
await client(functions.channels.InviteToChannelRequest(
channel=chat_entity,
users=user_entities
user = await client.get_entity(user_id)
users_to_add.append(user)
except ValueError as e:
return f"Error: User with ID {user_id} could not be found. {e}"
try:
result = await client(functions.channels.InviteToChannelRequest(
channel=entity,
users=users_to_add
))
return f"Invited {len(user_entities)} users to channel/supergroup {group_id}."
elif isinstance(chat_entity, Chat):
# Use AddChatUserRequest for basic groups (adds one user at a time)
added_count = 0
errors = []
for user in user_entities:
try:
# Note: fwd_limit=0 might be needed depending on privacy settings
await client(functions.messages.AddChatUserRequest(chat_id=group_id, user_id=user, fwd_limit=50))
added_count += 1
except Exception as add_e:
error_msg = f"Error inviting user {getattr(user, 'id', 'unknown')} to basic group {group_id}: {add_e}"
logger.error(error_msg)
errors.append(error_msg)
result_message = f"Invited {added_count} users to basic group {group_id}."
if errors:
result_message += "\nErrors encountered:\n" + "\n".join(errors)
return result_message
else:
return f"Chat ID {group_id} is neither a Channel/Supergroup nor a basic Group."
invited_count = 0
if hasattr(result, 'users') and result.users:
invited_count = len(result.users)
elif hasattr(result, 'count'):
invited_count = result.count
return f"Successfully invited {invited_count} users to {entity.title}"
except telethon.errors.rpcerrorlist.UserNotMutualContactError:
return "Error: Cannot invite users who are not mutual contacts. Please ensure the users are in your contacts and have added you back."
except telethon.errors.rpcerrorlist.UserPrivacyRestrictedError:
return "Error: One or more users have privacy settings that prevent you from adding them."
except Exception as e:
return f"Error inviting users: {e}"
except Exception as e:
logger.exception(f"invite_to_group failed (group_id={group_id}, user_ids={user_ids})")
return f"Error inviting users: {e}"
logger.error(f"telegram_mcp invite_to_group failed (group_id={group_id}, user_ids={user_ids})", exc_info=True)
return f"Error: {e}"
@mcp.tool()
async def leave_chat(chat_id: int) -> str:
"""
Leave a group or channel by chat ID.
Args:
chat_id: The chat ID to leave.
"""
try:
entity = await client.get_entity(chat_id)
# Check the entity type carefully
if isinstance(entity, Channel):
# Leave channel or supergroup
await client(functions.channels.LeaveChannelRequest(channel=entity))
return f"Left channel/supergroup {chat_id}."
# Handle both channels and supergroups (which are also channels in Telegram)
try:
await client(functions.channels.LeaveChannelRequest(channel=entity))
chat_name = getattr(entity, 'title', str(chat_id))
return f"Left channel/supergroup {chat_name} (ID: {chat_id})."
except Exception as chan_err:
return f"Error leaving channel: {chan_err}"
elif isinstance(entity, Chat):
# Leave basic group
me = await client.get_me(input_peer=True) # Get self entity for DeleteChatUserRequest
await client(functions.messages.DeleteChatUserRequest(chat_id=chat_id, user_id=me))
return f"Left basic group {chat_id}."
# Traditional basic groups (not supergroups)
try:
# First try with InputPeerUser
me = await client.get_me(input_peer=True)
await client(functions.messages.DeleteChatUserRequest(
chat_id=entity.id, # Use the entity ID directly
user_id=me
))
chat_name = getattr(entity, 'title', str(chat_id))
return f"Left basic group {chat_name} (ID: {chat_id})."
except Exception as chat_err:
# If the above fails, try the second approach
logger.warning(f"First leave attempt failed: {chat_err}, trying alternative method")
try:
# Alternative approach - sometimes this works better
me_full = await client.get_me()
await client(functions.messages.DeleteChatUserRequest(
chat_id=entity.id,
user_id=me_full.id
))
chat_name = getattr(entity, 'title', str(chat_id))
return f"Left basic group {chat_name} (ID: {chat_id})."
except Exception as alt_err:
return f"Error leaving basic group: {alt_err}"
else:
# Cannot leave a user chat this way
return f"Cannot leave chat {chat_id} of type {type(entity)}. This function is for groups and channels."
# Cannot leave a user chat this way
entity_type = type(entity).__name__
return f"Cannot leave chat ID {chat_id} of type {entity_type}. This function is for groups and channels only."
except Exception as e:
logger.exception(f"leave_chat failed (chat_id={chat_id})")
# Provide helpful hint for common errors
error_str = str(e).lower()
if "invalid" in error_str and "chat" in error_str:
return "Error: This appears to be a channel/supergroup. Please check the chat ID and try again."
return f"Error leaving chat: {e}"
@ -935,12 +1045,24 @@ async def delete_profile_photo() -> str:
@mcp.tool()
async def get_privacy_settings() -> str:
"""
Get your privacy settings.
Get your privacy settings for last seen status.
"""
try:
settings = await client(functions.account.GetPrivacyRequest(key='status_timestamp'))
return str(settings)
# Import needed types directly
from telethon.tl.types import InputPrivacyKeyStatusTimestamp
try:
settings = await client(functions.account.GetPrivacyRequest(
key=InputPrivacyKeyStatusTimestamp()
))
return str(settings)
except TypeError as e:
if "TLObject was expected" in str(e):
return "Error: Privacy settings API call failed due to type mismatch. This is likely a version compatibility issue with Telethon."
else:
raise
except Exception as e:
logger.exception("get_privacy_settings failed")
return f"Error getting privacy settings: {e}"
@ -948,19 +1070,93 @@ async def get_privacy_settings() -> str:
async def set_privacy_settings(key: str, allow_users: list = None, disallow_users: list = None) -> str:
"""
Set privacy settings (e.g., last seen, phone, etc.).
key: e.g. 'status_timestamp', 'phone_number', 'profile_photo', 'forwards', 'voice_messages', etc.
Args:
key: The privacy setting to modify ('status' for last seen, 'phone', 'profile_photo', etc.)
allow_users: List of user IDs to allow
disallow_users: List of user IDs to disallow
"""
from telethon.tl.types import InputPrivacyKeyStatusTimestamp, InputPrivacyValueAllowUsers, InputPrivacyValueDisallowUsers
try:
allow = InputPrivacyValueAllowUsers(users=[await client.get_entity(uid) for uid in (allow_users or [])])
disallow = InputPrivacyValueDisallowUsers(users=[await client.get_entity(uid) for uid in (disallow_users or [])])
await client(functions.account.SetPrivacyRequest(
key=getattr(functions.account, f'InputPrivacyKey{key.title().replace("_", "")}')(),
rules=[allow, disallow]
))
return f"Privacy settings for {key} updated."
# Import needed types
from telethon.tl.types import (
InputPrivacyKeyStatusTimestamp,
InputPrivacyKeyPhoneNumber,
InputPrivacyKeyProfilePhoto,
InputPrivacyValueAllowUsers,
InputPrivacyValueDisallowUsers,
InputPrivacyValueAllowAll,
InputPrivacyValueDisallowAll
)
# Map the simplified keys to their corresponding input types
key_mapping = {
'status': InputPrivacyKeyStatusTimestamp,
'phone': InputPrivacyKeyPhoneNumber,
'profile_photo': InputPrivacyKeyProfilePhoto,
}
# Get the appropriate key class
if key not in key_mapping:
return f"Error: Unsupported privacy key '{key}'. Supported keys: {', '.join(key_mapping.keys())}"
privacy_key = key_mapping[key]()
# Prepare the rules
rules = []
# Process allow rules
if allow_users is None or len(allow_users) == 0:
# If no specific users to allow, allow everyone by default
rules.append(InputPrivacyValueAllowAll())
else:
# Convert user IDs to InputUser entities
try:
allow_entities = []
for user_id in allow_users:
try:
user = await client.get_entity(user_id)
allow_entities.append(user)
except Exception as user_err:
logger.warning(f"Could not get entity for user ID {user_id}: {user_err}")
if allow_entities:
rules.append(InputPrivacyValueAllowUsers(users=allow_entities))
except Exception as allow_err:
logger.error(f"Error processing allowed users: {allow_err}")
return f"Error processing allowed users: {allow_err}"
# Process disallow rules
if disallow_users and len(disallow_users) > 0:
try:
disallow_entities = []
for user_id in disallow_users:
try:
user = await client.get_entity(user_id)
disallow_entities.append(user)
except Exception as user_err:
logger.warning(f"Could not get entity for user ID {user_id}: {user_err}")
if disallow_entities:
rules.append(InputPrivacyValueDisallowUsers(users=disallow_entities))
except Exception as disallow_err:
logger.error(f"Error processing disallowed users: {disallow_err}")
return f"Error processing disallowed users: {disallow_err}"
# Apply the privacy settings
try:
result = await client(functions.account.SetPrivacyRequest(
key=privacy_key,
rules=rules
))
return f"Privacy settings for {key} updated successfully."
except TypeError as type_err:
if "TLObject was expected" in str(type_err):
return "Error: Privacy settings API call failed due to type mismatch. This is likely a version compatibility issue with Telethon."
else:
raise
except Exception as e:
return f"Error setting privacy: {e}"
logger.exception(f"set_privacy_settings failed (key={key})")
return f"Error setting privacy settings: {e}"
@mcp.tool()
@ -1090,87 +1286,203 @@ async def delete_chat_photo(chat_id: int) -> str:
@mcp.tool()
async def promote_admin(chat_id: int, user_id: int) -> str:
async def promote_admin(group_id: int, user_id: int, rights: dict = None) -> str:
"""
Promote a user to admin in a group or channel.
Promote a user to admin in a group/channel.
Args:
group_id: ID of the group/channel
user_id: User ID to promote
rights: Admin rights to give (optional)
"""
from telethon.tl.types import ChatAdminRights
try:
chat = await client.get_entity(group_id)
user = await client.get_entity(user_id)
await client(functions.channels.EditAdminRequest(
channel=chat_id,
user_id=user,
admin_rights=ChatAdminRights(
change_info=True, post_messages=True, edit_messages=True, delete_messages=True,
ban_users=True, invite_users=True, pin_messages=True, add_admins=True, manage_call=True, other=True
),
rank="admin"
))
return f"User {user_id} promoted to admin in chat {chat_id}."
# Set default admin rights if not provided
if not rights:
rights = {
'change_info': True,
'post_messages': True,
'edit_messages': True,
'delete_messages': True,
'ban_users': True,
'invite_users': True,
'pin_messages': True,
'add_admins': False,
'anonymous': False,
'manage_call': True,
'other': True
}
admin_rights = ChatAdminRights(
change_info=rights.get('change_info', True),
post_messages=rights.get('post_messages', True),
edit_messages=rights.get('edit_messages', True),
delete_messages=rights.get('delete_messages', True),
ban_users=rights.get('ban_users', True),
invite_users=rights.get('invite_users', True),
pin_messages=rights.get('pin_messages', True),
add_admins=rights.get('add_admins', False),
anonymous=rights.get('anonymous', False),
manage_call=rights.get('manage_call', True),
other=rights.get('other', True)
)
try:
result = await client(functions.channels.EditAdminRequest(
channel=chat,
user_id=user,
admin_rights=admin_rights,
rank="Admin"
))
return f"Successfully promoted user {user_id} to admin in {chat.title}"
except telethon.errors.rpcerrorlist.UserNotMutualContactError:
return "Error: Cannot promote users who are not mutual contacts. Please ensure the user is in your contacts and has added you back."
except Exception as e:
return f"Error promoting user to admin: {e}"
except Exception as e:
return f"Error promoting admin: {e}"
logger.error(f"telegram_mcp promote_admin failed (group_id={group_id}, user_id={user_id})", exc_info=True)
return f"Error: {str(e)}"
@mcp.tool()
async def demote_admin(chat_id: int, user_id: int) -> str:
async def demote_admin(group_id: int, user_id: int) -> str:
"""
Demote an admin to regular user in a group or channel.
Demote a user from admin in a group/channel.
Args:
group_id: ID of the group/channel
user_id: User ID to demote
"""
from telethon.tl.types import ChatAdminRights
try:
chat = await client.get_entity(group_id)
user = await client.get_entity(user_id)
await client(functions.channels.EditAdminRequest(
channel=chat_id,
user_id=user,
admin_rights=ChatAdminRights(),
rank=""
))
return f"User {user_id} demoted in chat {chat_id}."
# Create empty admin rights (regular user)
admin_rights = ChatAdminRights(
change_info=False,
post_messages=False,
edit_messages=False,
delete_messages=False,
ban_users=False,
invite_users=False,
pin_messages=False,
add_admins=False,
anonymous=False,
manage_call=False,
other=False
)
try:
result = await client(functions.channels.EditAdminRequest(
channel=chat,
user_id=user,
admin_rights=admin_rights,
rank=""
))
return f"Successfully demoted user {user_id} from admin in {chat.title}"
except telethon.errors.rpcerrorlist.UserNotMutualContactError:
return "Error: Cannot modify admin status of users who are not mutual contacts. Please ensure the user is in your contacts and has added you back."
except Exception as e:
return f"Error demoting admin: {e}"
except Exception as e:
return f"Error demoting admin: {e}"
logger.error(f"telegram_mcp demote_admin failed (group_id={group_id}, user_id={user_id})", exc_info=True)
return f"Error: {str(e)}"
@mcp.tool()
async def ban_user(chat_id: int, user_id: int) -> str:
"""
Ban a user from a group or channel.
Args:
chat_id: ID of the group/channel
user_id: User ID to ban
"""
from telethon.tl.types import ChatBannedRights
import time
try:
chat = await client.get_entity(chat_id)
user = await client.get_entity(user_id)
# Ban for 1 year (31536000 seconds)
banned_rights = ChatBannedRights(until_date=int(time.time()) + 31536000, view_messages=True)
await client(functions.channels.EditBannedRequest(
channel=chat_id,
participant=user, # Fix: Use 'participant' instead of 'user_id'
banned_rights=banned_rights
))
return f"User {user_id} banned from chat {chat_id}."
# Create banned rights (all restrictions enabled)
banned_rights = ChatBannedRights(
until_date=None, # Ban forever
view_messages=True,
send_messages=True,
send_media=True,
send_stickers=True,
send_gifs=True,
send_games=True,
send_inline=True,
embed_links=True,
send_polls=True,
change_info=True,
invite_users=True,
pin_messages=True
)
try:
await client(functions.channels.EditBannedRequest(
channel=chat,
participant=user,
banned_rights=banned_rights
))
return f"User {user_id} banned from chat {chat.title} (ID: {chat_id})."
except telethon.errors.rpcerrorlist.UserNotMutualContactError:
return "Error: Cannot ban users who are not mutual contacts. Please ensure the user is in your contacts and has added you back."
except Exception as e:
return f"Error banning user: {e}"
except Exception as e:
logger.exception(f"ban_user failed (chat_id={chat_id}, user_id={user_id})")
return f"Error banning user: {e}"
return f"Error: {e}"
@mcp.tool()
async def unban_user(chat_id: int, user_id: int) -> str:
"""
Unban a user from a group or channel.
Args:
chat_id: ID of the group/channel
user_id: User ID to unban
"""
from telethon.tl.types import ChatBannedRights
try:
chat = await client.get_entity(chat_id)
user = await client.get_entity(user_id)
# Fix: Provide until_date=0 for unbanning
banned_rights = ChatBannedRights(until_date=0)
await client(functions.channels.EditBannedRequest(
channel=chat_id,
participant=user,
banned_rights=banned_rights
))
return f"User {user_id} unbanned in chat {chat_id}."
# Create unbanned rights (no restrictions)
unbanned_rights = ChatBannedRights(
until_date=None,
view_messages=False,
send_messages=False,
send_media=False,
send_stickers=False,
send_gifs=False,
send_games=False,
send_inline=False,
embed_links=False,
send_polls=False,
change_info=False,
invite_users=False,
pin_messages=False
)
try:
await client(functions.channels.EditBannedRequest(
channel=chat,
participant=user,
banned_rights=unbanned_rights
))
return f"User {user_id} unbanned from chat {chat.title} (ID: {chat_id})."
except telethon.errors.rpcerrorlist.UserNotMutualContactError:
return "Error: Cannot modify status of users who are not mutual contacts. Please ensure the user is in your contacts and has added you back."
except Exception as e:
return f"Error unbanning user: {e}"
except Exception as e:
logger.exception(f"unban_user failed (chat_id={chat_id}, user_id={user_id})")
return f"Error unbanning user: {e}"
return f"Error: {e}"
@mcp.tool()
@ -1211,17 +1523,20 @@ async def get_invite_link(chat_id: int) -> str:
try:
entity = await client.get_entity(chat_id)
if hasattr(functions.messages, 'ExportChatInviteRequest'):
try:
# Try using the peer parameter instead of chat_id
result = await client(functions.messages.ExportChatInviteRequest(
peer=entity
))
return result.link
except Exception as e1:
# If that fails, try alternative approach
logger.warning(f"First approach failed: {e1}, trying alternative")
# Try using ExportChatInviteRequest first
try:
from telethon.tl import functions
result = await client(functions.messages.ExportChatInviteRequest(
peer=entity
))
return result.link
except AttributeError:
# If the function doesn't exist in the current Telethon version
logger.warning("ExportChatInviteRequest not available, using alternative method")
except Exception as e1:
# If that fails, log and try alternative approach
logger.warning(f"ExportChatInviteRequest failed: {e1}")
# Alternative approach using client.export_chat_invite_link
try:
invite_link = await client.export_chat_invite_link(entity)
@ -1230,11 +1545,15 @@ async def get_invite_link(chat_id: int) -> str:
logger.warning(f"export_chat_invite_link failed: {e2}")
# Last resort: Try directly fetching chat info
full_chat = await client(functions.messages.GetFullChatRequest(
chat_id=entity.id
))
if hasattr(full_chat, 'full_chat') and hasattr(full_chat.full_chat, 'invite_link'):
return full_chat.full_chat.invite_link or "No invite link available."
try:
if isinstance(entity, (Chat, Channel)):
full_chat = await client(functions.messages.GetFullChatRequest(
chat_id=entity.id
))
if hasattr(full_chat, 'full_chat') and hasattr(full_chat.full_chat, 'invite_link'):
return full_chat.full_chat.invite_link or "No invite link available."
except Exception as e3:
logger.warning(f"GetFullChatRequest failed: {e3}")
return "Could not retrieve invite link for this chat."
except Exception as e:
@ -1306,20 +1625,27 @@ async def export_chat_invite(chat_id: int) -> str:
try:
entity = await client.get_entity(chat_id)
# This is essentially the same as get_invite_link, but kept separate for API consistency
# Try using ExportChatInviteRequest first
try:
# Try using the peer parameter instead of chat_id
from telethon.tl import functions
result = await client(functions.messages.ExportChatInviteRequest(
peer=entity
))
return result.link
except AttributeError:
# If the function doesn't exist in the current Telethon version
logger.warning("ExportChatInviteRequest not available, using alternative method")
except Exception as e1:
# If that fails, try alternative approach
logger.warning(f"ExportChatInviteRequest failed: {e1}, trying alternative")
# If that fails, log and try alternative approach
logger.warning(f"ExportChatInviteRequest failed: {e1}")
# Alternative approach
invite_link = await client.export_chat_invite_link(entity)
return invite_link
# Alternative approach using client.export_chat_invite_link
try:
invite_link = await client.export_chat_invite_link(entity)
return invite_link
except Exception as e2:
logger.warning(f"export_chat_invite_link failed: {e2}")
return f"Could not export chat invite: {e2}"
except Exception as e:
logger.exception(f"export_chat_invite failed (chat_id={chat_id})")
return f"Error exporting chat invite: {e}"

1
setup.py Normal file
View file

@ -0,0 +1 @@

583
test.py
View file

@ -1,583 +0,0 @@
import os
import sys
import asyncio
import nest_asyncio
from dotenv import load_dotenv
import logging
import random
import string
import json
from datetime import datetime, timedelta
# Assume main.py is in the same directory or adjust path accordingly
from main import (
client, mcp,
get_chats, get_messages, send_message, list_contacts, search_contacts,
get_contact_ids, list_messages, list_chats, get_chat, get_direct_chat_by_contact,
get_contact_chats, get_last_interaction, get_message_context, add_contact,
delete_contact, block_user, unblock_user, get_me, create_group,
invite_to_group, leave_chat, get_participants, send_file, download_media,
update_profile, set_profile_photo, delete_profile_photo, get_privacy_settings,
set_privacy_settings, import_contacts, export_contacts, get_blocked_users,
create_channel, edit_chat_title, edit_chat_photo, delete_chat_photo,
promote_admin, demote_admin, ban_user, unban_user, get_admins,
get_banned_users, get_invite_link, join_chat_by_link, export_chat_invite,
import_chat_invite, send_voice, forward_message, edit_message,
delete_message, pin_message, unpin_message, mark_as_read, reply_to_message,
upload_file, get_media_info, search_public_chats, search_messages,
resolve_username, mute_chat, unmute_chat, archive_chat, unarchive_chat,
get_sticker_sets, send_sticker, get_gif_search, send_gif, get_bot_info,
set_bot_commands, get_history, get_user_photos, get_user_status,
get_recent_actions, get_pinned_messages
)
# Import specific telethon types needed for tests
from telethon.errors.rpcerrorlist import UserNotParticipantError
from telethon.tl import types
load_dotenv()
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filename='.log', # Log to .log file
filemode='w' # Overwrite the log file each time
)
logger = logging.getLogger("TelegramToolTester")
logger.info("Logging configured to .log file.") # Force file creation early
# --- Test Configuration ---
# Set these environment variables before running the test script
TEST_CHAT_ID = int(os.getenv("TEST_CHAT_ID", "0")) # A safe chat ID (e.g., Saved Messages or a test group)
TEST_SUPERGROUP_ID = int(os.getenv("TEST_SUPERGROUP_ID", "0")) # ID of a test supergroup/channel you own/admin
TEST_USER_ID = int(os.getenv("TEST_USER_ID", "0")) # ID of a test user account (NOT a real person unless they consent!)
TEST_USERNAME = os.getenv("TEST_USERNAME", "") # Username of the test user
TEST_CONTACT_PHONE = os.getenv("TEST_CONTACT_PHONE", "") # Phone number for add_contact test (e.g., +15551234567)
TEST_CONTACT_FNAME = os.getenv("TEST_CONTACT_FNAME", "Test")
TEST_CONTACT_LNAME = os.getenv("TEST_CONTACT_LNAME", "Contact")
TEST_FILE_PATH = os.getenv("TEST_FILE_PATH", "test_upload.txt") # Path to a dummy file for upload/send tests
TEST_PHOTO_PATH = os.getenv("TEST_PHOTO_PATH", "test_photo.jpg") # Path to a dummy photo file
TEST_VOICE_PATH = os.getenv("TEST_VOICE_PATH", "test_voice.ogg") # Path to a dummy ogg voice file
TEST_STICKER_PATH = os.getenv("TEST_STICKER_PATH", "test_sticker.webp") # Path to a dummy webp sticker
TEST_BOT_USERNAME = os.getenv("TEST_BOT_USERNAME", "") # Username of a bot you own
TEST_INVITE_LINK_HASH = os.getenv("TEST_INVITE_LINK_HASH", "") # Hash from a valid invite link
# Create dummy files if they don't exist
if not os.path.exists(TEST_FILE_PATH):
with open(TEST_FILE_PATH, "w") as f:
f.write("This is a test file.")
if not os.path.exists(TEST_PHOTO_PATH):
logger.warning(f"Test photo file not found: {TEST_PHOTO_PATH}. Some tests might fail.")
if not os.path.exists(TEST_VOICE_PATH):
logger.warning(f"Test voice file not found: {TEST_VOICE_PATH}. send_voice test will fail.")
if not os.path.exists(TEST_STICKER_PATH):
logger.warning(f"Test sticker file not found: {TEST_STICKER_PATH}. send_sticker test will fail.")
async def run_test(tool_func, description, **kwargs):
logger.info(f"--- Testing: {description} ({tool_func.__name__}) ---")
logger.info(f"Params: {kwargs}")
try:
result = await tool_func(**kwargs)
logger.info(f"Result: {result}")
return result
except Exception as e:
logger.error(f"Error during {tool_func.__name__}: {e}", exc_info=True)
return f"TEST FAILED: {e}"
async def run_all_tests():
global TEST_CHAT_ID # Declare intention to modify the global variable
if not await client.is_user_authorized():
logger.error("Client not authorized. Please run main.py first to log in.")
return
logger.info("Starting Telegram Tool Tests...")
me = await client.get_me()
me_info = await get_me() # Use the tool version
logger.info(f"Running tests as: {me.first_name} (ID: {me.id})")
if not TEST_CHAT_ID:
TEST_CHAT_ID = me.id # Default to Saved Messages if not set
logger.warning(f"TEST_CHAT_ID not set, defaulting to Saved Messages ({TEST_CHAT_ID}).")
# --- Basic Info & Chat Listing ---
logger.info("--- Running Basic Info & Listing Tests ---")
await run_test(get_me, "Get own user info")
await run_test(list_chats, "List recent chats", limit=5)
await run_test(list_chats, "List groups", chat_type='group', limit=5)
await run_test(list_chats, "List channels", chat_type='channel', limit=5)
await run_test(list_chats, "List users", chat_type='user', limit=5)
# --- Test the basic get_chats function ---
await run_test(get_chats, "Get chats tool (paginated, page 1)", page=1, page_size=5)
# --- Specific Chat Operations (using TEST_CHAT_ID) ---
message_id_to_test = None
if TEST_CHAT_ID:
logger.info(f"--- Running tests on Chat ID: {TEST_CHAT_ID} ---")
await run_test(get_chat, "Get info for test chat", chat_id=TEST_CHAT_ID)
await run_test(get_history, "Get history for test chat", chat_id=TEST_CHAT_ID, limit=5)
await run_test(get_messages, "Get messages tool (paginated)", chat_id=TEST_CHAT_ID, page=1, page_size=5)
today = datetime.now()
yesterday = today - timedelta(days=1)
await run_test(list_messages, "List messages tool (filtered date)", chat_id=TEST_CHAT_ID, limit=10, from_date=yesterday.strftime("%Y-%m-%d"), to_date=today.strftime("%Y-%m-%d"))
await run_test(list_messages, "List messages tool (search)", chat_id=TEST_CHAT_ID, limit=10, search_query="Test")
# Send a test message to get IDs for subsequent tests
sent_msg_result = await run_test(send_message, "Send test message", chat_id=TEST_CHAT_ID, message=f"MCP Test Message {random.randint(1000, 9999)}")
if "successfully" in str(sent_msg_result).lower():
try:
# Fetch the last message sent by self (hopefully the test message)
async for msg in client.iter_messages(TEST_CHAT_ID, limit=5, from_user='me'):
# Check if it's the message we likely just sent
if "MCP Test Message" in msg.text:
message_id_to_test = msg.id
logger.info(f"Using message ID {message_id_to_test} for further tests.")
break
if not message_id_to_test: # Fallback if specific message not found
last_msgs = await client.get_messages(TEST_CHAT_ID, limit=1)
if last_msgs:
message_id_to_test = last_msgs[0].id
logger.warning(f"Could not find specific test message, using last message ID: {message_id_to_test}")
except Exception as e:
logger.error(f"Could not get last message ID: {e}")
if message_id_to_test:
logger.info(f"--- Running Message-Specific Tests (ID: {message_id_to_test}) ---")
await run_test(edit_message, "Edit test message", chat_id=TEST_CHAT_ID, message_id=message_id_to_test, new_text="MCP Test Message (edited)")
await run_test(get_message_context, "Get context for test message", chat_id=TEST_CHAT_ID, message_id=message_id_to_test, context_size=1)
await run_test(reply_to_message, "Reply to test message", chat_id=TEST_CHAT_ID, message_id=message_id_to_test, text="Test Reply")
await run_test(pin_message, "Pin test message", chat_id=TEST_CHAT_ID, message_id=message_id_to_test)
await asyncio.sleep(2) # Give time for pin to register
await run_test(get_pinned_messages, "Get pinned messages", chat_id=TEST_CHAT_ID)
await run_test(unpin_message, "Unpin test message", chat_id=TEST_CHAT_ID, message_id=message_id_to_test)
# Forwarding (Careful: forwards TO TEST_CHAT_ID FROM TEST_CHAT_ID)
await run_test(forward_message, "Forward test message", from_chat_id=TEST_CHAT_ID, message_id=message_id_to_test, to_chat_id=TEST_CHAT_ID)
await run_test(delete_message, "Delete test message", chat_id=TEST_CHAT_ID, message_id=message_id_to_test)
else:
logger.warning("Could not obtain a message ID for message-specific tests.")
await run_test(search_messages, "Search for 'Test' in test chat", chat_id=TEST_CHAT_ID, query="Test", limit=5)
await run_test(mark_as_read, "Mark test chat as read", chat_id=TEST_CHAT_ID)
# File Operations
logger.info("--- Running File Operation Tests ---")
await run_test(send_file, "Send test file", chat_id=TEST_CHAT_ID, file_path=TEST_FILE_PATH, caption="Test File")
await run_test(upload_file, "Upload test file", file_path=TEST_FILE_PATH)
# Find a message with media to test download/info
media_message_id = None
media_msg_obj = None
async for msg in client.iter_messages(TEST_CHAT_ID, limit=20):
if msg.media:
media_message_id = msg.id
media_msg_obj = msg
logger.info(f"Found media message ID {media_message_id} to test download/info.")
break
if media_message_id and media_msg_obj:
await run_test(get_media_info, "Get media info", chat_id=TEST_CHAT_ID, message_id=media_message_id)
# Use a more specific download path based on filename if possible
download_filename = getattr(media_msg_obj.media, 'document', None)
if download_filename:
download_filename = getattr(download_filename, 'attributes', [None])[0]
if download_filename:
download_filename = getattr(download_filename, 'file_name', None)
download_path = f"test_download_{download_filename or media_message_id}.tmp"
await run_test(download_media, "Download media", chat_id=TEST_CHAT_ID, message_id=media_message_id, file_path=download_path)
if os.path.exists(download_path):
try:
os.remove(download_path)
logger.info(f"Cleaned up downloaded file: {download_path}")
except OSError as e:
logger.error(f"Error removing downloaded file {download_path}: {e}")
else:
logger.warning(f"Could not find downloaded file to clean up: {download_path}")
else:
logger.warning("No media message found in recent history to test download/info.")
# Voice/Sticker/GIF (check paths exist)
logger.info("--- Running Media Send Tests ---")
if os.path.exists(TEST_VOICE_PATH):
await run_test(send_voice, "Send voice message", chat_id=TEST_CHAT_ID, file_path=TEST_VOICE_PATH)
# Enhanced sticker testing
logger.info("--- Running Enhanced Sticker Tests ---")
if os.path.exists(TEST_STICKER_PATH):
await run_test(send_sticker, "Send sticker file", chat_id=TEST_CHAT_ID, file_path=TEST_STICKER_PATH)
else:
logger.warning(f"Test sticker file not found at {TEST_STICKER_PATH}")
# Test sticker set retrieval
sticker_sets = await run_test(get_sticker_sets, "Get all available sticker sets")
# Try to parse sticker set info
try:
# Parse any available sticker info
if sticker_sets and "sets" in sticker_sets:
logger.info("Successfully retrieved sticker sets")
elif sticker_sets and len(sticker_sets) > 2: # JSON output has at least []
logger.info(f"Retrieved sticker sets data: {sticker_sets[:100]}...")
else:
logger.warning("No sticker sets found or empty response")
# If we're on a test account with limited stickers, we could:
# 1. Add a sticker set (not implemented in our tools)
# 2. Remove a sticker set (not implemented in our tools)
logger.info("Note: Adding/removing sticker sets requires additional tools not currently implemented")
except Exception as e:
logger.error(f"Error in sticker set processing: {e}")
# GIF testing
gif_search_result = await run_test(get_gif_search, "Search for GIFs", query="hello", limit=1)
try:
gif_ids = json.loads(gif_search_result)
if gif_ids:
gif_id = gif_ids[0]
await run_test(send_gif, "Send GIF by ID", chat_id=TEST_CHAT_ID, gif_id=gif_id)
else:
logger.warning("No GIF IDs returned from search.")
except Exception as e:
logger.warning(f"Could not parse or send GIF ID from search result: {e}")
# Mute/Archive
logger.info("--- Running Chat State Tests ---")
await run_test(mute_chat, "Mute test chat", chat_id=TEST_CHAT_ID)
await asyncio.sleep(1)
await run_test(unmute_chat, "Unmute test chat", chat_id=TEST_CHAT_ID)
await asyncio.sleep(1)
# Archive/Unarchive - Check if ToggleDialogPinRequest exists and test
if hasattr(types.messages, 'ToggleDialogPinRequest'):
logger.warning("--- Testing Archive/Unarchive (May depend on Telethon version) ---")
await run_test(archive_chat, "Archive test chat", chat_id=TEST_CHAT_ID)
await asyncio.sleep(1)
await run_test(unarchive_chat, "Unarchive test chat", chat_id=TEST_CHAT_ID)
await asyncio.sleep(1)
else:
logger.warning("ToggleDialogPinRequest not found, skipping archive/unarchive tests.")
# --- Contact Operations ---
logger.info("--- Running Contact Operations Tests ---")
await run_test(list_contacts, "List contacts")
await run_test(export_contacts, "Export contacts to JSON")
contact_to_delete_id = None
if TEST_CONTACT_PHONE and TEST_CONTACT_FNAME:
logger.warning("--- Running Add/Delete Contact Test (requires valid phone number) ---")
# Check if contact already exists
contact_exists = False
contacts_list = await list_contacts()
if TEST_CONTACT_PHONE in str(contacts_list):
logger.warning(f"Contact with phone {TEST_CONTACT_PHONE} seems to exist. Skipping add.")
# Try to find ID for deletion test anyway
try:
lines = str(contacts_list).split('\n')
for line in lines:
if TEST_CONTACT_PHONE in line:
contact_to_delete_id = int(line.split(',')[0].split(':')[1].strip())
logger.info(f"Found existing contact ID for deletion test: {contact_to_delete_id}")
break
except Exception as e:
logger.error(f"Could not parse existing contact ID: {e}")
contact_exists = True
if not contact_exists:
add_result = await run_test(add_contact, "Add test contact", phone=TEST_CONTACT_PHONE, first_name=TEST_CONTACT_FNAME, last_name=TEST_CONTACT_LNAME)
await asyncio.sleep(5) # Give time for contact to sync
if "added successfully" in str(add_result).lower():
# Try to find the added contact to get its ID for deletion
search_res = await run_test(search_contacts, "Search for added contact", query=TEST_CONTACT_PHONE)
try:
lines = str(search_res).split('\n')
for line in lines:
if TEST_CONTACT_PHONE in line and f"Name: {TEST_CONTACT_FNAME}" in line:
contact_to_delete_id = int(line.split(',')[0].split(':')[1].strip())
logger.info(f"Found added contact ID for deletion: {contact_to_delete_id}")
break
except Exception as e:
logger.error(f"Could not parse contact ID from search result: {e}")
else:
logger.warning("Add contact failed or did not return success message.")
if contact_to_delete_id:
await run_test(get_direct_chat_by_contact, "Get direct chat by contact's phone", contact_query=TEST_CONTACT_PHONE)
await run_test(get_contact_chats, "Get chats involving contact", contact_id=contact_to_delete_id)
logger.warning(f"--- Proceeding to delete contact ID: {contact_to_delete_id} ---")
await run_test(delete_contact, "Delete test contact", user_id=contact_to_delete_id)
else:
logger.warning("Could not find contact by phone number to test deletion/other ops.")
if TEST_USERNAME:
await run_test(search_contacts, "Search contacts by username", query=TEST_USERNAME)
await run_test(resolve_username, "Resolve test username", username=TEST_USERNAME)
await run_test(get_direct_chat_by_contact, "Get direct chat by test username", contact_query=TEST_USERNAME)
await run_test(get_contact_ids, "Get contact IDs")
# import_contacts test requires a list of dicts, harder to setup via env vars
# logger.warning("Skipping import_contacts test - requires manual setup.")
# await run_test(import_contacts, "Import contacts", contacts=[{'phone': '+1555...', 'first_name': ...}])
# Clarification: import_contacts test is skipped as it requires complex setup (list of dictionaries)
# which is difficult to manage solely through environment variables for automated testing.
# --- User Interaction (Requires TEST_USER_ID) ---
if TEST_USER_ID:
logger.info(f"--- Running User Interaction Tests (User ID: {TEST_USER_ID}) ---")
await run_test(get_user_status, "Get test user status", user_id=TEST_USER_ID)
await run_test(get_user_photos, "Get test user photos", user_id=TEST_USER_ID, limit=1)
# Check if the user is a contact before running contact-specific tests
is_contact = False
contacts_res = await list_contacts()
if f"ID: {TEST_USER_ID}" in str(contacts_res):
is_contact = True
logger.info(f"User {TEST_USER_ID} is a contact.")
await run_test(get_last_interaction, "Get last interaction with test user contact", contact_id=TEST_USER_ID)
await run_test(get_contact_chats, "Get chats involving test user contact", contact_id=TEST_USER_ID)
else:
logger.warning(f"User {TEST_USER_ID} is not a contact. Skipping contact-specific tests.")
# Block/Unblock
await run_test(block_user, "Block test user", user_id=TEST_USER_ID)
await asyncio.sleep(1)
await run_test(get_blocked_users, "Get blocked users (check if test user is present)")
await run_test(unblock_user, "Unblock test user", user_id=TEST_USER_ID)
await asyncio.sleep(1)
# --- Supergroup/Channel Operations (Requires TEST_SUPERGROUP_ID and TEST_USER_ID) ---
created_group_id = None
created_channel_id = None
if TEST_USER_ID:
# Create Group Test (Requires TEST_USER_ID)
logger.warning("--- Running Group/Channel Creation Test ---")
create_group_res = await run_test(create_group, "Create test group with test user", title=f"MCP Test Group {random.randint(100,999)}", user_ids=[TEST_USER_ID])
try:
if "created with ID" in create_group_res:
created_group_id = int(create_group_res.split(':')[-1].strip())
logger.info(f"Created group ID: {created_group_id}")
await asyncio.sleep(2)
logger.warning(f"--- Leaving newly created group: {created_group_id} ---")
await run_test(leave_chat, "Leave newly created group", chat_id=created_group_id)
except Exception as e:
logger.error(f"Failed to parse or leave created group: {e}")
# Create Channel Test (No additional users needed initially)
create_channel_res = await run_test(create_channel, "Create test channel", title=f"MCP Test Channel {random.randint(100,999)}", about="Test channel created by MCP", megagroup=False)
try:
if "created with ID" in create_channel_res:
created_channel_id = int(create_channel_res.split(':')[-1].strip())
logger.info(f"Created channel ID: {created_channel_id}")
await asyncio.sleep(2)
logger.warning(f"--- Leaving newly created channel: {created_channel_id} ---")
await run_test(leave_chat, "Leave newly created channel", chat_id=created_channel_id)
except Exception as e:
logger.error(f"Failed to parse or leave created channel: {e}")
if TEST_SUPERGROUP_ID:
logger.info(f"--- Running Supergroup/Channel Operations Tests (Chat ID: {TEST_SUPERGROUP_ID}) ---")
await run_test(get_chat, "Get test supergroup info", chat_id=TEST_SUPERGROUP_ID)
await run_test(get_participants, "Get participants of test supergroup", chat_id=TEST_SUPERGROUP_ID)
await run_test(get_admins, "Get admins of test supergroup", chat_id=TEST_SUPERGROUP_ID)
await run_test(get_banned_users, "Get banned users of test supergroup", chat_id=TEST_SUPERGROUP_ID)
await run_test(get_recent_actions, "Get recent actions for supergroup", chat_id=TEST_SUPERGROUP_ID)
await run_test(get_invite_link, "Get invite link for supergroup", chat_id=TEST_SUPERGROUP_ID)
await run_test(export_chat_invite, "Export chat invite for supergroup", chat_id=TEST_SUPERGROUP_ID)
if TEST_INVITE_LINK_HASH:
logger.warning(f"--- Running Join Chat by Invite Hash Test (Requires valid HASH: {TEST_INVITE_LINK_HASH}) ---")
# Extract hash if full URL is provided
invite_hash = TEST_INVITE_LINK_HASH
if invite_hash.startswith("https://t.me/+"):
invite_hash = invite_hash.split("+", 1)[1]
logger.info(f"Extracted hash from URL: {invite_hash}")
# This will handle various cases, including invalid/expired hash or already a member
import_res = await run_test(import_chat_invite, "Join chat via import hash", hash=invite_hash)
# Check if the response indicates already a member or successful join
already_member = "already a member" in import_res.lower()
success_join = "successfully joined" in import_res.lower()
logger.info(f"Invite result: {'Already a member' if already_member else 'Successfully joined' if success_join else 'Failed to join'}")
# Also test the full URL version if appropriate
if TEST_INVITE_LINK_HASH.startswith("https://"):
await run_test(join_chat_by_link, "Join chat via full link", link=TEST_INVITE_LINK_HASH)
# If we successfully joined a chat, we should leave it to clean up
if success_join and "chat:" in import_res:
try:
# Extract chat ID from success message if possible
chat_title = import_res.split("chat:", 1)[1].strip()
logger.warning(f"Attempting to find and leave newly joined chat: '{chat_title}'")
# Try to find the chat ID by matching the title
async for dialog in client.iter_dialogs(limit=10): # Check recent dialogs
if dialog.name == chat_title:
logger.info(f"Found chat to leave: {dialog.name} (ID: {dialog.id})")
await run_test(leave_chat, "Leave newly joined chat", chat_id=dialog.id)
break
except Exception as leave_err:
logger.error(f"Failed to leave newly joined chat: {leave_err}")
else:
logger.warning("TEST_INVITE_LINK_HASH not set. Skipping join/import tests.")
if TEST_USER_ID:
# Ban/Unban/Invite/Promote tests (Use with EXTREME caution)
logger.warning(f"--- Running potentially disruptive tests on supergroup {TEST_SUPERGROUP_ID} with user {TEST_USER_ID} ---")
await run_test(ban_user, "Ban test user from supergroup", chat_id=TEST_SUPERGROUP_ID, user_id=TEST_USER_ID)
await asyncio.sleep(2)
await run_test(get_banned_users, "Get banned users (check test user)", chat_id=TEST_SUPERGROUP_ID)
await run_test(unban_user, "Unban test user from supergroup", chat_id=TEST_SUPERGROUP_ID, user_id=TEST_USER_ID)
await asyncio.sleep(2)
# Ensure user is not already participant before inviting
try:
# Use a more specific filter if possible
# participants = await client.get_participants(TEST_SUPERGROUP_ID, filter=types.ChannelParticipantsSearch(q=str(TEST_USER_ID)), limit=1)
# Simpler check: iterate briefly
user_in_group = False
async for p in client.iter_participants(TEST_SUPERGROUP_ID, limit=50): # Limit search scope
if p.id == TEST_USER_ID:
user_in_group = True
break
if user_in_group:
logger.info(f"User {TEST_USER_ID} already in group {TEST_SUPERGROUP_ID}, skipping invite.")
else:
await run_test(invite_to_group, "Invite test user to supergroup", group_id=TEST_SUPERGROUP_ID, user_ids=[TEST_USER_ID])
await asyncio.sleep(2)
except UserNotParticipantError:
# This error is expected if user is not participant, proceed with invite
await run_test(invite_to_group, "Invite test user to supergroup (UserNotParticipantError caught)", group_id=TEST_SUPERGROUP_ID, user_ids=[TEST_USER_ID])
await asyncio.sleep(2)
except Exception as p_err:
logger.warning(f"Could not check participant status before invite: {p_err}. Attempting invite anyway.")
# Try inviting anyway
await run_test(invite_to_group, "Invite test user to supergroup (attempt)", group_id=TEST_SUPERGROUP_ID, user_ids=[TEST_USER_ID])
await asyncio.sleep(2)
await run_test(promote_admin, "Promote test user to admin", chat_id=TEST_SUPERGROUP_ID, user_id=TEST_USER_ID)
await asyncio.sleep(2)
await run_test(get_admins, "Get admins (check test user)", chat_id=TEST_SUPERGROUP_ID)
await run_test(demote_admin, "Demote test user from admin", chat_id=TEST_SUPERGROUP_ID, user_id=TEST_USER_ID)
await asyncio.sleep(2)
# Leave chat test needs careful consideration - don't leave accidentally!
# logger.warning(f"--- Skipping leave_chat test for TEST_SUPERGROUP_ID: {TEST_SUPERGROUP_ID} ---")
# await run_test(leave_chat, "Leave test supergroup", chat_id=TEST_SUPERGROUP_ID)
# Title/Photo Edit
original_title_res = await run_test(get_chat, "Get supergroup title before edit", chat_id=TEST_SUPERGROUP_ID)
original_title = "Unknown"
if "Title:" in str(original_title_res):
try:
original_title = str(original_title_res).split("Title:")[1].split('\n')[0].strip()
logger.info(f"Original title found: '{original_title}'")
except Exception as title_e:
logger.warning(f"Could not parse original title: {title_e}")
random_suffix = ''.join(random.choices(string.ascii_lowercase, k=4))
new_title = f"Test Title {random_suffix}"
await run_test(edit_chat_title, "Edit supergroup title", chat_id=TEST_SUPERGROUP_ID, title=new_title)
await asyncio.sleep(2)
# Restore original title if possible
if original_title != "Unknown":
await run_test(edit_chat_title, "Restore supergroup title", chat_id=TEST_SUPERGROUP_ID, title=original_title)
else:
logger.warning("Could not determine original title to restore.")
if os.path.exists(TEST_PHOTO_PATH):
await run_test(edit_chat_photo, "Edit supergroup photo", chat_id=TEST_SUPERGROUP_ID, file_path=TEST_PHOTO_PATH)
await asyncio.sleep(2)
await run_test(delete_chat_photo, "Delete supergroup photo", chat_id=TEST_SUPERGROUP_ID)
# --- Profile & Privacy (Use with EXTREME caution!) ---
logger.warning("--- Running Profile & Privacy Tests (Potentially Invasive - Mostly Skipped) ---")
# logger.warning("--- update_profile tests are commented out by default ---")
# original_bio = "" # Need to fetch current bio first if we want to restore
# await run_test(update_profile, "Update profile bio", about=f"MCP Test Bio {random.randint(100,999)}")
# await asyncio.sleep(1)
# await run_test(update_profile, "Restore profile bio", about=original_bio) # Restore to empty or original
# logger.warning("--- set/delete_profile_photo tests are commented out by default ---")
# if os.path.exists(TEST_PHOTO_PATH):
# await run_test(set_profile_photo, "Set profile photo", file_path=TEST_PHOTO_PATH)
# await asyncio.sleep(2)
# await run_test(delete_profile_photo, "Delete profile photo")
await run_test(get_privacy_settings, "Get privacy settings (last seen)")
# set_privacy_settings is complex and risky to test automatically.
logger.warning("Skipping set_privacy_settings test due to complexity and risk.")
# Example: Allow only TEST_USER_ID to see last seen (if TEST_USER_ID is set)
# if TEST_USER_ID:
# logger.warning("Testing set_privacy_settings - allowing TEST_USER_ID for last seen")
# await run_test(set_privacy_settings, "Set privacy (last seen - allow test user)", key='status_timestamp', allow_users=[TEST_USER_ID])
# await asyncio.sleep(2)
# logger.warning("Restoring default privacy for last seen")
# await run_test(set_privacy_settings, "Restore privacy (last seen - allow all)", key='status_timestamp', allow_users=[]) # Assuming empty means allow all?
# --- Bot Operations (Requires TEST_BOT_USERNAME) ---
if TEST_BOT_USERNAME:
logger.info(f"--- Running Bot Operations Tests (Bot: {TEST_BOT_USERNAME}) ---")
await run_test(get_bot_info, "Get bot info", bot_username=TEST_BOT_USERNAME)
# Check if our client is a bot before testing command setting
is_bot = False
try:
me = await client.get_me()
is_bot = getattr(me, 'bot', False)
except Exception as e:
logger.error(f"Error checking if client is a bot: {e}")
if is_bot:
# Only proceed with set_bot_commands test if we're a bot
logger.info("Client is a bot account, testing set_bot_commands")
await run_test(set_bot_commands, "Set bot commands", bot_username=TEST_BOT_USERNAME,
commands=[{'command': 'mcp_test', 'description': 'MCP Test Command'}])
await asyncio.sleep(2)
await run_test(set_bot_commands, "Clear bot commands", bot_username=TEST_BOT_USERNAME, commands=[])
else:
# Skip the set_bot_commands test if we're not a bot
logger.warning("Client is a regular user account, not a bot. Skipping set_bot_commands test.")
logger.info("Note: The set_bot_commands function can only be used by bot accounts.")
else:
logger.warning("TEST_BOT_USERNAME not set. Skipping bot tests.")
# --- Other Operations ---
logger.info("--- Running Other Operations Tests ---")
await run_test(search_public_chats, "Search public chats for 'bot'", query="bot")
await run_test(get_sticker_sets, "Get sticker sets")
# Final check for remaining tools that haven't been explicitly tested
logger.info("--- Testing Remaining Tools ---")
# Test the archive/unarchive chat functions if not already tested
if TEST_CHAT_ID:
try:
# Only run if we haven't tested these already
await run_test(archive_chat, "Archive test chat (final check)", chat_id=TEST_CHAT_ID)
await asyncio.sleep(1)
await run_test(unarchive_chat, "Unarchive test chat (final check)", chat_id=TEST_CHAT_ID)
except Exception as e:
logger.warning(f"Archive/unarchive test failed: {e}")
logger.info("--- All Tests Completed ---")
if __name__ == "__main__":
nest_asyncio.apply()
async def main():
try:
logger.info("Starting Telegram client for testing...")
# Ensure client is started and authorized
await client.start()
if not await client.is_user_authorized():
logger.error("Client authorization failed. Please run main.py interactively first.")
sys.exit(1)
await run_all_tests()
except Exception as e:
logger.critical(f"Critical error during test execution: {e}", exc_info=True)
sys.exit(1)
finally:
if client.is_connected():
logger.info("Disconnecting Telegram client...")
await client.disconnect()
asyncio.run(main())