from agno.agent import Agent
from agno.tools.airflow import AirflowTools
agent = Agent(
tools=[AirflowTools(dags_dir="tmp/dags", save_dag=True, read_dag=True)],
show_tool_calls=True,
markdown=True,
)
dag_content = """
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# 使用 'schedule' 代替已弃用的 'schedule_interval'
with DAG(
'example_dag',
default_args=default_args,
description='一个简单的示例 DAG',
schedule='@daily', # 从 schedule_interval 更改
catchup=False
) as dag:
def print_hello():
print("Hello from Airflow!")
return "Hello 任务已完成"
task = PythonOperator(
task_id='hello_task',
python_callable=print_hello,
dag=dag,
)
"""
agent.run(f"将此 DAG 文件保存为 'example_dag.py': {dag_content}")
agent.print_response("读取 'example_dag.py' 的内容")
创建虚拟环境
打开 Terminal
并创建一个 python 虚拟环境。
python3 -m venv .venv
source .venv/bin/activate
安装库
pip install -U apache-airflow openai agno
设置你的 API 密钥
export OPENAI_API_KEY=xxx
运行 Agent
python cookbook/tools/airflow_tools.py