안녕하세요, 박민재입니다. 오늘은 Airflow DB를 관리하는 방법에 대해서 이야기 나눠 보도록 하겠습니다.
Airflow에서 Backend Database는 어떤 역할을 할까요? Airflow에서 DAG을 실행 하기 위해서, Airflow는 다음과 같은 정보들을 Backend Database에 저장하여 정합성을 유지 합니다.
dag_id
, logical_date
, start_date
, status
등을 담고 있습니다.task_id
, logical_date
, status
등을 담고 있습니다.task_id
, logical_date
, value
로 이루어져 있습니다.이 외에도 Airflow를 운영하기 위한 Audit Log, DAG 정보, Role, Pool 등의 정보들을 담고 있습니다. 이 데이터들은 상호간의 정합성을 유지 하여야 하기 때문에, NoSQL이 아닌 Postgre, MySQL 등과 같은 RDBMS로 지원 하고 있죠.
하지만 시간이 지남에 따라서, RDBMS에 데이터가 계속 쌓이게 되면, 우리는 관리를 해줘야 하는 포인트에 도달하게 됩니다. 아까도 말씀 드렸다 싶이, Metadata의 정합성을 위해 RDBMS에 데이터를 쌓게 되는데요. DagRun, Task Instance 같은 정보들은 Airflow Scheduler에서 자주 사용 해야 하는 데이터인 이유로 Indexing을 수행 하는데요. 이는 DB에 더 많은 Disk 용량을 사용하게 됩니다.
저의 경우에는 광고 조직 전체에서 사용하는, 수백개의 DAG이 운용 되고 있는 Airflow의 운영 관리를 맡고 있다 보니, Metadata 규모에 대한 압박이 가해져 이렇게 Metadata를 정리하는 업무를 수행하게 되었어요.
만약 MySQL, PostgreSQL 같은 RDBMS를 사용하고 있다면, MySQL에는 binlog, PostgreSQL 같은 경우는 wal log를 확인 해 보세요. MySQL의 binlog는 MySQL 서버 인스턴스의 데이터 변경 사항 정보들을 담고 있는데요, Airflow는 DagRun, Task Instance, Log 등의 정보를 쉴 새 없이 변경 하기 때문에, 다른 사용 사례에 비해서 binlog가 매우 많이 쌓이게 됩니다. 그렇기 때문에, binlog / wal log 등이 현재 RDBMS Node에 많은 양이 쌓여 있는지 확인 할 필요가 있어요.
그러면 이제 데이터를 정리 해야할 시간이 온 것 입니다. Airflow 에서는 2.3 버전부터 airflow db clean
라는 기능을 제공 합니다. 요약 하자면, 특정 timestamp 이전에 생성 된 데이터에 대해서, 기간을 지정한 후 삭제를 진행 하는 거에요. ref
사용 예시는 다음과 같아요. --clean-before-timestamp
파라미터에는 삭제 하려고 하는 해당 시간을 삽입하고, --t
파라미터에는 Table을 삽입 하여 주는 거에요. --dry-run
옵션으로 실행 전 몇개의 레코드가 삭제 될 지 확인 할 수 있어요. --y
는 직접 데이터를 삽입 하는 거구요.
$ airflow db clean --clean-before-timestamp '2023-06-28 00:00:00' --t 'logs' --dry-run # dry-run
$ airflow db clean --clean-before-timestamp '2023-06-28 00:00:00' --t 'logs' --y # execute
Metadata를 정리 하기 위해서, 어떤 Table이 주로 사용되는 지, 어떤 Data를 삭제 하여도 무방한 지 알아야 겠죠? 대부분은 logs
, task_instance
, dag_run
, xcom
순으로 많이 사용 됩니다.
하지만 직접 확인 해보는 것이 좋겠죠? 각자의 Airflow 마다 사용 사례는 다르니까요.
SELECT table_name AS 'TableName',
ROUND(SUM(data_length+index_length)/(1024*1024), 2) AS 'All(MB)',
ROUND(data_length/(1024*1024), 2) AS 'Data(MB)',
ROUND(index_length/(1024*1024), 2) AS 'Index(MB)'
FROM information_schema.tables
GROUP BY table_name
ORDER BY data_length DESC;
먼저 logs
테이블 입니다. 해당 Table은 Audit Log에 해당 하는 데이터입니다. DAG, Task의 추가/수정/삭제의 기록, Task의 success, failed, clear 여부 등등을 가지고 있어, 보안 위반 사항등을 감지 하거나, 문제가 발생 시 해당 문제를 파악하는 데도 도움이 돼요. 하지만, 이는 Airflow Scheduling에서 정합성을 맞추는데 사용하지 않기 때문에, 너무 오래 된 log들은 삭제 하여도 무방합니다.
그 다음은 xcom
테이블 입니다. 해당 Table은 XCom에 해당 합니다. 동일 DAG내의 Task간 Variable을 공유 하는데 사용 하죠. 하지만, 사용자가 XCom에 큰 용량 (물론 64kb 제한이 기본으로 되어 있지만)의 변수 공유를 주기적으로 하게 된다면, 이 또한 문제가 될 것입니다. 그렇기 때문에 Task 단위 재처리가 필요하지 않은, 오래된 DAG Run에 있는 XCom 데이터인 경우 삭제 해 줘도 무방합니다. 당연히, 실행 중인 DAG Run의 XCom을 삭제한다면 문제가 있겠죠? 만약 사용하는 XCom의 크기를 어쩔 수 없이 크게 운영 되어야 한다면, Custom XCom Backend를 사용 하는 것도 방법입니다.
마지막으로 task_instance
, dag_run
테이블 입니다. 각각 Task Instance 정보, DAG Run 정보에 해당하며, Airflow Scheduling에 직접적인 영향을 주는 친구이기 때문에 삭제 하는데 조심 하여야 합니다.
하지만 task_instance
가 DB에서 차지하는 Size가 많기 때문에, 삭제를 하는 것은 필연적인데요, 어떻게 하면 좋을까요? 이를 위해 몇 개의 Test를 진행 해 보았습니다. Task가 하나만 존재 하는 Test DAG을 생성 후, 30개의 DAG Run을 수행 해 보았습니다.
우선, DagRun을 삭제하지 않은 채로, Task Instance를 airflow db clean
으로 삭제 해 보았습니다.
결과는 다음과 같이, Task에 대한 정보는 삭제 되었지만, 삭제 된 Task에 대해서 재수행을 하지 않았습니다. DAG Run은 Success 상태이기 때문에, Task가 다시 Trigger 되지 않은 모습입니다.
그 다음으로, DAG Run 또한 airflow db clean
으로 삭제를 진행 해 보았는데요. Airflow Scheduler는 DAG Run이 Clear가 되지 않는 이상, 가장 최신의 DAG Run을 바탕으로 다음에 수행할 DAG Run을 결정합니다. 그렇기 때문에, 과거의 DAG Run이 다시 Trigger 되지 않는 모습을 보입니다.
그렇다면, 전체 DAG Run을 지우려고 시도 하면 어떻게 될까요? airflow db clean
은 가장 최신의 DAG Run만 남기고 전체 삭제를 수행 합니다. 그렇기 때문에, 지금 ON 상태인 DAG의 Scheduling은 문제가 없이 작동 합니다.
아 물론, airflow db clean
은 Airflow Scheduler가 동작하는 DB에 직접 수행하는 것이니, DB Backup을 가능한 반드시 수행 해 주고 실행 하는것을 추천 드립니다. 또한, DB에 부하가 갈 수도 있으니, Airflow Task가 많이 Trigger 되지 않는 시간에 수행 하는 것을 추천 드립니다.