데이터 엔지니어링에 유용한 유용한 파이썬 원 라이너


편집자에 의한 이미지 | chatgpt
틀 소개
데이터 엔지니어링에는 대규모 데이터 세트 처리, ETL 파이프 라인 구축 및 데이터 품질 유지가 포함됩니다. 데이터 엔지니어는 스트리밍 데이터와 협력하고 시스템 성능을 모니터링하며 스키마 변경을 처리하며 분산 시스템에서 데이터 일관성을 보장합니다.
Python One-Liners는 복잡한 작업을 단일의 읽기 쉬운 문으로 압축하여 이러한 작업을 단순화하는 데 도움이 될 수 있습니다. 이 기사는 일반적인 데이터 엔지니어링 문제를 해결하는 실용적인 원 라이너에 중점을 둡니다.
여기에 제시된 한 라이너는 다양한 구조로 이벤트 데이터를 처리, 성능 문제에 대한 시스템 로그 분석, 다양한 스키마로 API 응답 처리 및 데이터 품질 검사 구현과 같은 실제 작업을 다룹니다. 시작합시다.
🔗 Github의 코드 링크
틀 샘플 데이터
하나의 라이너를 실행하기 위해 일부 샘플 데이터를 돌리겠습니다.
import pandas as pd
import numpy as np
import json
from datetime import datetime, timedelta
# Create streaming event data
np.random.seed(42)
events = []
for i in range(1000):
properties = {
'device_type': np.random.choice(['mobile', 'desktop', 'tablet']),
'page_path': np.random.choice(['/home', '/products', '/checkout']),
'session_length': np.random.randint(60, 3600)
}
if np.random.random() > 0.7:
properties['purchase_value'] = round(np.random.uniform(20, 300), 2)
event = {
'event_id': f'evt_{i}',
'timestamp': (datetime.now() - timedelta(hours=np.random.randint(0, 72))).isoformat(),
'user_id': f'user_{np.random.randint(100, 999)}',
'event_type': np.random.choice(['view', 'click', 'purchase']),
'metadata': json.dumps(properties)
}
events.append(event)
# Create database performance logs
db_logs = pd.DataFrame({
'timestamp': pd.date_range('2024-01-01', periods=5000, freq='1min'),
'operation': np.random.choice(['SELECT', 'INSERT', 'UPDATE'], 5000, p=[0.7, 0.2, 0.1]),
'duration_ms': np.random.lognormal(mean=4, sigma=1, size=5000),
'table_name': np.random.choice(['users', 'orders', 'products'], 5000),
'rows_processed': np.random.poisson(lam=25, size=5000),
'connection_id': np.random.randint(1, 20, 5000)
})
# Create API log data
api_logs = []
for i in range(800):
log_entry = {
'timestamp': datetime.now() - timedelta(minutes=np.random.randint(0, 1440)),
'endpoint': np.random.choice(['/api/users', '/api/orders', '/api/metrics']),
'status_code': np.random.choice([200, 400, 500], p=[0.8, 0.15, 0.05]),
'response_time': np.random.exponential(150)
}
if log_entry['status_code'] == 200:
log_entry['payload_size'] = np.random.randint(100, 5000)
api_logs.append(log_entry)
틀 1. JSON 필드를 데이터 프레임 열로 추출합니다
이벤트 로그에서 JSON 메타 데이터 필드를 분석을 위해 별도의 데이터 프레임 열로 변환하십시오.
events_df = pd.DataFrame([{**event, **json.loads(event['metadata'])} for event in events]).drop('metadata', axis=1)
이 1 라이너는 사전 포장 풀기와 함께 목록 이해력을 사용하여 각 이벤트의 기본 필드를 구문 분석 된 JSON 메타 데이터와 병합합니다. 그만큼 drop()
원본을 제거합니다 metadata
내용은 이제 별도의 열에 있기 때문에 열입니다.
산출:
이것은 JSON 필드가 좋아하는 1000 행과 8 개의 열로 데이터 프레임을 만듭니다. device_type
그리고 purchase_value
직접 쿼리하고 집계 할 수있는 개별 열이됩니다.
틀 2. 작동 유형별 성능 특이점 식별
유사한 작업에 비해 비정상적으로 오래 걸리는 데이터베이스 작업을 찾으십시오.
outliers = db_logs.groupby('operation').apply(lambda x: x[x['duration_ms'] > x['duration_ms'].quantile(0.95)]).reset_index(drop=True)
이 데이터베이스 로그를 작동 유형별로 그룹화 한 다음 각 그룹을 95 번째 백분위 수 시간을 초과하는 레코드를 필터링합니다.
잘린 출력 :
이로 인해 각 작업은 유사한 작업의 95%보다 상당히 느리게 수행되는 약 250 개의 이상치 운영 (총 5000 개)을 반환합니다.
틀 3. API 엔드 포인트의 롤링 평균 응답 시간 계산
슬라이딩 윈도우를 사용하여 다양한 API 엔드 포인트에 대한 시간이 지남에 따라 성능 추세를 모니터링합니다.
api_response_trends = pd.DataFrame(api_logs).set_index('timestamp').sort_index().groupby('endpoint')['response_time'].rolling('1H').mean().reset_index()
이것은 API 로그를 데이터 프레임, 세트로 변환합니다 timestamp
시간 기반 운영의 색인으로서, 단조로운 순서를 보장하기 위해 시간순으로 정렬합니다. 그런 다음 그룹 endpoint
응답 시간에 롤링 1 시간 창을 적용합니다.
각 슬라이딩 창 내에서 mean()
함수 평균 응답 시간을 계산합니다. 롤링 윈도우는 시간이 지남에 따라 이동하여 고립 된 측정이 아닌 성능 추세 분석을 제공합니다.
잘린 출력 :
우리는 각 API 엔드 포인트의 성능이 시간이 지남에 따라 밀리 초의 값으로 어떻게 변하는지를 보여주는 응답 시간 추세를 얻습니다. 값이 높을수록 성능이 느리다는 것을 나타냅니다.
틀 4. 이벤트 데이터의 스키마 감지
이전 이벤트에 존재하지 않은 이벤트 메타 데이터에 새 필드가 나타나는 시점을 식별하십시오.
schema_evolution = pd.DataFrame([{k: type(v).__name__ for k, v in json.loads(event['metadata']).items()} for event in events]).fillna('missing').nunique()
이것은 각 이벤트에서 JSON 메타 데이터를 구문 분석하고 type(v).__name__
.
결과 데이터 프레임에는 모든 이벤트에서 발견 된 이벤트 당 1 행과 고유 필드 당 하나의 열이 있습니다. 그만큼 fillna('missing')
특정 필드가없는 이벤트를 처리합니다 nunique()
얼마나 많은 다른 값 (포함 missing
) 각 열에 나타납니다.
산출:
device_type 1
page_path 1
session_length 1
purchase_value 2
dtype: int64
틀 5. 다단계 데이터베이스 연결 성능 집계
리소스 모니터링을위한 작업 유형 및 연결별로 그룹화 된 요약 통계를 만듭니다.
connection_perf = db_logs.groupby(['operation', 'connection_id']).agg({'duration_ms': ['mean', 'count'], 'rows_processed': ['sum', 'mean']}).round(2)
이 데이터베이스는 작업 유형 및 연결 ID별로 동시에 로그를 그룹화하여 다양한 연결이 다양한 작업을 처리하는 방법에 대한 계층 적 분석을 만듭니다.
그만큼 agg()
함수는 여러 집계 함수를 적용합니다. mean
그리고 count
평균 성능과 쿼리 빈도를 모두 표시하는 동안 sum
그리고 mean
~을 위한 rows_processed
처리량 패턴을 보여줍니다. 그만큼 round(2)
읽을 수있는 소수점 정밀도를 보장합니다.
산출:
이는 각 연결이 다른 작업을 수행하는 방법을 보여주는 다중 인덱스 데이터 프레임을 만듭니다.
틀 6. 시간별 이벤트 유형 분포 패턴 생성
사용자 행동주기를 이해하기 위해 다른 시간에 걸쳐 이벤트 유형 분포 패턴을 계산합니다.
hourly_patterns = pd.DataFrame(events).assign(hour=lambda x: pd.to_datetime(x['timestamp']).dt.hour).groupby(['hour', 'event_type']).size().unstack(fill_value=0).div(pd.DataFrame(events).assign(hour=lambda x: pd.to_datetime(x['timestamp']).dt.hour).groupby('hour').size(), axis=0).round(3)
이것은 타임 스탬프에서 시간을 추출합니다 assign()
그리고 Lambda는 시간 대 이벤트 유형을 사용하여 groupby
그리고 unstack
.
그만큼 div()
운영은 시간당 총 이벤트에 의해 정규화되어 원시 카운트가 아닌 비례 분포를 보여줍니다.
잘린 출력 :
각 이벤트 유형의 비율을 보여주는 행렬을 반환합니다 (view
,,, click
,,, purchase
) 하루 각 시간 동안, 다른 행동에 대한 사용자 행동 패턴과 피크 활동 기간을 드러냅니다.
틀 7. 상태 코드 별 API 오류율 요약 계산
모든 엔드 포인트에서 오류 분포 패턴을 분석하여 API 건강을 모니터링하십시오.
error_breakdown = pd.DataFrame(api_logs).groupby(['endpoint', 'status_code']).size().unstack(fill_value=0).div(pd.DataFrame(api_logs).groupby('endpoint').size(), axis=0).round(3)
이것은 둘 다 API 로그를 그룹화합니다 endpoint
그리고 status_code
그런 다음 사용합니다 size()
발생을 계산합니다 unstack()
상태 코드를 열로 피벗합니다. 그만큼 div()
작업은 원시 카운트가 아닌 비율을 보여주기 위해 엔드 포인트 당 총 요청에 의해 정규화되어 어떤 엔드 포인트가 가장 높은 오류율을 갖는 지, 어떤 오류가 생성되는지를 나타냅니다.
산출:
status_code 200 400 500
endpoint
/api/metrics 0.789 0.151 0.060
/api/orders 0.827 0.140 0.033
/api/users 0.772 0.167 0.061
각 엔드 포인트에 대해 각 상태 코드 (200, 400, 500)의 비율을 보여주는 행렬을 작성하여 문제가있는 엔드 포인트를 쉽게 찾을 수 있으며 클라이언트 오류 (4xx) 또는 서버 오류 (5xx)가 실패하는지 여부를 쉽게 찾을 수 있습니다.
틀 8. 슬라이딩 윈도우 이상 탐지 구현
현재의 성능을 최근의 역사적 공연과 비교하여 비정상적인 패턴을 감지하십시오.
anomaly_flags = db_logs.sort_values('timestamp').assign(rolling_mean=lambda x: x['duration_ms'].rolling(window=100, min_periods=10).mean()).assign(is_anomaly=lambda x: x['duration_ms'] > 2 * x['rolling_mean'])
이 정렬은 연대순으로 로그를 사용하여 사용하여 마지막 100 개의 작업의 롤링 평균을 계산합니다. rolling()
그런 다음 현재 지속 시간이 롤링 평균의 두 배를 초과하는 작업을 플래그합니다. 그만큼 min_periods=10
충분한 데이터를 사용할 수있는 후에 만 계산이 시작되도록합니다.
잘린 출력 :
각 데이터베이스 작업에 이상 플래그를 추가하여 정적 임계 값을 사용하지 않고 최근 성능에 비해 비정상적으로 느린 작업을 식별합니다.
틀 9. 메모리 효율적인 데이터 유형 최적화
숫자 유형을 가장 작은 표현으로 다운 캐스팅하여 데이터 프레임 메모리 사용량을 자동으로 최적화합니다.
optimized_df = db_logs.assign(**{c: (pd.to_numeric(db_logs[c], downcast="integer") if pd.api.types.is_integer_dtype(db_logs[c]) else pd.to_numeric(db_logs[c], downcast="float")) for c in db_logs.select_dtypes(include=['int', 'float']).columns})
이것은 숫자 열만 선택하고 원본에서 대체합니다. db_logs
다운 캐스트 된 버전을 사용하여 pd.to_numeric()
. 정수 열의 경우 시도합니다 int8
,,, int16
그리고 int32
머무르기 전에 int64
. 플로트 칼럼의 경우 시도합니다 float32
~ 전에 float64
.
그렇게하면 큰 데이터 세트의 메모리 사용량이 줄어 듭니다.
틀 10. 시간당 이벤트 처리 메트릭 계산
이벤트 볼륨 및 사용자 참여 패턴을 추적하여 스트리밍 파이프 라인 건강을 모니터링합니다.
pipeline_metrics = pd.DataFrame(events).assign(hour=lambda x: pd.to_datetime(x['timestamp']).dt.hour).groupby('hour').agg({'event_id': 'count', 'user_id': 'nunique', 'event_type': lambda x: (x == 'purchase').mean()}).rename(columns={'event_id': 'total_events', 'user_id': 'unique_users', 'event_type': 'purchase_rate'}).round(3)
시간마다 타임 스탬프 및 그룹 이벤트에서 시간을 추출한 다음 세 가지 주요 메트릭을 계산합니다. count()
사용하는 고유 한 사용자 nunique()
구매 이벤트의 비율을 계산하는 람다를 사용한 구매 전환율. 그만큼 rename()
메소드는 최종 출력에 대한 설명 열 이름을 제공합니다.
산출:
이는 이벤트 볼륨, 사용자 참여 수준 및 하루 종일 전환율을 나타내는 시간별 메트릭을 보여줍니다.
틀 마무리
이 1 라이너는 데이터 엔지니어링 작업에 유용합니다. 그들은 Pandas 운영, 통계 분석 및 데이터 변환 기술을 결합하여 실제 시나리오를 효율적으로 처리합니다.
각 패턴은 특정 요구 사항에 따라 적응하고 확장 할 수 있으며 생산 사용에 효과적인 핵심 논리를 유지할 수 있습니다.
행복한 코딩!
발라 프리 야 c 인도의 개발자이자 기술 작가입니다. 그녀는 수학, 프로그래밍, 데이터 과학 및 컨텐츠 제작의 교차점에서 일하는 것을 좋아합니다. 그녀의 관심 분야와 전문 지식에는 DevOps, 데이터 과학 및 자연어 처리가 포함됩니다. 그녀는 독서, 쓰기, 코딩 및 커피를 즐깁니다! 현재 그녀는 자습서, 방법 안내, 의견 조각 등을 통해 개발자 커뮤니티와 지식을 배우고 공유하는 작업을하고 있습니다. Bala는 또한 매력적인 리소스 개요 및 코딩 자습서를 만듭니다.
Post Comment