diff --git a/main.py b/main.py index dc6c49c..cd7a094 100644 --- a/main.py +++ b/main.py @@ -37,6 +37,7 @@ import telethon.errors.rpcerrorlist class ValidationError(Exception): """Custom exception for validation errors.""" + pass @@ -146,7 +147,6 @@ def log_and_format_error( else: error_code = f"{prefix}-ERR-{abs(hash(function_name)) % 1000:03d}" - # Format the additional context parameters context = ", ".join(f"{k}={v}" for k, v in kwargs.items()) @@ -166,6 +166,7 @@ def validate_id(*param_names_to_validate): It checks for valid integer ranges, string representations of integers, and username formats. """ + def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): @@ -178,22 +179,31 @@ def validate_id(*param_names_to_validate): def validate_single_id(value, p_name): # Handle integer IDs if isinstance(value, int): - if not (-2**63 <= value <= 2**63 - 1): - return None, f"Invalid {p_name}: {value}. ID is out of the valid integer range." + if not (-(2**63) <= value <= 2**63 - 1): + return ( + None, + f"Invalid {p_name}: {value}. ID is out of the valid integer range.", + ) return value, None # Handle string IDs if isinstance(value, str): try: int_value = int(value) - if not (-2**63 <= int_value <= 2**63 - 1): - return None, f"Invalid {p_name}: {value}. ID is out of the valid integer range." + if not (-(2**63) <= int_value <= 2**63 - 1): + return ( + None, + f"Invalid {p_name}: {value}. ID is out of the valid integer range.", + ) return int_value, None except ValueError: - if re.match(r'^@?[a-zA-Z0-9_]{5,}$', value): + if re.match(r"^@?[a-zA-Z0-9_]{5,}$", value): return value, None else: - return None, f"Invalid {p_name}: '{value}'. Must be a valid integer ID, or a username string." + return ( + None, + f"Invalid {p_name}: '{value}'. Must be a valid integer ID, or a username string.", + ) # Handle other invalid types return None, f"Invalid {p_name}: {value}. Type must be an integer or a string." @@ -208,7 +218,7 @@ def validate_id(*param_names_to_validate): ValidationError(error_msg), prefix="VALIDATION-001", user_message=error_msg, - **{param_name: param_value} + **{param_name: param_value}, ) validated_list.append(validated_item) kwargs[param_name] = validated_list @@ -220,12 +230,14 @@ def validate_id(*param_names_to_validate): ValidationError(error_msg), prefix="VALIDATION-001", user_message=error_msg, - **{param_name: param_value} + **{param_name: param_value}, ) kwargs[param_name] = validated_value return await func(*args, **kwargs) + return wrapper + return decorator @@ -315,7 +327,7 @@ async def get_chats(page: int = 1, page_size: int = 20) -> str: @mcp.tool() -@validate_id('chat_id') +@validate_id("chat_id") async def get_messages(chat_id: Union[int, str], page: int = 1, page_size: int = 20) -> str: """ Get paginated messages from a specific chat. @@ -347,7 +359,7 @@ async def get_messages(chat_id: Union[int, str], page: int = 1, page_size: int = @mcp.tool() -@validate_id('chat_id') +@validate_id("chat_id") async def send_message(chat_id: Union[int, str], message: str) -> str: """ Send a message to a specific chat. @@ -432,7 +444,7 @@ async def get_contact_ids() -> str: @mcp.tool() -@validate_id('chat_id') +@validate_id("chat_id") async def list_messages( chat_id: Union[int, str], limit: int = 20, @@ -588,7 +600,7 @@ async def list_chats(chat_type: str = None, limit: int = 20) -> str: @mcp.tool() -@validate_id('chat_id') +@validate_id("chat_id") async def get_chat(chat_id: Union[int, str]) -> str: """ Get detailed information about a specific chat. @@ -724,7 +736,7 @@ async def get_direct_chat_by_contact(contact_query: str) -> str: @mcp.tool() -@validate_id('contact_id') +@validate_id("contact_id") async def get_contact_chats(contact_id: Union[int, str]) -> str: """ List all chats involving a specific contact. @@ -777,7 +789,7 @@ async def get_contact_chats(contact_id: Union[int, str]) -> str: @mcp.tool() -@validate_id('contact_id') +@validate_id("contact_id") async def get_last_interaction(contact_id: Union[int, str]) -> str: """ Get the most recent message with a contact. @@ -814,8 +826,10 @@ async def get_last_interaction(contact_id: Union[int, str]) -> str: @mcp.tool() -@validate_id('chat_id') -async def get_message_context(chat_id: Union[int, str], message_id: int, context_size: int = 3) -> str: +@validate_id("chat_id") +async def get_message_context( + chat_id: Union[int, str], message_id: int, context_size: int = 3 +) -> str: """ Retrieve context around a specific message. @@ -932,7 +946,7 @@ async def add_contact(phone: str, first_name: str, last_name: str = "") -> str: @mcp.tool() -@validate_id('user_id') +@validate_id("user_id") async def delete_contact(user_id: Union[int, str]) -> str: """ Delete a contact by user ID. @@ -948,7 +962,7 @@ async def delete_contact(user_id: Union[int, str]) -> str: @mcp.tool() -@validate_id('user_id') +@validate_id("user_id") async def block_user(user_id: Union[int, str]) -> str: """ Block a user by user ID. @@ -964,7 +978,7 @@ async def block_user(user_id: Union[int, str]) -> str: @mcp.tool() -@validate_id('user_id') +@validate_id("user_id") async def unblock_user(user_id: Union[int, str]) -> str: """ Unblock a user by user ID. @@ -992,7 +1006,7 @@ async def get_me() -> str: @mcp.tool() -@validate_id('user_ids') +@validate_id("user_ids") async def create_group(title: str, user_ids: List[Union[int, str]]) -> str: """ Create a new group or supergroup and add users. @@ -1051,7 +1065,7 @@ async def create_group(title: str, user_ids: List[Union[int, str]]) -> str: @mcp.tool() -@validate_id('group_id', 'user_ids') +@validate_id("group_id", "user_ids") async def invite_to_group(group_id: Union[int, str], user_ids: List[Union[int, str]]) -> str: """ Invite users to a group or channel. @@ -1101,7 +1115,7 @@ async def invite_to_group(group_id: Union[int, str], user_ids: List[Union[int, s @mcp.tool() -@validate_id('chat_id') +@validate_id("chat_id") async def leave_chat(chat_id: Union[int, str]) -> str: """ Leave a group or channel by chat ID. @@ -1181,7 +1195,7 @@ async def leave_chat(chat_id: Union[int, str]) -> str: @mcp.tool() -@validate_id('chat_id') +@validate_id("chat_id") async def get_participants(chat_id: Union[int, str]) -> str: """ List all participants in a group or channel. @@ -1200,7 +1214,7 @@ async def get_participants(chat_id: Union[int, str]) -> str: @mcp.tool() -@validate_id('chat_id') +@validate_id("chat_id") async def send_file(chat_id: Union[int, str], file_path: str, caption: str = None) -> str: """ Send a file to a chat. @@ -1224,7 +1238,7 @@ async def send_file(chat_id: Union[int, str], file_path: str, caption: str = Non @mcp.tool() -@validate_id('chat_id') +@validate_id("chat_id") async def download_media(chat_id: Union[int, str], message_id: int, file_path: str) -> str: """ Download media from a message in a chat. @@ -1326,9 +1340,11 @@ async def get_privacy_settings() -> str: @mcp.tool() -@validate_id('allow_users', 'disallow_users') +@validate_id("allow_users", "disallow_users") async def set_privacy_settings( - key: str, allow_users: Optional[List[Union[int, str]]] = None, disallow_users: Optional[List[Union[int, str]]] = None + key: str, + allow_users: Optional[List[Union[int, str]]] = None, + disallow_users: Optional[List[Union[int, str]]] = None, ) -> str: """ Set privacy settings (e.g., last seen, phone, etc.). @@ -1483,7 +1499,7 @@ async def create_channel(title: str, about: str = "", megagroup: bool = False) - @mcp.tool() -@validate_id('chat_id') +@validate_id("chat_id") async def edit_chat_title(chat_id: Union[int, str], title: str) -> str: """ Edit the title of a chat, group, or channel. @@ -1503,7 +1519,7 @@ async def edit_chat_title(chat_id: Union[int, str], title: str) -> str: @mcp.tool() -@validate_id('chat_id') +@validate_id("chat_id") async def edit_chat_photo(chat_id: Union[int, str], file_path: str) -> str: """ Edit the photo of a chat, group, or channel. Requires a file path to an image. @@ -1537,7 +1553,7 @@ async def edit_chat_photo(chat_id: Union[int, str], file_path: str) -> str: @mcp.tool() -@validate_id('chat_id') +@validate_id("chat_id") async def delete_chat_photo(chat_id: Union[int, str]) -> str: """ Delete the photo of a chat, group, or channel. @@ -1566,8 +1582,10 @@ async def delete_chat_photo(chat_id: Union[int, str]) -> str: @mcp.tool() -@validate_id('group_id', 'user_id') -async def promote_admin(group_id: Union[int, str], user_id: Union[int, str], rights: dict = None) -> str: +@validate_id("group_id", "user_id") +async def promote_admin( + group_id: Union[int, str], user_id: Union[int, str], rights: dict = None +) -> str: """ Promote a user to admin in a group/channel. @@ -1631,7 +1649,7 @@ async def promote_admin(group_id: Union[int, str], user_id: Union[int, str], rig @mcp.tool() -@validate_id('group_id', 'user_id') +@validate_id("group_id", "user_id") async def demote_admin(group_id: Union[int, str], user_id: Union[int, str]) -> str: """ Demote a user from admin in a group/channel. @@ -1680,7 +1698,7 @@ async def demote_admin(group_id: Union[int, str], user_id: Union[int, str]) -> s @mcp.tool() -@validate_id('chat_id', 'user_id') +@validate_id("chat_id", "user_id") async def ban_user(chat_id: Union[int, str], user_id: Union[int, str]) -> str: """ Ban a user from a group or channel. @@ -1727,7 +1745,7 @@ async def ban_user(chat_id: Union[int, str], user_id: Union[int, str]) -> str: @mcp.tool() -@validate_id('chat_id', 'user_id') +@validate_id("chat_id", "user_id") async def unban_user(chat_id: Union[int, str], user_id: Union[int, str]) -> str: """ Unban a user from a group or channel. @@ -1774,7 +1792,7 @@ async def unban_user(chat_id: Union[int, str], user_id: Union[int, str]) -> str: @mcp.tool() -@validate_id('chat_id') +@validate_id("chat_id") async def get_admins(chat_id: Union[int, str]) -> str: """ Get all admins in a group or channel. @@ -1793,7 +1811,7 @@ async def get_admins(chat_id: Union[int, str]) -> str: @mcp.tool() -@validate_id('chat_id') +@validate_id("chat_id") async def get_banned_users(chat_id: Union[int, str]) -> str: """ Get all banned users in a group or channel. @@ -1814,7 +1832,7 @@ async def get_banned_users(chat_id: Union[int, str]) -> str: @mcp.tool() -@validate_id('chat_id') +@validate_id("chat_id") async def get_invite_link(chat_id: Union[int, str]) -> str: """ Get the invite link for a group or channel. @@ -1918,7 +1936,7 @@ async def join_chat_by_link(link: str) -> str: @mcp.tool() -@validate_id('chat_id') +@validate_id("chat_id") async def export_chat_invite(chat_id: Union[int, str]) -> str: """ Export a chat invite link. @@ -2008,7 +2026,7 @@ async def import_chat_invite(hash: str) -> str: @mcp.tool() -@validate_id('chat_id') +@validate_id("chat_id") async def send_voice(chat_id: Union[int, str], file_path: str) -> str: """ Send a voice message to a chat. File must be an OGG/OPUS voice note. @@ -2039,8 +2057,10 @@ async def send_voice(chat_id: Union[int, str], file_path: str) -> str: @mcp.tool() -@validate_id('from_chat_id', 'to_chat_id') -async def forward_message(from_chat_id: Union[int, str], message_id: int, to_chat_id: Union[int, str]) -> str: +@validate_id("from_chat_id", "to_chat_id") +async def forward_message( + from_chat_id: Union[int, str], message_id: int, to_chat_id: Union[int, str] +) -> str: """ Forward a message from one chat to another. """ @@ -2060,7 +2080,7 @@ async def forward_message(from_chat_id: Union[int, str], message_id: int, to_cha @mcp.tool() -@validate_id('chat_id') +@validate_id("chat_id") async def edit_message(chat_id: Union[int, str], message_id: int, new_text: str) -> str: """ Edit a message you sent. @@ -2076,7 +2096,7 @@ async def edit_message(chat_id: Union[int, str], message_id: int, new_text: str) @mcp.tool() -@validate_id('chat_id') +@validate_id("chat_id") async def delete_message(chat_id: Union[int, str], message_id: int) -> str: """ Delete a message by ID. @@ -2090,7 +2110,7 @@ async def delete_message(chat_id: Union[int, str], message_id: int) -> str: @mcp.tool() -@validate_id('chat_id') +@validate_id("chat_id") async def pin_message(chat_id: Union[int, str], message_id: int) -> str: """ Pin a message in a chat. @@ -2104,7 +2124,7 @@ async def pin_message(chat_id: Union[int, str], message_id: int) -> str: @mcp.tool() -@validate_id('chat_id') +@validate_id("chat_id") async def unpin_message(chat_id: Union[int, str], message_id: int) -> str: """ Unpin a message in a chat. @@ -2118,7 +2138,7 @@ async def unpin_message(chat_id: Union[int, str], message_id: int) -> str: @mcp.tool() -@validate_id('chat_id') +@validate_id("chat_id") async def mark_as_read(chat_id: Union[int, str]) -> str: """ Mark all messages as read in a chat. @@ -2132,7 +2152,7 @@ async def mark_as_read(chat_id: Union[int, str]) -> str: @mcp.tool() -@validate_id('chat_id') +@validate_id("chat_id") async def reply_to_message(chat_id: Union[int, str], message_id: int, text: str) -> str: """ Reply to a specific message in a chat. @@ -2148,7 +2168,7 @@ async def reply_to_message(chat_id: Union[int, str], message_id: int, text: str) @mcp.tool() -@validate_id('chat_id') +@validate_id("chat_id") async def get_media_info(chat_id: Union[int, str], message_id: int) -> str: """ Get info about media in a message. @@ -2179,7 +2199,7 @@ async def search_public_chats(query: str) -> str: @mcp.tool() -@validate_id('chat_id') +@validate_id("chat_id") async def search_messages(chat_id: Union[int, str], query: str, limit: int = 20) -> str: """ Search for messages in a chat by text. @@ -2216,7 +2236,7 @@ async def resolve_username(username: str) -> str: @mcp.tool() -@validate_id('chat_id') +@validate_id("chat_id") async def mute_chat(chat_id: Union[int, str]) -> str: """ Mute notifications for a chat. @@ -2255,7 +2275,7 @@ async def mute_chat(chat_id: Union[int, str]) -> str: @mcp.tool() -@validate_id('chat_id') +@validate_id("chat_id") async def unmute_chat(chat_id: Union[int, str]) -> str: """ Unmute notifications for a chat. @@ -2294,7 +2314,7 @@ async def unmute_chat(chat_id: Union[int, str]) -> str: @mcp.tool() -@validate_id('chat_id') +@validate_id("chat_id") async def archive_chat(chat_id: Union[int, str]) -> str: """ Archive a chat. @@ -2311,7 +2331,7 @@ async def archive_chat(chat_id: Union[int, str]) -> str: @mcp.tool() -@validate_id('chat_id') +@validate_id("chat_id") async def unarchive_chat(chat_id: Union[int, str]) -> str: """ Unarchive a chat. @@ -2340,7 +2360,7 @@ async def get_sticker_sets() -> str: @mcp.tool() -@validate_id('chat_id') +@validate_id("chat_id") async def send_sticker(chat_id: Union[int, str], file_path: str) -> str: """ Send a sticker to a chat. File must be a valid .webp sticker file. @@ -2418,7 +2438,7 @@ async def get_gif_search(query: str, limit: int = 10) -> str: @mcp.tool() -@validate_id('chat_id') +@validate_id("chat_id") async def send_gif(chat_id: Union[int, str], gif_id: int) -> str: """ Send a GIF to a chat by Telegram GIF document ID (not a file path). @@ -2521,7 +2541,7 @@ async def set_bot_commands(bot_username: str, commands: list) -> str: @mcp.tool() -@validate_id('chat_id') +@validate_id("chat_id") async def get_history(chat_id: Union[int, str], limit: int = 100) -> str: """ Get full chat history (up to limit). @@ -2544,7 +2564,7 @@ async def get_history(chat_id: Union[int, str], limit: int = 100) -> str: @mcp.tool() -@validate_id('user_id') +@validate_id("user_id") async def get_user_photos(user_id: Union[int, str], limit: int = 10) -> str: """ Get profile photos of a user. @@ -2560,7 +2580,7 @@ async def get_user_photos(user_id: Union[int, str], limit: int = 10) -> str: @mcp.tool() -@validate_id('user_id') +@validate_id("user_id") async def get_user_status(user_id: Union[int, str]) -> str: """ Get the online status of a user. @@ -2573,7 +2593,7 @@ async def get_user_status(user_id: Union[int, str]) -> str: @mcp.tool() -@validate_id('chat_id') +@validate_id("chat_id") async def get_recent_actions(chat_id: Union[int, str]) -> str: """ Get recent admin actions (admin log) in a group or channel. @@ -2596,7 +2616,7 @@ async def get_recent_actions(chat_id: Union[int, str]) -> str: @mcp.tool() -@validate_id('chat_id') +@validate_id("chat_id") async def get_pinned_messages(chat_id: Union[int, str]) -> str: """ Get all pinned messages in a chat. diff --git a/test_validation.py b/test_validation.py index 6e7354b..15da5ad 100644 --- a/test_validation.py +++ b/test_validation.py @@ -1,5 +1,6 @@ import pytest import os + os.environ["TELEGRAM_API_ID"] = "12345" os.environ["TELEGRAM_API_HASH"] = "dummy_hash" from main import validate_id, ValidationError, log_and_format_error @@ -7,46 +8,54 @@ from functools import wraps import asyncio from typing import Union, List, Optional + # A simple async function to be decorated for testing -@validate_id('user_id', 'chat_id', 'user_ids') +@validate_id("user_id", "chat_id", "user_ids") async def dummy_function(**kwargs): return "success", kwargs + @pytest.mark.asyncio async def test_valid_integer_id(): result, kwargs = await dummy_function(user_id=12345) assert result == "success" - assert kwargs['user_id'] == 12345 + assert kwargs["user_id"] == 12345 + @pytest.mark.asyncio async def test_valid_negative_integer_id(): result, kwargs = await dummy_function(chat_id=-100123456) assert result == "success" - assert kwargs['chat_id'] == -100123456 + assert kwargs["chat_id"] == -100123456 + @pytest.mark.asyncio async def test_valid_string_integer_id(): result, kwargs = await dummy_function(user_id="12345") assert result == "success" - assert kwargs['user_id'] == 12345 + assert kwargs["user_id"] == 12345 + @pytest.mark.asyncio async def test_valid_username(): result, kwargs = await dummy_function(user_id="@test_user") assert result == "success" - assert kwargs['user_id'] == "@test_user" + assert kwargs["user_id"] == "@test_user" + @pytest.mark.asyncio async def test_valid_username_without_at(): result, kwargs = await dummy_function(user_id="test_user_long_enough") assert result == "success" - assert kwargs['user_id'] == "test_user_long_enough" + assert kwargs["user_id"] == "test_user_long_enough" + @pytest.mark.asyncio async def test_valid_list_of_ids(): result, kwargs = await dummy_function(user_ids=[123, "456", "@test_user"]) assert result == "success" - assert kwargs['user_ids'] == [123, 456, "@test_user"] + assert kwargs["user_ids"] == [123, 456, "@test_user"] + @pytest.mark.asyncio async def test_invalid_float_id(): @@ -54,29 +63,34 @@ async def test_invalid_float_id(): assert "Invalid user_id" in result assert "Type must be an integer or a string" in result + @pytest.mark.asyncio async def test_invalid_string_id(): - result = await dummy_function(user_id="inv") # too short + result = await dummy_function(user_id="inv") # too short assert "Invalid user_id" in result assert "Must be a valid integer ID, or a username string" in result + @pytest.mark.asyncio async def test_integer_out_of_range(): result = await dummy_function(user_id=2**64) assert "Invalid user_id" in result assert "out of the valid integer range" in result + @pytest.mark.asyncio async def test_invalid_item_in_list(): result = await dummy_function(user_ids=[123, "456", 123.45]) assert "Invalid user_ids" in result assert "Type must be an integer or a string" in result + @pytest.mark.asyncio async def test_no_id_provided(): result, kwargs = await dummy_function() assert result == "success" + @pytest.mark.asyncio async def test_none_id_provided(): result, kwargs = await dummy_function(user_id=None)