나쁜 DAG 생성 중지 – Python 코드를 개선하여 공기 흐름 환경을 최적화하십시오 | Alvaro Leandro Cavalcante Carneiro | 2025 년 1 월

나쁜 DAG 생성 중지 – Python 코드를 개선하여 공기 흐름 환경을 최적화하십시오 | Alvaro Leandro Cavalcante Carneiro | 2025 년 1 월

Apache Airflow는 데이터 필드에서 가장 인기있는 오케스트레이션 도구 중 하나이며 전 세계 회사의 워크 플로우를 구동합니다. 그러나 생산 환경, 특히 복잡한 환경에서 이미 공기 흐름을 사용한 사람은 때때로 몇 가지 문제와 이상한 버그를 제시 할 수 있다는 것을 알고 있습니다.

공기 흐름 환경에서 관리 해야하는 여러 측면 중 하나는 종종 레이더 아래에서 날아가는 중 하나의 메트릭이 종종 날아갑니다. 하루 종소리 시간. 성능 병목 현상을 피하고 오케스트레이션의 올바른 기능을 보장하기 위해서는 구문 분석 시간을 모니터링하고 최적화하는 것이 필수적입니다.

즉,이 튜토리얼은 소개하는 것을 목표로합니다 airflow-parse-bench데이터 엔지니어가 공기 흐름 환경을 모니터링하고 최적화하여 코드 복잡성과 시간을 줄이는 통찰력을 제공하기 위해 개발 한 오픈 소스 도구.

공기 흐름과 관련하여 Dag Parse Time은 종종 an입니다 간과 된 메트릭. 구문 분석은 공기 흐름이 파이썬 파일을 처리하여 DAGS를 동적으로 구축 할 때마다 발생합니다.

기본적으로 모든 DAG는 30 초마다 구문 분석됩니다. 구성 변수로 제어되는 주파수 min_file_process_interval. 이것은 30 초마다, 모든 파이썬 코드가 dags 폴더는 예약 할 작업을 포함하는 DAG 객체를 생성하도록 폴더를 읽고, 가져오고, 처리합니다. 그런 다음 성공적으로 처리 된 파일이 DAG 백에 추가됩니다.

두 가지 주요 공기 흐름 구성 요소 가이 프로세스를 처리합니다.

함께, 두 구성 요소 모두 (일반적으로라고합니다 데이 프로세서) 공기 흐름 스케줄러에 의해 실행되므로 트리거되기 전에 DAG 객체가 업데이트되도록합니다. 그러나 확장 성 및 보안상의 이유로 DAG 프로세서를 클러스터에서 별도의 구성 요소로 실행할 수도 있습니다.

환경에 수십 개의 DAG 만 있으면 구문 분석 프로세스가 모든 종류의 문제를 일으킬 가능성은 낮습니다. 그러나 수백 또는 수천 개의 DAG가있는 생산 환경을 찾는 것이 일반적입니다. 이 경우 구문 분석 시간이 너무 높으면 다음으로 이어질 수 있습니다.

  • DAG 일정을 지연하십시오.
  • 리소스 활용도를 높입니다.
  • 환경 심장 박동 문제.
  • 스케줄러 고장.
  • 과도한 CPU 및 메모리 사용, 자원 낭비.

이제 불필요하게 복잡한 구문 분석 논리가 들어있는 수백 개의 DAG가있는 환경이 있다고 상상해보십시오. 작은 비 효율성은 빠르게 중대한 문제로 바뀌어 전체 공기 흐름 설정의 안정성과 성능에 영향을 줄 수 있습니다.

공기 흐름 DAG를 작성할 때 최적화 된 코드를 만들기위한 몇 가지 중요한 모범 사례가 있습니다. DAG를 개선하는 방법에 대한 많은 튜토리얼을 찾을 수 있지만 DAG 성능을 크게 향상시킬 수있는 몇 가지 주요 원칙을 요약하겠습니다.

최상위 코드를 제한합니다

