Add inline button helpers and channel subscription tool

This commit is contained in:
latand 2025-11-04 20:51:04 +02:00
parent 66ae9f6f8e
commit c3594795cd
2 changed files with 274 additions and 0 deletions

View file

@ -56,6 +56,7 @@ This MCP server exposes a huge suite of Telegram tools. **Every major Telegram/T
- **export_chat_invite(chat_id)**: Export invite link
- **import_chat_invite(hash)**: Join chat by invite hash
- **join_chat_by_link(link)**: Join chat by invite link
- **subscribe_public_channel(channel)**: Subscribe to a public channel or supergroup by username or ID
### Messaging
- **get_messages(chat_id, page, page_size)**: Paginated messages
@ -74,6 +75,8 @@ This MCP server exposes a huge suite of Telegram tools. **Every major Telegram/T
- **get_pinned_messages(chat_id)**: List pinned messages
- **get_last_interaction(contact_id)**: Most recent message with a contact
- **create_poll(chat_id, question, options, multiple_choice, quiz_mode, public_votes, close_date)**: Create a poll
- **list_inline_buttons(chat_id, message_id, limit)**: Inspect inline keyboards to discover button text/index
- **press_inline_button(chat_id, message_id, button_text, button_index)**: Trigger inline keyboard callbacks by label or index
### Contact Management
- **list_contacts()**: List all contacts
@ -313,6 +316,76 @@ Example output:
Message sent successfully.
```
### Listing Inline Buttons
```python
@mcp.tool()
async def list_inline_buttons(
chat_id: Union[int, str],
message_id: Optional[int] = None,
limit: int = 20,
) -> str:
"""
Discover inline keyboard layout, including button indices, callback availability, and URLs.
"""
```
Example usage:
```
list_inline_buttons(chat_id="@sample_tasks_bot")
```
This returns something like:
```
Buttons for message 42 (date 2025-01-01 12:00:00+00:00):
[0] text='📋 View tasks', callback=yes
[1] text=' Help', callback=yes
[2] text='🌐 Visit site', callback=no, url=https://example.org
```
### Pressing Inline Buttons
```python
@mcp.tool()
async def press_inline_button(
chat_id: Union[int, str],
message_id: Optional[int] = None,
button_text: Optional[str] = None,
button_index: Optional[int] = None,
) -> str:
"""
Press an inline keyboard button by label or zero-based index.
If message_id is omitted, the server searches recent messages for the latest inline keyboard.
"""
```
Example usage:
```
press_inline_button(chat_id="@sample_tasks_bot", button_text="📋 View tasks")
```
Use `list_inline_buttons` first if you need to inspect available buttons—pass a bogus `button_text`
to quickly list options or call `list_inline_buttons` directly. Once you know the text or index,
`press_inline_button` sends the callback, just like tapping the button in a native Telegram client.
### Subscribing to Public Channels
```python
@mcp.tool()
async def subscribe_public_channel(channel: Union[int, str]) -> str:
"""
Join a public channel or supergroup by username (e.g., "@examplechannel") or ID.
"""
```
Example usage:
```
subscribe_public_channel(channel="@daily_updates_feed")
```
If the account is already a participant, the tool reports that instead of failing, making it safe to
run repeatedly in workflows that need idempotent joins.
### Getting Chat Invite Links
The `get_invite_link` function is particularly robust with multiple fallback methods:

201
main.py
View file

@ -391,6 +391,207 @@ async def send_message(chat_id: Union[int, str], message: str) -> str:
return log_and_format_error("send_message", e, chat_id=chat_id)
@mcp.tool()
@validate_id("channel")
async def subscribe_public_channel(channel: Union[int, str]) -> str:
"""
Subscribe (join) to a public channel or supergroup by username or ID.
"""
try:
entity = await client.get_entity(channel)
await client(functions.channels.JoinChannelRequest(channel=entity))
title = getattr(entity, "title", getattr(entity, "username", "Unknown channel"))
return f"Subscribed to {title}."
except telethon.errors.rpcerrorlist.UserAlreadyParticipantError:
title = getattr(entity, "title", getattr(entity, "username", "this channel"))
return f"Already subscribed to {title}."
except telethon.errors.rpcerrorlist.ChannelPrivateError:
return "Cannot subscribe: this channel is private or requires an invite link."
except Exception as e:
return log_and_format_error("subscribe_public_channel", e, channel=channel)
@mcp.tool()
@validate_id("chat_id")
async def list_inline_buttons(
chat_id: Union[int, str], message_id: Optional[Union[int, str]] = None, limit: int = 20
) -> str:
"""
Inspect inline buttons on a recent message to discover their indices/text/URLs.
"""
try:
if isinstance(message_id, str):
if message_id.isdigit():
message_id = int(message_id)
else:
return "message_id must be an integer."
entity = await client.get_entity(chat_id)
target_message = None
if message_id is not None:
target_message = await client.get_messages(entity, ids=message_id)
if isinstance(target_message, list):
target_message = target_message[0] if target_message else None
else:
recent_messages = await client.get_messages(entity, limit=limit)
target_message = next(
(msg for msg in recent_messages if getattr(msg, "buttons", None)), None
)
if not target_message:
return "No message with inline buttons found."
buttons_attr = getattr(target_message, "buttons", None)
if not buttons_attr:
return f"Message {target_message.id} does not contain inline buttons."
buttons = [btn for row in buttons_attr for btn in row]
if not buttons:
return f"Message {target_message.id} does not contain inline buttons."
lines = [
f"Buttons for message {target_message.id} (date {target_message.date}):",
]
for idx, btn in enumerate(buttons):
raw_button = getattr(btn, "button", None)
text = getattr(btn, "text", "") or "<no text>"
url = getattr(raw_button, "url", None) if raw_button else None
has_callback = bool(getattr(btn, "data", None))
parts = [f"[{idx}] text='{text}'"]
parts.append("callback=yes" if has_callback else "callback=no")
if url:
parts.append(f"url={url}")
lines.append(", ".join(parts))
return "\n".join(lines)
except Exception as e:
return log_and_format_error(
"list_inline_buttons",
e,
chat_id=chat_id,
message_id=message_id,
limit=limit,
)
@mcp.tool()
@validate_id("chat_id")
async def press_inline_button(
chat_id: Union[int, str],
message_id: Optional[Union[int, str]] = None,
button_text: Optional[str] = None,
button_index: Optional[int] = None,
) -> str:
"""
Press an inline button (callback) in a chat message.
Args:
chat_id: Chat or bot where the inline keyboard exists.
message_id: Specific message ID to inspect. If omitted, searches recent messages for one containing buttons.
button_text: Exact text of the button to press (case-insensitive).
button_index: Zero-based index among all buttons if you prefer positional access.
"""
try:
if button_text is None and button_index is None:
return "Provide button_text or button_index to choose a button."
# Normalize message_id if provided as a string
if isinstance(message_id, str):
if message_id.isdigit():
message_id = int(message_id)
else:
return "message_id must be an integer."
if isinstance(button_index, str):
if button_index.isdigit():
button_index = int(button_index)
else:
return "button_index must be an integer."
entity = await client.get_entity(chat_id)
target_message = None
if message_id is not None:
target_message = await client.get_messages(entity, ids=message_id)
if isinstance(target_message, list):
target_message = target_message[0] if target_message else None
else:
recent_messages = await client.get_messages(entity, limit=20)
target_message = next(
(msg for msg in recent_messages if getattr(msg, "buttons", None)), None
)
if not target_message:
return (
"No message with inline buttons found. Specify message_id to target a specific message."
)
buttons_attr = getattr(target_message, "buttons", None)
if not buttons_attr:
return f"Message {target_message.id} does not contain inline buttons."
buttons = [btn for row in buttons_attr for btn in row]
if not buttons:
return f"Message {target_message.id} does not contain inline buttons."
target_button = None
if button_text:
normalized = button_text.strip().lower()
target_button = next(
(
btn
for btn in buttons
if (getattr(btn, "text", "") or "").strip().lower() == normalized
),
None,
)
if target_button is None and button_index is not None:
if button_index < 0 or button_index >= len(buttons):
return f"button_index out of range. Valid indices: 0-{len(buttons) - 1}."
target_button = buttons[button_index]
if not target_button:
available = ", ".join(
f"[{idx}] {getattr(btn, 'text', '') or '<no text>'}"
for idx, btn in enumerate(buttons)
)
return f"Button not found. Available buttons: {available}"
if not getattr(target_button, "data", None):
raw_button = getattr(target_button, "button", None)
url = getattr(raw_button, "url", None) if raw_button else None
if url:
return f"Selected button opens a URL instead of sending a callback: {url}"
return "Selected button does not provide callback data to press."
callback_result = await client(
functions.messages.GetBotCallbackAnswerRequest(
peer=entity, msg_id=target_message.id, data=target_button.data
)
)
response_parts = []
if getattr(callback_result, "message", None):
response_parts.append(callback_result.message)
if getattr(callback_result, "alert", None):
response_parts.append("Telegram displayed an alert to the user.")
if not response_parts:
response_parts.append("Button pressed successfully.")
return " ".join(response_parts)
except Exception as e:
return log_and_format_error(
"press_inline_button",
e,
chat_id=chat_id,
message_id=message_id,
button_text=button_text,
button_index=button_index,
)
@mcp.tool()
async def list_contacts() -> str:
"""