Airflow: Dynamic SubDag creation -
i have use case have list of clients. client can added or removed list, , can have different start dates, , different initial parameters.
i want use airflow backfill data each client based on initial start date + rerun if fails. thinking creating subdag each client. address problem?
how can dynamically create subdags based on client_id?
you can create dag objects dynamically:
def make_client_dag(parent_dag, client): return dag( '%s.client_%s' % (parent_dag.dag_id, client.name), start_date = client.start_date )
you use method in subdagoperator main dag:
for client in clients: subdagoperator( task_id='client_%s' % client.name, dag=main_dag, subdag = make_client_dag(main_dag, client) )
this create subdag specific each member of collection clients
, , each run next invocation of main dag. i'm not sure if you'll backfill behavior want.
Comments
Post a Comment