I’m building an automation script that monitors the notcoin Telegram bot for newly launched pools and sends alerts to my channel. The goal is to automatically detect when fresh pools become available and forward this info to subscribers.
Here’s my current approach:
config.py:
from dotenv import load_dotenv
import asyncio
import logging
import telegram_monitor
load_dotenv()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(telegram_monitor.initialize())
telegram_monitor.py:
import asyncio
import aiohttp
from os import environ
from aiogram import Bot, Dispatcher, types
from aiogram.client.default import DefaultBotProperties
from aiogram.enums import ParseMode
from aiogram.filters import CommandStart
from aiogram.types import Message
from bs4 import BeautifulSoup
dp = Dispatcher()
notification_suffix = "Don't miss out on this opportunity!"
target_channel = ""
processed_pools = []
scan_frequency = 3600
async def initialize():
global telegram_bot
telegram_bot = Bot(
environ.get("BOT_TOKEN"),
default=DefaultBotProperties(parse_mode=ParseMode.HTML),
)
await telegram_bot.delete_webhook(drop_pending_updates=True)
await dp.start_polling(telegram_bot)
@dp.message(CommandStart())
async def handle_start_command(message: Message):
await message.answer(f"Welcome! Your user ID: {message.from_user.id}")
async def scrape_pool_information():
async with aiohttp.ClientSession() as session:
async with session.get("https://t.me/notcoin_bot") as response:
parser = BeautifulSoup(await response.text(), "html.parser")
pool_elements = parser.find_all("div", class_="pool")
for element in pool_elements:
pool_title = element.find("h3").text
reward_amount = element.find("span", class_="reward").text
if pool_title not in processed_pools:
processed_pools.append(pool_title)
message_content = f"**New Pool Alert:** {pool_title} | **Reward Rate:** {reward_amount} per hour.\n{notification_suffix}"
await telegram_bot.send_message(target_channel, message_content)
async def run_periodic_check(wait_time):
await asyncio.sleep(wait_time)
event_loop = asyncio.get_event_loop()
event_loop.create_task(scrape_pool_information())
event_loop.create_task(run_periodic_check(scan_frequency))
The bot responds to the start command correctly but the pool monitoring functionality isn’t working. What could be causing this issue with the data extraction logic?