代码

cookbook/tools/airflow_tools.py
from agno.agent import Agent
from agno.tools.airflow import AirflowTools

agent = Agent(
    tools=[AirflowTools(dags_dir="tmp/dags", save_dag=True, read_dag=True)],
    show_tool_calls=True,
    markdown=True,
)

dag_content = """
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# 使用 'schedule' 代替已弃用的 'schedule_interval'
with DAG(
    'example_dag',
    default_args=default_args,
    description='一个简单的示例 DAG',
    schedule='@daily',  # 从 schedule_interval 更改
    catchup=False
) as dag:

    def print_hello():
        print("Hello from Airflow!")
        return "Hello 任务已完成"

    task = PythonOperator(
        task_id='hello_task',
        python_callable=print_hello,
        dag=dag,
    )
"""

agent.run(f"将此 DAG 文件保存为 'example_dag.py': {dag_content}")
agent.print_response("读取 'example_dag.py' 的内容")

用法

1

创建虚拟环境

打开 Terminal 并创建一个 python 虚拟环境。

python3 -m venv .venv
source .venv/bin/activate
2

安装库

pip install -U apache-airflow openai agno
3

设置你的 API 密钥

export OPENAI_API_KEY=xxx
4

运行 Agent

python cookbook/tools/airflow_tools.py