공기 흐름을 사용하여 “삭제되지 않는”버킷에서 파일을 삭제할 수 없음을 의미하는 s3 파일을 GCS로 이동하려고합니다. 새 파일이 매일있을 것이라고 보장 할 수는 없지만 매일 새 파일을 확인해야합니다.
내 문제는 subdags의 동적 생성입니다. 파일이 있으면 서브 덤이 필요합니다. 파일이 없으면 서브 덤이 필요하지 않습니다. 내 문제는 업스트림 / 다운 스트림 설정입니다. 내 코드에서는 파일을 감지하지만 예상대로 하위 코드를 시작하지는 않습니다. 뭔가 빠졌습니다.
내 코드는 다음과 같습니다.
from airflow import models
from airflow.utils.helpers import chain
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.contrib.operators.s3_to_gcs_operator import S3ToGoogleCloudStorageOperator
from airflow.utils import dates
from airflow.models import Variable
import logging
args = {
'owner': 'Airflow',
'start_date': dates.days_ago(1),
'email': ['sinistersparrow1701@gmail.com'],
'email_on_failure': True,
'email_on_success': True,
}
bucket = 'mybucket'
prefix = 'myprefix/'
LastBDEXDate = int(Variable.get("last_publish_date"))
maxdate = LastBDEXDate
files = []
parent_dag = models.DAG(
dag_id='My_Ingestion',
default_args=args,
schedule_interval='@daily',
catchup=False
)
def Check_For_Files(**kwargs):
s3 = S3Hook(aws_conn_id='S3_BOX')
s3.get_conn()
bucket = bucket
LastBDEXDate = int(Variable.get("last_publish_date"))
maxdate = LastBDEXDate
files = s3.list_keys(bucket_name=bucket, prefix='myprefix/file')
for file in files:
print(file)
print(file.split("_")[-2])
print(file.split("_")[-2][-8:]) ##proves I can see a date in the file name is ok.
maxdate = maxdate if maxdate > int(file.split("_")[-2][-8:]) else int(file.split("_")[-2][-8:])
if maxdate > LastBDEXDate:
return 'Start_Process'
return 'finished'
def create_subdag(dag_parent, dag_id_child_prefix, file_name):
# dag params
dag_id_child = '%s.%s' % (dag_parent.dag_id, dag_id_child_prefix)
# dag
subdag = models.DAG(dag_id=dag_id_child,
default_args=args,
schedule_interval=None)
# operators
s3_to_gcs_op = S3ToGoogleCloudStorageOperator(
task_id=dag_id_child,
bucket=bucket,
prefix=file_name,
dest_gcs_conn_id='GCP_Account',
dest_gcs='gs://my_files/To_Process/',
replace=False,
gzip=True,
dag=subdag)
return subdag
def create_subdag_operator(dag_parent, filename, index):
tid_subdag = 'file_{}'.format(index)
subdag = create_subdag(dag_parent, tid_subdag, filename)
sd_op = SubDagOperator(task_id=tid_subdag, dag=dag_parent, subdag=subdag)
return sd_op
def create_subdag_operators(dag_parent, file_list):
subdags = [create_subdag_operator(dag_parent, file, file_list.index(file)) for file in file_list]
# chain subdag-operators together
chain(*subdags)
return subdags
check_for_files = BranchPythonOperator(
task_id='Check_for_s3_Files',
provide_context=True,
python_callable=Check_For_Files,
dag=parent_dag
)
finished = DummyOperator(
task_id='finished',
dag=parent_dag
)
decision_to_continue = DummyOperator(
task_id='Start_Process',
dag=parent_dag
)
if len(files) > 0:
subdag_ops = create_subdag_operators(parent_dag, files)
check_for_files >> decision_to_continue >> subdag_ops[0] >> subdag_ops[-1] >> finished
check_for_files >> finished
답변
다른 방법도 있지만 공기 흐름에서 동적 DAG 또는 하위 DAG를 만드는 권장 방법은 다음과 같지만 문제에 크게 적용 할 수 있습니다.
먼저 (yaml/csv)
모든 s3
파일 및 위치 목록이 포함 된 파일 을 작성 하십시오. 목록에 저장하는 기능을 작성한 경우 별도의 yaml
파일 에 저장하고 런타임에 airflow env에로드 한다고 말한 다음 작성하십시오. DAG.
아래는 샘플 yaml
파일입니다.
dynamicDagConfigFile.yaml
job: dynamic-dag
bucket_name: 'bucket-name'
prefix: 'bucket-prefix'
S3Files:
- File1: 'S3Loc1'
- File2: 'S3Loc2'
- File3: 'S3Loc3'
파일 Check_For_Files
에 저장하도록 함수를 수정할 수 있습니다 yaml
.
이제 동적 dag 생성으로 넘어갈 수 있습니다.
먼저 더미 연산자를 사용하여 두 가지 작업, 즉 시작 및 종료 작업을 정의하십시오. 이러한 작업은 다음과 같이 작업 DAG
을 동적으로 생성 하여 구축 할 작업입니다.
start = DummyOperator(
task_id='start',
dag=dag
)
end = DummyOperator(
task_id='end',
dag=dag)
동적 DAG : PythonOperators
공기 흐름에 사용할 것 입니다. 함수는 작업 ID를 인수로 받아야합니다. 실행될 파이썬 함수, 즉 파이썬 연산자를위한 python_callable; 및 실행 동안 사용될 일련의 인수들.
인수를 포함하십시오 task id
. 따라서을 통해 동적으로 생성 된 작업간에 데이터를 교환 할 수 있습니다 XCOM
.
이 동적 dag 내에서 작동 기능을 지정할 수 있습니다. s3_to_gcs_op
.
def createDynamicDAG(task_id, callableFunction, args):
task = PythonOperator(
task_id = task_id,
provide_context=True,
#Eval is used since the callableFunction var is of type string
#while the python_callable argument for PythonOperators only receives objects of type callable not strings.
python_callable = eval(callableFunction),
op_kwargs = args,
xcom_push = True,
dag = dag,
)
return task
마지막으로 yaml 파일에있는 위치를 기반으로 먼저 동적 dags를 만들 수 있습니다. yaml
아래와 같이 파일을 동적 dag를 생성 .
with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
# use safe_load instead to load the YAML file
configFile = yaml.safe_load(f)
#Extract file list
S3Files = configFile['S3Files']
#In this loop tasks are created for each table defined in the YAML file
for S3File in S3Files:
for S3File, fieldName in S3File.items():
#Remember task id is provided in order to exchange data among tasks generated in dynamic way.
get_s3_files = createDynamicDAG('{}-getS3Data'.format(S3File),
'getS3Data',
{}) #your configs here.
#Second step is upload S3 to GCS
upload_s3_toGCS = createDynamicDAG('{}-uploadDataS3ToGCS'.format(S3File), 'uploadDataS3ToGCS', {'previous_task_id':'{}-'})
#write your configs again here like S3 bucket name prefix extra or read from yaml file, and other GCS config.
최종 DAG 정의 :
아이디어는
#once tasks are generated they should linked with the
#dummy operators generated in the start and end tasks.
start >> get_s3_files
get_s3_files >> upload_s3_toGCS
upload_s3_toGCS >> end
전체 기류 코드 순서 :
import yaml
import airflow
from airflow import DAG
from datetime import datetime, timedelta, time
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
start = DummyOperator(
task_id='start',
dag=dag
)
def createDynamicDAG(task_id, callableFunction, args):
task = PythonOperator(
task_id = task_id,
provide_context=True,
#Eval is used since the callableFunction var is of type string
#while the python_callable argument for PythonOperators only receives objects of type callable not strings.
python_callable = eval(callableFunction),
op_kwargs = args,
xcom_push = True,
dag = dag,
)
return task
end = DummyOperator(
task_id='end',
dag=dag)
with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
configFile = yaml.safe_load(f)
#Extract file list
S3Files = configFile['S3Files']
#In this loop tasks are created for each table defined in the YAML file
for S3File in S3Files:
for S3File, fieldName in S3File.items():
#Remember task id is provided in order to exchange data among tasks generated in dynamic way.
get_s3_files = createDynamicDAG('{}-getS3Data'.format(S3File),
'getS3Data',
{}) #your configs here.
#Second step is upload S3 to GCS
upload_s3_toGCS = createDynamicDAG('{}-uploadDataS3ToGCS'.format(S3File), 'uploadDataS3ToGCS', {'previous_task_id':'{}-'})
#write your configs again here like S3 bucket name prefix extra or read from yaml file, and other GCS config.
start >> get_s3_files
get_s3_files >> upload_s3_toGCS
upload_s3_toGCS >> end