Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions dags/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Airflow Log Cleanup

A maintenance workflow that you can deploy into Airflow to periodically clean out the task logs to avoid those getting too big.

- **airflow-log-cleanup.py**: Allows to delete logs by specifying the **number** of worker nodes. Does not guarantee log deletion of all nodes.
- **airflow-log-cleanup-pwdless-ssh.py**: Allows to delete logs by specifying the list of worker nodes by their hostname. Requires the `airflow` user to have passwordless ssh to access all nodes.

## Deploy

1. Login to the machine running Airflow
2. Navigate to the dags directory
3. Select the DAG to deploy (with or without SSH access) and follow the instructions

### airflow-log-cleanup.py

1. Copy the airflow-log-cleanup.py file to this dags directory

a. Here's a fast way:

$ wget https://raw.githubusercontent.com/teamclairvoyant/airflow-maintenance-dags/master/log-cleanup/airflow-log-cleanup.py

2. Update the global variables (SCHEDULE_INTERVAL, DAG_OWNER_NAME, ALERT_EMAIL_ADDRESSES, ENABLE_DELETE and NUMBER_OF_WORKERS) in the DAG with the desired values

3. Create and Set the following Variables in the Airflow Web Server (Admin -> Variables)

- airflow_log_cleanup__max_log_age_in_days - integer - Length to retain the log files if not already provided in the conf. If this is set to 30, the job will remove those files that are 30 days old or older.
- airflow_log_cleanup__enable_delete_child_log - boolean (True/False) - Whether to delete files from the Child Log directory defined under [scheduler] in the airflow.cfg file

4. Enable the DAG in the Airflow Webserver

### airflow-log-cleanup-pwdless-ssh.py ###

1. Copy the airflow-log-cleanup-pwdless-ssh.py file to this dags directory

a. Here's a fast way:

$ wget https://raw.githubusercontent.com/teamclairvoyant/airflow-maintenance-dags/master/log-cleanup/airflow-log-cleanup-pwdless-ssh.py

2. Update the global variables (SCHEDULE_INTERVAL, DAG_OWNER_NAME, ALERT_EMAIL_ADDRESSES, ENABLE_DELETE and AIRFLOW_HOSTS) in the DAG with the desired values

3. Create and Set the following Variables in the Airflow Web Server (Admin -> Variables)

- airflow_log_cleanup__max_log_age_in_days - integer - Length to retain the log files if not already provided in the conf. If this is set to 30, the job will remove those files that are 30 days old or older.
- airflow_log_cleanup__enable_delete_child_log - boolean (True/False) - Whether to delete files from the Child Log directory defined under [scheduler] in the airflow.cfg file

4. Ensure the `airflow` user can passwordless SSH on the hosts listed in `AIRFLOW_HOSTS`
1. Create a public and private key SSH key on all the worker nodes. You can follow these instructions: https://www.digitalocean.com/community/tutorials/how-to-set-up-ssh-keys--2
2. Add the public key content to the ~/.ssh/authorized_keys file on all the other machines

5. Enable the DAG in the Airflow Webserver
230 changes: 230 additions & 0 deletions dags/airflow-log-cleanup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
"""
A maintenance workflow that you can deploy into Airflow to periodically clean
out the task logs to avoid those getting too big.
airflow trigger_dag --conf '[curly-braces]"maxLogAgeInDays":30[curly-braces]' airflow-log-cleanup
--conf options:
maxLogAgeInDays:<INT> - Optional
"""
import logging
import os
from datetime import timedelta

import airflow
import jinja2
from airflow.configuration import conf
from airflow.models import DAG, Variable
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator

