I’m having trouble with my Python Telegram Bot when I try to run it through an Airflow DAG. The DAG seems to work and starts the bot, but it doesn’t respond to any commands. Oddly enough, when I run the file on its own, it works fine.
I’m using Astro CLI 1.31.0 to run Airflow in a Docker container. I’m pretty new to Airflow, so I’m not sure what’s going wrong.
Here’s a simplified version of my code:
import telegram_lib
import environment_settings
from airflow.client import AirflowClient
environment_settings.set_proxy('*')
BOT_TOKEN = 'SECRET_TOKEN'
bot = telegram_lib.Bot(BOT_TOKEN)
airflow_client = AirflowClient()
def start_bot(bot_instance=bot):
print("Bot is starting")
@bot_instance.command_handler(['info', 'begin'])
def greet_user(message):
bot_instance.send_message(message.chat.id, "Welcome! This bot helps manage Geospatial data.")
# More command handlers here...
bot_instance.run_continuously()
if __name__ == '__main__':
start_bot()
I tried setting NO_PROXY to ‘*’, but it didn’t help. Any ideas on what might be causing this issue?
Have you considered the possibility that the bot’s polling mechanism might be conflicting with Airflow’s task execution? When running in Airflow, long-running processes like continuous polling can be problematic.
Instead of using run_continuously(), you might want to implement a time-bound execution. For example:
def start_bot(bot_instance=bot, timeout=300):
print("Bot is starting")
# Set up command handlers
# Run the bot for a specified time
bot_instance.idle(timeout=timeout)
This approach allows the bot to run for a set duration, after which the task can complete. You can then schedule the DAG to run periodically, effectively restarting the bot at regular intervals.
Also, ensure that your Airflow worker has the necessary network access to communicate with the Telegram API. Docker networking can sometimes cause unexpected issues in this regard.
I’ve encountered similar issues when running bots through Airflow DAGs. The problem might be related to how Airflow executes tasks in a distributed environment.
One solution that worked for me was to use a PythonOperator instead of directly running the bot in the DAG. Here’s a rough example:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
def run_bot():
# Your bot code here
start_bot()
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('telegram_bot_dag', default_args=default_args, schedule_interval=timedelta(days=1))
run_bot_task = PythonOperator(
task_id='run_telegram_bot',
python_callable=run_bot,
dag=dag
)
This approach ensures that the bot runs in a separate process, which might resolve the issue you’re facing. Additionally, make sure your Airflow environment has all the necessary dependencies installed for your bot to function correctly.
hey Samuel87, have u checked if ur airflow worker has internet access? sometimes docker can mess with network stuff. also, try adding some logging to see if the bot actually starts in the DAG. might help pinpoint where its failing. good luck!