높은 DAG 파싱 시간의 가장 일반적인 원인 중 하나는 비효율적이거나 복잡한 최상위 코드입니다. 공기 흐름 DAG 파일의 최상위 코드는 스케줄러가 파일을 구문 분석 할 때마다 실행됩니다. 이 코드에 데이터베이스 쿼리, API 호출 또는 동적 작업 생성과 같은 리소스 집약적 인 작업이 포함되어 있으면 구문 분석 성능에 크게 영향을 줄 수 있습니다.

다음 코드는 a의 예를 보여줍니다 최적화되지 않은 DAG:

이 경우, 스케줄러가 파일을 구문 분석 할 때마다 최상위 코드가 실행되어 API 요청을 작성하고 데이터 프레임을 처리하여 구문 분석 시간에 크게 영향을 줄 수 있습니다.

구문 분석 느린에 기여하는 또 다른 중요한 요소는 최상위 수입입니다. 최상위 레벨에서 수입 된 모든 라이브러리는 구문 분석 중에 메모리에로드되므로 시간이 많이 걸릴 수 있습니다. 이를 피하기 위해 가져 오기를 함수 또는 작업 정의로 옮길 수 있습니다.

다음 코드는 동일한 DAG의 더 나은 버전을 보여줍니다.

최상위 코드에서 XCOM 및 변수를 피하십시오

여전히 같은 주제에 대해 이야기하는 것은 특히 최상위 코드에서 Xcom과 변수를 사용하지 않도록하는 것이 특히 흥미 롭습니다. Google 문서에서 언급 한 바와 같이 :

최상위 코드에서 variable.get ()를 사용하는 경우 .py 파일이 구문 분석 될 때마다 Airflow는 DB에 세션을 엽니 다. 이것은 구문 분석 시간을 크게 느리게 할 수 있습니다.

이를 해결하려면 사용을 고려하십시오 JSON 사전 여러 가지를 만들지 않고 단일 데이터베이스 쿼리에서 여러 변수를 검색합니다. Variable.get() 전화. 또는 사용하십시오 Jinja 템플릿이 방식으로 검색된 변수는 DAG 구문 분석 중에는 작업 실행 중에 만 처리됩니다.

불필요한 DAG를 제거하십시오

분명해 보이지만 항상 환경에서 불필요한 DAG 및 파일을 정기적으로 정리하는 것이 중요합니다.

  • 사용하지 않은 DAG를 제거하십시오: 당신을 확인하십시오 dags 더 이상 필요하지 않은 파일을 폴더로 삭제하십시오.
  • 사용 .airflowignore: 파일을 지정하여 공기 흐름이 의도적으로 무시하고 구문 분석을 건너 뛰는 것을 지정합니다.
  • 일시 정지 DAG를 검토하십시오: 일시 정지 된 DAG는 여전히 스케줄러에 의해 구문 분석되어 리소스를 소비합니다. 더 이상 필요하지 않은 경우 제거 또는 보관을 고려하십시오.

공기 흐름 구성을 변경합니다

마지막으로 스케줄러 리소스 사용을 줄이기 위해 일부 공기 흐름 구성을 변경할 수 있습니다.

  • min_file_process_interval:이 설정은 (초) 공기 흐름이 얼마나 자주 DAG 파일을 구문 분석하는지 제어합니다. 기본 30 초에서 늘리면 DAG 업데이트 느린 비용으로 스케줄러의 부하가 줄어 듭니다.
  • dag_dir_list_interval: 이것은 (초) 공기 흐름을 얼마나 자주 스캔하는지 결정합니다. dags 새로운 DAGS를위한 디렉토리. 새 DAGS를 적게 배치하는 경우이 간격을 높이면 CPU 사용을 줄입니다.

우리는 건강한 공기 흐름 환경을 유지하기 위해 최적화 된 DAG를 만드는 것의 중요성에 대해 많이 논의했습니다. 그러나 실제로 DAG의 구문 분석 시간을 어떻게 측정합니까? 다행히도 공기 흐름 배치 또는 운영 체제에 따라이를 수행하는 몇 가지 방법이 있습니다.