# airflow-log-cleanup
DAG_ID = os.path.basename(__file__).replace(".pyc", "").replace(".py", "")
START_DATE = airflow.utils.dates.days_ago(1)
try:
BASE_LOG_FOLDER = conf.get("core", "BASE_LOG_FOLDER").rstrip("/")
except Exception as e:
BASE_LOG_FOLDER = conf.get("logging", "BASE_LOG_FOLDER").rstrip("/")
# How often to Run. @daily - Once a day at Midnight
SCHEDULE_INTERVAL = "@daily"
# Who is listed as the owner of this DAG in the Airflow Web Server
DAG_OWNER_NAME = "operations"
# List of email address to send email alerts to if this job fails
ALERT_EMAIL_ADDRESSES = []
# Length to retain the log files if not already provided in the conf. If this
# is set to 30, the job will remove those files that are 30 days old or older
DEFAULT_MAX_LOG_AGE_IN_DAYS = Variable.get(
"airflow_log_cleanup__max_log_age_in_days", 30
)
# Whether the job should delete the logs or not. Included if you want to
# temporarily avoid deleting the logs
ENABLE_DELETE = True
# The number of worker nodes you have in Airflow. Will attempt to run this
# process for however many workers there are so that each worker gets its
# logs cleared.
NUMBER_OF_WORKERS = 1
DIRECTORIES_TO_DELETE = [BASE_LOG_FOLDER]
ENABLE_DELETE_CHILD_LOG = Variable.get(
"airflow_log_cleanup__enable_delete_child_log", "False"
)
LOG_CLEANUP_PROCESS_LOCK_FILE = "/tmp/airflow_log_cleanup_worker.lock"
logging.info("ENABLE_DELETE_CHILD_LOG " + ENABLE_DELETE_CHILD_LOG)

if not BASE_LOG_FOLDER or BASE_LOG_FOLDER.strip() == "":
raise ValueError(
"BASE_LOG_FOLDER variable is empty in airflow.cfg. It can be found "
"under the [core] (<2.0.0) section or [logging] (>=2.0.0) in the cfg file. "
"Kindly provide an appropriate directory path."
)

if ENABLE_DELETE_CHILD_LOG.lower() == "true":
try:
CHILD_PROCESS_LOG_DIRECTORY = conf.get(
"scheduler", "CHILD_PROCESS_LOG_DIRECTORY"
)
if CHILD_PROCESS_LOG_DIRECTORY != ' ':
DIRECTORIES_TO_DELETE.append(CHILD_PROCESS_LOG_DIRECTORY)
except Exception as e:
logging.exception(
"Could not obtain CHILD_PROCESS_LOG_DIRECTORY from " +
"Airflow Configurations: " + str(e)
)

default_args = {
'owner': DAG_OWNER_NAME,
'depends_on_past': False,
'email': ALERT_EMAIL_ADDRESSES,
'email_on_failure': True,
'email_on_retry': False,
'start_date': START_DATE,
'retries': 1,
'retry_delay': timedelta(minutes=1)
}

dag = DAG(
DAG_ID,
default_args=default_args,
schedule_interval=SCHEDULE_INTERVAL,
start_date=START_DATE,
tags=['teamclairvoyant', 'airflow-maintenance-dags'],
template_undefined=jinja2.Undefined
)
if hasattr(dag, 'doc_md'):
dag.doc_md = __doc__
if hasattr(dag, 'catchup'):
dag.catchup = False

start = DummyOperator(
task_id='start',
dag=dag)

