Date Partitioning with Airflow
How can we quickly partition data by date in something like an S3 Bucket?
A common way to do this is with a function that returns a date partitioned file path based on the execution date of the DAG. In this way the dag can be made idempotent, meaning that dag runs of the same execution_date produce the same result every time.
from airflow.operators.python_operator import PythonOperator
from airflow.macros import ds_format
dag = DAG(...)
def get_date_part(ds):
return ds_format(ds, '%Y-%m-%d', '%Y/%m/%d/')
def write_to_s3_bucket(ds):
date_part = get_date_part(ds)
file_path = os.path.join(date_part, 'my_file.csv')
# do something with boto3 ...
write_to_bucket = PythonOperator(
task_id='write_to_bucket',
python_callable=write_to_s3_bucket,
# the macros corresponds to the dag's execution date
op_kwargs={'ds': '{{ ds }}'},
dag=dag
)
Written on June 15, 2020