Merge branch 'main' into fix/entity-resolution-stringsession

This commit is contained in:
Eugene Evstafev 2026-03-17 09:33:58 +00:00 committed by GitHub
commit dad686a2d5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 1055 additions and 154 deletions

View file

@ -101,16 +101,24 @@ This MCP server exposes a huge suite of Telegram tools. **Every major Telegram/T
### User & Profile
- **get_me()**: Get your user info
- **update_profile(first_name, last_name, about)**: Update your profile
- **set_profile_photo(file_path)**: Set a profile photo from an allowed root path
- **delete_profile_photo()**: Remove your profile photo
- **get_user_photos(user_id, limit)**: Get a user's profile photos
- **get_user_status(user_id)**: Get a user's online status
### Media
- **get_media_info(chat_id, message_id)**: Get info about media in a message
- **send_file(chat_id, file_path, caption)**: Send a local file from allowed roots
- **download_media(chat_id, message_id, file_path)**: Save message media under allowed roots
- **upload_file(file_path)**: Upload a local file and return upload metadata
- **send_voice(chat_id, file_path)**: Send `.ogg/.opus` voice note from allowed roots
- **send_sticker(chat_id, file_path)**: Send `.webp` sticker from allowed roots
- **edit_chat_photo(chat_id, file_path)**: Update chat photo from allowed roots
### Search & Discovery
- **search_public_chats(query)**: Search public chats/channels/bots
- **search_public_chats(query, limit)**: Search public chats/channels/bots with a configurable result limit
- **search_messages(chat_id, query, limit)**: Search messages in a chat
- **search_global(query, page, page_size)**: Search messages globally with pagination
- **resolve_username(username)**: Resolve a username to ID
### Stickers, GIFs, Bots
@ -142,11 +150,28 @@ To improve robustness, all functions accepting `chat_id` or `user_id` parameters
The server will automatically validate the input and convert it to the correct format before making a request to Telegram. If the input is invalid, a clear error message will be returned.
## Removed Functionality
## File-path Tools Security Model
Please note that tools requiring direct file path access on the server (`send_file`, `download_media`, `set_profile_photo`, `edit_chat_photo`, `send_voice`, `send_sticker`, `upload_file`) have been removed from `main.py`. This is due to limitations in the current MCP environment regarding handling file attachments and local file system paths.
File-path tools are available, but **disabled by default** until allowed roots are configured.
Additionally, GIF-related tools (`get_gif_search`, `get_saved_gifs`, `send_gif`) have been removed due to ongoing issues with reliability in the Telethon library or Telegram API interactions.
Supported file-path tools:
- `send_file`, `download_media`, `set_profile_photo`, `edit_chat_photo`, `send_voice`, `send_sticker`, `upload_file`
Security semantics (aligned with MCP filesystem server):
- Server-side allowlist via CLI positional arguments (fallback when Roots API is unsupported).
- Client-provided MCP Roots replace the server allowlist when available.
- If the client returns an empty Roots list, file-path tools are disabled (deny-all).
- All paths are resolved via realpath and must stay inside an allowed root.
- Traversal/glob-like patterns are rejected (`..`, `*`, `?`, `~`, etc.).
- Relative paths resolve against the first allowed root.
- Write tools default to `<first_root>/downloads/` when `file_path` is omitted.
Example server launch with allowlisted roots:
```bash
uv --directory /full/path/to/telegram-mcp run main.py /data/telegram /tmp/telegram-mcp
```
GIF tools are currently limited: `get_gif_search` and `send_gif` are available, while `get_saved_gifs` is not implemented due to reliability limits in Telethon/Telegram API interactions.
---
@ -509,13 +534,14 @@ Successfully joined chat: Developer Community
```python
@mcp.tool()
async def search_public_chats(query: str) -> str:
async def search_public_chats(query: str, limit: int = 20) -> str:
"""
Search for public chats, channels, or bots by username or title.
"""
try:
result = await client(functions.contacts.SearchRequest(q=query, limit=20))
return json.dumps([format_entity(u) for u in result.users], indent=2)
result = await client(functions.contacts.SearchRequest(q=query, limit=limit))
entities = [format_entity(e) for e in result.chats + result.users]
return json.dumps(entities, indent=2)
except Exception as e:
return f"Error searching public chats: {e}"
```

676
main.py
View file