예를 들어, 클라우드 작곡가 배포가있는 경우 Google CLI에서 다음 명령을 실행하여 DAG 구문 분석 보고서를 쉽게 검색 할 수 있습니다.

gcloud composer environments run $ENVIRONMENT_NAME \
— location $LOCATION \
dags report

구문 분석 메트릭을 검색하는 것은 간단하지만 코드 최적화의 효과를 측정하는 것은 덜 가능합니다. 코드를 수정할 때마다 업데이트 된 Python 파일을 클라우드 제공 업체에 재배치하고 DAG를 구문 분석 할 때까지 기다린 다음 느리고 시간이 많이 걸리는 프로세스 인 새 보고서를 추출해야합니다.

Linux 또는 Mac에있는 경우 또 다른 가능한 접근 방식은이 명령을 실행하여 컴퓨터에서 로컬로 구문 분석 시간을 측정하는 것입니다.

time python airflow/example_dags/example.py

그러나 간단하지만이 접근법은 여러 DAG의 구문 분석 시간을 체계적으로 측정하고 비교할 때 실용적이지 않습니다.

이러한 도전을 해결하기 위해 airflow-parse-bench공기 흐름의 기본 구문 분석 방법을 사용하여 DAG의 구문 분석 시간을 단순화하고 비교하는 파이썬 라이브러리.

그만큼 airflow-parse-bench 도구를 사용하면 구문 분석 시간을 쉽게 저장하고 결과를 비교하며 DAG의 비교를 표준화 할 수 있습니다.

라이브러리 설치

설치하기 전에 라이브러리 충돌을 피하기 위해 VirtualEnV를 사용하는 것이 좋습니다. 설정되면 다음 명령을 실행하여 패키지를 설치할 수 있습니다.

pip install airflow-parse-bench

메모: 이 명령은 필수 종속성 (공기 흐름 및 공기 흐름 제공자와 관련) 만 설치합니다. DAG가 의존하는 추가 라이브러리를 수동으로 설치해야합니다.

예를 들어, DAG가 사용하는 경우 boto3 AWS와 상호 작용하려면이를 확인하십시오 boto3 환경에 설치됩니다. 그렇지 않으면 구문 분석 오류가 발생합니다.

그런 다음 공기 흐름 데이터베이스를 초기화해야합니다. 다음 명령을 실행하여 수행 할 수 있습니다.

airflow db init

또한 DAG가 사용하는 경우 공기 흐름 변수로컬로도 정의해야합니다. 그러나 실제 값이 구문 분석 목적으로 필요하지 않기 때문에 변수에 실제 값을 넣을 필요는 없습니다.

airflow variables set MY_VARIABLE 'ANY TEST VALUE'

이것 없이는 다음과 같은 오류가 발생합니다.

error: 'Variable MY_VARIABLE does not exist'

도구 사용

라이브러리를 설치 한 후 구문 분석 시간 측정을 시작할 수 있습니다. 예를 들어, DAG 파일이 이름이 있다고 가정하십시오 dag_test.py 위의 예제에 사용 된 최적화되지 않은 DAG 코드를 포함합니다.

구문 분석 시간을 측정하려면 간단히 실행하십시오.

airflow-parse-bench --path dag_test.py

이 실행은 다음과 같은 출력을 생성합니다.

실행 결과. 저자의 이미지.

관찰 한 바와 같이, 우리의 Dag는 구문 분석 시간을 제시했습니다 0.61 초. 명령을 다시 실행하면 시스템 및 환경 요인으로 인해 구문 분석 시간이 약간 달라질 수 있으므로 약간의 차이가 나타납니다.

동일한 DAG의 다른 실행 결과. 저자의 이미지.

보다 간결한 수를 제시하려면 반복 횟수를 지정하여 여러 실행을 집계 할 수 있습니다.

airflow-parse-bench --path dag_test.py --num-iterations 5

마무리하는 데 시간이 조금 더 걸리지 만 이것은 평균 구문 분석 시간 5 개의 처형에 걸쳐.

