I am working with the code at the bottom of this question.
My intent is to go to a SFTP site, check for new folders, and download the zip files (then extract). Sometimes there's no new folders, other times there could be one or more new folders.
The script appears to work normally, but the SFTP connection randomly disconnects without warnings. I'm looking for a way to keep the link open until all of the files are in.
import airflow
from airflow import models
from airflow.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator
from airflow.operators.gcs_to_gcs import GoogleCloudStorageToGoogleCloudStorageOperator
from airflow.contrib.operators.file_to_gcs import FileToGoogleCloudStorageOperator
from airflow.operators.bash_operator import BashOperator
from airflow.gcp.hooks.gcs import GoogleCloudStorageHook
from airflow.gcp.operators.gcs import GoogleCloudStorageDeleteOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.mssql_operator import MsSqlOperator
from airflow.hooks.ftp_hook import FTPHook
from airflow.utils import dates
from airflow.models import Variable
import logging
from zipfile import ZipFile
import os, glob, subprocess
args = {
'owner': 'Airflow',
'start_date': dates.days_ago(1),
'email': ['[email protected]', '[email protected]'],
'email_on_failure': True,
'email_on_success': True,
'schedule_interval': '0 1 * * *',
}
def GetFiles(**kwargs):
foundfiles = False
FTPPATH = '/SmartZip/'
ftp = FTPHook(ftp_conn_id='FTP_DataTree_MLS')
ftp.get_conn()
folders = [x.split()[-1] for x in ftp.list_directory(FTPPATH, nlst=False) if x.startswith("d") and x.split()[-1].startswith("2")]
print(folders)
logging.info("NUMBER OF FOLDERS -> {}.".format(len(folders)))
if len(folders) > 0:
for folder in folders:
path = FTPPATH + folder
logging.info("looking in folder {}.".format(path))
if int(folder[0:8]) > int(Variable.get("mls_publish_date")):
Variable.set("mls_publish_date", int(folder[0:8]))
files = [x.split()[-1] for x in ftp.list_directory(path, nlst=False) if str(x.split()[-1]) != '.' and str(x.split()[-1]) != '..']
logging.info("number of files -> {}".format(len(files)))
if len(files) > 0:
foundfiles = True
for file in files:
localfile = Variable.get("temp_directory") + "MLS/" + file
logging.info("retrieving file {}/{}".format(path, file))
ftp.retrieve_file(path + '/' + file, localfile)
zf = ZipFile(localfile, 'r')
zf.extractall(Variable.get("temp_directory") + "MLS/")
zf.close()
os.remove(localfile)
if foundfiles:
return 'Upload_Files_to_GCS'
else:
return 'Populate_Valuation_LeadsMLS'
dag = models.DAG(
dag_id='DataTree_MLS_Ingestion',
default_args=args
)
check_for_file = BranchPythonOperator(
task_id='Check_FTP_and_Download',
provide_context=True,
python_callable=GetFiles,
dag=dag
)
How do I force SFTP to stay open until all is done?
Thanks!
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…