@ -1,3 +1,4 @@
import argparse
import os
import sys
import json
@ -9,12 +10,15 @@ import mimetypes
from datetime import datetime, timedelta
from enum import Enum
from typing import List, Dict, Optional, Union, Any
from pathlib import Path
from urllib.parse import unquote, urlparse
# Third-party libraries
import nest_asyncio
from dotenv import load_dotenv
from mcp.server.fastmcp import FastMCP
from mcp.server.fastmcp import FastMCP, Context
from mcp.types import ToolAnnotations
from mcp.shared.exceptions import McpError
from pythonjsonlogger import jsonlogger
from telethon import TelegramClient, functions, types, utils
from telethon.sessions import StringSession
@ -33,6 +37,7 @@ from telethon.tl.types import (
InputPeerChat,
InputPeerChannel,
DialogFilter,
DialogFilterChatlist,
DialogFilterDefault,
TextWithEntities,
)
@ -133,12 +138,38 @@ try:
logger.addHandler(file_handler)
logger.info(f"Logging initialized to {log_file_path}")
except Exception as log_error:
print(f"WARNING: Error setting up log file: {log_error}")
print(f"WARNING: Error setting up log file: {log_error}", file=sys.stderr)
# Fallback to console-only logging
logger.addHandler(console_handler)
logger.error(f"Failed to set up log file handler: {log_error}")
# File-path tool security configuration
SERVER_ALLOWED_ROOTS: list[Path] = []
DEFAULT_DOWNLOAD_SUBDIR = "downloads"
DISALLOWED_PATH_PATTERNS = ("*", "?", "[", "]", "{", "}", "~", "\x00")
EXTENSION_ALLOWLISTS: dict[str, set[str]] = {
"send_voice": {".ogg", ".opus"},
"send_sticker": {".webp"},
"set_profile_photo": {".jpg", ".jpeg", ".png", ".webp"},
"edit_chat_photo": {".jpg", ".jpeg", ".png", ".webp"},
}
MAX_FILE_BYTES: dict[str, int] = {
"send_file": 200 * 1024 * 1024, # 200 MB
"upload_file": 200 * 1024 * 1024,
"send_voice": 100 * 1024 * 1024,
"send_sticker": 10 * 1024 * 1024,
"set_profile_photo": 50 * 1024 * 1024,
"edit_chat_photo": 50 * 1024 * 1024,
}
ROOTS_UNSUPPORTED_ERROR_CODES = {-32601}
ROOTS_STATUS_READY = "ready"
ROOTS_STATUS_NOT_CONFIGURED = "not_configured"
ROOTS_STATUS_UNSUPPORTED_FALLBACK = "unsupported_fallback"
ROOTS_STATUS_CLIENT_DENY_ALL = "client_deny_all"
ROOTS_STATUS_ERROR = "error"
# Error code prefix mapping for better error tracing
class ErrorCategory(str, Enum):
CHAT = "CHAT"
@ -386,6 +417,274 @@ def get_engagement_info(message) -> str:
return f" | {', '.join(engagement_parts)}" if engagement_parts else ""
def _dedupe_paths(paths: List[Path]) -> List[Path]:
seen: set[str] = set()
result: List[Path] = []
for path in paths:
key = str(path)
if key in seen:
continue
seen.add(key)
result.append(path)
return result
def _contains_forbidden_path_patterns(raw_path: str) -> Optional[str]:
value = raw_path.strip()
if not value:
return "Path must not be empty."
if any(token in value for token in DISALLOWED_PATH_PATTERNS):
return "Path contains disallowed wildcard/shell patterns."
if ".." in Path(value).parts:
return "Path traversal is not allowed."
return None
def _coerce_root_uri_to_path(uri: str) -> Path:
parsed = urlparse(uri)
if parsed.scheme != "file":
raise ValueError(f"Unsupported root URI scheme: {parsed.scheme}")
decoded_path = unquote(parsed.path or "")
if parsed.netloc and parsed.netloc not in ("", "localhost"):
decoded_path = f"//{parsed.netloc}{decoded_path}"
if os.name == "nt" and decoded_path.startswith("/") and len(decoded_path) > 2:
# file:///C:/tmp -> C:/tmp on Windows
if decoded_path[2] == ":":
decoded_path = decoded_path[1:]
return Path(decoded_path).resolve(strict=True)
def _path_is_within_root(candidate: Path, root: Path) -> bool:
root = root.resolve()
if root.is_file():
return candidate == root
return candidate == root or root in candidate.parents
def _path_is_within_any_root(candidate: Path, roots: List[Path]) -> bool:
return any(_path_is_within_root(candidate, root) for root in roots)
def _first_resolution_root(roots: List[Path]) -> Path:
first = roots[0]
return first if first.is_dir() else first.parent
def _ensure_extension_allowed(tool_name: str, candidate: Path) -> Optional[str]:
allowlist = EXTENSION_ALLOWLISTS.get(tool_name)
if not allowlist:
return None
if candidate.suffix.lower() not in allowlist:
allowed = ", ".join(sorted(allowlist))
return f"File extension is not allowed for {tool_name}. Allowed: {allowed}."
return None
def _ensure_size_within_limit(tool_name: str, candidate: Path) -> Optional[str]:
max_bytes = MAX_FILE_BYTES.get(tool_name)
if not max_bytes:
return None
size = candidate.stat().st_size
if size > max_bytes:
return f"File is too large for {tool_name}: {size} bytes " f"(limit: {max_bytes} bytes)."
return None
async def _get_effective_allowed_roots(ctx: Optional[Context]) -> List[Path]:
roots, _status = await _get_effective_allowed_roots_with_status(ctx)
return roots
def _is_roots_unsupported_error(error: Exception) -> bool:
if isinstance(error, McpError):
error_code = getattr(getattr(error, "error", None), "code", None)
error_message = (
getattr(getattr(error, "error", None), "message", None) or str(error)
).lower()
if error_code in ROOTS_UNSUPPORTED_ERROR_CODES:
return True
return "method not found" in error_message or "not implemented" in error_message
if isinstance(error, NotImplementedError):
return True
if isinstance(error, AttributeError):
return "list_roots" in str(error)
return False
async def _get_effective_allowed_roots_with_status(
ctx: Optional[Context],
) -> tuple[List[Path], str]:
fallback_roots = list(SERVER_ALLOWED_ROOTS)
if ctx is None:
if fallback_roots:
return fallback_roots, ROOTS_STATUS_READY
return [], ROOTS_STATUS_NOT_CONFIGURED
try:
list_roots_result = await ctx.session.list_roots()
except Exception as error:
if _is_roots_unsupported_error(error):
if fallback_roots:
return fallback_roots, ROOTS_STATUS_UNSUPPORTED_FALLBACK
return [], ROOTS_STATUS_NOT_CONFIGURED
logger.error(
"MCP roots request failed; disabling file-path tools for safety.", exc_info=True
)
return [], ROOTS_STATUS_ERROR
client_roots: List[Path] = []
for root in list_roots_result.roots:
try:
client_roots.append(_coerce_root_uri_to_path(str(root.uri)))
except Exception:
# Ignore invalid root entries supplied by a client.
continue
if client_roots:
return _dedupe_paths(client_roots), ROOTS_STATUS_READY
# Roots API succeeded; an empty roots list is treated as explicit deny-all.
return [], ROOTS_STATUS_CLIENT_DENY_ALL
async def _ensure_allowed_roots(
ctx: Optional[Context], tool_name: str
) -> tuple[List[Path], Optional[str]]:
roots, status = await _get_effective_allowed_roots_with_status(ctx)
if not roots:
if status == ROOTS_STATUS_CLIENT_DENY_ALL:
return (
[],
(
f"{tool_name} is disabled because the client provided an empty "
"MCP Roots list (deny-all)."
),
)
if status == ROOTS_STATUS_ERROR:
return (
[],
(
f"{tool_name} is disabled because MCP Roots could not be verified safely. "
"Check MCP client/server logs."
),
)
return (
[],
(
f"{tool_name} is disabled until allowed roots are configured. "
"Provide server CLI roots and/or client MCP Roots."
),
)
return roots, None
async def _resolve_readable_file_path(
*,
raw_path: str,
ctx: Optional[Context],
tool_name: str,
) -> tuple[Optional[Path], Optional[str]]:
roots, error = await _ensure_allowed_roots(ctx, tool_name)
if error:
return None, error
pattern_error = _contains_forbidden_path_patterns(raw_path)
if pattern_error:
return None, pattern_error
candidate = Path(raw_path.strip())
if not candidate.is_absolute():
candidate = _first_resolution_root(roots) / candidate
try:
candidate = candidate.resolve(strict=True)
except FileNotFoundError:
return None, f"File not found: {raw_path}"
if not _path_is_within_any_root(candidate, roots):
return None, "Path is outside allowed roots."
if not candidate.is_file():
return None, f"Path is not a file: {candidate}"
if not os.access(candidate, os.R_OK):
return None, f"File is not readable: {candidate}"
extension_error = _ensure_extension_allowed(tool_name, candidate)
if extension_error:
return None, extension_error
size_error = _ensure_size_within_limit(tool_name, candidate)
if size_error:
return None, size_error
return candidate, None
async def _resolve_writable_file_path(
*,
raw_path: Optional[str],
default_filename: str,
ctx: Optional[Context],
tool_name: str,
) -> tuple[Optional[Path], Optional[str]]:
roots, error = await _ensure_allowed_roots(ctx, tool_name)
if error:
return None, error
if raw_path and raw_path.strip():
pattern_error = _contains_forbidden_path_patterns(raw_path)
if pattern_error:
return None, pattern_error
candidate = Path(raw_path.strip())
if not candidate.is_absolute():
candidate = _first_resolution_root(roots) / candidate
else:
safe_name = Path(default_filename).name
candidate = _first_resolution_root(roots) / DEFAULT_DOWNLOAD_SUBDIR / safe_name
candidate = candidate.resolve(strict=False)
parent = candidate.parent.resolve(strict=False)
if not _path_is_within_any_root(candidate, roots) or not _path_is_within_any_root(
parent, roots
):
return None, "Path is outside allowed roots."
extension_error = _ensure_extension_allowed(tool_name, candidate)
if extension_error:
return None, extension_error
parent.mkdir(parents=True, exist_ok=True)
if not os.access(parent, os.W_OK):
return None, f"Directory not writable: {parent}"
return candidate, None
def _configure_allowed_roots_from_cli(argv: Optional[List[str]] = None) -> None:
parser = argparse.ArgumentParser(
prog="telegram-mcp",
add_help=False,
description=(
"Optional positional arguments define server-side allowed roots "
"for file-path tools."
),
)
parser.add_argument("allowed_roots", nargs="*")
parsed, _unknown = parser.parse_known_args(argv or [])
resolved_roots: List[Path] = []
for raw_root in parsed.allowed_roots:
root = Path(raw_root).expanduser()
if not root.exists():
raise SystemExit(f"Allowed root does not exist: {root}")
resolved = root.resolve(strict=True)
resolved_roots.append(resolved)
global SERVER_ALLOWED_ROOTS
SERVER_ALLOWED_ROOTS = _dedupe_paths(resolved_roots)
@mcp.tool(annotations=ToolAnnotations(title="Get Chats", openWorldHint=True, readOnlyHint=True))
async def get_chats(page: int = 1, page_size: int = 20) -> str:
"""
@ -451,12 +750,17 @@ async def get_messages(chat_id: Union[int, str], page: int = 1, page_size: int =
annotations=ToolAnnotations(title="Send Message", openWorldHint=True, destructiveHint=True)
)
@validate_id("chat_id")
async def send_message(chat_id: Union[int, str], message: str) -> str:
async def send_message(
chat_id: Union[int, str], message: str, parse_mode: Optional[str] = None
) -> str:
"""
Send a message to a specific chat.
Args:
chat_id: The ID or username of the chat.
message: The message content to send.
parse_mode: Optional formatting mode. Use 'html' for HTML tags (<b>, <i>, <code>, <pre>,
<a href="...">), 'md' or 'markdown' for Markdown (**bold**, __italic__, `code`,
```pre```), or omit for plain text (no formatting).
"""
try:
entity = await resolve_entity(chat_id)
@ -1753,12 +2057,17 @@ async def get_participants(chat_id: Union[int, str]) -> str:
@mcp.tool(annotations=ToolAnnotations(title="Send File", openWorldHint=True, destructiveHint=True))
@validate_id("chat_id")
async def send_file(chat_id: Union[int, str], file_path: str, caption: str = None) -> str:
async def send_file(
chat_id: Union[int, str],
file_path: str,
caption: str = None,
ctx: Optional[Context] = None,
) -> str:
"""
Send a file to a chat.
Args:
chat_id: The chat ID or username.
file_path: Absolute path to the file to send (must exist and be readable).
file_path: Absolute or relative path to the file under allowed roots.
caption: Optional caption for the file.
"""
try:
@ -1776,30 +2085,51 @@ async def send_file(chat_id: Union[int, str], file_path: str, caption: str = Non
@mcp.tool(
annotations=ToolAnnotations(title="Download Media", openWorldHint=True, readOnlyHint=True)
annotations=ToolAnnotations(title="Download Media", openWorldHint=True, destructiveHint=True)
)
@validate_id("chat_id")
async def download_media(chat_id: Union[int, str], message_id: int, file_path: str) -> str:
async def download_media(
chat_id: Union[int, str],
message_id: int,
file_path: Optional[str] = None,
ctx: Optional[Context] = None,
) -> str:
"""
Download media from a message in a chat.
Args:
chat_id: The chat ID or username.
message_id: The message ID containing the media.
file_path: Absolute path to save the downloaded file (must be writable).
file_path: Optional absolute or relative path under allowed roots.
If omitted, saves into `<first_root>/downloads/`.
"""
try:
entity = await resolve_entity(chat_id)
msg = await client.get_messages(entity, ids=message_id)
if not msg or not msg.media:
return "No media found in the specified message."
# Check if directory is writable
dir_path = os.path.dirname(file_path) or "."
if not os.access(dir_path, os.W_OK):
return f"Directory not writable: {dir_path}"
await client.download_media(msg, file=file_path)
if not os.path.isfile(file_path):
return f"Download failed: file not created at {file_path}"
return f"Media downloaded to {file_path}."
default_name = f"telegram_{chat_id}_{message_id}_{int(time.time())}"
out_path, path_error = await _resolve_writable_file_path(
raw_path=file_path,
default_filename=default_name,
ctx=ctx,
tool_name="download_media",
)
if path_error:
return path_error
downloaded = await client.download_media(msg, file=str(out_path))
if not downloaded:
return f"Download failed for message {message_id}."
final_path = Path(downloaded).resolve(strict=True)
roots, roots_error = await _ensure_allowed_roots(ctx, "download_media")
if roots_error:
return roots_error
if not _path_is_within_any_root(final_path, roots):
return "Download failed: resulting path is outside allowed roots."
return f"Media downloaded to {final_path}."
except Exception as e:
return log_and_format_error(
"download_media",
@ -1837,15 +2167,24 @@ async def update_profile(first_name: str = None, last_name: str = None, about: s
title="Set Profile Photo", openWorldHint=True, destructiveHint=True, idempotentHint=True
)
)
async def set_profile_photo(file_path: str) -> str:
async def set_profile_photo(file_path: str, ctx: Optional[Context] = None) -> str:
"""
Set a new profile photo.
"""
try:
await client(
functions.photos.UploadProfilePhotoRequest(file=await client.upload_file(file_path))
safe_path, path_error = await _resolve_readable_file_path(
raw_path=file_path,
ctx=ctx,
tool_name="set_profile_photo",
)
return "Profile photo updated."
if path_error:
return path_error
await client(
functions.photos.UploadProfilePhotoRequest(
file=await client.upload_file(str(safe_path))
)
)
return f"Profile photo updated from {safe_path}."
except Exception as e:
return log_and_format_error("set_profile_photo", e, file_path=file_path)
@ -1865,7 +2204,7 @@ async def delete_profile_photo() -> str:
)
if not photos.photos:
return "No profile photo to delete."
await client(functions.photos.DeletePhotosRequest(id=[photos.photos[0].id]))
await client(functions.photos.DeletePhotosRequest(id=[photos.photos[0]]))
return "Profile photo deleted."
except Exception as e:
return log_and_format_error("delete_profile_photo", e)
@ -2100,15 +2439,22 @@ async def edit_chat_title(chat_id: Union[int, str], title: str) -> str:
)
)
@validate_id("chat_id")
async def edit_chat_photo(chat_id: Union[int, str], file_path: str) -> str:
async def edit_chat_photo(
chat_id: Union[int, str],
file_path: str,
ctx: Optional[Context] = None,
) -> str:
"""
Edit the photo of a chat, group, or channel. Requires a file path to an image.
"""
try:
if not os.path.isfile(file_path):
return f"Photo file not found: {file_path}"
if not os.access(file_path, os.R_OK):
return f"Photo file not readable: {file_path}"
safe_path, path_error = await _resolve_readable_file_path(
raw_path=file_path,
ctx=ctx,
tool_name="edit_chat_photo",
)
if path_error:
return path_error
entity = await resolve_entity(chat_id)
uploaded_file = await client.upload_file(file_path)
@ -2126,7 +2472,7 @@ async def edit_chat_photo(chat_id: Union[int, str], file_path: str) -> str:
else:
return f"Cannot edit photo for this entity type ({type(entity)})."
return f"Chat {chat_id} photo updated."
return f"Chat {chat_id} photo updated from {safe_path}."
except Exception as e:
logger.exception(f"edit_chat_photo failed (chat_id={chat_id}, file_path='{file_path}')")
return log_and_format_error("edit_chat_photo", e, chat_id=chat_id, file_path=file_path)
@ -2629,27 +2975,34 @@ async def import_chat_invite(hash: str) -> str:
annotations=ToolAnnotations(title="Send Voice", openWorldHint=True, destructiveHint=True)
)
@validate_id("chat_id")
async def send_voice(chat_id: Union[int, str], file_path: str) -> str:
async def send_voice(
chat_id: Union[int, str],
file_path: str,
ctx: Optional[Context] = None,
) -> str:
"""
Send a voice message to a chat. File must be an OGG/OPUS voice note.
Args:
chat_id: The chat ID or username.
file_path: Absolute path to the OGG/OPUS file.
file_path: Absolute or relative path under allowed roots to the OGG/OPUS file.
"""
try:
if not os.path.isfile(file_path):
return f"File not found: {file_path}"
if not os.access(file_path, os.R_OK):
return f"File is not readable: {file_path}"
safe_path, path_error = await _resolve_readable_file_path(
raw_path=file_path,
ctx=ctx,
tool_name="send_voice",
)
if path_error:
return path_error
mime, _ = mimetypes.guess_type(file_path)
mime, _ = mimetypes.guess_type(str(safe_path))
if not (
mime
and (
mime == "audio/ogg"
or file_path.lower().endswith(".ogg")
or file_path.lower().endswith(".opus")
or str(safe_path).lower().endswith(".ogg")
or str(safe_path).lower().endswith(".opus")
)
):
return "Voice file must be .ogg or .opus format."
@ -2661,6 +3014,37 @@ async def send_voice(chat_id: Union[int, str], file_path: str) -> str:
return log_and_format_error("send_voice", e, chat_id=chat_id, file_path=file_path)
@mcp.tool(
annotations=ToolAnnotations(title="Upload File", openWorldHint=True, destructiveHint=True)
)
async def upload_file(file_path: str, ctx: Optional[Context] = None) -> str:
"""
Upload a local file to Telegram and return upload metadata.
Args:
file_path: Absolute or relative path under allowed roots.
"""
try:
safe_path, path_error = await _resolve_readable_file_path(
raw_path=file_path,
ctx=ctx,
tool_name="upload_file",
)
if path_error:
return path_error
uploaded = await client.upload_file(str(safe_path))
payload = {
"path": str(safe_path),
"name": getattr(uploaded, "name", safe_path.name),
"size": getattr(uploaded, "size", safe_path.stat().st_size),
"md5_checksum": getattr(uploaded, "md5_checksum", None),
}
return json.dumps(payload, indent=2, default=json_serializer)
except Exception as e:
return log_and_format_error("upload_file", e, file_path=file_path)
@mcp.tool(
annotations=ToolAnnotations(title="Forward Message", openWorldHint=True, destructiveHint=True)
)
@ -2782,9 +3166,18 @@ async def mark_as_read(chat_id: Union[int, str]) -> str:
annotations=ToolAnnotations(title="Reply To Message", openWorldHint=True, destructiveHint=True)
)
@validate_id("chat_id")
async def reply_to_message(chat_id: Union[int, str], message_id: int, text: str) -> str:
async def reply_to_message(
chat_id: Union[int, str], message_id: int, text: str, parse_mode: Optional[str] = None
) -> str:
"""
Reply to a specific message in a chat.
Args:
chat_id: The chat ID or username.
message_id: The message ID to reply to.
text: The reply text.
parse_mode: Optional formatting mode. Use 'html' for HTML tags (<b>, <i>, <code>, <pre>,
<a href="...">), 'md' or 'markdown' for Markdown (**bold**, __italic__, `code`,
```pre```), or omit for plain text (no formatting).
"""
try:
entity = await resolve_entity(chat_id)
@ -2823,15 +3216,16 @@ async def get_media_info(chat_id: Union[int, str], message_id: int) -> str:
@mcp.tool(
annotations=ToolAnnotations(title="Search Public Chats", openWorldHint=True, readOnlyHint=True)
)
async def search_public_chats(query: str) -> str:
async def search_public_chats(query: str, limit: int = 20) -> str:
"""
Search for public chats, channels, or bots by username or title.
"""
try:
result = await client(functions.contacts.SearchRequest(q=query, limit=20))
return json.dumps([format_entity(u) for u in result.users], indent=2)
result = await client(functions.contacts.SearchRequest(q=query, limit=limit))
entities = [format_entity(e) for e in result.chats + result.users]
return json.dumps(entities, indent=2)
except Exception as e:
return log_and_format_error("search_public_chats", e, query=query)
return log_and_format_error("search_public_chats", e, query=query, limit=limit)
@mcp.tool(
@ -2862,6 +3256,45 @@ async def search_messages(chat_id: Union[int, str], query: str, limit: int = 20)
)
@mcp.tool(
annotations=ToolAnnotations(
title="Search Global Messages",
openWorldHint=True,
readOnlyHint=True,
)
)
async def search_global(query: str, page: int = 1, page_size: int = 20) -> str:
"""
Search for messages across all public chats and channels by text content.
"""
try:
offset = (page - 1) * page_size
messages = await client.get_messages(
None, limit=page_size, search=query, add_offset=offset
)
if not messages:
return "No messages found for this page."
lines = []
for msg in messages:
chat = msg.chat
chat_name = (
getattr(chat, "title", None) or getattr(chat, "first_name", "") or str(msg.chat_id)
)
sender_name = get_sender_name(msg)
lines.append(
f"Chat: {chat_name} | ID: {msg.id} | {sender_name} | "
f"Date: {msg.date} | Message: {msg.message}"
)
return "\n".join(lines)
except Exception as e:
return log_and_format_error(
"search_global", e, query=query, page=page, page_size=page_size
)
@mcp.tool(
annotations=ToolAnnotations(title="Resolve Username", openWorldHint=True, readOnlyHint=True)
)
@ -3026,21 +3459,26 @@ async def get_sticker_sets() -> str:
annotations=ToolAnnotations(title="Send Sticker", openWorldHint=True, destructiveHint=True)
)
@validate_id("chat_id")
async def send_sticker(chat_id: Union[int, str], file_path: str) -> str:
async def send_sticker(
chat_id: Union[int, str],
file_path: str,
ctx: Optional[Context] = None,
) -> str:
"""
Send a sticker to a chat. File must be a valid .webp sticker file.
Args:
chat_id: The chat ID or username.
file_path: Absolute path to the .webp sticker file.
file_path: Absolute or relative path under allowed roots to the .webp sticker file.
"""
try:
if not os.path.isfile(file_path):
return f"Sticker file not found: {file_path}"
if not os.access(file_path, os.R_OK):
return f"Sticker file is not readable: {file_path}"
if not file_path.lower().endswith(".webp"):
return "Sticker file must be a .webp file."
safe_path, path_error = await _resolve_readable_file_path(
raw_path=file_path,
ctx=ctx,
tool_name="send_sticker",
)
if path_error:
return path_error
entity = await resolve_entity(chat_id)
await client.send_file(entity, file_path, force_document=False)
@ -3746,6 +4184,21 @@ async def list_folders() -> str:
}
folders.append(folder_data)
elif isinstance(f, DialogFilterChatlist):
# Shared folders use DialogFilterChatlist type
title = f.title
if isinstance(title, TextWithEntities):
title = title.text
folder_data = {
"id": f.id,
"title": title,
"emoticon": getattr(f, "emoticon", None),
"type": "shared",
"included_peers_count": len(getattr(f, "include_peers", [])),
"pinned_peers_count": len(getattr(f, "pinned_peers", [])),
}
folders.append(folder_data)
if not folders:
return "No folders found. Create one with create_folder tool."
@ -3770,7 +4223,7 @@ async def get_folder(folder_id: int) -> str:
target_folder = None
for f in result.filters:
if isinstance(f, DialogFilter) and f.id == folder_id:
if isinstance(f, (DialogFilter, DialogFilterChatlist)) and f.id == folder_id:
target_folder = f
break
@ -3835,7 +4288,15 @@ async def get_folder(folder_id: int) -> str:
"id": target_folder.id,
"title": title,
"emoticon": getattr(target_folder, "emoticon", None),
"filters": {
"included_chats": included_chats,
"excluded_chats": excluded_chats,
"pinned_chats": pinned_chats,
}
if isinstance(target_folder, DialogFilterChatlist):
folder_data["type"] = "shared"
else:
folder_data["filters"] = {
"contacts": getattr(target_folder, "contacts", False),
"non_contacts": getattr(target_folder, "non_contacts", False),
"groups": getattr(target_folder, "groups", False),
@ -3844,11 +4305,7 @@ async def get_folder(folder_id: int) -> str:
"exclude_muted": getattr(target_folder, "exclude_muted", False),
"exclude_read": getattr(target_folder, "exclude_read", False),
"exclude_archived": getattr(target_folder, "exclude_archived", False),
},
"included_chats": included_chats,
"excluded_chats": excluded_chats,
"pinned_chats": pinned_chats,
}
}
return json.dumps(folder_data, indent=2, default=json_serializer)
except Exception as e:
@ -3897,7 +4354,7 @@ async def create_folder(
existing_ids = set()
folder_count = 0
for f in result.filters:
if isinstance(f, DialogFilter):
if isinstance(f, (DialogFilter, DialogFilterChatlist)):
existing_ids.add(f.id)
folder_count += 1
@ -3979,7 +4436,7 @@ async def add_chat_to_folder(
target_folder = None
for f in result.filters:
if isinstance(f, DialogFilter) and f.id == folder_id:
if isinstance(f, (DialogFilter, DialogFilterChatlist)) and f.id == folder_id:
target_folder = f
break
@ -4013,24 +4470,35 @@ async def add_chat_to_folder(
pinned_peers.append(peer)
# Update the folder (keep all original attributes)
updated_filter = DialogFilter(
id=target_folder.id,
title=target_folder.title,
emoticon=getattr(target_folder, "emoticon", None),
pinned_peers=pinned_peers,
include_peers=include_peers,
exclude_peers=list(getattr(target_folder, "exclude_peers", [])),
contacts=getattr(target_folder, "contacts", False),
non_contacts=getattr(target_folder, "non_contacts", False),
groups=getattr(target_folder, "groups", False),
broadcasts=getattr(target_folder, "broadcasts", False),
bots=getattr(target_folder, "bots", False),
exclude_muted=getattr(target_folder, "exclude_muted", False),
exclude_read=getattr(target_folder, "exclude_read", False),
exclude_archived=getattr(target_folder, "exclude_archived", False),
title_noanimate=getattr(target_folder, "title_noanimate", None),
color=getattr(target_folder, "color", None),
)
if isinstance(target_folder, DialogFilterChatlist):
updated_filter = DialogFilterChatlist(
id=target_folder.id,
title=target_folder.title,
emoticon=getattr(target_folder, "emoticon", None),
pinned_peers=pinned_peers,
include_peers=include_peers,
title_noanimate=getattr(target_folder, "title_noanimate", None),
color=getattr(target_folder, "color", None),
)
else:
updated_filter = DialogFilter(
id=target_folder.id,
title=target_folder.title,
emoticon=getattr(target_folder, "emoticon", None),
pinned_peers=pinned_peers,
include_peers=include_peers,
exclude_peers=list(getattr(target_folder, "exclude_peers", [])),
contacts=getattr(target_folder, "contacts", False),
non_contacts=getattr(target_folder, "non_contacts", False),
groups=getattr(target_folder, "groups", False),
broadcasts=getattr(target_folder, "broadcasts", False),
bots=getattr(target_folder, "bots", False),
exclude_muted=getattr(target_folder, "exclude_muted", False),
exclude_read=getattr(target_folder, "exclude_read", False),
exclude_archived=getattr(target_folder, "exclude_archived", False),
title_noanimate=getattr(target_folder, "title_noanimate", None),
color=getattr(target_folder, "color", None),
)
await client(
functions.messages.UpdateDialogFilterRequest(id=folder_id, filter=updated_filter)
@ -4069,7 +4537,7 @@ async def remove_chat_from_folder(folder_id: int, chat_id: Union[int, str]) -> s
target_folder = None
for f in result.filters:
if isinstance(f, DialogFilter) and f.id == folder_id:
if isinstance(f, (DialogFilter, DialogFilterChatlist)) and f.id == folder_id:
target_folder = f
break
@ -4108,24 +4576,35 @@ async def remove_chat_from_folder(folder_id: int, chat_id: Union[int, str]) -> s
return f"Chat {chat_id} was not in folder {folder_id}."
# Update the folder (keep all original attributes)
updated_filter = DialogFilter(
id=target_folder.id,
title=target_folder.title,
emoticon=getattr(target_folder, "emoticon", None),
pinned_peers=pinned_peers,
include_peers=include_peers,
exclude_peers=list(getattr(target_folder, "exclude_peers", [])),
contacts=getattr(target_folder, "contacts", False),
non_contacts=getattr(target_folder, "non_contacts", False),
groups=getattr(target_folder, "groups", False),
broadcasts=getattr(target_folder, "broadcasts", False),
bots=getattr(target_folder, "bots", False),
exclude_muted=getattr(target_folder, "exclude_muted", False),
exclude_read=getattr(target_folder, "exclude_read", False),
exclude_archived=getattr(target_folder, "exclude_archived", False),
title_noanimate=getattr(target_folder, "title_noanimate", None),
color=getattr(target_folder, "color", None),
)
if isinstance(target_folder, DialogFilterChatlist):
updated_filter = DialogFilterChatlist(
id=target_folder.id,
title=target_folder.title,
emoticon=getattr(target_folder, "emoticon", None),
pinned_peers=pinned_peers,
include_peers=include_peers,
title_noanimate=getattr(target_folder, "title_noanimate", None),
color=getattr(target_folder, "color", None),
)
else:
updated_filter = DialogFilter(
id=target_folder.id,
title=target_folder.title,
emoticon=getattr(target_folder, "emoticon", None),
pinned_peers=pinned_peers,
include_peers=include_peers,
exclude_peers=list(getattr(target_folder, "exclude_peers", [])),
contacts=getattr(target_folder, "contacts", False),
non_contacts=getattr(target_folder, "non_contacts", False),
groups=getattr(target_folder, "groups", False),
broadcasts=getattr(target_folder, "broadcasts", False),
bots=getattr(target_folder, "bots", False),
exclude_muted=getattr(target_folder, "exclude_muted", False),
exclude_read=getattr(target_folder, "exclude_read", False),
exclude_archived=getattr(target_folder, "exclude_archived", False),
title_noanimate=getattr(target_folder, "title_noanimate", None),
color=getattr(target_folder, "color", None),
)
await client(
functions.messages.UpdateDialogFilterRequest(id=folder_id, filter=updated_filter)
@ -4168,7 +4647,7 @@ async def delete_folder(folder_id: int) -> str:
folder_exists = False
folder_title = None
for f in result.filters:
if isinstance(f, DialogFilter) and f.id == folder_id:
if isinstance(f, (DialogFilter, DialogFilterChatlist)) and f.id == folder_id:
folder_exists = True
# Handle title which can be str or TextWithEntities
title = f.title
@ -4207,7 +4686,7 @@ async def reorder_folders(folder_ids: List[int]) -> str:
existing_ids = set()
for f in result.filters:
if isinstance(f, DialogFilter):
if isinstance(f, (DialogFilter, DialogFilterChatlist)):
existing_ids.add(f.id)
# Validate all provided IDs exist
@ -4234,7 +4713,7 @@ async def reorder_folders(folder_ids: List[int]) -> str:
async def _main() -> None:
try:
# Start the Telethon client non-interactively
print("Starting Telegram client...")
print("Starting Telegram client...", file=sys.stderr)
await client.start()
# Warm entity cache — StringSession has no persistent cache,
@ -4256,6 +4735,7 @@ async def _main() -> None:
def main() -> None:
_configure_allowed_roots_from_cli(sys.argv[1:])
nest_asyncio.apply()
asyncio.run(_main())

View file

@ -29,7 +29,8 @@ dependencies = [
"nest-asyncio>=1.6.0",
"python-dotenv>=1.1.0",
"python-json-logger>=3.3.0",
"telethon>=1.39.0"
"qrcode>=8.2",
"telethon>=1.42.0",
]
[project.urls]
@ -64,4 +65,6 @@ exclude = [
dev = [
"black>=25.9.0",
"flake8>=7.3.0",
"pytest>=9.0.2",
"pytest-asyncio>=1.3.0",
]

View file

@ -4,4 +4,5 @@ mcp[cli]>=1.4.1
nest-asyncio>=1.6.0
python-dotenv>=1.1.0
python-json-logger>=3.3.0
telethon>=1.39.0
qrcode>=8.2
telethon>=1.42.0

View file

@ -19,16 +19,76 @@ parameters support integer IDs, string representations of IDs (e.g., "123456"),
and usernames (e.g., "@mychannel").
"""
import asyncio
import io
import os
from telethon.sync import TelegramClient
from telethon.sessions import StringSession
from dotenv import load_dotenv
import sys
# Load environment variables from .env file
from dotenv import load_dotenv
from telethon import errors
from telethon.sessions import StringSession
from telethon.sync import TelegramClient
load_dotenv()
def _qr_login(client: TelegramClient) -> None:
import qrcode
qr = client.qr_login()
print("\n----- QR Code Login -----\n")
qr_obj = qrcode.QRCode(border=1)
qr_obj.add_data(qr.url)
qr_obj.make(fit=True)
f = io.StringIO()
qr_obj.print_ascii(out=f, invert=True)
print(f.getvalue())
print("Scan the QR code above with your Telegram app:")
print(" Open Telegram > Settings > Devices > Link Desktop Device\n")
print(f"Or open this link on a device where you're logged in:\n {qr.url}\n")
print(f"Expires at: {qr.expires.strftime('%H:%M:%S')}")
print("Waiting for you to scan...")
try:
client.loop.run_until_complete(qr.wait(timeout=120))
except asyncio.TimeoutError:
print("\nQR code expired. Please try again.")
client.disconnect()
sys.exit(1)
except errors.SessionPasswordNeededError:
pw = input("\nTwo-factor authentication enabled. Please enter your password: ")
client.sign_in(password=pw)
def _phone_login(client: TelegramClient) -> None:
phone = input("Please enter your phone (or bot token): ")
try:
client.send_code_request(phone)
except errors.FloodWaitError as e:
print(f"\nFlood wait error; you must wait {e.seconds} seconds before trying again.")
client.disconnect()
sys.exit(1)
except errors.PhoneNumberInvalidError:
print("\nThe phone number is invalid.")
client.disconnect()
sys.exit(1)
except Exception as e:
print(f"\nError sending code: {e}")
client.disconnect()
sys.exit(1)
code = input("\nPlease enter the code you received: ")
try:
client.sign_in(phone, code)
except errors.SessionPasswordNeededError:
pw = input("Two-factor authentication enabled. Please enter your password: ")
client.sign_in(password=pw)
def main() -> None:
API_ID = os.getenv("TELEGRAM_API_ID")
API_HASH = os.getenv("TELEGRAM_API_HASH")
@ -38,7 +98,6 @@ def main() -> None:
print("Create an .env file with your credentials from https://my.telegram.org/apps")
sys.exit(1)
# Convert API_ID to integer
try:
API_ID = int(API_ID)
except ValueError:
@ -47,56 +106,62 @@ def main() -> None:
print("\n----- Telegram Session String Generator -----\n")
print("This script will generate a session string for your Telegram account.")
print(
"You will be asked to enter your phone number and the verification code sent to your Telegram app."
)
print("The generated session string can be added to your .env file.")
print(
"\nYour credentials will NOT be stored on any server and are only used for local authentication.\n"
)
print("Choose login method:")
print(" 1) QR code login (recommended -- scan from your Telegram app)")
print(" 2) Phone number + verification code")
method = input("\nEnter 1 or 2 [default: 1]: ").strip() or "1"
try:
# Connect to Telegram and generate the session string
with TelegramClient(StringSession(), API_ID, API_HASH) as client:
# The client.session.save() function from StringSession returns the session string
session_string = StringSession.save(client.session)
client = TelegramClient(StringSession(), API_ID, API_HASH)
client.connect()
print("\nAuthentication successful!")
print("\n----- Your Session String -----")
print(f"\n{session_string}\n")
print("Add this to your .env file as:")
print(f"TELEGRAM_SESSION_STRING={session_string}")
print("\nIMPORTANT: Keep this string private and never share it with anyone!")
if not client.is_user_authorized():
if method == "1":
_qr_login(client)
else:
_phone_login(client)
# Optional: auto-update the .env file
choice = input(
"\nWould you like to automatically update your .env file with this session string? (y/N): "
)
if choice.lower() == "y":
try:
# Read the current .env file
with open(".env", "r") as file:
env_contents = file.readlines()
session_string = StringSession.save(client.session)
# Update or add the SESSION_STRING line
session_string_line_found = False
for i, line in enumerate(env_contents):
if line.startswith("TELEGRAM_SESSION_STRING="):
env_contents[i] = f"TELEGRAM_SESSION_STRING={session_string}\n"
session_string_line_found = True
break
print("\nAuthentication successful!")
print("\n----- Your Session String -----")
print(f"\n{session_string}\n")
print("Add this to your .env file as:")
print(f"TELEGRAM_SESSION_STRING={session_string}")
print("\nIMPORTANT: Keep this string private and never share it with anyone!")
if not session_string_line_found:
env_contents.append(f"TELEGRAM_SESSION_STRING={session_string}\n")
choice = input(
"\nWould you like to automatically update your .env file with this session string? (y/N): "
)
if choice.lower() == "y":
try:
with open(".env", "r") as file:
env_contents = file.readlines()
# Write back to the .env file
with open(".env", "w") as file:
file.writelines(env_contents)
session_string_line_found = False
for i, line in enumerate(env_contents):
if line.startswith("TELEGRAM_SESSION_STRING="):
env_contents[i] = f"TELEGRAM_SESSION_STRING={session_string}\n"
session_string_line_found = True
break
print("\n.env file updated successfully!")
except Exception as e:
print(f"\nError updating .env file: {e}")
print("Please manually add the session string to your .env file.")
if not session_string_line_found:
env_contents.append(f"TELEGRAM_SESSION_STRING={session_string}\n")
with open(".env", "w") as file:
file.writelines(env_contents)
print("\n.env file updated successfully!")
except Exception as e:
print(f"\nError updating .env file: {e}")
print("Please manually add the session string to your .env file.")
client.disconnect()
except Exception as e:
print(f"\nError: {e}")

