I am trying to use the datetime as a unique
identifier for a s3 folder that I create in an airflow dag.
However, the datetime string seems to be changing despite
no work on my part. Does anyone know why?
dag.py
import utility
from datetime import datetime, timedelta
import logging
folder = utility.get_id()
def print_params_fn(**kwargs):
logging.info(kwargs)
return None
DEFAULT_ARGS = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 10, 13, 0, 0, 0),
'retries': 1,
'retry_delay': timedelta(minutes=1)
}
with DAG(
dag_id='mydag',
default_args=DEFAULT_ARGS,
max_active_runs=1,
schedule_interval='0 17 * * *',
catchup=False) as dag:
step1 = PythonOperator(task_id='step1', python_callable=utility.do_something,
op_kwargs={'read_dir': folder})
step2 = PythonOperator(task_id='step2', python_callable=utility.do_something,
op_kwargs={'read_dir': folder})
print_params0 = PythonOperator(task_id="print_params0",
python_callable=print_params_fn,
provide_context=True,
op_kwargs={'folder_name': folder})
print_params = PythonOperator(task_id="print_params",
python_callable=print_params_fn,
provide_context=True,
op_kwargs={'folder_name': folder})
print_params0 >> step1 >> print_params >> step2
utility.py
import os
from datetime import datetime
def get_id():
# current date and time
now = datetime.now()
return str(int(datetime.timestamp(now)))
def do_something(read_dir):
print('dir exists{}'.format(os.path.exists()))
return None
When I log folder to the airflow logs, before node 1 I see:
1608239870, but when I log the folder to the airflow logs before node 2,
I see 1608239840.
I expect the folder variable to stay constant, but for some reason they are changing.
Does anyone know why this is happening?
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…