diff --git a/main.py b/main.py index a608c04..862180d 100644 --- a/main.py +++ b/main.py @@ -32,6 +32,9 @@ from telethon.tl.types import ( InputPeerUser, InputPeerChat, InputPeerChannel, + DialogFilter, + DialogFilterDefault, + TextWithEntities, ) import re from functools import wraps @@ -121,6 +124,7 @@ class ErrorCategory(str, Enum): PROFILE = "PROFILE" AUTH = "AUTH" ADMIN = "ADMIN" + FOLDER = "FOLDER" def log_and_format_error( @@ -3592,6 +3596,510 @@ async def clear_draft(chat_id: Union[int, str]) -> str: return log_and_format_error("clear_draft", e, chat_id=chat_id) +# ============================================================================ +# FOLDER MANAGEMENT TOOLS +# ============================================================================ + + +@mcp.tool(annotations=ToolAnnotations(title="List Folders", openWorldHint=True, readOnlyHint=True)) +async def list_folders() -> str: + """ + Get all dialog folders (filters) with their IDs, names, and emoji. + Returns a list of folders that can be used with other folder tools. + """ + try: + result = await client(functions.messages.GetDialogFiltersRequest()) + + folders = [] + for f in result.filters: + # Skip system default folder + if isinstance(f, DialogFilterDefault): + continue + + if isinstance(f, DialogFilter): + # Handle title which can be str or TextWithEntities + title = f.title + if isinstance(title, TextWithEntities): + title = title.text + folder_data = { + "id": f.id, + "title": title, + "emoticon": getattr(f, "emoticon", None), + "contacts": getattr(f, "contacts", False), + "non_contacts": getattr(f, "non_contacts", False), + "groups": getattr(f, "groups", False), + "broadcasts": getattr(f, "broadcasts", False), + "bots": getattr(f, "bots", False), + "exclude_muted": getattr(f, "exclude_muted", False), + "exclude_read": getattr(f, "exclude_read", False), + "exclude_archived": getattr(f, "exclude_archived", False), + "included_peers_count": len(getattr(f, "include_peers", [])), + "excluded_peers_count": len(getattr(f, "exclude_peers", [])), + "pinned_peers_count": len(getattr(f, "pinned_peers", [])), + } + folders.append(folder_data) + + if not folders: + return "No folders found. Create one with create_folder tool." + + return json.dumps( + {"folders": folders, "count": len(folders)}, indent=2, default=json_serializer + ) + except Exception as e: + logger.exception("list_folders failed") + return log_and_format_error("list_folders", e, ErrorCategory.FOLDER) + + +@mcp.tool(annotations=ToolAnnotations(title="Get Folder", openWorldHint=True, readOnlyHint=True)) +async def get_folder(folder_id: int) -> str: + """ + Get detailed information about a specific folder including all included chats. + + Args: + folder_id: The folder ID (get from list_folders) + """ + try: + result = await client(functions.messages.GetDialogFiltersRequest()) + + target_folder = None + for f in result.filters: + if isinstance(f, DialogFilter) and f.id == folder_id: + target_folder = f + break + + if not target_folder: + return f"Folder with ID {folder_id} not found. Use list_folders to see available folders." + + # Resolve included peers to readable names + included_chats = [] + for peer in getattr(target_folder, "include_peers", []): + try: + entity = await client.get_entity(peer) + chat_info = { + "id": entity.id, + "name": getattr(entity, "title", None) or getattr(entity, "first_name", "Unknown"), + "type": type(entity).__name__, + } + if hasattr(entity, "username") and entity.username: + chat_info["username"] = entity.username + included_chats.append(chat_info) + except Exception: + included_chats.append({"id": str(peer), "name": "Unknown", "type": "Unknown"}) + + # Resolve excluded peers + excluded_chats = [] + for peer in getattr(target_folder, "exclude_peers", []): + try: + entity = await client.get_entity(peer) + chat_info = { + "id": entity.id, + "name": getattr(entity, "title", None) or getattr(entity, "first_name", "Unknown"), + "type": type(entity).__name__, + } + excluded_chats.append(chat_info) + except Exception: + excluded_chats.append({"id": str(peer), "name": "Unknown", "type": "Unknown"}) + + # Resolve pinned peers + pinned_chats = [] + for peer in getattr(target_folder, "pinned_peers", []): + try: + entity = await client.get_entity(peer) + chat_info = { + "id": entity.id, + "name": getattr(entity, "title", None) or getattr(entity, "first_name", "Unknown"), + "type": type(entity).__name__, + } + pinned_chats.append(chat_info) + except Exception: + pinned_chats.append({"id": str(peer), "name": "Unknown", "type": "Unknown"}) + + # Handle title which can be str or TextWithEntities + title = target_folder.title + if isinstance(title, TextWithEntities): + title = title.text + + folder_data = { + "id": target_folder.id, + "title": title, + "emoticon": getattr(target_folder, "emoticon", None), + "filters": { + "contacts": getattr(target_folder, "contacts", False), + "non_contacts": getattr(target_folder, "non_contacts", False), + "groups": getattr(target_folder, "groups", False), + "broadcasts": getattr(target_folder, "broadcasts", False), + "bots": getattr(target_folder, "bots", False), + "exclude_muted": getattr(target_folder, "exclude_muted", False), + "exclude_read": getattr(target_folder, "exclude_read", False), + "exclude_archived": getattr(target_folder, "exclude_archived", False), + }, + "included_chats": included_chats, + "excluded_chats": excluded_chats, + "pinned_chats": pinned_chats, + } + + return json.dumps(folder_data, indent=2, default=json_serializer) + except Exception as e: + logger.exception(f"get_folder failed (folder_id={folder_id})") + return log_and_format_error("get_folder", e, ErrorCategory.FOLDER, folder_id=folder_id) + + +@mcp.tool( + annotations=ToolAnnotations( + title="Create Folder", openWorldHint=True, destructiveHint=True, idempotentHint=False + ) +) +async def create_folder( + title: str, + emoticon: Optional[str] = None, + chat_ids: Optional[List[Union[int, str]]] = None, + contacts: bool = False, + non_contacts: bool = False, + groups: bool = False, + broadcasts: bool = False, + bots: bool = False, + exclude_muted: bool = False, + exclude_read: bool = False, + exclude_archived: bool = True, +) -> str: + """ + Create a new dialog folder. + + Args: + title: Folder name (required) + emoticon: Folder emoji (optional, e.g., "📁", "🏠", "💼") + chat_ids: List of chat IDs or usernames to include (optional) + contacts: Include all contacts + non_contacts: Include all non-contacts + groups: Include all groups + broadcasts: Include all channels + bots: Include all bots + exclude_muted: Exclude muted chats + exclude_read: Exclude read chats + exclude_archived: Exclude archived chats (default True) + """ + try: + # Get existing folders to check count and find next ID + result = await client(functions.messages.GetDialogFiltersRequest()) + + existing_ids = set() + folder_count = 0 + for f in result.filters: + if isinstance(f, DialogFilter): + existing_ids.add(f.id) + folder_count += 1 + + # Telegram limit: max 10 custom folders + if folder_count >= 10: + return "Cannot create folder: Telegram limit is 10 folders. Delete one first." + + # Find next available ID (IDs 0 and 1 are reserved for system) + new_id = 2 + while new_id in existing_ids: + new_id += 1 + + # Resolve chat_ids to input peers + include_peers = [] + if chat_ids: + for chat_id in chat_ids: + try: + peer = await client.get_input_entity(chat_id) + include_peers.append(peer) + except Exception as e: + return f"Failed to resolve chat '{chat_id}': {str(e)}" + + # Create the folder (title must be TextWithEntities) + title_obj = TextWithEntities(text=title, entities=[]) + new_filter = DialogFilter( + id=new_id, + title=title_obj, + emoticon=emoticon, + pinned_peers=[], + include_peers=include_peers, + exclude_peers=[], + contacts=contacts, + non_contacts=non_contacts, + groups=groups, + broadcasts=broadcasts, + bots=bots, + exclude_muted=exclude_muted, + exclude_read=exclude_read, + exclude_archived=exclude_archived, + ) + + await client(functions.messages.UpdateDialogFilterRequest(id=new_id, filter=new_filter)) + + return json.dumps( + { + "success": True, + "folder_id": new_id, + "title": title, + "emoticon": emoticon, + "included_chats_count": len(include_peers), + }, + indent=2, + ) + except Exception as e: + logger.exception(f"create_folder failed (title={title})") + return log_and_format_error("create_folder", e, ErrorCategory.FOLDER, title=title) + + +@mcp.tool( + annotations=ToolAnnotations( + title="Add Chat to Folder", openWorldHint=True, destructiveHint=True, idempotentHint=True + ) +) +@validate_id("chat_id") +async def add_chat_to_folder( + folder_id: int, chat_id: Union[int, str], pinned: bool = False +) -> str: + """ + Add a chat to an existing folder. + + Args: + folder_id: The folder ID (get from list_folders) + chat_id: Chat ID or username to add + pinned: Pin the chat in this folder (default False) + """ + try: + # Get the folder + result = await client(functions.messages.GetDialogFiltersRequest()) + + target_folder = None + for f in result.filters: + if isinstance(f, DialogFilter) and f.id == folder_id: + target_folder = f + break + + if not target_folder: + return f"Folder with ID {folder_id} not found. Use list_folders to see available folders." + + # Resolve chat to input peer + try: + peer = await client.get_input_entity(chat_id) + except Exception as e: + return f"Failed to resolve chat '{chat_id}': {str(e)}" + + # Check if already included (idempotent) + include_peers = list(getattr(target_folder, "include_peers", [])) + pinned_peers = list(getattr(target_folder, "pinned_peers", [])) + + # Get peer ID for comparison + peer_id = utils.get_peer_id(peer) + already_included = any(utils.get_peer_id(p) == peer_id for p in include_peers) + already_pinned = any(utils.get_peer_id(p) == peer_id for p in pinned_peers) + + if already_included and (not pinned or already_pinned): + return f"Chat {chat_id} is already in folder {folder_id}." + + # Add to appropriate list + if not already_included: + include_peers.append(peer) + if pinned and not already_pinned: + pinned_peers.append(peer) + + # Update the folder (keep all original attributes) + updated_filter = DialogFilter( + id=target_folder.id, + title=target_folder.title, + emoticon=getattr(target_folder, "emoticon", None), + pinned_peers=pinned_peers, + include_peers=include_peers, + exclude_peers=list(getattr(target_folder, "exclude_peers", [])), + contacts=getattr(target_folder, "contacts", False), + non_contacts=getattr(target_folder, "non_contacts", False), + groups=getattr(target_folder, "groups", False), + broadcasts=getattr(target_folder, "broadcasts", False), + bots=getattr(target_folder, "bots", False), + exclude_muted=getattr(target_folder, "exclude_muted", False), + exclude_read=getattr(target_folder, "exclude_read", False), + exclude_archived=getattr(target_folder, "exclude_archived", False), + title_noanimate=getattr(target_folder, "title_noanimate", None), + color=getattr(target_folder, "color", None), + ) + + await client( + functions.messages.UpdateDialogFilterRequest(id=folder_id, filter=updated_filter) + ) + + return f"Chat {chat_id} added to folder {folder_id}" + (" (pinned)" if pinned else "") + "." + except Exception as e: + logger.exception(f"add_chat_to_folder failed (folder_id={folder_id}, chat_id={chat_id})") + return log_and_format_error( + "add_chat_to_folder", e, ErrorCategory.FOLDER, folder_id=folder_id, chat_id=chat_id + ) + + +@mcp.tool( + annotations=ToolAnnotations( + title="Remove Chat from Folder", + openWorldHint=True, + destructiveHint=True, + idempotentHint=True, + ) +) +@validate_id("chat_id") +async def remove_chat_from_folder(folder_id: int, chat_id: Union[int, str]) -> str: + """ + Remove a chat from a folder. + + Args: + folder_id: The folder ID (get from list_folders) + chat_id: Chat ID or username to remove + """ + try: + # Get the folder + result = await client(functions.messages.GetDialogFiltersRequest()) + + target_folder = None + for f in result.filters: + if isinstance(f, DialogFilter) and f.id == folder_id: + target_folder = f + break + + if not target_folder: + return f"Folder with ID {folder_id} not found. Use list_folders to see available folders." + + # Resolve chat to get peer ID + try: + peer = await client.get_input_entity(chat_id) + peer_id = utils.get_peer_id(peer) + except Exception as e: + return f"Failed to resolve chat '{chat_id}': {str(e)}" + + # Filter out the peer from both include and pinned lists + include_peers = [ + p for p in getattr(target_folder, "include_peers", []) if utils.get_peer_id(p) != peer_id + ] + pinned_peers = [ + p for p in getattr(target_folder, "pinned_peers", []) if utils.get_peer_id(p) != peer_id + ] + + original_include_count = len(getattr(target_folder, "include_peers", [])) + original_pinned_count = len(getattr(target_folder, "pinned_peers", [])) + + # Check if anything was removed (idempotent) + if len(include_peers) == original_include_count and len(pinned_peers) == original_pinned_count: + return f"Chat {chat_id} was not in folder {folder_id}." + + # Update the folder (keep all original attributes) + updated_filter = DialogFilter( + id=target_folder.id, + title=target_folder.title, + emoticon=getattr(target_folder, "emoticon", None), + pinned_peers=pinned_peers, + include_peers=include_peers, + exclude_peers=list(getattr(target_folder, "exclude_peers", [])), + contacts=getattr(target_folder, "contacts", False), + non_contacts=getattr(target_folder, "non_contacts", False), + groups=getattr(target_folder, "groups", False), + broadcasts=getattr(target_folder, "broadcasts", False), + bots=getattr(target_folder, "bots", False), + exclude_muted=getattr(target_folder, "exclude_muted", False), + exclude_read=getattr(target_folder, "exclude_read", False), + exclude_archived=getattr(target_folder, "exclude_archived", False), + title_noanimate=getattr(target_folder, "title_noanimate", None), + color=getattr(target_folder, "color", None), + ) + + await client( + functions.messages.UpdateDialogFilterRequest(id=folder_id, filter=updated_filter) + ) + + return f"Chat {chat_id} removed from folder {folder_id}." + except Exception as e: + logger.exception( + f"remove_chat_from_folder failed (folder_id={folder_id}, chat_id={chat_id})" + ) + return log_and_format_error( + "remove_chat_from_folder", e, ErrorCategory.FOLDER, folder_id=folder_id, chat_id=chat_id + ) + + +@mcp.tool( + annotations=ToolAnnotations( + title="Delete Folder", openWorldHint=True, destructiveHint=True, idempotentHint=True + ) +) +async def delete_folder(folder_id: int) -> str: + """ + Delete a folder. Chats in the folder are preserved, only the folder is removed. + + Args: + folder_id: The folder ID to delete (get from list_folders) + """ + try: + # System folders (id < 2) cannot be deleted + if folder_id < 2: + return f"Cannot delete system folder (ID {folder_id}). Only custom folders can be deleted." + + # Check if folder exists + result = await client(functions.messages.GetDialogFiltersRequest()) + + folder_exists = False + folder_title = None + for f in result.filters: + if isinstance(f, DialogFilter) and f.id == folder_id: + folder_exists = True + # Handle title which can be str or TextWithEntities + title = f.title + if isinstance(title, TextWithEntities): + title = title.text + folder_title = title + break + + if not folder_exists: + return f"Folder with ID {folder_id} not found (may already be deleted)." + + # Delete by passing None as filter + await client(functions.messages.UpdateDialogFilterRequest(id=folder_id, filter=None)) + + return f"Folder '{folder_title}' (ID {folder_id}) deleted. Chats are preserved." + except Exception as e: + logger.exception(f"delete_folder failed (folder_id={folder_id})") + return log_and_format_error("delete_folder", e, ErrorCategory.FOLDER, folder_id=folder_id) + + +@mcp.tool( + annotations=ToolAnnotations( + title="Reorder Folders", openWorldHint=True, destructiveHint=True, idempotentHint=True + ) +) +async def reorder_folders(folder_ids: List[int]) -> str: + """ + Change the order of folders in the folder list. + + Args: + folder_ids: List of folder IDs in the desired order + """ + try: + # Get existing folders to validate + result = await client(functions.messages.GetDialogFiltersRequest()) + + existing_ids = set() + for f in result.filters: + if isinstance(f, DialogFilter): + existing_ids.add(f.id) + + # Validate all provided IDs exist + for fid in folder_ids: + if fid not in existing_ids: + return f"Folder ID {fid} not found. Use list_folders to see available folders." + + # Validate all existing folders are included + if set(folder_ids) != existing_ids: + missing = existing_ids - set(folder_ids) + return f"All folder IDs must be included. Missing: {missing}" + + # Reorder + await client(functions.messages.UpdateDialogFiltersOrderRequest(order=folder_ids)) + + return f"Folders reordered: {folder_ids}" + except Exception as e: + logger.exception(f"reorder_folders failed (folder_ids={folder_ids})") + return log_and_format_error("reorder_folders", e, ErrorCategory.FOLDER, folder_ids=folder_ids) + + async def _main() -> None: try: # Start the Telethon client non-interactively