249
test_file_path_security.py Normal file
View file

@ -0,0 +1,249 @@
import os
from pathlib import Path
import pytest
from mcp import types
from mcp.shared.exceptions import McpError
from mcp.types import ErrorData
os.environ.setdefault("TELEGRAM_API_ID", "12345")
os.environ.setdefault("TELEGRAM_API_HASH", "dummy_hash")
import main
class _DummySession:
def __init__(self, roots):
self._roots = roots
async def list_roots(self):
return types.ListRootsResult(roots=self._roots)
class _DummyContext:
def __init__(self, roots):
self.session = _DummySession(roots)
class _FailingSession:
def __init__(self, error):
self._error = error
async def list_roots(self):
raise self._error
class _FailingContext:
def __init__(self, error):
self.session = _FailingSession(error)
class _MissingRootsSession:
pass
class _MissingRootsContext:
def __init__(self):
self.session = _MissingRootsSession()
@pytest.mark.asyncio
async def test_readable_relative_path_resolves_inside_first_server_root(tmp_path, monkeypatch):
root = (tmp_path / "root").resolve()
root.mkdir(parents=True)
target = root / "document.txt"
target.write_text("ok", encoding="utf-8")
monkeypatch.setattr(main, "SERVER_ALLOWED_ROOTS", [root])
resolved, error = await main._resolve_readable_file_path(
raw_path="document.txt",
ctx=None,
tool_name="send_file",
)
assert error is None
assert resolved == target.resolve()
@pytest.mark.asyncio
async def test_readable_path_rejects_traversal(tmp_path, monkeypatch):
root = (tmp_path / "root").resolve()
root.mkdir(parents=True)
monkeypatch.setattr(main, "SERVER_ALLOWED_ROOTS", [root])
resolved, error = await main._resolve_readable_file_path(
raw_path="../etc/passwd",
ctx=None,
tool_name="send_file",
)
assert resolved is None
assert error == "Path traversal is not allowed."
@pytest.mark.asyncio
async def test_readable_path_rejects_outside_root(tmp_path, monkeypatch):
root = (tmp_path / "root").resolve()
outside_root = (tmp_path / "outside").resolve()
root.mkdir(parents=True)
outside_root.mkdir(parents=True)
outside_file = outside_root / "outside.txt"
outside_file.write_text("no", encoding="utf-8")
monkeypatch.setattr(main, "SERVER_ALLOWED_ROOTS", [root])
resolved, error = await main._resolve_readable_file_path(
raw_path=str(outside_file),
ctx=None,
tool_name="send_file",
)
assert resolved is None
assert error == "Path is outside allowed roots."
@pytest.mark.asyncio
async def test_client_roots_replace_server_allowlist(tmp_path, monkeypatch):
server_root = (tmp_path / "server_root").resolve()
client_root = (tmp_path / "client_root").resolve()
server_root.mkdir(parents=True)
client_root.mkdir(parents=True)
(server_root / "server.txt").write_text("server", encoding="utf-8")
client_file = client_root / "client.txt"
client_file.write_text("client", encoding="utf-8")
monkeypatch.setattr(main, "SERVER_ALLOWED_ROOTS", [server_root])
ctx = _DummyContext([types.Root(uri=client_root.as_uri())])
roots = await main._get_effective_allowed_roots(ctx)
assert roots == [client_root]
resolved, error = await main._resolve_readable_file_path(
raw_path="client.txt",
ctx=ctx,
tool_name="send_file",
)
assert error is None
assert resolved == client_file.resolve()
@pytest.mark.asyncio
async def test_empty_client_roots_disable_file_tools(tmp_path, monkeypatch):
server_root = (tmp_path / "server_root").resolve()
server_root.mkdir(parents=True)
monkeypatch.setattr(main, "SERVER_ALLOWED_ROOTS", [server_root])
ctx = _DummyContext([])
roots = await main._get_effective_allowed_roots(ctx)
assert roots == []
resolved, error = await main._resolve_readable_file_path(
raw_path="server.txt",
ctx=ctx,
tool_name="send_file",
)
assert resolved is None
assert error is not None
assert "empty MCP Roots list" in error
assert "deny-all" in error
@pytest.mark.asyncio
async def test_mcp_method_not_found_falls_back_to_server_allowlist(tmp_path, monkeypatch):
server_root = (tmp_path / "server_root").resolve()
server_root.mkdir(parents=True)
monkeypatch.setattr(main, "SERVER_ALLOWED_ROOTS", [server_root])
ctx = _FailingContext(McpError(ErrorData(code=-32601, message="Method not found")))
roots = await main._get_effective_allowed_roots(ctx)
assert roots == [server_root]
@pytest.mark.asyncio
async def test_missing_list_roots_method_falls_back_to_server_allowlist(tmp_path, monkeypatch):
server_root = (tmp_path / "server_root").resolve()
server_root.mkdir(parents=True)
monkeypatch.setattr(main, "SERVER_ALLOWED_ROOTS", [server_root])
ctx = _MissingRootsContext()
roots = await main._get_effective_allowed_roots(ctx)
assert roots == [server_root]
@pytest.mark.asyncio
async def test_unexpected_roots_error_disables_file_path_tools(tmp_path, monkeypatch):
server_root = (tmp_path / "server_root").resolve()
server_root.mkdir(parents=True)
monkeypatch.setattr(main, "SERVER_ALLOWED_ROOTS", [server_root])
ctx = _FailingContext(RuntimeError("transport failure"))
roots = await main._get_effective_allowed_roots(ctx)
assert roots == []
resolved, error = await main._resolve_readable_file_path(
raw_path="anything.txt",
ctx=ctx,
tool_name="send_file",
)
assert resolved is None
assert error is not None
assert "disabled" in error
@pytest.mark.asyncio
async def test_writable_default_path_uses_downloads_subdir(tmp_path, monkeypatch):
root = (tmp_path / "root").resolve()
root.mkdir(parents=True)
monkeypatch.setattr(main, "SERVER_ALLOWED_ROOTS", [root])
resolved, error = await main._resolve_writable_file_path(
raw_path=None,
default_filename="example.bin",
ctx=None,
tool_name="download_media",
)
assert error is None
assert resolved == (root / "downloads" / "example.bin").resolve()
assert resolved.parent.exists()
@pytest.mark.asyncio
async def test_extension_allowlist_is_enforced_for_sticker(tmp_path, monkeypatch):
root = (tmp_path / "root").resolve()
root.mkdir(parents=True)
file_path = root / "sticker.txt"
file_path.write_text("bad", encoding="utf-8")
monkeypatch.setattr(main, "SERVER_ALLOWED_ROOTS", [root])
resolved, error = await main._resolve_readable_file_path(
raw_path=str(file_path),
ctx=None,
tool_name="send_sticker",
)
assert resolved is None
assert error is not None
assert "extension is not allowed" in error
@pytest.mark.asyncio
async def test_file_tools_disabled_without_any_roots(monkeypatch):
monkeypatch.setattr(main, "SERVER_ALLOWED_ROOTS", [])
resolved, error = await main._resolve_readable_file_path(
raw_path="anything.txt",
ctx=None,
tool_name="send_file",
)
assert resolved is None
assert error is not None
assert "disabled" in error