log_cleanup = """

echo "Getting Configurations..."
BASE_LOG_FOLDER="{{params.directory}}"
WORKER_SLEEP_TIME="{{params.sleep_time}}"

sleep ${WORKER_SLEEP_TIME}s

MAX_LOG_AGE_IN_DAYS="{{dag_run.conf.maxLogAgeInDays}}"
if [ "${MAX_LOG_AGE_IN_DAYS}" == "" ]; then
echo "maxLogAgeInDays conf variable isn't included. Using Default '""" + str(DEFAULT_MAX_LOG_AGE_IN_DAYS) + """'."
MAX_LOG_AGE_IN_DAYS='""" + str(DEFAULT_MAX_LOG_AGE_IN_DAYS) + """'
fi
ENABLE_DELETE=""" + str("true" if ENABLE_DELETE else "false") + """
echo "Finished Getting Configurations"
echo ""

echo "Configurations:"
echo "BASE_LOG_FOLDER: '${BASE_LOG_FOLDER}'"
echo "MAX_LOG_AGE_IN_DAYS: '${MAX_LOG_AGE_IN_DAYS}'"
echo "ENABLE_DELETE: '${ENABLE_DELETE}'"

cleanup() {
echo "Executing Find Statement: $1"
FILES_MARKED_FOR_DELETE=`eval $1`
echo "Process will be Deleting the following File(s)/Directory(s):"
echo "${FILES_MARKED_FOR_DELETE}"
echo "Process will be Deleting `echo "${FILES_MARKED_FOR_DELETE}" | \
grep -v '^$' | wc -l` File(s)/Directory(s)" \
# "grep -v '^$'" - removes empty lines.
# "wc -l" - Counts the number of lines
echo ""
if [ "${ENABLE_DELETE}" == "true" ];
then
if [ "${FILES_MARKED_FOR_DELETE}" != "" ];
then
echo "Executing Delete Statement: $2"
eval $2
DELETE_STMT_EXIT_CODE=$?
if [ "${DELETE_STMT_EXIT_CODE}" != "0" ]; then
echo "Delete process failed with exit code \
'${DELETE_STMT_EXIT_CODE}'"

echo "Removing lock file..."
rm -f """ + str(LOG_CLEANUP_PROCESS_LOCK_FILE) + """
if [ "${REMOVE_LOCK_FILE_EXIT_CODE}" != "0" ]; then
echo "Error removing the lock file. \
Check file permissions.\
To re-run the DAG, ensure that the lock file has been \
deleted (""" + str(LOG_CLEANUP_PROCESS_LOCK_FILE) + """)."
exit ${REMOVE_LOCK_FILE_EXIT_CODE}
fi
exit ${DELETE_STMT_EXIT_CODE}
fi
else
echo "WARN: No File(s)/Directory(s) to Delete"
fi
else
echo "WARN: You're opted to skip deleting the File(s)/Directory(s)!!!"
fi
}


if [ ! -f """ + str(LOG_CLEANUP_PROCESS_LOCK_FILE) + """ ]; then

echo "Lock file not found on this node! \
Creating it to prevent collisions..."
touch """ + str(LOG_CLEANUP_PROCESS_LOCK_FILE) + """
CREATE_LOCK_FILE_EXIT_CODE=$?
if [ "${CREATE_LOCK_FILE_EXIT_CODE}" != "0" ]; then
echo "Error creating the lock file. \
Check if the airflow user can create files under tmp directory. \
Exiting..."
exit ${CREATE_LOCK_FILE_EXIT_CODE}
fi

echo ""
echo "Running Cleanup Process..."

FIND_STATEMENT="find ${BASE_LOG_FOLDER}/*/* -type f -mtime \
+${MAX_LOG_AGE_IN_DAYS}"
DELETE_STMT="${FIND_STATEMENT} -exec rm -f {} \;"

cleanup "${FIND_STATEMENT}" "${DELETE_STMT}"
CLEANUP_EXIT_CODE=$?

FIND_STATEMENT="find ${BASE_LOG_FOLDER}/*/* -type d -empty"
DELETE_STMT="${FIND_STATEMENT} -prune -exec rm -rf {} \;"

cleanup "${FIND_STATEMENT}" "${DELETE_STMT}"
CLEANUP_EXIT_CODE=$?

FIND_STATEMENT="find ${BASE_LOG_FOLDER}/* -type d -empty"
DELETE_STMT="${FIND_STATEMENT} -prune -exec rm -rf {} \;"

cleanup "${FIND_STATEMENT}" "${DELETE_STMT}"
CLEANUP_EXIT_CODE=$?

echo "Finished Running Cleanup Process"

echo "Deleting lock file..."
rm -f """ + str(LOG_CLEANUP_PROCESS_LOCK_FILE) + """
REMOVE_LOCK_FILE_EXIT_CODE=$?
if [ "${REMOVE_LOCK_FILE_EXIT_CODE}" != "0" ]; then
echo "Error removing the lock file. Check file permissions. To re-run the DAG, ensure that the lock file has been deleted (""" + str(LOG_CLEANUP_PROCESS_LOCK_FILE) + """)."
exit ${REMOVE_LOCK_FILE_EXIT_CODE}
fi

else
echo "Another task is already deleting logs on this worker node. \
Skipping it!"
echo "If you believe you're receiving this message in error, kindly check \
if """ + str(LOG_CLEANUP_PROCESS_LOCK_FILE) + """ exists and delete it."
exit 0
fi

"""

for log_cleanup_id in range(1, NUMBER_OF_WORKERS + 1):

for dir_id, directory in enumerate(DIRECTORIES_TO_DELETE):

log_cleanup_op = BashOperator(
task_id='log_cleanup_worker_num_' +
str(log_cleanup_id) + '_dir_' + str(dir_id),
bash_command=log_cleanup,
params={
"directory": str(directory),
"sleep_time": int(log_cleanup_id)*3},
dag=dag)

log_cleanup_op.set_upstream(start)