Merge pull request #32 from Latand/feature/inline-buttons-subscribe
Add inline button helpers and channel subscription tool
This commit is contained in:
commit
ac284bcd4e
2 changed files with 272 additions and 0 deletions
73
README.md
73
README.md
|
|
@ -56,6 +56,7 @@ This MCP server exposes a huge suite of Telegram tools. **Every major Telegram/T
|
|||
- **export_chat_invite(chat_id)**: Export invite link
|
||||
- **import_chat_invite(hash)**: Join chat by invite hash
|
||||
- **join_chat_by_link(link)**: Join chat by invite link
|
||||
- **subscribe_public_channel(channel)**: Subscribe to a public channel or supergroup by username or ID
|
||||
|
||||
### Messaging
|
||||
- **get_messages(chat_id, page, page_size)**: Paginated messages
|
||||
|
|
@ -74,6 +75,8 @@ This MCP server exposes a huge suite of Telegram tools. **Every major Telegram/T
|
|||
- **get_pinned_messages(chat_id)**: List pinned messages
|
||||
- **get_last_interaction(contact_id)**: Most recent message with a contact
|
||||
- **create_poll(chat_id, question, options, multiple_choice, quiz_mode, public_votes, close_date)**: Create a poll
|
||||
- **list_inline_buttons(chat_id, message_id, limit)**: Inspect inline keyboards to discover button text/index
|
||||
- **press_inline_button(chat_id, message_id, button_text, button_index)**: Trigger inline keyboard callbacks by label or index
|
||||
|
||||
### Contact Management
|
||||
- **list_contacts()**: List all contacts
|
||||
|
|
@ -313,6 +316,76 @@ Example output:
|
|||
Message sent successfully.
|
||||
```
|
||||
|
||||
### Listing Inline Buttons
|
||||
|
||||
```python
|
||||
@mcp.tool()
|
||||
async def list_inline_buttons(
|
||||
chat_id: Union[int, str],
|
||||
message_id: Optional[int] = None,
|
||||
limit: int = 20,
|
||||
) -> str:
|
||||
"""
|
||||
Discover inline keyboard layout, including button indices, callback availability, and URLs.
|
||||
"""
|
||||
```
|
||||
|
||||
Example usage:
|
||||
```
|
||||
list_inline_buttons(chat_id="@sample_tasks_bot")
|
||||
```
|
||||
|
||||
This returns something like:
|
||||
```
|
||||
Buttons for message 42 (date 2025-01-01 12:00:00+00:00):
|
||||
[0] text='📋 View tasks', callback=yes
|
||||
[1] text='ℹ️ Help', callback=yes
|
||||
[2] text='🌐 Visit site', callback=no, url=https://example.org
|
||||
```
|
||||
|
||||
### Pressing Inline Buttons
|
||||
|
||||
```python
|
||||
@mcp.tool()
|
||||
async def press_inline_button(
|
||||
chat_id: Union[int, str],
|
||||
message_id: Optional[int] = None,
|
||||
button_text: Optional[str] = None,
|
||||
button_index: Optional[int] = None,
|
||||
) -> str:
|
||||
"""
|
||||
Press an inline keyboard button by label or zero-based index.
|
||||
If message_id is omitted, the server searches recent messages for the latest inline keyboard.
|
||||
"""
|
||||
```
|
||||
|
||||
Example usage:
|
||||
```
|
||||
press_inline_button(chat_id="@sample_tasks_bot", button_text="📋 View tasks")
|
||||
```
|
||||
|
||||
Use `list_inline_buttons` first if you need to inspect available buttons—pass a bogus `button_text`
|
||||
to quickly list options or call `list_inline_buttons` directly. Once you know the text or index,
|
||||
`press_inline_button` sends the callback, just like tapping the button in a native Telegram client.
|
||||
|
||||
### Subscribing to Public Channels
|
||||
|
||||
```python
|
||||
@mcp.tool()
|
||||
async def subscribe_public_channel(channel: Union[int, str]) -> str:
|
||||
"""
|
||||
Join a public channel or supergroup by username (e.g., "@examplechannel") or ID.
|
||||
"""
|
||||
```
|
||||
|
||||
Example usage:
|
||||
```
|
||||
subscribe_public_channel(channel="@daily_updates_feed")
|
||||
```
|
||||
|
||||
If the account is already a participant, the tool reports that instead of failing, making it safe to
|
||||
run repeatedly in workflows that need idempotent joins.
|
||||
|
||||
### Getting Chat Invite Links
|
||||
|
||||
The `get_invite_link` function is particularly robust with multiple fallback methods:
|
||||
|
|
|
|||
199
main.py
199
main.py
|
|
@ -391,6 +391,205 @@ async def send_message(chat_id: Union[int, str], message: str) -> str:
|
|||
return log_and_format_error("send_message", e, chat_id=chat_id)
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
@validate_id("channel")
|
||||
async def subscribe_public_channel(channel: Union[int, str]) -> str:
|
||||
"""
|
||||
Subscribe (join) to a public channel or supergroup by username or ID.
|
||||
"""
|
||||
try:
|
||||
entity = await client.get_entity(channel)
|
||||
await client(functions.channels.JoinChannelRequest(channel=entity))
|
||||
title = getattr(entity, "title", getattr(entity, "username", "Unknown channel"))
|
||||
return f"Subscribed to {title}."
|
||||
except telethon.errors.rpcerrorlist.UserAlreadyParticipantError:
|
||||
title = getattr(entity, "title", getattr(entity, "username", "this channel"))
|
||||
return f"Already subscribed to {title}."
|
||||
except telethon.errors.rpcerrorlist.ChannelPrivateError:
|
||||
return "Cannot subscribe: this channel is private or requires an invite link."
|
||||
except Exception as e:
|
||||
return log_and_format_error("subscribe_public_channel", e, channel=channel)
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
@validate_id("chat_id")
|
||||
async def list_inline_buttons(
|
||||
chat_id: Union[int, str], message_id: Optional[Union[int, str]] = None, limit: int = 20
|
||||
) -> str:
|
||||
"""
|
||||
Inspect inline buttons on a recent message to discover their indices/text/URLs.
|
||||
"""
|
||||
try:
|
||||
if isinstance(message_id, str):
|
||||
if message_id.isdigit():
|
||||
message_id = int(message_id)
|
||||
else:
|
||||
return "message_id must be an integer."
|
||||
|
||||
entity = await client.get_entity(chat_id)
|
||||
target_message = None
|
||||
|
||||
if message_id is not None:
|
||||
target_message = await client.get_messages(entity, ids=message_id)
|
||||
if isinstance(target_message, list):
|
||||
target_message = target_message[0] if target_message else None
|
||||
else:
|
||||
recent_messages = await client.get_messages(entity, limit=limit)
|
||||
target_message = next(
|
||||
(msg for msg in recent_messages if getattr(msg, "buttons", None)), None
|
||||
)
|
||||
|
||||
if not target_message:
|
||||
return "No message with inline buttons found."
|
||||
|
||||
buttons_attr = getattr(target_message, "buttons", None)
|
||||
if not buttons_attr:
|
||||
return f"Message {target_message.id} does not contain inline buttons."
|
||||
|
||||
buttons = [btn for row in buttons_attr for btn in row]
|
||||
if not buttons:
|
||||
return f"Message {target_message.id} does not contain inline buttons."
|
||||
|
||||
lines = [
|
||||
f"Buttons for message {target_message.id} (date {target_message.date}):",
|
||||
]
|
||||
for idx, btn in enumerate(buttons):
|
||||
raw_button = getattr(btn, "button", None)
|
||||
text = getattr(btn, "text", "") or "<no text>"
|
||||
url = getattr(raw_button, "url", None) if raw_button else None
|
||||
has_callback = bool(getattr(btn, "data", None))
|
||||
parts = [f"[{idx}] text='{text}'"]
|
||||
parts.append("callback=yes" if has_callback else "callback=no")
|
||||
if url:
|
||||
parts.append(f"url={url}")
|
||||
lines.append(", ".join(parts))
|
||||
|
||||
return "\n".join(lines)
|
||||
except Exception as e:
|
||||
return log_and_format_error(
|
||||
"list_inline_buttons",
|
||||
e,
|
||||
chat_id=chat_id,
|
||||
message_id=message_id,
|
||||
limit=limit,
|
||||
)
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
@validate_id("chat_id")
|
||||
async def press_inline_button(
|
||||
chat_id: Union[int, str],
|
||||
message_id: Optional[Union[int, str]] = None,
|
||||
button_text: Optional[str] = None,
|
||||
button_index: Optional[int] = None,
|
||||
) -> str:
|
||||
"""
|
||||
Press an inline button (callback) in a chat message.
|
||||
|
||||
Args:
|
||||
chat_id: Chat or bot where the inline keyboard exists.
|
||||
message_id: Specific message ID to inspect. If omitted, searches recent messages for one containing buttons.
|
||||
button_text: Exact text of the button to press (case-insensitive).
|
||||
button_index: Zero-based index among all buttons if you prefer positional access.
|
||||
"""
|
||||
try:
|
||||
if button_text is None and button_index is None:
|
||||
return "Provide button_text or button_index to choose a button."
|
||||
|
||||
# Normalize message_id if provided as a string
|
||||
if isinstance(message_id, str):
|
||||
if message_id.isdigit():
|
||||
message_id = int(message_id)
|
||||
else:
|
||||
return "message_id must be an integer."
|
||||
|
||||
if isinstance(button_index, str):
|
||||
if button_index.isdigit():
|
||||
button_index = int(button_index)
|
||||
else:
|
||||
return "button_index must be an integer."
|
||||
|
||||
entity = await client.get_entity(chat_id)
|
||||
|
||||
target_message = None
|
||||
if message_id is not None:
|
||||
target_message = await client.get_messages(entity, ids=message_id)
|
||||
if isinstance(target_message, list):
|
||||
target_message = target_message[0] if target_message else None
|
||||
else:
|
||||
recent_messages = await client.get_messages(entity, limit=20)
|
||||
target_message = next(
|
||||
(msg for msg in recent_messages if getattr(msg, "buttons", None)), None
|
||||
)
|
||||
|
||||
if not target_message:
|
||||
return "No message with inline buttons found. Specify message_id to target a specific message."
|
||||
|
||||
buttons_attr = getattr(target_message, "buttons", None)
|
||||
if not buttons_attr:
|
||||
return f"Message {target_message.id} does not contain inline buttons."
|
||||
|
||||
buttons = [btn for row in buttons_attr for btn in row]
|
||||
if not buttons:
|
||||
return f"Message {target_message.id} does not contain inline buttons."
|
||||
|
||||
target_button = None
|
||||
if button_text:
|
||||
normalized = button_text.strip().lower()
|
||||
target_button = next(
|
||||
(
|
||||
btn
|
||||
for btn in buttons
|
||||
if (getattr(btn, "text", "") or "").strip().lower() == normalized
|
||||
),
|
||||
None,
|
||||
)
|
||||
|
||||
if target_button is None and button_index is not None:
|
||||
if button_index < 0 or button_index >= len(buttons):
|
||||
return f"button_index out of range. Valid indices: 0-{len(buttons) - 1}."
|
||||
target_button = buttons[button_index]
|
||||
|
||||
if not target_button:
|
||||
available = ", ".join(
|
||||
f"[{idx}] {getattr(btn, 'text', '') or '<no text>'}"
|
||||
for idx, btn in enumerate(buttons)
|
||||
)
|
||||
return f"Button not found. Available buttons: {available}"
|
||||
|
||||
if not getattr(target_button, "data", None):
|
||||
raw_button = getattr(target_button, "button", None)
|
||||
url = getattr(raw_button, "url", None) if raw_button else None
|
||||
if url:
|
||||
return f"Selected button opens a URL instead of sending a callback: {url}"
|
||||
return "Selected button does not provide callback data to press."
|
||||
|
||||
callback_result = await client(
|
||||
functions.messages.GetBotCallbackAnswerRequest(
|
||||
peer=entity, msg_id=target_message.id, data=target_button.data
|
||||
)
|
||||
)
|
||||
|
||||
response_parts = []
|
||||
if getattr(callback_result, "message", None):
|
||||
response_parts.append(callback_result.message)
|
||||
if getattr(callback_result, "alert", None):
|
||||
response_parts.append("Telegram displayed an alert to the user.")
|
||||
if not response_parts:
|
||||
response_parts.append("Button pressed successfully.")
|
||||
|
||||
return " ".join(response_parts)
|
||||
except Exception as e:
|
||||
return log_and_format_error(
|
||||
"press_inline_button",
|
||||
e,
|
||||
chat_id=chat_id,
|
||||
message_id=message_id,
|
||||
button_text=button_text,
|
||||
button_index=button_index,
|
||||
)
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def list_contacts() -> str:
|
||||
"""
|
||||
|
|
|
|||
Loading…
Reference in a new issue