이제 위에서 언급 한 최적화의 영향을 평가하기 위해 코드를 내 코드를 대체했습니다.dag_test.py 최적화 된 버전이 앞서 공유됩니다. 동일한 명령을 실행 한 후 다음과 같은 결과를 얻었습니다.

최적화 된 코드의 결과를 구문 분석합니다. 저자의 이미지.

알다시피, 몇 가지 모범 사례를 적용하는 것만으로는 거의 감소 할 수있었습니다. 0.5 초 Dag Parse Time에서 우리가 만든 변화의 중요성을 강조하십시오!

공유와 관련이 있다고 생각하는 다른 흥미로운 기능이 있습니다.

알림으로, 도구를 사용하는 의심이나 문제가있는 경우 GitHub의 전체 문서에 액세스 할 수 있습니다.

게다가 라이브러리에서 지원하는 모든 매개 변수를 보려면 간단히 실행하십시오.

airflow-parse-bench --help

여러 DAG를 테스트합니다

대부분의 경우 구문 분석 시간을 테스트 할 수있는 수십 개의 DAG가있을 수 있습니다. 이 유스 케이스를 해결하기 위해 이름이 지정된 폴더를 만들었습니다. dags 4 개의 파이썬 파일을 넣습니다.

폴더의 모든 DAG에 대한 구문 분석 시간을 측정하려면 폴더 경로를 --path 매개 변수 :

airflow-parse-bench --path my_path/dags

이 명령을 실행하면 폴더의 모든 DAG에 대한 구문 분석 시간을 요약하는 테이블이 생성됩니다.

여러 DAG의 구문 분석 시간을 테스트합니다. 저자의 이미지.

기본적으로 테이블은 가장 빠른 곳에서 가장 느린 DAG로 정렬됩니다. 그러나 사용하여 순서를 뒤집을 수 있습니다 --order 매개 변수 :

airflow-parse-bench --path my_path/dags --order desc
역 정렬 순서. 저자의 이미지.

변하지 않은 Dags를 건너 뛰십시오

그만큼 --skip-unchanged 매개 변수는 개발 중에 특히 유용 할 수 있습니다. 이름에서 알 수 있듯이이 옵션은 마지막 실행 이후 수정되지 않은 DAG에 대한 구문 분석 실행을 건너 뜁니다.

airflow-parse-bench --path my_path/dags --skip-unchanged

아래에 표시된 바와 같이, DAG가 변경되지 않은 상태로 유지되면, 출력은 구문 분석 시간의 차이를 반영하지 않습니다.

변경되지 않은 파일에 차이가없는 출력. 저자의 이미지.

데이터베이스 재설정

지표 및 기록을 포함한 모든 DAG 정보는 로컬 SQLITE 데이터베이스에 저장됩니다. 저장된 모든 데이터를 지우고 새로 시작하려면 --reset-db 깃발:

airflow-parse-bench --path my_path/dags --reset-db

이 명령은 데이터베이스를 재설정하고 DAG를 첫 번째 실행 인 것처럼 처리합니다.

구문 분석 시간은 확장 가능하고 효율적인 공기 흐름 환경을 유지하는 데 중요한 메트릭입니다. 특히 오케스트레이션 요구 사항이 점점 복잡해지면서.

이런 이유로 airflow-parse-bench 라이브러리는 데이터 엔지니어가 더 나은 DAG를 만드는 데 도움이되는 중요한 도구가 될 수 있습니다. DAGS의 구문 분석 시간을 로컬로 테스트하면 코드 병목 현상을 쉽고 빠르게 찾아서 DAG를 더 빠르고 성능을 발휘할 수 있습니다.

코드는 로컬로 실행되므로 생성 된 구문 분석 시간은 공기 흐름 클러스터에있는 것과 동일하지 않습니다. 그러나 로컬 컴퓨터에서 구문 분석 시간을 줄일 수 있다면 클라우드 환경에서 동일하게 재현 할 수 있습니다.

마지막으로,이 프로젝트는 협업을 위해 열려 있습니다! 제안, 아이디어 또는 개선이 있으면 Github에 자유롭게 기여하십시오.

출처 참조

Post Comment

당신은 놓쳤을 수도 있습니다