Airflow: Dynamic SubDag creation -


i have use case have list of clients. client can added or removed list, , can have different start dates, , different initial parameters.

i want use airflow backfill data each client based on initial start date + rerun if fails. thinking creating subdag each client. address problem?

how can dynamically create subdags based on client_id?

you can create dag objects dynamically:

def make_client_dag(parent_dag, client):   return dag(     '%s.client_%s' % (parent_dag.dag_id, client.name),     start_date = client.start_date   ) 

you use method in subdagoperator main dag:

for client in clients:   subdagoperator(     task_id='client_%s' % client.name,     dag=main_dag,     subdag = make_client_dag(main_dag, client)   ) 

this create subdag specific each member of collection clients, , each run next invocation of main dag. i'm not sure if you'll backfill behavior want.


Comments

Popular posts from this blog

Is there a better way to structure post methods in Class Based Views -

reflection - How to access the object-members of an object declaration in kotlin -

php - Doctrine Query Builder Error on Join: [Syntax Error] line 0, col 87: Error: Expected Literal, got 'JOIN' -