diff --git a/.env.example b/.env.example index 41cbf8b..8af8654 100644 --- a/.env.example +++ b/.env.example @@ -1,10 +1,9 @@ -# Your Telegram API credentials from https://my.telegram.org/apps +# Telegram API Credentials (Required - get from https://my.telegram.org/apps) TELEGRAM_API_ID=123456 TELEGRAM_API_HASH=0123456789abcdef0123456789abcdef -# Session name (can be any name you choose) +# Session Management (Choose ONE) +# Option 1: File-based session (a .session file will be created) TELEGRAM_SESSION_NAME=telegram_session - -# Optional: Session string for portable authentication (leave empty if not using) -# This will be generated by the session_string_generator.py script -TELEGRAM_SESSION_STRING= \ No newline at end of file +# Option 2: String-based session (if you generate one, e.g., using Telethon's string session generator) +TELEGRAM_SESSION_STRING=1231231232erfdfdffd diff --git a/.gitignore b/.gitignore index 1bcc19d..733c724 100644 --- a/.gitignore +++ b/.gitignore @@ -85,7 +85,7 @@ ipython_config.py # pyenv # For a library or package, you might want to ignore these files since the code is # intended to run in multiple environments; otherwise, check them in: -# .python-version +.python-version # pipenv # According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. @@ -185,3 +185,20 @@ anon_new.session-journal # Claude Desktop config claude_desktop_config.json +.DS_Store + +# Test files +.cursor/ +tests/ +test.py +extract_test_issues.py +*.tmp +mcp_errors.log +telegram_test.log +test_issues_report.md + +# Temporary data files +test_upload.txt +test_voice.ogg +sticker.webp +two.png \ No newline at end of file diff --git a/README.md b/README.md index c808310..f09885b 100644 --- a/README.md +++ b/README.md @@ -1,68 +1,165 @@ -# Telegram MCP for Claude +# Telegram MCP Server ![MCP Badge](https://badge.mcpx.dev) [![License: Apache 2.0](https://img.shields.io/badge/license-Apache%202.0-green?style=flat-square)](https://opensource.org/licenses/Apache-2.0) -A powerful Telegram integration for Claude via the Model Context Protocol (MCP), allowing you to interact with your Telegram account directly from Claude Desktop. +--- + +## ๐Ÿค– MCP in Action + +Here's a demonstration of the Telegram MCP capabilities in Claude: + + **Basic usage example:** ![Telegram MCP in action](screenshots/1.png) -## ๐Ÿš€ Features +1. **Example: Asking Claude to analyze chat history and send a response:** -This MCP server provides a comprehensive suite of tools for seamless Telegram interaction: +![Telegram MCP Request](screenshots/2.png) -### Chat Management -- **get_chats** - Get a paginated list of your chats -- **list_chats** - List all chats with detailed metadata and filtering options -- **get_chat** - Get detailed information about a specific chat +2. **Successfully sent message to the group:** + +![Telegram MCP Result](screenshots/3.png) + +As you can see, the AI can seamlessly interact with your Telegram account, retrieving and displaying your chats, messages, and other data in a natural way. + +--- + +A full-featured Telegram integration for Claude, Cursor, and any MCP-compatible client, powered by [Telethon](https://docs.telethon.dev/) and the [Model Context Protocol (MCP)](https://modelcontextprotocol.io/). This project lets you interact with your Telegram account programmatically, automating everything from messaging to group management. + + +--- + +## ๐Ÿš€ Features & Tools + +This MCP server exposes a huge suite of Telegram tools. **Every major Telegram/Telethon feature is available as a tool!** + +### Chat & Group Management +- **get_chats(page, page_size)**: Paginated list of chats +- **list_chats(chat_type, limit)**: List chats with metadata and filtering +- **get_chat(chat_id)**: Detailed info about a chat +- **create_group(title, user_ids)**: Create a new group +- **create_channel(title, about, megagroup)**: Create a channel or supergroup +- **edit_chat_title(chat_id, title)**: Change chat/group/channel title +- **edit_chat_photo(chat_id, file_path)**: Set chat/group/channel photo +- **delete_chat_photo(chat_id)**: Remove chat/group/channel photo +- **leave_chat(chat_id)**: Leave a group or channel +- **get_participants(chat_id)**: List all participants +- **get_admins(chat_id)**: List all admins +- **get_banned_users(chat_id)**: List all banned users +- **promote_admin(chat_id, user_id)**: Promote user to admin +- **demote_admin(chat_id, user_id)**: Demote admin to user +- **ban_user(chat_id, user_id)**: Ban user +- **unban_user(chat_id, user_id)**: Unban user +- **get_invite_link(chat_id)**: Get invite link +- **export_chat_invite(chat_id)**: Export invite link +- **import_chat_invite(hash)**: Join chat by invite hash +- **join_chat_by_link(link)**: Join chat by invite link ### Messaging -- **get_messages** - Get messages from a specific chat with pagination -- **list_messages** - Retrieve messages with powerful filtering (text search, date ranges) -- **send_message** - Send messages to any chat -- **get_message_context** - View the context around a specific message +- **get_messages(chat_id, page, page_size)**: Paginated messages +- **list_messages(chat_id, limit, search_query, from_date, to_date)**: Filtered messages +- **send_message(chat_id, message)**: Send a message +- **reply_to_message(chat_id, message_id, text)**: Reply to a message +- **edit_message(chat_id, message_id, new_text)**: Edit your message +- **delete_message(chat_id, message_id)**: Delete a message +- **forward_message(from_chat_id, message_id, to_chat_id)**: Forward a message +- **pin_message(chat_id, message_id)**: Pin a message +- **unpin_message(chat_id, message_id)**: Unpin a message +- **mark_as_read(chat_id)**: Mark all as read +- **get_message_context(chat_id, message_id, context_size)**: Context around a message +- **get_history(chat_id, limit)**: Full chat history +- **get_pinned_messages(chat_id)**: List pinned messages ### Contact Management -- **search_contacts** - Find contacts by name, username or phone number -- **get_direct_chat_by_contact** - Find personal chats with specific contacts -- **get_contact_chats** - List all chats (including groups) involving a contact -- **get_last_interaction** - View your most recent exchanges with a contact +- **list_contacts()**: List all contacts +- **search_contacts(query)**: Search contacts +- **add_contact(phone, first_name, last_name)**: Add a contact +- **delete_contact(user_id)**: Delete a contact +- **block_user(user_id)**: Block a user +- **unblock_user(user_id)**: Unblock a user +- **import_contacts(contacts)**: Bulk import contacts +- **export_contacts()**: Export all contacts as JSON +- **get_blocked_users()**: List blocked users +- **get_contact_ids()**: List all contact IDs +- **get_direct_chat_by_contact(contact_query)**: Find direct chat with a contact +- **get_contact_chats(contact_id)**: List all chats with a contact +- **get_last_interaction(contact_id)**: Most recent message with a contact + +### User & Profile +- **get_me()**: Get your user info +- **update_profile(first_name, last_name, about)**: Update your profile +- **set_profile_photo(file_path)**: Set your profile photo +- **delete_profile_photo()**: Remove your profile photo +- **get_user_photos(user_id, limit)**: Get a user's profile photos +- **get_user_status(user_id)**: Get a user's online status + +### Media +- **send_file(chat_id, file_path, caption)**: Send a file +- **send_voice(chat_id, file_path)**: Send a voice message +- **download_media(chat_id, message_id, file_path)**: Download media +- **upload_file(file_path)**: Upload a file to Telegram servers +- **get_media_info(chat_id, message_id)**: Get info about media in a message + +### Search & Discovery +- **search_public_chats(query)**: Search public chats/channels/bots +- **search_messages(chat_id, query, limit)**: Search messages in a chat +- **resolve_username(username)**: Resolve a username to ID + +### Stickers, GIFs, Bots +- **get_sticker_sets()**: List sticker sets +- **send_sticker(chat_id, file_path)**: Send a sticker +- **get_gif_search(query, limit)**: Search for GIFs +- **send_gif(chat_id, gif_id)**: Send a GIF +- **get_bot_info(bot_username)**: Get info about a bot +- **set_bot_commands(bot_username, commands)**: Set bot commands (bot accounts only) + +### Privacy, Settings, and Misc +- **get_privacy_settings()**: Get privacy settings +- **set_privacy_settings(key, allow_users, disallow_users)**: Set privacy settings +- **mute_chat(chat_id)**: Mute notifications +- **unmute_chat(chat_id)**: Unmute notifications +- **archive_chat(chat_id)**: Archive a chat +- **unarchive_chat(chat_id)**: Unarchive a chat +- **get_recent_actions(chat_id)**: Get recent admin actions + +--- ## ๐Ÿ“‹ Requirements - - Python 3.10+ -- [Telethon](https://docs.telethon.dev/) for Telegram API access +- [Telethon](https://docs.telethon.dev/) - [MCP Python SDK](https://modelcontextprotocol.io/docs/) -- [UV](https://astral.sh/uv/) package manager -- [Claude Desktop](https://claude.ai/desktop) app +- [Claude Desktop](https://claude.ai/desktop) or [Cursor](https://cursor.so/) (or any MCP client) -## ๐Ÿ”ง Installation +--- -### 1. Clone the Repository +## ๐Ÿ”ง Installation & Setup + +### 1. Fork & Clone ```bash -git clone https://github.com/chigwell/telegram-mcp-server -cd telegram-mcp-server +git clone https://github.com/chigwell/telegram-mcp.git +cd telegram-mcp ``` -### 2. Generate Session String - -For better security and portability, this project uses Telethon's StringSession. Generate your session string: +### 2. Create a Virtual Environment ```bash -python session_string_generator.py +python3 -m venv .venv +source .venv/bin/activate # On Windows: .venv\Scripts\activate +pip install -r requirements.txt ``` -This will: -1. Ask for your phone number -2. Send a verification code to your Telegram app -3. Generate a session string and add it to your `.env` file +### 3. Generate a Session String -The session string allows authentication without storing SQLite session files, which helps avoid database lock issues and improves portability. +```bash +python3 session_string_generator.py +``` +Follow the prompts to authenticate and update your `.env` file. -### 3. Set Up Your Environment +### 4. Configure .env -Create a `.env` file with your Telegram credentials: +Copy `.env.example` to `.env` and fill in your values: ``` TELEGRAM_API_ID=your_api_id_here @@ -70,89 +167,380 @@ TELEGRAM_API_HASH=your_api_hash_here TELEGRAM_SESSION_NAME=anon TELEGRAM_SESSION_STRING=your_session_string_here ``` +Get your API credentials at [my.telegram.org/apps](https://my.telegram.org/apps). -You can obtain API credentials at [my.telegram.org/apps](https://my.telegram.org/apps). +--- -### 4. Install Dependencies +## โš™๏ธ Configuration for Claude & Cursor -```bash -uv venv -source .venv/bin/activate # On Windows: .venv\Scripts\activate -uv add "mcp[cli]" telethon python-dotenv nest_asyncio -``` - -### 5. Configure Claude Desktop - -#### On macOS/Linux: -Edit `~/Library/Application Support/Claude/claude_desktop_config.json`: +### MCP Configuration +Edit your Claude desktop config (e.g. `~/Library/Application Support/Claude/claude_desktop_config.json`) or Cursor config (`~/.cursor/mcp.json`): ```json { - "mcpServers": { - "telegram-mcp": { - "command": "/full/path/to/uv", - "args": [ - "--directory", - "/full/path/to/telegram-mcp-server", - "run", - "main.py" - ] - } + "mcpServers": { + "telegram-mcp": { + "command": "uv", + "args": [ + "--directory", + "/full/path/to/telegram-mcp-server", + "run", + "main.py" + ] } + } } ``` -#### On Windows: -Edit `%APPDATA%\Claude\claude_desktop_config.json` with similar configuration. +## ๐Ÿ“ Tool Examples with Code & Output + +Below are examples of the most commonly used tools with their implementation and sample output. + +### Getting Your Chats + +```python +@mcp.tool() +async def get_chats(page: int = 1, page_size: int = 20) -> str: + """ + Get a paginated list of chats. + Args: + page: Page number (1-indexed). + page_size: Number of chats per page. + """ + try: + dialogs = await client.get_dialogs() + start = (page - 1) * page_size + end = start + page_size + if start >= len(dialogs): + return "Page out of range." + chats = dialogs[start:end] + lines = [] + for dialog in chats: + entity = dialog.entity + chat_id = entity.id + title = getattr(entity, "title", None) or getattr(entity, "first_name", "Unknown") + lines.append(f"Chat ID: {chat_id}, Title: {title}") + return "\n".join(lines) + except Exception as e: + logger.exception(f"get_chats failed (page={page}, page_size={page_size})") + return "An error occurred (code: GETCHATS-ERR-001). Check mcp_errors.log for details." +``` + +Example output: +``` +Chat ID: 123456789, Title: John Doe +Chat ID: -100987654321, Title: My Project Group +Chat ID: 111223344, Title: Jane Smith +Chat ID: -200123456789, Title: News Channel +``` + +### Sending Messages + +```python +@mcp.tool() +async def send_message(chat_id: int, message: str) -> str: + """ + Send a message to a specific chat. + Args: + chat_id: The ID of the chat. + message: The message content to send. + """ + try: + entity = await client.get_entity(chat_id) + await client.send_message(entity, message) + return "Message sent successfully." + except Exception as e: + logger.exception(f"send_message failed (chat_id={chat_id})") + return "An error occurred (code: SENDMSG-ERR-001). Check mcp_errors.log for details." +``` + +Example output: +``` +Message sent successfully. +``` + +### Getting Chat Invite Links + +The `get_invite_link` function is particularly robust with multiple fallback methods: + +```python +@mcp.tool() +async def get_invite_link(chat_id: int) -> str: + """ + Get the invite link for a group or channel. + """ + try: + entity = await client.get_entity(chat_id) + + # Try using ExportChatInviteRequest first + try: + from telethon.tl import functions + result = await client(functions.messages.ExportChatInviteRequest( + peer=entity + )) + return result.link + except AttributeError: + # If the function doesn't exist in the current Telethon version + logger.warning("ExportChatInviteRequest not available, using alternative method") + except Exception as e1: + # If that fails, log and try alternative approach + logger.warning(f"ExportChatInviteRequest failed: {e1}") + + # Alternative approach using client.export_chat_invite_link + try: + invite_link = await client.export_chat_invite_link(entity) + return invite_link + except Exception as e2: + logger.warning(f"export_chat_invite_link failed: {e2}") + + # Last resort: Try directly fetching chat info + try: + if isinstance(entity, (Chat, Channel)): + full_chat = await client(functions.messages.GetFullChatRequest( + chat_id=entity.id + )) + if hasattr(full_chat, 'full_chat') and hasattr(full_chat.full_chat, 'invite_link'): + return full_chat.full_chat.invite_link or "No invite link available." + except Exception as e3: + logger.warning(f"GetFullChatRequest failed: {e3}") + + return "Could not retrieve invite link for this chat." + except Exception as e: + logger.exception(f"get_invite_link failed (chat_id={chat_id})") + return f"Error getting invite link: {e}" +``` + +Example output: +``` +https://t.me/+AbCdEfGhIjKlMnOp +``` + +### Joining Chats via Invite Links + +```python +@mcp.tool() +async def join_chat_by_link(link: str) -> str: + """ + Join a chat by invite link. + """ + try: + # Extract the hash from the invite link + if '/' in link: + hash_part = link.split('/')[-1] + if hash_part.startswith('+'): + hash_part = hash_part[1:] # Remove the '+' if present + else: + hash_part = link + + # Try checking the invite before joining + try: + # Try to check invite info first (will often fail if not a member) + invite_info = await client(functions.messages.CheckChatInviteRequest(hash=hash_part)) + if hasattr(invite_info, 'chat') and invite_info.chat: + # If we got chat info, we're already a member + chat_title = getattr(invite_info.chat, 'title', 'Unknown Chat') + return f"You are already a member of this chat: {chat_title}" + except Exception: + # This often fails if not a member - just continue + pass + + # Join the chat using the hash + result = await client(functions.messages.ImportChatInviteRequest(hash=hash_part)) + if result and hasattr(result, 'chats') and result.chats: + chat_title = getattr(result.chats[0], 'title', 'Unknown Chat') + return f"Successfully joined chat: {chat_title}" + return f"Joined chat via invite hash." + except Exception as e: + err_str = str(e).lower() + if "expired" in err_str: + return "The invite hash has expired and is no longer valid." + elif "invalid" in err_str: + return "The invite hash is invalid or malformed." + elif "already" in err_str and "participant" in err_str: + return "You are already a member of this chat." + logger.exception(f"join_chat_by_link failed (link={link})") + return f"Error joining chat: {e}" +``` + +Example output: +``` +Successfully joined chat: Developer Community +``` + +### Searching Public Chats + +```python +@mcp.tool() +async def search_public_chats(query: str) -> str: + """ + Search for public chats, channels, or bots by username or title. + """ + try: + result = await client(functions.contacts.SearchRequest(q=query, limit=20)) + return json.dumps([format_entity(u) for u in result.users], indent=2) + except Exception as e: + return f"Error searching public chats: {e}" +``` + +Example output: +```json +[ + { + "id": 123456789, + "name": "TelegramBot", + "type": "user", + "username": "telegram_bot" + }, + { + "id": 987654321, + "name": "Telegram News", + "type": "user", + "username": "telegram_news" + } +] +``` + +### Getting Direct Chats with Contacts + +```python +@mcp.tool() +async def get_direct_chat_by_contact(contact_query: str) -> str: + """ + Find a direct chat with a specific contact by name, username, or phone. + + Args: + contact_query: Name, username, or phone number to search for. + """ + try: + # Fetch all contacts using the correct Telethon method + result = await client(functions.contacts.GetContactsRequest(hash=0)) + contacts = result.users + found_contacts = [] + for contact in contacts: + if not contact: + continue + name = f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip() + username = getattr(contact, 'username', '') + phone = getattr(contact, 'phone', '') + if (contact_query.lower() in name.lower() or + (username and contact_query.lower() in username.lower()) or + (phone and contact_query in phone)): + found_contacts.append(contact) + if not found_contacts: + return f"No contacts found matching '{contact_query}'." + # If we found contacts, look for direct chats with them + results = [] + dialogs = await client.get_dialogs() + for contact in found_contacts: + contact_name = f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip() + for dialog in dialogs: + if isinstance(dialog.entity, User) and dialog.entity.id == contact.id: + chat_info = f"Chat ID: {dialog.entity.id}, Contact: {contact_name}" + if getattr(contact, 'username', ''): + chat_info += f", Username: @{contact.username}" + if dialog.unread_count: + chat_info += f", Unread: {dialog.unread_count}" + results.append(chat_info) + break + + if not results: + return f"Found contacts matching '{contact_query}', but no direct chats with them." + + return "\n".join(results) + except Exception as e: + return f"Error searching for direct chat: {e}" +``` + +Example output: +``` +Chat ID: 123456789, Contact: John Smith, Username: @johnsmith, Unread: 3 +``` + +--- ## ๐ŸŽฎ Usage Examples -Here are some ways to interact with Telegram through Claude: +- "Show my recent chats" +- "Send 'Hello world' to chat 123456789" +- "Add contact with phone +1234567890, name John Doe" +- "Create a group 'Project Team' with users 111, 222, 333" +- "Download the media from message 42 in chat 123456789" +- "Mute notifications for chat 123456789" +- "Promote user 111 to admin in group 123456789" +- "Search for public channels about 'news'" +- "Join the Telegram group with invite link https://t.me/+AbCdEfGhIjK" +- "Send a sticker to my Saved Messages" +- "Get all my sticker sets" -### Basic Chat Navigation -- "Show me my most recent chats" -- "List my group chats with unread messages" -- "Show detailed information about chat 123456789" +You can use these tools via natural language in Claude, Cursor, or any MCP-compatible client. -### Messaging -- "Show me the last 10 messages from chat 123456789" -- "Send 'I'll be there in 10 minutes' to chat 123456789" -- "Find messages containing 'meeting' in chat 123456789" -- "Show messages from March 1-15, 2023 in chat 123456789" +--- -### Contact Interactions -- "Search for contacts named 'Alex'" -- "Find my direct chat with John" -- "Show all chats where I interact with contact 987654321" -- "Show my last conversation with Lisa" +## ๐Ÿง  Error Handling & Robustness -### Advanced Features -- "Show the context around message 42 in chat 123456789" -- "List all channels I'm subscribed to" +This implementation includes comprehensive error handling: + +- **Session management**: Works with both file-based and string-based sessions +- **Error reporting**: Detailed errors logged to `mcp_errors.log` +- **Graceful degradation**: Multiple fallback approaches for critical functions +- **User-friendly messages**: Clear, actionable error messages instead of technical errors +- **Account type detection**: Functions that require bot accounts detect and notify when used with user accounts +- **Invite link processing**: Handles various link formats and already-member cases + +The code is designed to be robust against common Telegram API issues and limitations. + +--- + +## ๐Ÿ› ๏ธ Contribution Guide + +1. **Fork this repo:** [chigwell/telegram-mcp](https://github.com/chigwell/telegram-mcp) +2. **Clone your fork:** + ```bash + git clone https://github.com//telegram-mcp.git + ``` +3. **Create a new branch:** + ```bash + git checkout -b my-feature + ``` +4. **Make your changes, add tests/docs if needed.** +5. **Push and open a Pull Request** to [chigwell/telegram-mcp](https://github.com/chigwell/telegram-mcp) with a clear description. +6. **Tag @chigwell or @l1v0n1** in your PR for review. + +--- ## ๐Ÿ”’ Security Considerations +- **Never commit your `.env` or session string.** +- The session string gives full access to your Telegram accountโ€”keep it safe! +- All processing is local; no data is sent anywhere except Telegram's API. +- Use `.env.example` as a template and keep your actual `.env` file private. +- Test files are automatically excluded in `.gitignore`. -- **Private API Keys**: Never commit your `.env` file or session files to Git repositories -- **Session String**: The session string in your `.env` file provides full access to your Telegram account. Keep it secure. -- **Local Processing**: All Telegram data is processed locally on your machine - no data is sent to external servers beyond Telegram's own API. -- **Permissions**: The MCP server has the same access to Telegram as you would have with the official app, including reading and sending messages. +--- ## ๐Ÿ› ๏ธ Troubleshooting +- **Check logs** in your MCP client (Claude/Cursor) and the terminal for errors. +- **Detailed error logs** can be found in `mcp_errors.log`. +- **Interpreter errors?** Make sure your `.venv` is created and selected. +- **Database lock?** Use session string authentication, not file-based sessions. +- **iCloud/Dropbox issues?** Move your project to a local path without spaces if you see odd errors. +- **Regenerate session string** if you change your Telegram password or see auth errors. +- **Bot-only functions** will show clear messages when used with regular user accounts. +- **Test script failures?** Check test configuration in `.env` for valid test accounts/groups. -If you encounter issues: - -1. Check Claude Desktop logs for error messages -2. Ensure your Telegram API credentials are correct -3. Verify that the paths in your Claude Desktop config are absolute and correct -4. If you see database lock errors, use the session string authentication method -5. If you need to regenerate your session string, run `python session_string_generator.py` again +--- ## ๐Ÿ“„ License This project is licensed under the [Apache 2.0 License](LICENSE). -## ๐Ÿ™ Acknowledgements +--- -- [Telethon](https://github.com/LonamiWebs/Telethon) for the Telegram client library -- [Model Context Protocol](https://modelcontextprotocol.io/) for the integration framework -- [Anthropic](https://www.anthropic.com/) for Claude and the Claude Desktop app +## ๐Ÿ™ Acknowledgements +- [Telethon](https://github.com/LonamiWebs/Telethon) +- [Model Context Protocol](https://modelcontextprotocol.io/) +- [Claude](https://www.anthropic.com/) and [Cursor](https://cursor.so/) +- [chigwell/telegram-mcp](https://github.com/chigwell/telegram-mcp) (upstream) + +--- + +**Maintained by @chigwell and @l1v0n1. PRs welcome!** diff --git a/main.py b/main.py index ca52b77..c7899df 100644 --- a/main.py +++ b/main.py @@ -1,20 +1,38 @@ import os import sys -import time -from dotenv import load_dotenv -import asyncio -import nest_asyncio -from mcp.server.fastmcp import FastMCP -from telethon import TelegramClient -from telethon.sessions import StringSession -import sqlite3 -from telethon import utils -from telethon.tl.types import User, Chat, Channel -from telethon.tl.functions.contacts import SearchRequest -from datetime import datetime, timedelta import json +import time +import asyncio +import sqlite3 +import logging +import mimetypes +from datetime import datetime, timedelta from typing import List, Dict, Optional, Union, Any +# Third-party libraries +import nest_asyncio +from dotenv import load_dotenv +from mcp.server.fastmcp import FastMCP +from telethon import TelegramClient, functions, utils +from telethon.sessions import StringSession +from telethon.tl.types import ( + User, Chat, Channel, + ChatAdminRights, ChatBannedRights, + ChannelParticipantsKicked, ChannelParticipantsAdmins, + InputChatPhoto, InputChatUploadedPhoto, InputChatPhotoEmpty, + InputPeerUser, InputPeerChat, InputPeerChannel +) +import telethon.errors.rpcerrorlist + +def json_serializer(obj): + """Helper function to convert non-serializable objects for JSON serialization.""" + if isinstance(obj, datetime): + return obj.isoformat() + if isinstance(obj, bytes): + return obj.decode('utf-8', errors='replace') + # Add other non-serializable types as needed + raise TypeError(f"Object of type {type(obj)} is not JSON serializable") + load_dotenv() TELEGRAM_API_ID = int(os.getenv("TELEGRAM_API_ID")) @@ -33,6 +51,83 @@ else: # Use file-based session client = TelegramClient(TELEGRAM_SESSION_NAME, TELEGRAM_API_ID, TELEGRAM_API_HASH) +# Setup robust logging with both file and console output +logger = logging.getLogger("telegram_mcp") +logger.setLevel(logging.ERROR) # Set to ERROR for production, INFO for debugging + +# Create console handler +console_handler = logging.StreamHandler() +console_handler.setLevel(logging.ERROR) # Set to ERROR for production, INFO for debugging + +# Create file handler with absolute path +script_dir = os.path.dirname(os.path.abspath(__file__)) +log_file_path = os.path.join(script_dir, "mcp_errors.log") + +try: + file_handler = logging.FileHandler(log_file_path, mode='a') # Append mode + file_handler.setLevel(logging.ERROR) + + # Create formatter and add to handlers + formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(name)s - %(message)s - %(filename)s:%(lineno)d') + console_handler.setFormatter(formatter) + file_handler.setFormatter(formatter) + + # Add handlers to logger + logger.addHandler(console_handler) + logger.addHandler(file_handler) + + logger.info(f"Logging initialized to {log_file_path}") +except Exception as log_error: + print(f"WARNING: Error setting up log file: {log_error}") + # Fallback to console-only logging + logger.addHandler(console_handler) + logger.error(f"Failed to set up log file handler: {log_error}") + +# Error code prefix mapping for better error tracing +ERROR_PREFIXES = { + "chat": "CHAT", + "msg": "MSG", + "contact": "CONTACT", + "group": "GROUP", + "media": "MEDIA", + "profile": "PROFILE", + "auth": "AUTH", + "admin": "ADMIN" +} + +def log_and_format_error(function_name: str, error: Exception, prefix: str = None, **kwargs) -> str: + """ + Centralized error handling function that logs the error and returns a formatted user-friendly message. + + Args: + function_name: Name of the function where error occurred + error: The exception that was raised + prefix: Error code prefix (e.g., "CHAT", "MSG") - if None, will be derived from function_name + **kwargs: Additional context parameters to include in log + + Returns: + A user-friendly error message with error code + """ + # Generate a consistent error code + if prefix is None: + # Try to derive prefix from function name + for key, value in ERROR_PREFIXES.items(): + if key in function_name.lower(): + prefix = value + break + if prefix is None: + prefix = "GEN" # Generic prefix if none matches + + error_code = f"{prefix}-ERR-{abs(hash(function_name)) % 1000:03d}" + + # Format the additional context parameters + context = ", ".join(f"{k}={v}" for k, v in kwargs.items()) + + # Log the full technical error + logger.exception(f"{function_name} failed ({context}): {error}") + + # Return a user-friendly message + return f"An error occurred (code: {error_code}). Check mcp_errors.log for details." def format_entity(entity) -> Dict[str, Any]: """Helper function to format entity information consistently.""" @@ -79,32 +174,32 @@ def format_message(message) -> Dict[str, Any]: async def get_chats(page: int = 1, page_size: int = 20) -> str: """ Get a paginated list of chats. - Args: page: Page number (1-indexed). page_size: Number of chats per page. """ - dialogs = await client.get_dialogs() - start = (page - 1) * page_size - end = start + page_size - if start >= len(dialogs): - return "Page out of range." - chats = dialogs[start:end] - lines = [] - for dialog in chats: - # For groups or channels, use the title; for users, show first name. - entity = dialog.entity - chat_id = entity.id - title = getattr(entity, "title", None) or getattr(entity, "first_name", "Unknown") - lines.append(f"Chat ID: {chat_id}, Title: {title}") - return "\n".join(lines) + try: + dialogs = await client.get_dialogs() + start = (page - 1) * page_size + end = start + page_size + if start >= len(dialogs): + return "Page out of range." + chats = dialogs[start:end] + lines = [] + for dialog in chats: + entity = dialog.entity + chat_id = entity.id + title = getattr(entity, "title", None) or getattr(entity, "first_name", "Unknown") + lines.append(f"Chat ID: {chat_id}, Title: {title}") + return "\n".join(lines) + except Exception as e: + return log_and_format_error("get_chats", e) @mcp.tool() async def get_messages(chat_id: int, page: int = 1, page_size: int = 20) -> str: """ Get paginated messages from a specific chat. - Args: chat_id: The ID of the chat. page: Page number (1-indexed). @@ -112,79 +207,100 @@ async def get_messages(chat_id: int, page: int = 1, page_size: int = 20) -> str: """ try: entity = await client.get_entity(chat_id) + offset = (page - 1) * page_size + messages = await client.get_messages(entity, limit=page_size, add_offset=offset) + if not messages: + return "No messages found for this page." + lines = [] + for msg in messages: + lines.append(f"ID: {msg.id} | Date: {msg.date} | Message: {msg.message}") + return "\n".join(lines) except Exception as e: - return f"Could not resolve chat with ID {chat_id}: {e}" - - offset = (page - 1) * page_size - messages = await client.get_messages(entity, limit=page_size, add_offset=offset) - if not messages: - return "No messages found for this page." - lines = [] - for msg in messages: - lines.append(f"ID: {msg.id} | Date: {msg.date} | Message: {msg.message}") - return "\n".join(lines) + return log_and_format_error("get_messages", e, chat_id=chat_id, page=page, page_size=page_size) @mcp.tool() async def send_message(chat_id: int, message: str) -> str: """ Send a message to a specific chat. - Args: chat_id: The ID of the chat. message: The message content to send. """ try: entity = await client.get_entity(chat_id) - except Exception as e: - return f"Could not resolve chat with ID {chat_id}: {e}" - - try: await client.send_message(entity, message) return "Message sent successfully." except Exception as e: - return f"Failed to send message: {e}" + return log_and_format_error("send_message", e, chat_id=chat_id) + + +@mcp.tool() +async def list_contacts() -> str: + """ + List all contacts in your Telegram account. + """ + try: + result = await client(functions.contacts.GetContactsRequest(hash=0)) + users = result.users + if not users: + return "No contacts found." + lines = [] + for user in users: + name = f"{getattr(user, 'first_name', '')} {getattr(user, 'last_name', '')}".strip() + username = getattr(user, 'username', '') + phone = getattr(user, 'phone', '') + contact_info = f"ID: {user.id}, Name: {name}" + if username: + contact_info += f", Username: @{username}" + if phone: + contact_info += f", Phone: {phone}" + lines.append(contact_info) + return "\n".join(lines) + except Exception as e: + return log_and_format_error("list_contacts", e) @mcp.tool() async def search_contacts(query: str) -> str: """ - Search for contacts by name or phone number. - + Search for contacts by name, username, or phone number using Telethon's SearchRequest. Args: - query: The search term to look for in contact names or phone numbers. + query: The search term to look for in contact names, usernames, or phone numbers. """ try: - # Search in your contacts - contacts = await client.get_contacts() - results = [] - - for contact in contacts: - if not contact: - continue - - name = f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip() - username = getattr(contact, 'username', '') - phone = getattr(contact, 'phone', '') - - if (query.lower() in name.lower() or - (username and query.lower() in username.lower()) or - (phone and query in phone)): - - contact_info = f"ID: {contact.id}, Name: {name}" - if username: - contact_info += f", Username: @{username}" - if phone: - contact_info += f", Phone: {phone}" - - results.append(contact_info) - - if not results: + result = await client(functions.contacts.SearchRequest(q=query, limit=50)) + users = result.users + if not users: return f"No contacts found matching '{query}'." - - return "\n".join(results) + lines = [] + for user in users: + name = f"{getattr(user, 'first_name', '')} {getattr(user, 'last_name', '')}".strip() + username = getattr(user, 'username', '') + phone = getattr(user, 'phone', '') + contact_info = f"ID: {user.id}, Name: {name}" + if username: + contact_info += f", Username: @{username}" + if phone: + contact_info += f", Phone: {phone}" + lines.append(contact_info) + return "\n".join(lines) except Exception as e: - return f"Error searching contacts: {e}" + return log_and_format_error("search_contacts", e, query=query) + + +@mcp.tool() +async def get_contact_ids() -> str: + """ + Get all contact IDs in your Telegram account. + """ + try: + result = await client(functions.contacts.GetContactIDsRequest(hash=0)) + if not result: + return "No contact IDs found." + return "Contact IDs: " + ", ".join(str(cid) for cid in result) + except Exception as e: + return log_and_format_error("get_contact_ids", e) @mcp.tool() @@ -210,14 +326,29 @@ async def list_messages(chat_id: int, limit: int = 20, search_query: str = None, if from_date: try: from_date_obj = datetime.strptime(from_date, "%Y-%m-%d") + # Make it timezone aware by adding UTC timezone info + # Use datetime.timezone.utc for Python 3.9+ or import timezone directly for 3.13+ + try: + # For Python 3.9+ + from_date_obj = from_date_obj.replace(tzinfo=datetime.timezone.utc) + except AttributeError: + # For Python 3.13+ + from datetime import timezone + from_date_obj = from_date_obj.replace(tzinfo=timezone.utc) except ValueError: return f"Invalid from_date format. Use YYYY-MM-DD." if to_date: try: to_date_obj = datetime.strptime(to_date, "%Y-%m-%d") - # Set to end of day + # Set to end of day and make timezone aware to_date_obj = to_date_obj + timedelta(days=1, microseconds=-1) + # Add timezone info + try: + to_date_obj = to_date_obj.replace(tzinfo=datetime.timezone.utc) + except AttributeError: + from datetime import timezone + to_date_obj = to_date_obj.replace(tzinfo=timezone.utc) except ValueError: return f"Invalid to_date format. Use YYYY-MM-DD." @@ -253,7 +384,7 @@ async def list_messages(chat_id: int, limit: int = 20, search_query: str = None, return "\n".join(lines) except Exception as e: - return f"Error retrieving messages: {e}" + return log_and_format_error("list_messages", e, chat_id=chat_id) @mcp.tool() @@ -314,7 +445,7 @@ async def list_chats(chat_type: str = None, limit: int = 20) -> str: return "\n".join(results) except Exception as e: - return f"Error listing chats: {e}" + return log_and_format_error("list_chats", e, chat_type=chat_type, limit=limit) @mcp.tool() @@ -331,15 +462,29 @@ async def get_chat(chat_id: int) -> str: result = [] result.append(f"ID: {entity.id}") + is_channel = isinstance(entity, Channel) + is_chat = isinstance(entity, Chat) + is_user = isinstance(entity, User) + if hasattr(entity, 'title'): result.append(f"Title: {entity.title}") - chat_type = "Channel" if getattr(entity, 'broadcast', False) else "Group" + chat_type = "Channel" if is_channel and getattr(entity, 'broadcast', False) else "Group" + if is_channel and getattr(entity, 'megagroup', False): + chat_type = "Supergroup" + elif is_chat: + chat_type = "Group (Basic)" result.append(f"Type: {chat_type}") if hasattr(entity, 'username') and entity.username: result.append(f"Username: @{entity.username}") - if hasattr(entity, 'participants_count'): - result.append(f"Participants: {entity.participants_count}") - elif isinstance(entity, User): + + # Fetch participants count reliably + try: + participants_count = (await client.get_participants(entity, limit=0)).total + result.append(f"Participants: {participants_count}") + except Exception as pe: + result.append(f"Participants: Error fetching ({pe})") + + elif is_user: name = f"{entity.first_name}" if entity.last_name: name += f" {entity.last_name}" @@ -354,22 +499,29 @@ async def get_chat(chat_id: int) -> str: # Get last activity if it's a dialog try: - dialogs = await client.get_dialogs(limit=100) - for dialog in dialogs: - if dialog.entity.id == chat_id: - result.append(f"Unread Messages: {dialog.unread_count}") - if dialog.message: - last_msg = dialog.message - sender = getattr(last_msg.sender, 'first_name', '') or 'Unknown' - result.append(f"Last Message: From {sender} at {last_msg.date}") - result.append(f"Message: {last_msg.message or '[Media/No text]'}") - break - except: + # Using get_dialogs might be slow if there are many dialogs + # Alternative: Get entity again via get_dialogs if needed for unread count + dialog = await client.get_dialogs(limit=1, offset_id=0, offset_peer=entity) + if dialog: + dialog = dialog[0] + result.append(f"Unread Messages: {dialog.unread_count}") + if dialog.message: + last_msg = dialog.message + sender_name = "Unknown" + if last_msg.sender: + sender_name = getattr(last_msg.sender, 'first_name', '') or getattr(last_msg.sender, 'title', 'Unknown') + if hasattr(last_msg.sender, 'last_name') and last_msg.sender.last_name: + sender_name += f" {last_msg.sender.last_name}" + sender_name = sender_name.strip() or "Unknown" + result.append(f"Last Message: From {sender_name} at {last_msg.date}") + result.append(f"Message: {last_msg.message or '[Media/No text]'}") + except Exception as diag_ex: + logger.warning(f"Could not get dialog info for {chat_id}: {diag_ex}") pass return "\n".join(result) except Exception as e: - return f"Error getting chat info: {e}" + return log_and_format_error("get_chat", e, chat_id=chat_id) @mcp.tool() @@ -381,30 +533,25 @@ async def get_direct_chat_by_contact(contact_query: str) -> str: contact_query: Name, username, or phone number to search for. """ try: - # First search for the contact - contacts = await client.get_contacts() + # Fetch all contacts using the correct Telethon method + result = await client(functions.contacts.GetContactsRequest(hash=0)) + contacts = result.users found_contacts = [] - for contact in contacts: if not contact: continue - name = f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip() username = getattr(contact, 'username', '') phone = getattr(contact, 'phone', '') - if (contact_query.lower() in name.lower() or (username and contact_query.lower() in username.lower()) or (phone and contact_query in phone)): found_contacts.append(contact) - if not found_contacts: return f"No contacts found matching '{contact_query}'." - # If we found contacts, look for direct chats with them results = [] dialogs = await client.get_dialogs() - for contact in found_contacts: contact_name = f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip() for dialog in dialogs: @@ -416,14 +563,12 @@ async def get_direct_chat_by_contact(contact_query: str) -> str: chat_info += f", Unread: {dialog.unread_count}" results.append(chat_info) break - if not results: found_names = ", ".join([f"{c.first_name} {c.last_name}".strip() for c in found_contacts]) return f"Found contacts: {found_names}, but no direct chats were found with them." - return "\n".join(results) except Exception as e: - return f"Error finding direct chat: {e}" + return log_and_format_error("get_direct_chat_by_contact", e, contact_query=contact_query) @mcp.tool() @@ -473,7 +618,7 @@ async def get_contact_chats(contact_id: int) -> str: return f"Chats with {contact_name} (ID: {contact_id}):\n" + "\n".join(results) except Exception as e: - return f"Error retrieving contact chats: {e}" + return log_and_format_error("get_contact_chats", e, contact_id=contact_id) @mcp.tool() @@ -507,7 +652,7 @@ async def get_last_interaction(contact_id: int) -> str: return "\n".join(results) except Exception as e: - return f"Error retrieving last interaction: {e}" + return log_and_format_error("get_last_interaction", e, contact_id=contact_id) @mcp.tool() @@ -522,46 +667,1639 @@ async def get_message_context(chat_id: int, message_id: int, context_size: int = """ try: chat = await client.get_entity(chat_id) - # Get messages around the specified message messages_before = await client.get_messages( chat, limit=context_size, max_id=message_id ) - central_message = await client.get_messages( chat, ids=message_id ) - + # Fix: get_messages(ids=...) returns a single Message, not a list + if central_message is not None and not isinstance(central_message, list): + central_message = [central_message] + elif central_message is None: + central_message = [] messages_after = await client.get_messages( chat, limit=context_size, min_id=message_id, reverse=True ) - if not central_message: return f"Message with ID {message_id} not found in chat {chat_id}." - # Combine messages in chronological order all_messages = list(messages_before) + list(central_message) + list(messages_after) all_messages.sort(key=lambda m: m.id) - results = [f"Context for message {message_id} in chat {chat_id}:"] - for msg in all_messages: sender_name = "Unknown" if msg.sender: sender_name = getattr(msg.sender, 'first_name', '') or getattr(msg.sender, 'title', 'Unknown') - highlight = " [THIS MESSAGE]" if msg.id == message_id else "" results.append(f"ID: {msg.id} | {sender_name} | {msg.date}{highlight}\n{msg.message or '[Media/No text]'}\n") - return "\n".join(results) except Exception as e: - return f"Error retrieving message context: {e}" + return log_and_format_error("get_message_context", e, chat_id=chat_id, message_id=message_id, context_size=context_size) + + +@mcp.tool() +async def add_contact(phone: str, first_name: str, last_name: str = "") -> str: + """ + Add a new contact to your Telegram account. + Args: + phone: The phone number of the contact (with country code). + first_name: The contact's first name. + last_name: The contact's last name (optional). + """ + try: + # Try to import the required types first + from telethon.tl.types import InputPhoneContact + + result = await client(functions.contacts.ImportContactsRequest( + contacts=[ + InputPhoneContact( + client_id=0, + phone=phone, + first_name=first_name, + last_name=last_name + ) + ] + )) + if result.imported: + return f"Contact {first_name} {last_name} added successfully." + else: + return f"Contact not added. Response: {str(result)}" + except (ImportError, AttributeError) as type_err: + # Try alternative approach using raw API + try: + result = await client(functions.contacts.ImportContactsRequest( + contacts=[{ + 'client_id': 0, + 'phone': phone, + 'first_name': first_name, + 'last_name': last_name + }] + )) + if hasattr(result, 'imported') and result.imported: + return f"Contact {first_name} {last_name} added successfully (alt method)." + else: + return f"Contact not added. Alternative method response: {str(result)}" + except Exception as alt_e: + logger.exception(f"add_contact (alt method) failed (phone={phone})") + return log_and_format_error("add_contact", alt_e, phone=phone) + except Exception as e: + logger.exception(f"add_contact failed (phone={phone})") + return log_and_format_error("add_contact", e, phone=phone) + + +@mcp.tool() +async def delete_contact(user_id: int) -> str: + """ + Delete a contact by user ID. + Args: + user_id: The Telegram user ID of the contact to delete. + """ + try: + user = await client.get_entity(user_id) + await client(functions.contacts.DeleteContactsRequest(id=[user])) + return f"Contact with user ID {user_id} deleted." + except Exception as e: + return log_and_format_error("delete_contact", e, user_id=user_id) + + +@mcp.tool() +async def block_user(user_id: int) -> str: + """ + Block a user by user ID. + Args: + user_id: The Telegram user ID to block. + """ + try: + user = await client.get_entity(user_id) + await client(functions.contacts.BlockRequest(id=user)) + return f"User {user_id} blocked." + except Exception as e: + return log_and_format_error("block_user", e, user_id=user_id) + + +@mcp.tool() +async def unblock_user(user_id: int) -> str: + """ + Unblock a user by user ID. + Args: + user_id: The Telegram user ID to unblock. + """ + try: + user = await client.get_entity(user_id) + await client(functions.contacts.UnblockRequest(id=user)) + return f"User {user_id} unblocked." + except Exception as e: + return log_and_format_error("unblock_user", e, user_id=user_id) + + +@mcp.tool() +async def get_me() -> str: + """ + Get your own user information. + """ + try: + me = await client.get_me() + return json.dumps(format_entity(me), indent=2) + except Exception as e: + return log_and_format_error("get_me", e) + + +@mcp.tool() +async def create_group(title: str, user_ids: list) -> str: + """ + Create a new group or supergroup and add users. + + Args: + title: Title for the new group + user_ids: List of user IDs to add to the group + """ + try: + # Convert user IDs to entities + users = [] + for user_id in user_ids: + try: + user = await client.get_entity(user_id) + users.append(user) + except Exception as e: + logger.error(f"Failed to get entity for user ID {user_id}: {e}") + return f"Error: Could not find user with ID {user_id}" + + if not users: + return "Error: No valid users provided" + + # Create the group with the users + try: + # Create a new chat with selected users + result = await client(functions.messages.CreateChatRequest( + users=users, + title=title + )) + + # Check what type of response we got + if hasattr(result, 'chats') and result.chats: + created_chat = result.chats[0] + return f"Group created with ID: {created_chat.id}" + elif hasattr(result, 'chat') and result.chat: + return f"Group created with ID: {result.chat.id}" + elif hasattr(result, 'chat_id'): + return f"Group created with ID: {result.chat_id}" + else: + # If we can't determine the chat ID directly from the result + # Try to find it in recent dialogs + await asyncio.sleep(1) # Give Telegram a moment to register the new group + dialogs = await client.get_dialogs(limit=5) # Get recent dialogs + for dialog in dialogs: + if dialog.title == title: + return f"Group created with ID: {dialog.id}" + + # If we still can't find it, at least return success + return f"Group created successfully. Please check your recent chats for '{title}'." + + except Exception as create_err: + if "PEER_FLOOD" in str(create_err): + return "Error: Cannot create group due to Telegram limits. Try again later." + else: + raise # Let the outer exception handler catch it + except Exception as e: + logger.exception(f"create_group failed (title={title}, user_ids={user_ids})") + return log_and_format_error("create_group", e, title=title, user_ids=user_ids) + + +@mcp.tool() +async def invite_to_group(group_id: int, user_ids: list) -> str: + """ + Invite users to a group or channel. + + Args: + group_id: The ID of the group/channel. + user_ids: List of user IDs to invite. + """ + try: + entity = await client.get_entity(group_id) + users_to_add = [] + + for user_id in user_ids: + try: + user = await client.get_entity(user_id) + users_to_add.append(user) + except ValueError as e: + return f"Error: User with ID {user_id} could not be found. {e}" + + try: + result = await client(functions.channels.InviteToChannelRequest( + channel=entity, + users=users_to_add + )) + + invited_count = 0 + if hasattr(result, 'users') and result.users: + invited_count = len(result.users) + elif hasattr(result, 'count'): + invited_count = result.count + + return f"Successfully invited {invited_count} users to {entity.title}" + except telethon.errors.rpcerrorlist.UserNotMutualContactError: + return "Error: Cannot invite users who are not mutual contacts. Please ensure the users are in your contacts and have added you back." + except telethon.errors.rpcerrorlist.UserPrivacyRestrictedError: + return "Error: One or more users have privacy settings that prevent you from adding them." + except Exception as e: + return log_and_format_error("invite_to_group", e, group_id=group_id, user_ids=user_ids) + + except Exception as e: + logger.error(f"telegram_mcp invite_to_group failed (group_id={group_id}, user_ids={user_ids})", exc_info=True) + return log_and_format_error("invite_to_group", e, group_id=group_id, user_ids=user_ids) + + +@mcp.tool() +async def leave_chat(chat_id: int) -> str: + """ + Leave a group or channel by chat ID. + + Args: + chat_id: The chat ID to leave. + """ + try: + entity = await client.get_entity(chat_id) + + # Check the entity type carefully + if isinstance(entity, Channel): + # Handle both channels and supergroups (which are also channels in Telegram) + try: + await client(functions.channels.LeaveChannelRequest(channel=entity)) + chat_name = getattr(entity, 'title', str(chat_id)) + return f"Left channel/supergroup {chat_name} (ID: {chat_id})." + except Exception as chan_err: + return log_and_format_error("leave_chat", chan_err, chat_id=chat_id) + + elif isinstance(entity, Chat): + # Traditional basic groups (not supergroups) + try: + # First try with InputPeerUser + me = await client.get_me(input_peer=True) + await client(functions.messages.DeleteChatUserRequest( + chat_id=entity.id, # Use the entity ID directly + user_id=me + )) + chat_name = getattr(entity, 'title', str(chat_id)) + return f"Left basic group {chat_name} (ID: {chat_id})." + except Exception as chat_err: + # If the above fails, try the second approach + logger.warning(f"First leave attempt failed: {chat_err}, trying alternative method") + + try: + # Alternative approach - sometimes this works better + me_full = await client.get_me() + await client(functions.messages.DeleteChatUserRequest( + chat_id=entity.id, + user_id=me_full.id + )) + chat_name = getattr(entity, 'title', str(chat_id)) + return f"Left basic group {chat_name} (ID: {chat_id})." + except Exception as alt_err: + return log_and_format_error("leave_chat", alt_err, chat_id=chat_id) + else: + # Cannot leave a user chat this way + entity_type = type(entity).__name__ + return log_and_format_error("leave_chat", Exception(f"Cannot leave chat ID {chat_id} of type {entity_type}. This function is for groups and channels only."), chat_id=chat_id) + + except Exception as e: + logger.exception(f"leave_chat failed (chat_id={chat_id})") + + # Provide helpful hint for common errors + error_str = str(e).lower() + if "invalid" in error_str and "chat" in error_str: + return log_and_format_error("leave_chat", Exception(f"Error leaving chat: This appears to be a channel/supergroup. Please check the chat ID and try again."), chat_id=chat_id) + + return log_and_format_error("leave_chat", e, chat_id=chat_id) + + +@mcp.tool() +async def get_participants(chat_id: int) -> str: + """ + List all participants in a group or channel. + Args: + chat_id: The group or channel ID. + """ + try: + participants = await client.get_participants(chat_id) + lines = [f"ID: {p.id}, Name: {getattr(p, 'first_name', '')} {getattr(p, 'last_name', '')}" for p in participants] + return "\n".join(lines) + except Exception as e: + return log_and_format_error("get_participants", e, chat_id=chat_id) + + +@mcp.tool() +async def send_file(chat_id: int, file_path: str, caption: str = None) -> str: + """ + Send a file to a chat. + Args: + chat_id: The chat ID. + file_path: Absolute path to the file to send (must exist and be readable). + caption: Optional caption for the file. + """ + try: + if not os.path.isfile(file_path): + return f"File not found: {file_path}" + if not os.access(file_path, os.R_OK): + return f"File is not readable: {file_path}" + entity = await client.get_entity(chat_id) + await client.send_file(entity, file_path, caption=caption) + return f"File sent to chat {chat_id}." + except Exception as e: + return log_and_format_error("send_file", e, chat_id=chat_id, file_path=file_path, caption=caption) + + +@mcp.tool() +async def download_media(chat_id: int, message_id: int, file_path: str) -> str: + """ + Download media from a message in a chat. + Args: + chat_id: The chat ID. + message_id: The message ID containing the media. + file_path: Absolute path to save the downloaded file (must be writable). + """ + try: + entity = await client.get_entity(chat_id) + msg = await client.get_messages(entity, ids=message_id) + if not msg or not msg.media: + return "No media found in the specified message." + # Check if directory is writable + dir_path = os.path.dirname(file_path) or '.' + if not os.access(dir_path, os.W_OK): + return f"Directory not writable: {dir_path}" + await client.download_media(msg, file=file_path) + if not os.path.isfile(file_path): + return f"Download failed: file not created at {file_path}" + return f"Media downloaded to {file_path}." + except Exception as e: + return log_and_format_error("download_media", e, chat_id=chat_id, message_id=message_id, file_path=file_path) + + +@mcp.tool() +async def update_profile(first_name: str = None, last_name: str = None, about: str = None) -> str: + """ + Update your profile information (name, bio). + """ + try: + await client(functions.account.UpdateProfileRequest( + first_name=first_name, + last_name=last_name, + about=about + )) + return "Profile updated." + except Exception as e: + return log_and_format_error("update_profile", e, first_name=first_name, last_name=last_name, about=about) + + +@mcp.tool() +async def set_profile_photo(file_path: str) -> str: + """ + Set a new profile photo. + """ + try: + await client(functions.photos.UploadProfilePhotoRequest( + file=await client.upload_file(file_path) + )) + return "Profile photo updated." + except Exception as e: + return log_and_format_error("set_profile_photo", e, file_path=file_path) + + +@mcp.tool() +async def delete_profile_photo() -> str: + """ + Delete your current profile photo. + """ + try: + photos = await client(functions.photos.GetUserPhotosRequest(user_id='me', offset=0, max_id=0, limit=1)) + if not photos.photos: + return "No profile photo to delete." + await client(functions.photos.DeletePhotosRequest(id=[photos.photos[0].id])) + return "Profile photo deleted." + except Exception as e: + return log_and_format_error("delete_profile_photo", e) + + +@mcp.tool() +async def get_privacy_settings() -> str: + """ + Get your privacy settings for last seen status. + """ + try: + # Import needed types directly + from telethon.tl.types import InputPrivacyKeyStatusTimestamp + + try: + settings = await client(functions.account.GetPrivacyRequest( + key=InputPrivacyKeyStatusTimestamp() + )) + return str(settings) + except TypeError as e: + if "TLObject was expected" in str(e): + return "Error: Privacy settings API call failed due to type mismatch. This is likely a version compatibility issue with Telethon." + else: + raise + except Exception as e: + logger.exception("get_privacy_settings failed") + return log_and_format_error("get_privacy_settings", e) + + +@mcp.tool() +async def set_privacy_settings(key: str, allow_users: list = None, disallow_users: list = None) -> str: + """ + Set privacy settings (e.g., last seen, phone, etc.). + + Args: + key: The privacy setting to modify ('status' for last seen, 'phone', 'profile_photo', etc.) + allow_users: List of user IDs to allow + disallow_users: List of user IDs to disallow + """ + try: + # Import needed types + from telethon.tl.types import ( + InputPrivacyKeyStatusTimestamp, + InputPrivacyKeyPhoneNumber, + InputPrivacyKeyProfilePhoto, + InputPrivacyValueAllowUsers, + InputPrivacyValueDisallowUsers, + InputPrivacyValueAllowAll, + InputPrivacyValueDisallowAll + ) + + # Map the simplified keys to their corresponding input types + key_mapping = { + 'status': InputPrivacyKeyStatusTimestamp, + 'phone': InputPrivacyKeyPhoneNumber, + 'profile_photo': InputPrivacyKeyProfilePhoto, + } + + # Get the appropriate key class + if key not in key_mapping: + return f"Error: Unsupported privacy key '{key}'. Supported keys: {', '.join(key_mapping.keys())}" + + privacy_key = key_mapping[key]() + + # Prepare the rules + rules = [] + + # Process allow rules + if allow_users is None or len(allow_users) == 0: + # If no specific users to allow, allow everyone by default + rules.append(InputPrivacyValueAllowAll()) + else: + # Convert user IDs to InputUser entities + try: + allow_entities = [] + for user_id in allow_users: + try: + user = await client.get_entity(user_id) + allow_entities.append(user) + except Exception as user_err: + logger.warning(f"Could not get entity for user ID {user_id}: {user_err}") + + if allow_entities: + rules.append(InputPrivacyValueAllowUsers(users=allow_entities)) + except Exception as allow_err: + logger.error(f"Error processing allowed users: {allow_err}") + return log_and_format_error("set_privacy_settings", allow_err, key=key) + + # Process disallow rules + if disallow_users and len(disallow_users) > 0: + try: + disallow_entities = [] + for user_id in disallow_users: + try: + user = await client.get_entity(user_id) + disallow_entities.append(user) + except Exception as user_err: + logger.warning(f"Could not get entity for user ID {user_id}: {user_err}") + + if disallow_entities: + rules.append(InputPrivacyValueDisallowUsers(users=disallow_entities)) + except Exception as disallow_err: + logger.error(f"Error processing disallowed users: {disallow_err}") + return log_and_format_error("set_privacy_settings", disallow_err, key=key) + + # Apply the privacy settings + try: + result = await client(functions.account.SetPrivacyRequest( + key=privacy_key, + rules=rules + )) + return f"Privacy settings for {key} updated successfully." + except TypeError as type_err: + if "TLObject was expected" in str(type_err): + return "Error: Privacy settings API call failed due to type mismatch. This is likely a version compatibility issue with Telethon." + else: + raise + except Exception as e: + logger.exception(f"set_privacy_settings failed (key={key})") + return log_and_format_error("set_privacy_settings", e, key=key) + + +@mcp.tool() +async def import_contacts(contacts: list) -> str: + """ + Import a list of contacts. Each contact should be a dict with phone, first_name, last_name. + """ + try: + input_contacts = [functions.contacts.InputPhoneContact(client_id=i, phone=c['phone'], first_name=c['first_name'], last_name=c.get('last_name', '')) for i, c in enumerate(contacts)] + result = await client(functions.contacts.ImportContactsRequest(contacts=input_contacts)) + return f"Imported {len(result.imported)} contacts." + except Exception as e: + return log_and_format_error("import_contacts", e, contacts=contacts) + + +@mcp.tool() +async def export_contacts() -> str: + """ + Export all contacts as a JSON string. + """ + try: + result = await client(functions.contacts.GetContactsRequest(hash=0)) + users = result.users + return json.dumps([format_entity(u) for u in users], indent=2) + except Exception as e: + return log_and_format_error("export_contacts", e) + + +@mcp.tool() +async def get_blocked_users() -> str: + """ + Get a list of blocked users. + """ + try: + result = await client(functions.contacts.GetBlockedRequest(offset=0, limit=100)) + return json.dumps([format_entity(u) for u in result.users], indent=2) + except Exception as e: + return log_and_format_error("get_blocked_users", e) + + +@mcp.tool() +async def create_channel(title: str, about: str = "", megagroup: bool = False) -> str: + """ + Create a new channel or supergroup. + """ + try: + result = await client(functions.channels.CreateChannelRequest( + title=title, + about=about, + megagroup=megagroup + )) + return f"Channel '{title}' created with ID: {result.chats[0].id}" + except Exception as e: + return log_and_format_error("create_channel", e, title=title, about=about, megagroup=megagroup) + + +@mcp.tool() +async def edit_chat_title(chat_id: int, title: str) -> str: + """ + Edit the title of a chat, group, or channel. + """ + try: + entity = await client.get_entity(chat_id) + if isinstance(entity, Channel): + await client(functions.channels.EditTitleRequest(channel=entity, title=title)) + elif isinstance(entity, Chat): + await client(functions.messages.EditChatTitleRequest(chat_id=chat_id, title=title)) + else: + return f"Cannot edit title for this entity type ({type(entity)})." + return f"Chat {chat_id} title updated to '{title}'." + except Exception as e: + logger.exception(f"edit_chat_title failed (chat_id={chat_id}, title='{title}')") + return log_and_format_error("edit_chat_title", e, chat_id=chat_id, title=title) + + +@mcp.tool() +async def edit_chat_photo(chat_id: int, file_path: str) -> str: + """ + Edit the photo of a chat, group, or channel. Requires a file path to an image. + """ + try: + if not os.path.isfile(file_path): + return f"Photo file not found: {file_path}" + if not os.access(file_path, os.R_OK): + return f"Photo file not readable: {file_path}" + + entity = await client.get_entity(chat_id) + uploaded_file = await client.upload_file(file_path) + + if isinstance(entity, Channel): + # For channels/supergroups, use EditPhotoRequest with InputChatUploadedPhoto + input_photo = InputChatUploadedPhoto(file=uploaded_file) + await client(functions.channels.EditPhotoRequest(channel=entity, photo=input_photo)) + elif isinstance(entity, Chat): + # For basic groups, use EditChatPhotoRequest with InputChatUploadedPhoto + input_photo = InputChatUploadedPhoto(file=uploaded_file) + await client(functions.messages.EditChatPhotoRequest(chat_id=chat_id, photo=input_photo)) + else: + return f"Cannot edit photo for this entity type ({type(entity)})." + + return f"Chat {chat_id} photo updated." + except Exception as e: + logger.exception(f"edit_chat_photo failed (chat_id={chat_id}, file_path='{file_path}')") + return log_and_format_error("edit_chat_photo", e, chat_id=chat_id, file_path=file_path) + + +@mcp.tool() +async def delete_chat_photo(chat_id: int) -> str: + """ + Delete the photo of a chat, group, or channel. + """ + try: + entity = await client.get_entity(chat_id) + if isinstance(entity, Channel): + # Use InputChatPhotoEmpty for channels/supergroups + await client(functions.channels.EditPhotoRequest(channel=entity, photo=InputChatPhotoEmpty())) + elif isinstance(entity, Chat): + # Use None (or InputChatPhotoEmpty) for basic groups + await client(functions.messages.EditChatPhotoRequest(chat_id=chat_id, photo=InputChatPhotoEmpty())) + else: + return f"Cannot delete photo for this entity type ({type(entity)})." + + return f"Chat {chat_id} photo deleted." + except Exception as e: + logger.exception(f"delete_chat_photo failed (chat_id={chat_id})") + return log_and_format_error("delete_chat_photo", e, chat_id=chat_id) + + +@mcp.tool() +async def promote_admin(group_id: int, user_id: int, rights: dict = None) -> str: + """ + Promote a user to admin in a group/channel. + + Args: + group_id: ID of the group/channel + user_id: User ID to promote + rights: Admin rights to give (optional) + """ + try: + chat = await client.get_entity(group_id) + user = await client.get_entity(user_id) + + # Set default admin rights if not provided + if not rights: + rights = { + 'change_info': True, + 'post_messages': True, + 'edit_messages': True, + 'delete_messages': True, + 'ban_users': True, + 'invite_users': True, + 'pin_messages': True, + 'add_admins': False, + 'anonymous': False, + 'manage_call': True, + 'other': True + } + + admin_rights = ChatAdminRights( + change_info=rights.get('change_info', True), + post_messages=rights.get('post_messages', True), + edit_messages=rights.get('edit_messages', True), + delete_messages=rights.get('delete_messages', True), + ban_users=rights.get('ban_users', True), + invite_users=rights.get('invite_users', True), + pin_messages=rights.get('pin_messages', True), + add_admins=rights.get('add_admins', False), + anonymous=rights.get('anonymous', False), + manage_call=rights.get('manage_call', True), + other=rights.get('other', True) + ) + + try: + result = await client(functions.channels.EditAdminRequest( + channel=chat, + user_id=user, + admin_rights=admin_rights, + rank="Admin" + )) + return f"Successfully promoted user {user_id} to admin in {chat.title}" + except telethon.errors.rpcerrorlist.UserNotMutualContactError: + return "Error: Cannot promote users who are not mutual contacts. Please ensure the user is in your contacts and has added you back." + except Exception as e: + return log_and_format_error("promote_admin", e, group_id=group_id, user_id=user_id) + + except Exception as e: + logger.error(f"telegram_mcp promote_admin failed (group_id={group_id}, user_id={user_id})", exc_info=True) + return log_and_format_error("promote_admin", e, group_id=group_id, user_id=user_id) + + +@mcp.tool() +async def demote_admin(group_id: int, user_id: int) -> str: + """ + Demote a user from admin in a group/channel. + + Args: + group_id: ID of the group/channel + user_id: User ID to demote + """ + try: + chat = await client.get_entity(group_id) + user = await client.get_entity(user_id) + + # Create empty admin rights (regular user) + admin_rights = ChatAdminRights( + change_info=False, + post_messages=False, + edit_messages=False, + delete_messages=False, + ban_users=False, + invite_users=False, + pin_messages=False, + add_admins=False, + anonymous=False, + manage_call=False, + other=False + ) + + try: + result = await client(functions.channels.EditAdminRequest( + channel=chat, + user_id=user, + admin_rights=admin_rights, + rank="" + )) + return f"Successfully demoted user {user_id} from admin in {chat.title}" + except telethon.errors.rpcerrorlist.UserNotMutualContactError: + return "Error: Cannot modify admin status of users who are not mutual contacts. Please ensure the user is in your contacts and has added you back." + except Exception as e: + return log_and_format_error("demote_admin", e, group_id=group_id, user_id=user_id) + + except Exception as e: + logger.error(f"telegram_mcp demote_admin failed (group_id={group_id}, user_id={user_id})", exc_info=True) + return log_and_format_error("demote_admin", e, group_id=group_id, user_id=user_id) + + +@mcp.tool() +async def ban_user(chat_id: int, user_id: int) -> str: + """ + Ban a user from a group or channel. + + Args: + chat_id: ID of the group/channel + user_id: User ID to ban + """ + try: + chat = await client.get_entity(chat_id) + user = await client.get_entity(user_id) + + # Create banned rights (all restrictions enabled) + banned_rights = ChatBannedRights( + until_date=None, # Ban forever + view_messages=True, + send_messages=True, + send_media=True, + send_stickers=True, + send_gifs=True, + send_games=True, + send_inline=True, + embed_links=True, + send_polls=True, + change_info=True, + invite_users=True, + pin_messages=True + ) + + try: + await client(functions.channels.EditBannedRequest( + channel=chat, + participant=user, + banned_rights=banned_rights + )) + return f"User {user_id} banned from chat {chat.title} (ID: {chat_id})." + except telethon.errors.rpcerrorlist.UserNotMutualContactError: + return "Error: Cannot ban users who are not mutual contacts. Please ensure the user is in your contacts and has added you back." + except Exception as e: + return log_and_format_error("ban_user", e, chat_id=chat_id, user_id=user_id) + except Exception as e: + logger.exception(f"ban_user failed (chat_id={chat_id}, user_id={user_id})") + return log_and_format_error("ban_user", e, chat_id=chat_id, user_id=user_id) + + +@mcp.tool() +async def unban_user(chat_id: int, user_id: int) -> str: + """ + Unban a user from a group or channel. + + Args: + chat_id: ID of the group/channel + user_id: User ID to unban + """ + try: + chat = await client.get_entity(chat_id) + user = await client.get_entity(user_id) + + # Create unbanned rights (no restrictions) + unbanned_rights = ChatBannedRights( + until_date=None, + view_messages=False, + send_messages=False, + send_media=False, + send_stickers=False, + send_gifs=False, + send_games=False, + send_inline=False, + embed_links=False, + send_polls=False, + change_info=False, + invite_users=False, + pin_messages=False + ) + + try: + await client(functions.channels.EditBannedRequest( + channel=chat, + participant=user, + banned_rights=unbanned_rights + )) + return f"User {user_id} unbanned from chat {chat.title} (ID: {chat_id})." + except telethon.errors.rpcerrorlist.UserNotMutualContactError: + return "Error: Cannot modify status of users who are not mutual contacts. Please ensure the user is in your contacts and has added you back." + except Exception as e: + return log_and_format_error("unban_user", e, chat_id=chat_id, user_id=user_id) + except Exception as e: + logger.exception(f"unban_user failed (chat_id={chat_id}, user_id={user_id})") + return log_and_format_error("unban_user", e, chat_id=chat_id, user_id=user_id) + + +@mcp.tool() +async def get_admins(chat_id: int) -> str: + """ + Get all admins in a group or channel. + """ + try: + # Fix: Use the correct filter type ChannelParticipantsAdmins + participants = await client.get_participants(chat_id, filter=ChannelParticipantsAdmins()) + lines = [f"ID: {p.id}, Name: {getattr(p, 'first_name', '')} {getattr(p, 'last_name', '')}".strip() for p in participants] + return "\n".join(lines) if lines else "No admins found." + except Exception as e: + logger.exception(f"get_admins failed (chat_id={chat_id})") + return log_and_format_error("get_admins", e, chat_id=chat_id) + + +@mcp.tool() +async def get_banned_users(chat_id: int) -> str: + """ + Get all banned users in a group or channel. + """ + try: + # Fix: Use the correct filter type ChannelParticipantsKicked + participants = await client.get_participants(chat_id, filter=ChannelParticipantsKicked(q='')) + lines = [f"ID: {p.id}, Name: {getattr(p, 'first_name', '')} {getattr(p, 'last_name', '')}".strip() for p in participants] + return "\n".join(lines) if lines else "No banned users found." + except Exception as e: + logger.exception(f"get_banned_users failed (chat_id={chat_id})") + return log_and_format_error("get_banned_users", e, chat_id=chat_id) + + +@mcp.tool() +async def get_invite_link(chat_id: int) -> str: + """ + Get the invite link for a group or channel. + """ + try: + entity = await client.get_entity(chat_id) + + # Try using ExportChatInviteRequest first + try: + from telethon.tl import functions + result = await client(functions.messages.ExportChatInviteRequest( + peer=entity + )) + return result.link + except AttributeError: + # If the function doesn't exist in the current Telethon version + logger.warning("ExportChatInviteRequest not available, using alternative method") + except Exception as e1: + # If that fails, log and try alternative approach + logger.warning(f"ExportChatInviteRequest failed: {e1}") + + # Alternative approach using client.export_chat_invite_link + try: + invite_link = await client.export_chat_invite_link(entity) + return invite_link + except Exception as e2: + logger.warning(f"export_chat_invite_link failed: {e2}") + + # Last resort: Try directly fetching chat info + try: + if isinstance(entity, (Chat, Channel)): + full_chat = await client(functions.messages.GetFullChatRequest( + chat_id=entity.id + )) + if hasattr(full_chat, 'full_chat') and hasattr(full_chat.full_chat, 'invite_link'): + return full_chat.full_chat.invite_link or "No invite link available." + except Exception as e3: + logger.warning(f"GetFullChatRequest failed: {e3}") + + return "Could not retrieve invite link for this chat." + except Exception as e: + logger.exception(f"get_invite_link failed (chat_id={chat_id})") + return log_and_format_error("get_invite_link", e, chat_id=chat_id) + + +@mcp.tool() +async def join_chat_by_link(link: str) -> str: + """ + Join a chat by invite link. + """ + try: + # Extract the hash from the invite link + if '/' in link: + hash_part = link.split('/')[-1] + if hash_part.startswith('+'): + hash_part = hash_part[1:] # Remove the '+' if present + else: + hash_part = link + + # Try checking the invite before joining + try: + from telethon.errors import (InviteHashExpiredError, InviteHashInvalidError, + UserAlreadyParticipantError, ChatAdminRequiredError, + UsersTooMuchError) + + # Try to check invite info first (will often fail if not a member) + invite_info = await client(functions.messages.CheckChatInviteRequest(hash=hash_part)) + if hasattr(invite_info, 'chat') and invite_info.chat: + # If we got chat info, we're already a member + chat_title = getattr(invite_info.chat, 'title', 'Unknown Chat') + return f"You are already a member of this chat: {chat_title}" + except Exception as check_err: + # This often fails if not a member - just continue + pass + + # Join the chat using the hash + try: + result = await client(functions.messages.ImportChatInviteRequest(hash=hash_part)) + if result and hasattr(result, 'chats') and result.chats: + chat_title = getattr(result.chats[0], 'title', 'Unknown Chat') + return f"Successfully joined chat: {chat_title}" + return f"Joined chat via invite hash." + except Exception as join_err: + err_str = str(join_err).lower() + if "expired" in err_str: + return "The invite hash has expired and is no longer valid." + elif "invalid" in err_str: + return "The invite hash is invalid or malformed." + elif "already" in err_str and "participant" in err_str: + return "You are already a member of this chat." + elif "admin" in err_str: + return "Cannot join this chat - requires admin approval." + elif "too much" in err_str or "too many" in err_str: + return "Cannot join this chat - it has reached maximum number of participants." + else: + raise # Re-raise to be caught by the outer exception handler + except Exception as e: + logger.exception(f"join_chat_by_link failed (link={link})") + return log_and_format_error("join_chat_by_link", e, link=link) + + +@mcp.tool() +async def export_chat_invite(chat_id: int) -> str: + """ + Export a chat invite link. + """ + try: + entity = await client.get_entity(chat_id) + + # Try using ExportChatInviteRequest first + try: + from telethon.tl import functions + result = await client(functions.messages.ExportChatInviteRequest( + peer=entity + )) + return result.link + except AttributeError: + # If the function doesn't exist in the current Telethon version + logger.warning("ExportChatInviteRequest not available, using alternative method") + except Exception as e1: + # If that fails, log and try alternative approach + logger.warning(f"ExportChatInviteRequest failed: {e1}") + + # Alternative approach using client.export_chat_invite_link + try: + invite_link = await client.export_chat_invite_link(entity) + return invite_link + except Exception as e2: + logger.warning(f"export_chat_invite_link failed: {e2}") + return log_and_format_error("export_chat_invite", e2, chat_id=chat_id) + except Exception as e: + logger.exception(f"export_chat_invite failed (chat_id={chat_id})") + return log_and_format_error("export_chat_invite", e, chat_id=chat_id) + + +@mcp.tool() +async def import_chat_invite(hash: str) -> str: + """ + Import a chat invite by hash. + """ + try: + # Remove any prefixes like '+' if present + if hash.startswith('+'): + hash = hash[1:] + + # Try checking the invite before joining + try: + from telethon.errors import (InviteHashExpiredError, InviteHashInvalidError, + UserAlreadyParticipantError, ChatAdminRequiredError, + UsersTooMuchError) + + # Try to check invite info first (will often fail if not a member) + invite_info = await client(functions.messages.CheckChatInviteRequest(hash=hash)) + if hasattr(invite_info, 'chat') and invite_info.chat: + # If we got chat info, we're already a member + chat_title = getattr(invite_info.chat, 'title', 'Unknown Chat') + return f"You are already a member of this chat: {chat_title}" + except Exception as check_err: + # This often fails if not a member - just continue + pass + + # Join the chat using the hash + try: + result = await client(functions.messages.ImportChatInviteRequest(hash=hash)) + if result and hasattr(result, 'chats') and result.chats: + chat_title = getattr(result.chats[0], 'title', 'Unknown Chat') + return f"Successfully joined chat: {chat_title}" + return f"Joined chat via invite hash." + except Exception as join_err: + err_str = str(join_err).lower() + if "expired" in err_str: + return "The invite hash has expired and is no longer valid." + elif "invalid" in err_str: + return "The invite hash is invalid or malformed." + elif "already" in err_str and "participant" in err_str: + return "You are already a member of this chat." + elif "admin" in err_str: + return "Cannot join this chat - requires admin approval." + elif "too much" in err_str or "too many" in err_str: + return "Cannot join this chat - it has reached maximum number of participants." + else: + raise # Re-raise to be caught by the outer exception handler + except Exception as e: + logger.exception(f"import_chat_invite failed (hash={hash})") + return log_and_format_error("import_chat_invite", e, hash=hash) + + +@mcp.tool() +async def send_voice(chat_id: int, file_path: str) -> str: + """ + Send a voice message to a chat. File must be an OGG/OPUS voice note. + Args: + chat_id: The chat ID. + file_path: Absolute path to the OGG/OPUS file. + """ + try: + if not os.path.isfile(file_path): + return f"File not found: {file_path}" + if not os.access(file_path, os.R_OK): + return f"File is not readable: {file_path}" + mime, _ = mimetypes.guess_type(file_path) + if not (mime and (mime == 'audio/ogg' or file_path.lower().endswith('.ogg') or file_path.lower().endswith('.opus'))): + return "Voice file must be .ogg or .opus format." + entity = await client.get_entity(chat_id) + await client.send_file(entity, file_path, voice_note=True) + return f"Voice message sent to chat {chat_id}." + except Exception as e: + return log_and_format_error("send_voice", e, chat_id=chat_id, file_path=file_path) + + +@mcp.tool() +async def forward_message(from_chat_id: int, message_id: int, to_chat_id: int) -> str: + """ + Forward a message from one chat to another. + """ + try: + from_entity = await client.get_entity(from_chat_id) + to_entity = await client.get_entity(to_chat_id) + await client.forward_messages(to_entity, message_id, from_entity) + return f"Message {message_id} forwarded from {from_chat_id} to {to_chat_id}." + except Exception as e: + return log_and_format_error("forward_message", e, from_chat_id=from_chat_id, message_id=message_id, to_chat_id=to_chat_id) + + +@mcp.tool() +async def edit_message(chat_id: int, message_id: int, new_text: str) -> str: + """ + Edit a message you sent. + """ + try: + entity = await client.get_entity(chat_id) + await client.edit_message(entity, message_id, new_text) + return f"Message {message_id} edited." + except Exception as e: + return log_and_format_error("edit_message", e, chat_id=chat_id, message_id=message_id, new_text=new_text) + + +@mcp.tool() +async def delete_message(chat_id: int, message_id: int) -> str: + """ + Delete a message by ID. + """ + try: + entity = await client.get_entity(chat_id) + await client.delete_messages(entity, message_id) + return f"Message {message_id} deleted." + except Exception as e: + return log_and_format_error("delete_message", e, chat_id=chat_id, message_id=message_id) + + +@mcp.tool() +async def pin_message(chat_id: int, message_id: int) -> str: + """ + Pin a message in a chat. + """ + try: + entity = await client.get_entity(chat_id) + await client.pin_message(entity, message_id) + return f"Message {message_id} pinned in chat {chat_id}." + except Exception as e: + return log_and_format_error("pin_message", e, chat_id=chat_id, message_id=message_id) + + +@mcp.tool() +async def unpin_message(chat_id: int, message_id: int) -> str: + """ + Unpin a message in a chat. + """ + try: + entity = await client.get_entity(chat_id) + await client.unpin_message(entity, message_id) + return f"Message {message_id} unpinned in chat {chat_id}." + except Exception as e: + return log_and_format_error("unpin_message", e, chat_id=chat_id, message_id=message_id) + + +@mcp.tool() +async def mark_as_read(chat_id: int) -> str: + """ + Mark all messages as read in a chat. + """ + try: + entity = await client.get_entity(chat_id) + await client.send_read_acknowledge(entity) + return f"Marked all messages as read in chat {chat_id}." + except Exception as e: + return log_and_format_error("mark_as_read", e, chat_id=chat_id) + + +@mcp.tool() +async def reply_to_message(chat_id: int, message_id: int, text: str) -> str: + """ + Reply to a specific message in a chat. + """ + try: + entity = await client.get_entity(chat_id) + await client.send_message(entity, text, reply_to=message_id) + return f"Replied to message {message_id} in chat {chat_id}." + except Exception as e: + return log_and_format_error("reply_to_message", e, chat_id=chat_id, message_id=message_id, text=text) + + +@mcp.tool() +async def upload_file(file_path: str) -> str: + """ + Upload a file to Telegram servers (returns file handle as string, not a file path). + Args: + file_path: Absolute path to the file to upload (must exist and be readable). + """ + try: + if not os.path.isfile(file_path): + return f"File not found: {file_path}" + if not os.access(file_path, os.R_OK): + return f"File is not readable: {file_path}" + file = await client.upload_file(file_path) + return str(file) + except Exception as e: + return log_and_format_error("upload_file", e, file_path=file_path) + + +@mcp.tool() +async def get_media_info(chat_id: int, message_id: int) -> str: + """ + Get info about media in a message. + Args: + chat_id: The chat ID. + message_id: The message ID. + """ + try: + entity = await client.get_entity(chat_id) + msg = await client.get_messages(entity, ids=message_id) + if not msg or not msg.media: + return "No media found in the specified message." + return str(msg.media) + except Exception as e: + return log_and_format_error("get_media_info", e, chat_id=chat_id, message_id=message_id) + + +@mcp.tool() +async def search_public_chats(query: str) -> str: + """ + Search for public chats, channels, or bots by username or title. + """ + try: + result = await client(functions.contacts.SearchRequest(q=query, limit=20)) + return json.dumps([format_entity(u) for u in result.users], indent=2) + except Exception as e: + return log_and_format_error("search_public_chats", e, query=query) + + +@mcp.tool() +async def search_messages(chat_id: int, query: str, limit: int = 20) -> str: + """ + Search for messages in a chat by text. + """ + try: + entity = await client.get_entity(chat_id) + messages = await client.get_messages(entity, limit=limit, search=query) + return "\n".join([f"ID: {m.id} | {m.date} | {m.message}" for m in messages]) + except Exception as e: + return log_and_format_error("search_messages", e, chat_id=chat_id, query=query, limit=limit) + + +@mcp.tool() +async def resolve_username(username: str) -> str: + """ + Resolve a username to a user or chat ID. + """ + try: + result = await client(functions.contacts.ResolveUsernameRequest(username=username)) + return str(result) + except Exception as e: + return log_and_format_error("resolve_username", e, username=username) + + +@mcp.tool() +async def mute_chat(chat_id: int) -> str: + """ + Mute notifications for a chat. + """ + try: + from telethon.tl.types import InputPeerNotifySettings + + peer = await client.get_entity(chat_id) + await client(functions.account.UpdateNotifySettingsRequest( + peer=peer, + settings=InputPeerNotifySettings(mute_until=2**31-1) + )) + return f"Chat {chat_id} muted." + except (ImportError, AttributeError) as type_err: + try: + # Alternative approach directly using raw API + peer = await client.get_input_entity(chat_id) + await client(functions.account.UpdateNotifySettingsRequest( + peer=peer, + settings={ + 'mute_until': 2**31-1, # Far future + 'show_previews': False, + 'silent': True + } + )) + return f"Chat {chat_id} muted (using alternative method)." + except Exception as alt_e: + logger.exception(f"mute_chat (alt method) failed (chat_id={chat_id})") + return log_and_format_error("mute_chat", alt_e, chat_id=chat_id) + except Exception as e: + logger.exception(f"mute_chat failed (chat_id={chat_id})") + return log_and_format_error("mute_chat", e, chat_id=chat_id) + + +@mcp.tool() +async def unmute_chat(chat_id: int) -> str: + """ + Unmute notifications for a chat. + """ + try: + from telethon.tl.types import InputPeerNotifySettings + + peer = await client.get_entity(chat_id) + await client(functions.account.UpdateNotifySettingsRequest( + peer=peer, + settings=InputPeerNotifySettings(mute_until=0) + )) + return f"Chat {chat_id} unmuted." + except (ImportError, AttributeError) as type_err: + try: + # Alternative approach directly using raw API + peer = await client.get_input_entity(chat_id) + await client(functions.account.UpdateNotifySettingsRequest( + peer=peer, + settings={ + 'mute_until': 0, # Unmute (current time) + 'show_previews': True, + 'silent': False + } + )) + return f"Chat {chat_id} unmuted (using alternative method)." + except Exception as alt_e: + logger.exception(f"unmute_chat (alt method) failed (chat_id={chat_id})") + return log_and_format_error("unmute_chat", alt_e, chat_id=chat_id) + except Exception as e: + logger.exception(f"unmute_chat failed (chat_id={chat_id})") + return log_and_format_error("unmute_chat", e, chat_id=chat_id) + + +@mcp.tool() +async def archive_chat(chat_id: int) -> str: + """ + Archive a chat. + """ + try: + await client(functions.messages.ToggleDialogPinRequest( + peer=await client.get_entity(chat_id), + pinned=True + )) + return f"Chat {chat_id} archived." + except Exception as e: + return log_and_format_error("archive_chat", e, chat_id=chat_id) + + +@mcp.tool() +async def unarchive_chat(chat_id: int) -> str: + """ + Unarchive a chat. + """ + try: + await client(functions.messages.ToggleDialogPinRequest( + peer=await client.get_entity(chat_id), + pinned=False + )) + return f"Chat {chat_id} unarchived." + except Exception as e: + return log_and_format_error("unarchive_chat", e, chat_id=chat_id) + + +@mcp.tool() +async def get_sticker_sets() -> str: + """ + Get all sticker sets. + """ + try: + result = await client(functions.messages.GetAllStickersRequest(hash=0)) + return json.dumps([s.title for s in result.sets], indent=2) + except Exception as e: + return log_and_format_error("get_sticker_sets", e) + + +@mcp.tool() +async def send_sticker(chat_id: int, file_path: str) -> str: + """ + Send a sticker to a chat. File must be a valid .webp sticker file. + Args: + chat_id: The chat ID. + file_path: Absolute path to the .webp sticker file. + """ + try: + if not os.path.isfile(file_path): + return f"Sticker file not found: {file_path}" + if not os.access(file_path, os.R_OK): + return f"Sticker file is not readable: {file_path}" + if not file_path.lower().endswith('.webp'): + return "Sticker file must be a .webp file." + entity = await client.get_entity(chat_id) + await client.send_file(entity, file_path, force_document=False) + return f"Sticker sent to chat {chat_id}." + except Exception as e: + return log_and_format_error("send_sticker", e, chat_id=chat_id, file_path=file_path) + + +@mcp.tool() +async def get_gif_search(query: str, limit: int = 10) -> str: + """ + Search for GIFs by query. Returns a list of Telegram document IDs (not file paths). + Args: + query: Search term for GIFs. + limit: Max number of GIFs to return. + """ + try: + # Try approach 1: SearchGifsRequest + try: + result = await client(functions.messages.SearchGifsRequest(q=query, offset_id=0, limit=limit)) + if not result.gifs: + return "[]" + return json.dumps([g.document.id for g in result.gifs], indent=2, default=json_serializer) + except (AttributeError, ImportError): + # Fallback approach: Use SearchRequest with GIF filter + try: + from telethon.tl.types import InputMessagesFilterGif + result = await client(functions.messages.SearchRequest( + peer="gif", q=query, filter=InputMessagesFilterGif(), + min_date=None, max_date=None, offset_id=0, add_offset=0, + limit=limit, max_id=0, min_id=0, hash=0 + )) + if not result or not hasattr(result, 'messages') or not result.messages: + return "[]" + # Extract document IDs from any messages with media + gif_ids = [] + for msg in result.messages: + if hasattr(msg, 'media') and msg.media and hasattr(msg.media, 'document'): + gif_ids.append(msg.media.document.id) + return json.dumps(gif_ids, default=json_serializer) + except Exception as inner_e: + # Last resort: Try to fetch from a public bot + return f"Could not search GIFs using available methods: {inner_e}" + except Exception as e: + logger.exception(f"get_gif_search failed (query={query}, limit={limit})") + return log_and_format_error("get_gif_search", e, query=query, limit=limit) + + +@mcp.tool() +async def send_gif(chat_id: int, gif_id: int) -> str: + """ + Send a GIF to a chat by Telegram GIF document ID (not a file path). + Args: + chat_id: The chat ID. + gif_id: Telegram document ID for the GIF (from get_gif_search). + """ + try: + if not isinstance(gif_id, int): + return "gif_id must be a Telegram document ID (integer), not a file path. Use get_gif_search to find IDs." + entity = await client.get_entity(chat_id) + await client.send_file(entity, gif_id) + return f"GIF sent to chat {chat_id}." + except Exception as e: + return log_and_format_error("send_gif", e, chat_id=chat_id, gif_id=gif_id) + + +@mcp.tool() +async def get_bot_info(bot_username: str) -> str: + """ + Get information about a bot by username. + """ + try: + entity = await client.get_entity(bot_username) + if not entity: + return f"Bot with username {bot_username} not found." + + result = await client(functions.users.GetFullUserRequest(id=entity)) + + # Create a more structured, serializable response + if hasattr(result, 'to_dict'): + # Use custom serializer to handle non-serializable types + return json.dumps(result.to_dict(), indent=2, default=json_serializer) + else: + # Fallback if to_dict is not available + info = { + "bot_info": { + "id": entity.id, + "username": entity.username, + "first_name": entity.first_name, + "last_name": getattr(entity, "last_name", ""), + "is_bot": getattr(entity, "bot", False), + "verified": getattr(entity, "verified", False), + } + } + if hasattr(result, "full_user") and hasattr(result.full_user, "about"): + info["bot_info"]["about"] = result.full_user.about + + return json.dumps(info, indent=2) + except Exception as e: + logger.exception(f"get_bot_info failed (bot_username={bot_username})") + return log_and_format_error("get_bot_info", e, bot_username=bot_username) + + +@mcp.tool() +async def set_bot_commands(bot_username: str, commands: list) -> str: + """ + Set bot commands for a bot you own. + Note: This function can only be used if the Telegram client is a bot account. + Regular user accounts cannot set bot commands. + + Args: + bot_username: The username of the bot to set commands for. + commands: List of command dictionaries with 'command' and 'description' keys. + """ + try: + # First check if the current client is a bot + me = await client.get_me() + if not getattr(me, 'bot', False): + return "Error: This function can only be used by bot accounts. Your current Telegram account is a regular user account, not a bot." + + # Import required types + from telethon.tl.types import BotCommand, BotCommandScopeDefault + from telethon.tl.functions.bots import SetBotCommandsRequest + + # Create BotCommand objects from the command dictionaries + bot_commands = [ + BotCommand(command=c['command'], description=c['description']) + for c in commands + ] + + # Get the bot entity + bot = await client.get_entity(bot_username) + + # Set the commands with proper scope + await client(SetBotCommandsRequest( + scope=BotCommandScopeDefault(), + lang_code="en", # Default language code + commands=bot_commands + )) + + return f"Bot commands set for {bot_username}." + except ImportError as ie: + logger.exception(f"set_bot_commands failed - ImportError: {ie}") + return log_and_format_error("set_bot_commands", ie) + except Exception as e: + logger.exception(f"set_bot_commands failed (bot_username={bot_username})") + return log_and_format_error("set_bot_commands", e, bot_username=bot_username) + + +@mcp.tool() +async def get_history(chat_id: int, limit: int = 100) -> str: + """ + Get full chat history (up to limit). + """ + try: + entity = await client.get_entity(chat_id) + messages = await client.get_messages(entity, limit=limit) + return "\n".join([f"ID: {m.id} | {m.date} | {m.message}" for m in messages]) + except Exception as e: + return log_and_format_error("get_history", e, chat_id=chat_id, limit=limit) + + +@mcp.tool() +async def get_user_photos(user_id: int, limit: int = 10) -> str: + """ + Get profile photos of a user. + """ + try: + user = await client.get_entity(user_id) + photos = await client(functions.photos.GetUserPhotosRequest(user_id=user, offset=0, max_id=0, limit=limit)) + return json.dumps([p.id for p in photos.photos], indent=2) + except Exception as e: + return log_and_format_error("get_user_photos", e, user_id=user_id, limit=limit) + + +@mcp.tool() +async def get_user_status(user_id: int) -> str: + """ + Get the online status of a user. + """ + try: + user = await client.get_entity(user_id) + return str(user.status) + except Exception as e: + return log_and_format_error("get_user_status", e, user_id=user_id) + + +@mcp.tool() +async def get_recent_actions(chat_id: int) -> str: + """ + Get recent admin actions (admin log) in a group or channel. + """ + try: + result = await client(functions.channels.GetAdminLogRequest( + channel=chat_id, + q="", + events_filter=None, + admins=[], + max_id=0, + min_id=0, + limit=20 + )) + + if not result or not result.events: + return "No recent admin actions found." + + # Use the custom serializer to handle datetime objects + return json.dumps([e.to_dict() for e in result.events], indent=2, default=json_serializer) + except Exception as e: + logger.exception(f"get_recent_actions failed (chat_id={chat_id})") + return log_and_format_error("get_recent_actions", e, chat_id=chat_id) + + +@mcp.tool() +async def get_pinned_messages(chat_id: int) -> str: + """ + Get all pinned messages in a chat. + """ + try: + entity = await client.get_entity(chat_id) + # Use correct filter based on Telethon version + try: + # Try newer Telethon approach + from telethon.tl.types import InputMessagesFilterPinned + messages = await client.get_messages(entity, filter=InputMessagesFilterPinned()) + except (ImportError, AttributeError): + # Fallback - try without filter and manually filter pinned + all_messages = await client.get_messages(entity, limit=50) + messages = [m for m in all_messages if getattr(m, 'pinned', False)] + + if not messages: + return "No pinned messages found in this chat." + + return "\n".join([f"ID: {m.id} | {m.date} | {m.message or '[Media/No text]'}" for m in messages]) + except Exception as e: + logger.exception(f"get_pinned_messages failed (chat_id={chat_id})") + return log_and_format_error("get_pinned_messages", e, chat_id=chat_id) if __name__ == "__main__": diff --git a/pyproject.toml b/pyproject.toml index 7701a20..ec26d7c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "telegram-mcp" -version = "1.5.0" +version = "2.0.0" description = "Telegram integration for Claude via the Model Context Protocol" readme = "README.md" authors = [ diff --git a/screenshots/1.png b/screenshots/1.png index 53c2bf2..9bc5232 100644 Binary files a/screenshots/1.png and b/screenshots/1.png differ diff --git a/screenshots/2.png b/screenshots/2.png new file mode 100644 index 0000000..ef81168 Binary files /dev/null and b/screenshots/2.png differ diff --git a/screenshots/3.png b/screenshots/3.png new file mode 100644 index 0000000..3dc3220 Binary files /dev/null and b/screenshots/3.png differ diff --git a/uv.lock b/uv.lock index 8301051..dcff6d5 100644 --- a/uv.lock +++ b/uv.lock @@ -1,10 +1,6 @@ version = 1 revision = 1 -<<<<<<< HEAD requires-python = ">=3.10" -======= -requires-python = ">=3.13" ->>>>>>> 34256575c5ccd1cfa8bf8cf577bfa4fd48429904 [[package]] name = "annotated-types" @@ -20,15 +16,10 @@ name = "anyio" version = "4.9.0" source = { registry = "https://pypi.org/simple" } dependencies = [ -<<<<<<< HEAD { name = "exceptiongroup", marker = "python_full_version < '3.11'" }, { name = "idna" }, { name = "sniffio" }, { name = "typing-extensions", marker = "python_full_version < '3.13'" }, -======= - { name = "idna" }, - { name = "sniffio" }, ->>>>>>> 34256575c5ccd1cfa8bf8cf577bfa4fd48429904 ] sdist = { url = "https://files.pythonhosted.org/packages/95/7d/4c1bd541d4dffa1b52bd83fb8527089e097a106fc90b467a7313b105f840/anyio-4.9.0.tar.gz", hash = "sha256:673c0c244e15788651a4ff38710fea9675823028a6f08a5eda409e0c9840a028", size = 190949 } wheels = [ @@ -77,7 +68,6 @@ wheels = [ ] [[package]] -<<<<<<< HEAD name = "exceptiongroup" version = "1.2.2" source = { registry = "https://pypi.org/simple" } @@ -87,8 +77,6 @@ wheels = [ ] [[package]] -======= ->>>>>>> 34256575c5ccd1cfa8bf8cf577bfa4fd48429904 name = "h11" version = "0.14.0" source = { registry = "https://pypi.org/simple" } @@ -237,7 +225,6 @@ dependencies = [ ] sdist = { url = "https://files.pythonhosted.org/packages/b9/05/91ce14dfd5a3a99555fce436318cc0fd1f08c4daa32b3248ad63669ea8b4/pydantic_core-2.33.0.tar.gz", hash = "sha256:40eb8af662ba409c3cbf4a8150ad32ae73514cd7cb1f1a2113af39763dd616b3", size = 434080 } wheels = [ -<<<<<<< HEAD { url = "https://files.pythonhosted.org/packages/29/43/0649ad07e66b36a3fb21442b425bd0348ac162c5e686b36471f363201535/pydantic_core-2.33.0-cp310-cp310-macosx_10_12_x86_64.whl", hash = "sha256:71dffba8fe9ddff628c68f3abd845e91b028361d43c5f8e7b3f8b91d7d85413e", size = 2042968 }, { url = "https://files.pythonhosted.org/packages/a0/a6/975fea4774a459e495cb4be288efd8b041ac756a0a763f0b976d0861334b/pydantic_core-2.33.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:abaeec1be6ed535a5d7ffc2e6c390083c425832b20efd621562fbb5bff6dc518", size = 1860347 }, { url = "https://files.pythonhosted.org/packages/aa/49/7858dadad305101a077ec4d0c606b6425a2b134ea8d858458a6d287fd871/pydantic_core-2.33.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:759871f00e26ad3709efc773ac37b4d571de065f9dfb1778012908bcc36b3a73", size = 1910060 }, @@ -279,8 +266,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/e8/27/be7571e215ac8d321712f2433c445b03dbcd645366a18f67b334df8912bc/pydantic_core-2.33.0-cp312-cp312-win32.whl", hash = "sha256:918f2013d7eadea1d88d1a35fd4a1e16aaf90343eb446f91cb091ce7f9b431a2", size = 1908353 }, { url = "https://files.pythonhosted.org/packages/be/3a/be78f28732f93128bd0e3944bdd4b3970b389a1fbd44907c97291c8dcdec/pydantic_core-2.33.0-cp312-cp312-win_amd64.whl", hash = "sha256:aec79acc183865bad120b0190afac467c20b15289050648b876b07777e67ea48", size = 1955956 }, { url = "https://files.pythonhosted.org/packages/21/26/b8911ac74faa994694b76ee6a22875cc7a4abea3c381fdba4edc6c6bef84/pydantic_core-2.33.0-cp312-cp312-win_arm64.whl", hash = "sha256:5461934e895968655225dfa8b3be79e7e927e95d4bd6c2d40edd2fa7052e71b6", size = 1903259 }, -======= ->>>>>>> 34256575c5ccd1cfa8bf8cf577bfa4fd48429904 { url = "https://files.pythonhosted.org/packages/79/20/de2ad03ce8f5b3accf2196ea9b44f31b0cd16ac6e8cfc6b21976ed45ec35/pydantic_core-2.33.0-cp313-cp313-macosx_10_12_x86_64.whl", hash = "sha256:f00e8b59e1fc8f09d05594aa7d2b726f1b277ca6155fc84c0396db1b373c4555", size = 2032214 }, { url = "https://files.pythonhosted.org/packages/f9/af/6817dfda9aac4958d8b516cbb94af507eb171c997ea66453d4d162ae8948/pydantic_core-2.33.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:1a73be93ecef45786d7d95b0c5e9b294faf35629d03d5b145b09b81258c7cd6d", size = 1852338 }, { url = "https://files.pythonhosted.org/packages/44/f3/49193a312d9c49314f2b953fb55740b7c530710977cabe7183b8ef111b7f/pydantic_core-2.33.0-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ff48a55be9da6930254565ff5238d71d5e9cd8c5487a191cb85df3bdb8c77365", size = 1896913 }, @@ -298,7 +283,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/52/54/295e38769133363d7ec4a5863a4d579f331728c71a6644ff1024ee529315/pydantic_core-2.33.0-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:7b79af799630af263eca9ec87db519426d8c9b3be35016eddad1832bac812d87", size = 1813331 }, { url = "https://files.pythonhosted.org/packages/4c/9c/0c8ea02db8d682aa1ef48938abae833c1d69bdfa6e5ec13b21734b01ae70/pydantic_core-2.33.0-cp313-cp313t-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:eabf946a4739b5237f4f56d77fa6668263bc466d06a8036c055587c130a46f7b", size = 1986653 }, { url = "https://files.pythonhosted.org/packages/8e/4f/3fb47d6cbc08c7e00f92300e64ba655428c05c56b8ab6723bd290bae6458/pydantic_core-2.33.0-cp313-cp313t-win_amd64.whl", hash = "sha256:8a1d581e8cdbb857b0e0e81df98603376c1a5c34dc5e54039dcc00f043df81e7", size = 1931234 }, -<<<<<<< HEAD { url = "https://files.pythonhosted.org/packages/44/77/85e173b715e1a277ce934f28d877d82492df13e564fa68a01c96f36a47ad/pydantic_core-2.33.0-pp310-pypy310_pp73-macosx_10_12_x86_64.whl", hash = "sha256:e2762c568596332fdab56b07060c8ab8362c56cf2a339ee54e491cd503612c50", size = 2040129 }, { url = "https://files.pythonhosted.org/packages/33/e7/33da5f8a94bbe2191cfcd15bd6d16ecd113e67da1b8c78d3cc3478112dab/pydantic_core-2.33.0-pp310-pypy310_pp73-macosx_11_0_arm64.whl", hash = "sha256:5bf637300ff35d4f59c006fff201c510b2b5e745b07125458a5389af3c0dff8c", size = 1872656 }, { url = "https://files.pythonhosted.org/packages/b4/7a/9600f222bea840e5b9ba1f17c0acc79b669b24542a78c42c6a10712c0aae/pydantic_core-2.33.0-pp310-pypy310_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:62c151ce3d59ed56ebd7ce9ce5986a409a85db697d25fc232f8e81f195aa39a1", size = 1903731 }, @@ -317,8 +301,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/23/8b/b6be91243da44a26558d9c3a9007043b3750334136c6550551e8092d6d96/pydantic_core-2.33.0-pp311-pypy311_pp73-musllinux_1_1_armv7l.whl", hash = "sha256:2c0afd34f928383e3fd25740f2050dbac9d077e7ba5adbaa2227f4d4f3c8da5c", size = 2251618 }, { url = "https://files.pythonhosted.org/packages/aa/c5/fbcf1977035b834f63eb542e74cd6c807177f383386175b468f0865bcac4/pydantic_core-2.33.0-pp311-pypy311_pp73-musllinux_1_1_x86_64.whl", hash = "sha256:7da333f21cd9df51d5731513a6d39319892947604924ddf2e24a4612975fb936", size = 2255374 }, { url = "https://files.pythonhosted.org/packages/2f/f8/66f328e411f1c9574b13c2c28ab01f308b53688bbbe6ca8fb981e6cabc42/pydantic_core-2.33.0-pp311-pypy311_pp73-win_amd64.whl", hash = "sha256:4b6d77c75a57f041c5ee915ff0b0bb58eabb78728b69ed967bc5b780e8f701b8", size = 2082099 }, -======= ->>>>>>> 34256575c5ccd1cfa8bf8cf577bfa4fd48429904 ] [[package]] @@ -359,10 +341,7 @@ source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "markdown-it-py" }, { name = "pygments" }, -<<<<<<< HEAD { name = "typing-extensions", marker = "python_full_version < '3.11'" }, -======= ->>>>>>> 34256575c5ccd1cfa8bf8cf577bfa4fd48429904 ] sdist = { url = "https://files.pythonhosted.org/packages/a1/53/830aa4c3066a8ab0ae9a9955976fb770fe9c6102117c8ec4ab3ea62d89e8/rich-14.0.0.tar.gz", hash = "sha256:82f1bc23a6a21ebca4ae0c45af9bdbc492ed20231dcb63f297d6d1021a9d5725", size = 224078 } wheels = [ @@ -426,13 +405,8 @@ wheels = [ [[package]] name = "telegram-mcp" -<<<<<<< HEAD -version = "1.5.0" +version = "2.0.0" source = { editable = "." } -======= -version = "2025.3.201549" -source = { virtual = "." } ->>>>>>> 34256575c5ccd1cfa8bf8cf577bfa4fd48429904 dependencies = [ { name = "dotenv" }, { name = "httpx" }, @@ -508,10 +482,7 @@ source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "click" }, { name = "h11" }, -<<<<<<< HEAD { name = "typing-extensions", marker = "python_full_version < '3.11'" }, -======= ->>>>>>> 34256575c5ccd1cfa8bf8cf577bfa4fd48429904 ] sdist = { url = "https://files.pythonhosted.org/packages/4b/4d/938bd85e5bf2edeec766267a5015ad969730bb91e31b44021dfe8b22df6c/uvicorn-0.34.0.tar.gz", hash = "sha256:404051050cd7e905de2c9a7e61790943440b3416f49cb409f965d9dcd0fa73e9", size = 76568 } wheels = [