85
uv.lock
View file

@ -40,6 +40,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/3a/2a/7cc015f5b9f5db42b7d48157e23356022889fc354a2813c15934b7cb5c0e/attrs-25.4.0-py3-none-any.whl", hash = "sha256:adcf7e2a1fb3b36ac48d97835bb6d8ade15b8dcce26aba8bf1d14847b57a3373", size = 67615, upload-time = "2025-10-06T13:54:43.17Z" },
]
[[package]]
name = "backports-asyncio-runner"
version = "1.2.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/8e/ff/70dca7d7cb1cbc0edb2c6cc0c38b65cba36cccc491eca64cabd5fe7f8670/backports_asyncio_runner-1.2.0.tar.gz", hash = "sha256:a5aa7b2b7d8f8bfcaa2b57313f70792df84e32a2a746f585213373f900b42162", size = 69893, upload-time = "2025-07-02T02:27:15.685Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/a0/59/76ab57e3fe74484f48a53f8e337171b4a2349e506eabe136d7e01d059086/backports_asyncio_runner-1.2.0-py3-none-any.whl", hash = "sha256:0da0a936a8aeb554eccb426dc55af3ba63bcdc69fa1a600b5bb305413a4477b5", size = 12313, upload-time = "2025-07-02T02:27:14.263Z" },
]
[[package]]
name = "black"
version = "25.9.0"
@ -394,6 +403,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/76/c6/c88e154df9c4e1a2a66ccf0005a88dfb2650c1dffb6f5ce603dfbd452ce3/idna-3.10-py3-none-any.whl", hash = "sha256:946d195a0d259cbba61165e88e65941f16e9b36ea6ddb97f00452bae8b1287d3", size = 70442, upload-time = "2024-09-15T18:07:37.964Z" },
]
[[package]]
name = "iniconfig"
version = "2.3.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/72/34/14ca021ce8e5dfedc35312d08ba8bf51fdd999c576889fc2c24cb97f4f10/iniconfig-2.3.0.tar.gz", hash = "sha256:c76315c77db068650d49c5b56314774a7804df16fee4402c1f19d6d15d8c4730", size = 20503, upload-time = "2025-10-18T21:55:43.219Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/cb/b1/3846dd7f199d53cb17f49cba7e651e9ce294d8497c8c150530ed11865bb8/iniconfig-2.3.0-py3-none-any.whl", hash = "sha256:f631c04d2c48c52b84d0d0549c99ff3859c98df65b3101406327ecc7d53fbf12", size = 7484, upload-time = "2025-10-18T21:55:41.639Z" },
]
[[package]]
name = "jsonschema"
version = "4.25.1"
@ -527,6 +545,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/73/cb/ac7874b3e5d58441674fb70742e6c374b28b0c7cb988d37d991cde47166c/platformdirs-4.5.0-py3-none-any.whl", hash = "sha256:e578a81bb873cbb89a41fcc904c7ef523cc18284b7e3b3ccf06aca1403b7ebd3", size = 18651, upload-time = "2025-10-08T17:44:47.223Z" },
]
[[package]]
name = "pluggy"
version = "1.6.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/f9/e2/3e91f31a7d2b083fe6ef3fa267035b518369d9511ffab804f839851d2779/pluggy-1.6.0.tar.gz", hash = "sha256:7dcc130b76258d33b90f61b658791dede3486c3e6bfb003ee5c9bfb396dd22f3", size = 69412, upload-time = "2025-05-15T12:30:07.975Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/54/20/4d324d65cc6d9205fabedc306948156824eb9f0ee1633355a8f7ec5c66bf/pluggy-1.6.0-py3-none-any.whl", hash = "sha256:e920276dd6813095e9377c0bc5566d94c932c33b27a3e3945d8389c374dd4746", size = 20538, upload-time = "2025-05-15T12:30:06.134Z" },
]
[[package]]
name = "pyaes"
version = "1.6.1"
@ -708,6 +735,38 @@ crypto = [
{ name = "cryptography", version = "46.0.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.14' and platform_python_implementation != 'PyPy'" },
]
[[package]]
name = "pytest"
version = "9.0.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "colorama", marker = "sys_platform == 'win32'" },
{ name = "exceptiongroup", marker = "python_full_version < '3.11'" },
{ name = "iniconfig" },
{ name = "packaging" },
{ name = "pluggy" },
{ name = "pygments" },
{ name = "tomli", marker = "python_full_version < '3.11'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/d1/db/7ef3487e0fb0049ddb5ce41d3a49c235bf9ad299b6a25d5780a89f19230f/pytest-9.0.2.tar.gz", hash = "sha256:75186651a92bd89611d1d9fc20f0b4345fd827c41ccd5c299a868a05d70edf11", size = 1568901, upload-time = "2025-12-06T21:30:51.014Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/3b/ab/b3226f0bd7cdcf710fbede2b3548584366da3b19b5021e74f5bde2a8fa3f/pytest-9.0.2-py3-none-any.whl", hash = "sha256:711ffd45bf766d5264d487b917733b453d917afd2b0ad65223959f59089f875b", size = 374801, upload-time = "2025-12-06T21:30:49.154Z" },
]
[[package]]
name = "pytest-asyncio"
version = "1.3.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "backports-asyncio-runner", marker = "python_full_version < '3.11'" },
{ name = "pytest" },
{ name = "typing-extensions", marker = "python_full_version < '3.13'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/90/2c/8af215c0f776415f3590cac4f9086ccefd6fd463befeae41cd4d3f193e5a/pytest_asyncio-1.3.0.tar.gz", hash = "sha256:d7f52f36d231b80ee124cd216ffb19369aa168fc10095013c6b014a34d3ee9e5", size = 50087, upload-time = "2025-11-10T16:07:47.256Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/e5/35/f8b19922b6a25bc0880171a2f1a003eaeb93657475193ab516fd87cac9da/pytest_asyncio-1.3.0-py3-none-any.whl", hash = "sha256:611e26147c7f77640e6d0a92a38ed17c3e9848063698d5c93d5aa7aa11cebff5", size = 15075, upload-time = "2025-11-10T16:07:45.537Z" },
]
[[package]]
name = "python-dotenv"
version = "1.1.0"
@ -766,6 +825,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/c0/d2/21af5c535501a7233e734b8af901574572da66fcc254cb35d0609c9080dd/pywin32-311-cp314-cp314-win_arm64.whl", hash = "sha256:a508e2d9025764a8270f93111a970e1d0fbfc33f4153b388bb649b7eec4f9b42", size = 8932540, upload-time = "2025-07-14T20:13:36.379Z" },
]
[[package]]
name = "qrcode"
version = "8.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "colorama", marker = "sys_platform == 'win32'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/8f/b2/7fc2931bfae0af02d5f53b174e9cf701adbb35f39d69c2af63d4a39f81a9/qrcode-8.2.tar.gz", hash = "sha256:35c3f2a4172b33136ab9f6b3ef1c00260dd2f66f858f24d88418a015f446506c", size = 43317, upload-time = "2025-05-01T15:44:24.726Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/dd/b8/d2d6d731733f51684bbf76bf34dab3b70a9148e8f2cef2bb544fccec681a/qrcode-8.2-py3-none-any.whl", hash = "sha256:16e64e0716c14960108e85d853062c9e8bba5ca8252c0b4d0231b9df4060ff4f", size = 45986, upload-time = "2025-05-01T15:44:22.781Z" },
]
[[package]]
name = "referencing"
version = "0.37.0"
@ -982,6 +1053,7 @@ dependencies = [
{ name = "nest-asyncio" },
{ name = "python-dotenv" },
{ name = "python-json-logger" },
{ name = "qrcode" },
{ name = "telethon" },
]
@ -989,6 +1061,8 @@ dependencies = [
dev = [
{ name = "black" },
{ name = "flake8" },
{ name = "pytest" },
{ name = "pytest-asyncio" },
]
[package.metadata]
@ -999,26 +1073,29 @@ requires-dist = [
{ name = "nest-asyncio", specifier = ">=1.6.0" },
{ name = "python-dotenv", specifier = ">=1.1.0" },
{ name = "python-json-logger", specifier = ">=3.3.0" },
{ name = "telethon", specifier = ">=1.39.0" },
{ name = "qrcode", specifier = ">=8.2" },
{ name = "telethon", specifier = ">=1.42.0" },
]
[package.metadata.requires-dev]
dev = [
{ name = "black", specifier = ">=25.9.0" },
{ name = "flake8", specifier = ">=7.3.0" },
{ name = "pytest", specifier = ">=9.0.2" },
{ name = "pytest-asyncio", specifier = ">=1.3.0" },
]
[[package]]
name = "telethon"
version = "1.39.0"
version = "1.42.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "pyaes" },
{ name = "rsa" },
]
sdist = { url = "https://files.pythonhosted.org/packages/54/b2/6e48b593a0e4e678578a682332ef36e54648c78ef13a5d395e5cdd293c6d/telethon-1.39.0.tar.gz", hash = "sha256:35d4795d8c91deac515fb0bcb3723866b924de1c724e1d5c230460e96f284a63", size = 640634, upload-time = "2025-02-20T17:30:44.562Z" }
sdist = { url = "https://files.pythonhosted.org/packages/8c/10/8c8c9476bfce767a856d8aaf9eae8ea1869df4e970da16f1c5b638fd1b0c/telethon-1.42.0.tar.gz", hash = "sha256:032e95511261d5ead719f75494c6c85ece2ce71816b54f3c65d6ccc371d6994d", size = 672734, upload-time = "2025-11-05T19:15:19.849Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/32/f2/4e66a59c6d0cb6a25c0c2a630b4b184d81cfe8cb982bf3b65ce32b020fc8/Telethon-1.39.0-py3-none-any.whl", hash = "sha256:aa9f394b94be144799a6f6a93ab463867bc7c63503ede9631751940a98f6c703", size = 715851, upload-time = "2025-02-20T17:30:41.955Z" },
{ url = "https://files.pythonhosted.org/packages/e4/e4/8ce0ff55251381966a7c3f88bd5b34abda79b225a8e7fb51ddef3b849c94/telethon-1.42.0-py3-none-any.whl", hash = "sha256:cf361c94586bcacd6d0fc8959a2bce509d1bb37007fe6476a80c4fb4a2decc29", size = 748466, upload-time = "2025-11-05T19:15:18.241Z" },
]
[[package]]