feat(folders): add 7 tools for Telegram folder management

Add new MCP tools to manage Telegram dialog folders (filters):
- list_folders: get all folders with IDs, names, emoji
- get_folder: get folder details including all included chats
- create_folder: create new folder with filters and chat list
- add_chat_to_folder: add chat to existing folder (idempotent)
- remove_chat_from_folder: remove chat from folder (idempotent)
- delete_folder: delete folder (chats preserved)
- reorder_folders: change folder order

Handles Telegram API specifics:
- TextWithEntities for folder titles
- Max 10 folders limit
- System folder protection (ID < 2)
- title_noanimate and color attributes

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
smixs 2026-01-27 13:55:55 +05:00
parent d65fb8b74c
commit b733d32467

508
main.py
View file

@ -32,6 +32,9 @@ from telethon.tl.types import (
InputPeerUser,
InputPeerChat,
InputPeerChannel,
DialogFilter,
DialogFilterDefault,
TextWithEntities,
)
import re
from functools import wraps
@ -121,6 +124,7 @@ class ErrorCategory(str, Enum):
PROFILE = "PROFILE"
AUTH = "AUTH"
ADMIN = "ADMIN"
FOLDER = "FOLDER"
def log_and_format_error(
@ -3592,6 +3596,510 @@ async def clear_draft(chat_id: Union[int, str]) -> str:
return log_and_format_error("clear_draft", e, chat_id=chat_id)
# ============================================================================
# FOLDER MANAGEMENT TOOLS
# ============================================================================
@mcp.tool(annotations=ToolAnnotations(title="List Folders", openWorldHint=True, readOnlyHint=True))
async def list_folders() -> str:
"""
Get all dialog folders (filters) with their IDs, names, and emoji.
Returns a list of folders that can be used with other folder tools.
"""
try:
result = await client(functions.messages.GetDialogFiltersRequest())
folders = []
for f in result.filters:
# Skip system default folder
if isinstance(f, DialogFilterDefault):
continue
if isinstance(f, DialogFilter):
# Handle title which can be str or TextWithEntities
title = f.title
if isinstance(title, TextWithEntities):
title = title.text
folder_data = {
"id": f.id,
"title": title,
"emoticon": getattr(f, "emoticon", None),
"contacts": getattr(f, "contacts", False),
"non_contacts": getattr(f, "non_contacts", False),
"groups": getattr(f, "groups", False),
"broadcasts": getattr(f, "broadcasts", False),
"bots": getattr(f, "bots", False),
"exclude_muted": getattr(f, "exclude_muted", False),
"exclude_read": getattr(f, "exclude_read", False),
"exclude_archived": getattr(f, "exclude_archived", False),
"included_peers_count": len(getattr(f, "include_peers", [])),
"excluded_peers_count": len(getattr(f, "exclude_peers", [])),
"pinned_peers_count": len(getattr(f, "pinned_peers", [])),
}
folders.append(folder_data)
if not folders:
return "No folders found. Create one with create_folder tool."
return json.dumps(
{"folders": folders, "count": len(folders)}, indent=2, default=json_serializer
)
except Exception as e:
logger.exception("list_folders failed")
return log_and_format_error("list_folders", e, ErrorCategory.FOLDER)
@mcp.tool(annotations=ToolAnnotations(title="Get Folder", openWorldHint=True, readOnlyHint=True))
async def get_folder(folder_id: int) -> str:
"""
Get detailed information about a specific folder including all included chats.
Args:
folder_id: The folder ID (get from list_folders)
"""
try:
result = await client(functions.messages.GetDialogFiltersRequest())
target_folder = None
for f in result.filters:
if isinstance(f, DialogFilter) and f.id == folder_id:
target_folder = f
break
if not target_folder:
return f"Folder with ID {folder_id} not found. Use list_folders to see available folders."
# Resolve included peers to readable names
included_chats = []
for peer in getattr(target_folder, "include_peers", []):
try:
entity = await client.get_entity(peer)
chat_info = {
"id": entity.id,
"name": getattr(entity, "title", None) or getattr(entity, "first_name", "Unknown"),
"type": type(entity).__name__,
}
if hasattr(entity, "username") and entity.username:
chat_info["username"] = entity.username
included_chats.append(chat_info)
except Exception:
included_chats.append({"id": str(peer), "name": "Unknown", "type": "Unknown"})
# Resolve excluded peers
excluded_chats = []
for peer in getattr(target_folder, "exclude_peers", []):
try:
entity = await client.get_entity(peer)
chat_info = {
"id": entity.id,
"name": getattr(entity, "title", None) or getattr(entity, "first_name", "Unknown"),
"type": type(entity).__name__,
}
excluded_chats.append(chat_info)
except Exception:
excluded_chats.append({"id": str(peer), "name": "Unknown", "type": "Unknown"})
# Resolve pinned peers
pinned_chats = []
for peer in getattr(target_folder, "pinned_peers", []):
try:
entity = await client.get_entity(peer)
chat_info = {
"id": entity.id,
"name": getattr(entity, "title", None) or getattr(entity, "first_name", "Unknown"),
"type": type(entity).__name__,
}
pinned_chats.append(chat_info)
except Exception:
pinned_chats.append({"id": str(peer), "name": "Unknown", "type": "Unknown"})
# Handle title which can be str or TextWithEntities
title = target_folder.title
if isinstance(title, TextWithEntities):
title = title.text
folder_data = {
"id": target_folder.id,
"title": title,
"emoticon": getattr(target_folder, "emoticon", None),
"filters": {
"contacts": getattr(target_folder, "contacts", False),
"non_contacts": getattr(target_folder, "non_contacts", False),
"groups": getattr(target_folder, "groups", False),
"broadcasts": getattr(target_folder, "broadcasts", False),
"bots": getattr(target_folder, "bots", False),
"exclude_muted": getattr(target_folder, "exclude_muted", False),
"exclude_read": getattr(target_folder, "exclude_read", False),
"exclude_archived": getattr(target_folder, "exclude_archived", False),
},
"included_chats": included_chats,
"excluded_chats": excluded_chats,
"pinned_chats": pinned_chats,
}
return json.dumps(folder_data, indent=2, default=json_serializer)
except Exception as e:
logger.exception(f"get_folder failed (folder_id={folder_id})")
return log_and_format_error("get_folder", e, ErrorCategory.FOLDER, folder_id=folder_id)
@mcp.tool(
annotations=ToolAnnotations(
title="Create Folder", openWorldHint=True, destructiveHint=True, idempotentHint=False
)
)
async def create_folder(
title: str,
emoticon: Optional[str] = None,
chat_ids: Optional[List[Union[int, str]]] = None,
contacts: bool = False,
non_contacts: bool = False,
groups: bool = False,
broadcasts: bool = False,
bots: bool = False,
exclude_muted: bool = False,
exclude_read: bool = False,
exclude_archived: bool = True,
) -> str:
"""
Create a new dialog folder.
Args:
title: Folder name (required)
emoticon: Folder emoji (optional, e.g., "📁", "🏠", "💼")
chat_ids: List of chat IDs or usernames to include (optional)
contacts: Include all contacts
non_contacts: Include all non-contacts
groups: Include all groups
broadcasts: Include all channels
bots: Include all bots
exclude_muted: Exclude muted chats
exclude_read: Exclude read chats
exclude_archived: Exclude archived chats (default True)
"""
try:
# Get existing folders to check count and find next ID
result = await client(functions.messages.GetDialogFiltersRequest())
existing_ids = set()
folder_count = 0
for f in result.filters:
if isinstance(f, DialogFilter):
existing_ids.add(f.id)
folder_count += 1
# Telegram limit: max 10 custom folders
if folder_count >= 10:
return "Cannot create folder: Telegram limit is 10 folders. Delete one first."
# Find next available ID (IDs 0 and 1 are reserved for system)
new_id = 2
while new_id in existing_ids:
new_id += 1
# Resolve chat_ids to input peers
include_peers = []
if chat_ids:
for chat_id in chat_ids:
try:
peer = await client.get_input_entity(chat_id)
include_peers.append(peer)
except Exception as e:
return f"Failed to resolve chat '{chat_id}': {str(e)}"
# Create the folder (title must be TextWithEntities)
title_obj = TextWithEntities(text=title, entities=[])
new_filter = DialogFilter(
id=new_id,
title=title_obj,
emoticon=emoticon,
pinned_peers=[],
include_peers=include_peers,
exclude_peers=[],
contacts=contacts,
non_contacts=non_contacts,
groups=groups,
broadcasts=broadcasts,
bots=bots,
exclude_muted=exclude_muted,
exclude_read=exclude_read,
exclude_archived=exclude_archived,
)
await client(functions.messages.UpdateDialogFilterRequest(id=new_id, filter=new_filter))
return json.dumps(
{
"success": True,
"folder_id": new_id,
"title": title,
"emoticon": emoticon,
"included_chats_count": len(include_peers),
},
indent=2,
)
except Exception as e:
logger.exception(f"create_folder failed (title={title})")
return log_and_format_error("create_folder", e, ErrorCategory.FOLDER, title=title)
@mcp.tool(
annotations=ToolAnnotations(
title="Add Chat to Folder", openWorldHint=True, destructiveHint=True, idempotentHint=True
)
)
@validate_id("chat_id")
async def add_chat_to_folder(
folder_id: int, chat_id: Union[int, str], pinned: bool = False
) -> str:
"""
Add a chat to an existing folder.
Args:
folder_id: The folder ID (get from list_folders)
chat_id: Chat ID or username to add
pinned: Pin the chat in this folder (default False)
"""
try:
# Get the folder
result = await client(functions.messages.GetDialogFiltersRequest())
target_folder = None
for f in result.filters:
if isinstance(f, DialogFilter) and f.id == folder_id:
target_folder = f
break
if not target_folder:
return f"Folder with ID {folder_id} not found. Use list_folders to see available folders."
# Resolve chat to input peer
try:
peer = await client.get_input_entity(chat_id)
except Exception as e:
return f"Failed to resolve chat '{chat_id}': {str(e)}"
# Check if already included (idempotent)
include_peers = list(getattr(target_folder, "include_peers", []))
pinned_peers = list(getattr(target_folder, "pinned_peers", []))
# Get peer ID for comparison
peer_id = utils.get_peer_id(peer)
already_included = any(utils.get_peer_id(p) == peer_id for p in include_peers)
already_pinned = any(utils.get_peer_id(p) == peer_id for p in pinned_peers)
if already_included and (not pinned or already_pinned):
return f"Chat {chat_id} is already in folder {folder_id}."
# Add to appropriate list
if not already_included:
include_peers.append(peer)
if pinned and not already_pinned:
pinned_peers.append(peer)
# Update the folder (keep all original attributes)
updated_filter = DialogFilter(
id=target_folder.id,
title=target_folder.title,
emoticon=getattr(target_folder, "emoticon", None),
pinned_peers=pinned_peers,
include_peers=include_peers,
exclude_peers=list(getattr(target_folder, "exclude_peers", [])),
contacts=getattr(target_folder, "contacts", False),
non_contacts=getattr(target_folder, "non_contacts", False),
groups=getattr(target_folder, "groups", False),
broadcasts=getattr(target_folder, "broadcasts", False),
bots=getattr(target_folder, "bots", False),
exclude_muted=getattr(target_folder, "exclude_muted", False),
exclude_read=getattr(target_folder, "exclude_read", False),
exclude_archived=getattr(target_folder, "exclude_archived", False),
title_noanimate=getattr(target_folder, "title_noanimate", None),
color=getattr(target_folder, "color", None),
)
await client(
functions.messages.UpdateDialogFilterRequest(id=folder_id, filter=updated_filter)
)
return f"Chat {chat_id} added to folder {folder_id}" + (" (pinned)" if pinned else "") + "."
except Exception as e:
logger.exception(f"add_chat_to_folder failed (folder_id={folder_id}, chat_id={chat_id})")
return log_and_format_error(
"add_chat_to_folder", e, ErrorCategory.FOLDER, folder_id=folder_id, chat_id=chat_id
)
@mcp.tool(
annotations=ToolAnnotations(
title="Remove Chat from Folder",
openWorldHint=True,
destructiveHint=True,
idempotentHint=True,
)
)
@validate_id("chat_id")
async def remove_chat_from_folder(folder_id: int, chat_id: Union[int, str]) -> str:
"""
Remove a chat from a folder.
Args:
folder_id: The folder ID (get from list_folders)
chat_id: Chat ID or username to remove
"""
try:
# Get the folder
result = await client(functions.messages.GetDialogFiltersRequest())
target_folder = None
for f in result.filters:
if isinstance(f, DialogFilter) and f.id == folder_id:
target_folder = f
break
if not target_folder:
return f"Folder with ID {folder_id} not found. Use list_folders to see available folders."
# Resolve chat to get peer ID
try:
peer = await client.get_input_entity(chat_id)
peer_id = utils.get_peer_id(peer)
except Exception as e:
return f"Failed to resolve chat '{chat_id}': {str(e)}"
# Filter out the peer from both include and pinned lists
include_peers = [
p for p in getattr(target_folder, "include_peers", []) if utils.get_peer_id(p) != peer_id
]
pinned_peers = [
p for p in getattr(target_folder, "pinned_peers", []) if utils.get_peer_id(p) != peer_id
]
original_include_count = len(getattr(target_folder, "include_peers", []))
original_pinned_count = len(getattr(target_folder, "pinned_peers", []))
# Check if anything was removed (idempotent)
if len(include_peers) == original_include_count and len(pinned_peers) == original_pinned_count:
return f"Chat {chat_id} was not in folder {folder_id}."
# Update the folder (keep all original attributes)
updated_filter = DialogFilter(
id=target_folder.id,
title=target_folder.title,
emoticon=getattr(target_folder, "emoticon", None),
pinned_peers=pinned_peers,
include_peers=include_peers,
exclude_peers=list(getattr(target_folder, "exclude_peers", [])),
contacts=getattr(target_folder, "contacts", False),
non_contacts=getattr(target_folder, "non_contacts", False),
groups=getattr(target_folder, "groups", False),
broadcasts=getattr(target_folder, "broadcasts", False),
bots=getattr(target_folder, "bots", False),
exclude_muted=getattr(target_folder, "exclude_muted", False),
exclude_read=getattr(target_folder, "exclude_read", False),
exclude_archived=getattr(target_folder, "exclude_archived", False),
title_noanimate=getattr(target_folder, "title_noanimate", None),
color=getattr(target_folder, "color", None),
)
await client(
functions.messages.UpdateDialogFilterRequest(id=folder_id, filter=updated_filter)
)
return f"Chat {chat_id} removed from folder {folder_id}."
except Exception as e:
logger.exception(
f"remove_chat_from_folder failed (folder_id={folder_id}, chat_id={chat_id})"
)
return log_and_format_error(
"remove_chat_from_folder", e, ErrorCategory.FOLDER, folder_id=folder_id, chat_id=chat_id
)
@mcp.tool(
annotations=ToolAnnotations(
title="Delete Folder", openWorldHint=True, destructiveHint=True, idempotentHint=True
)
)
async def delete_folder(folder_id: int) -> str:
"""
Delete a folder. Chats in the folder are preserved, only the folder is removed.
Args:
folder_id: The folder ID to delete (get from list_folders)
"""
try:
# System folders (id < 2) cannot be deleted
if folder_id < 2:
return f"Cannot delete system folder (ID {folder_id}). Only custom folders can be deleted."
# Check if folder exists
result = await client(functions.messages.GetDialogFiltersRequest())
folder_exists = False
folder_title = None
for f in result.filters:
if isinstance(f, DialogFilter) and f.id == folder_id:
folder_exists = True
# Handle title which can be str or TextWithEntities
title = f.title
if isinstance(title, TextWithEntities):
title = title.text
folder_title = title
break
if not folder_exists:
return f"Folder with ID {folder_id} not found (may already be deleted)."
# Delete by passing None as filter
await client(functions.messages.UpdateDialogFilterRequest(id=folder_id, filter=None))
return f"Folder '{folder_title}' (ID {folder_id}) deleted. Chats are preserved."
except Exception as e:
logger.exception(f"delete_folder failed (folder_id={folder_id})")
return log_and_format_error("delete_folder", e, ErrorCategory.FOLDER, folder_id=folder_id)
@mcp.tool(
annotations=ToolAnnotations(
title="Reorder Folders", openWorldHint=True, destructiveHint=True, idempotentHint=True
)
)
async def reorder_folders(folder_ids: List[int]) -> str:
"""
Change the order of folders in the folder list.
Args:
folder_ids: List of folder IDs in the desired order
"""
try:
# Get existing folders to validate
result = await client(functions.messages.GetDialogFiltersRequest())
existing_ids = set()
for f in result.filters:
if isinstance(f, DialogFilter):
existing_ids.add(f.id)
# Validate all provided IDs exist
for fid in folder_ids:
if fid not in existing_ids:
return f"Folder ID {fid} not found. Use list_folders to see available folders."
# Validate all existing folders are included
if set(folder_ids) != existing_ids:
missing = existing_ids - set(folder_ids)
return f"All folder IDs must be included. Missing: {missing}"
# Reorder
await client(functions.messages.UpdateDialogFiltersOrderRequest(order=folder_ids))
return f"Folders reordered: {folder_ids}"
except Exception as e:
logger.exception(f"reorder_folders failed (folder_ids={folder_ids})")
return log_and_format_error("reorder_folders", e, ErrorCategory.FOLDER, folder_ids=folder_ids)
async def _main() -> None:
try:
# Start the Telethon client non-interactively