데이터 엔지니어링에 유용한 유용한 파이썬 원 라이너

데이터 엔지니어링에 유용한 유용한 파이썬 원 라이너

데이터 엔지니어링에 유용한 파이썬 원 라이너데이터 엔지니어링에 유용한 파이썬 원 라이너
편집자에 의한 이미지 | chatgpt

소개

데이터 엔지니어링에는 대규모 데이터 세트 처리, ETL 파이프 라인 구축 및 데이터 품질 유지가 포함됩니다. 데이터 엔지니어는 스트리밍 데이터와 협력하고 시스템 성능을 모니터링하며 스키마 변경을 처리하며 분산 시스템에서 데이터 일관성을 보장합니다.

Python One-Liners는 복잡한 작업을 단일의 읽기 쉬운 문으로 압축하여 이러한 작업을 단순화하는 데 도움이 될 수 있습니다. 이 기사는 일반적인 데이터 엔지니어링 문제를 해결하는 실용적인 원 라이너에 중점을 둡니다.

여기에 제시된 한 라이너는 다양한 구조로 이벤트 데이터를 처리, 성능 문제에 대한 시스템 로그 분석, 다양한 스키마로 API 응답 처리 및 데이터 품질 검사 구현과 같은 실제 작업을 다룹니다. 시작합시다.

🔗 Github의 코드 링크

샘플 데이터

하나의 라이너를 실행하기 위해 일부 샘플 데이터를 돌리겠습니다.

import pandas as pd
import numpy as np
import json
from datetime import datetime, timedelta

# Create streaming event data
np.random.seed(42)
events = []
for i in range(1000):
    properties = {
        'device_type': np.random.choice(['mobile', 'desktop', 'tablet']),
        'page_path': np.random.choice(['/home', '/products', '/checkout']),
        'session_length': np.random.randint(60, 3600)
    }
    if np.random.random() > 0.7:
        properties['purchase_value'] = round(np.random.uniform(20, 300), 2)

    event = {
        'event_id': f'evt_{i}',
        'timestamp': (datetime.now() - timedelta(hours=np.random.randint(0, 72))).isoformat(),
        'user_id': f'user_{np.random.randint(100, 999)}',
        'event_type': np.random.choice(['view', 'click', 'purchase']),
        'metadata': json.dumps(properties)
    }
    events.append(event)

# Create database performance logs
db_logs = pd.DataFrame({
    'timestamp': pd.date_range('2024-01-01', periods=5000, freq='1min'),
    'operation': np.random.choice(['SELECT', 'INSERT', 'UPDATE'], 5000, p=[0.7, 0.2, 0.1]),
    'duration_ms': np.random.lognormal(mean=4, sigma=1, size=5000),
    'table_name': np.random.choice(['users', 'orders', 'products'], 5000),
    'rows_processed': np.random.poisson(lam=25, size=5000),
    'connection_id': np.random.randint(1, 20, 5000)
})

# Create API log data
api_logs = []
for i in range(800):
    log_entry = {
        'timestamp': datetime.now() - timedelta(minutes=np.random.randint(0, 1440)),
        'endpoint': np.random.choice(['/api/users', '/api/orders', '/api/metrics']),
        'status_code': np.random.choice([200, 400, 500], p=[0.8, 0.15, 0.05]),
        'response_time': np.random.exponential(150)
    }
    if log_entry['status_code'] == 200:
        log_entry['payload_size'] = np.random.randint(100, 5000)
    api_logs.append(log_entry)

1. JSON 필드를 데이터 프레임 열로 추출합니다

이벤트 로그에서 JSON 메타 데이터 필드를 분석을 위해 별도의 데이터 프레임 열로 변환하십시오.

events_df = pd.DataFrame([{**event, **json.loads(event['metadata'])} for event in events]).drop('metadata', axis=1)

이 1 라이너는 사전 포장 풀기와 함께 목록 이해력을 사용하여 각 이벤트의 기본 필드를 구문 분석 된 JSON 메타 데이터와 병합합니다. 그만큼 drop() 원본을 제거합니다 metadata 내용은 이제 별도의 열에 있기 때문에 열입니다.

산출:

Extract-JSON-2-COLSExtract-JSON-2-COLS

이것은 JSON 필드가 좋아하는 1000 행과 8 개의 열로 데이터 프레임을 만듭니다. device_type 그리고 purchase_value 직접 쿼리하고 집계 할 수있는 개별 열이됩니다.

2. 작동 유형별 성능 특이점 식별

유사한 작업에 비해 비정상적으로 오래 걸리는 데이터베이스 작업을 찾으십시오.

outliers = db_logs.groupby('operation').apply(lambda x: x[x['duration_ms'] > x['duration_ms'].quantile(0.95)]).reset_index(drop=True)

이 데이터베이스 로그를 작동 유형별로 그룹화 한 다음 각 그룹을 95 번째 백분위 수 시간을 초과하는 레코드를 필터링합니다.

잘린 출력 :

특이 치특이 치

이로 인해 각 작업은 유사한 작업의 95%보다 상당히 느리게 수행되는 약 250 개의 이상치 운영 (총 5000 개)을 반환합니다.

3. API 엔드 포인트의 롤링 평균 응답 시간 계산

슬라이딩 윈도우를 사용하여 다양한 API 엔드 포인트에 대한 시간이 지남에 따라 성능 추세를 모니터링합니다.

api_response_trends = pd.DataFrame(api_logs).set_index('timestamp').sort_index().groupby('endpoint')['response_time'].rolling('1H').mean().reset_index()

이것은 API 로그를 데이터 프레임, 세트로 변환합니다 timestamp 시간 기반 운영의 색인으로서, 단조로운 순서를 보장하기 위해 시간순으로 정렬합니다. 그런 다음 그룹 endpoint 응답 시간에 롤링 1 시간 창을 적용합니다.

각 슬라이딩 창 내에서 mean() 함수 평균 응답 시간을 계산합니다. 롤링 윈도우는 시간이 지남에 따라 이동하여 고립 된 측정이 아닌 성능 추세 분석을 제공합니다.

잘린 출력 :

롤링-avg롤링-avg

우리는 각 API 엔드 포인트의 성능이 시간이 지남에 따라 밀리 초의 값으로 어떻게 변하는지를 보여주는 응답 시간 추세를 얻습니다. 값이 높을수록 성능이 느리다는 것을 나타냅니다.

4. 이벤트 데이터의 스키마 감지

이전 이벤트에 존재하지 않은 이벤트 메타 데이터에 새 필드가 나타나는 시점을 식별하십시오.

schema_evolution = pd.DataFrame([{k: type(v).__name__ for k, v in json.loads(event['metadata']).items()} for event in events]).fillna('missing').nunique()

이것은 각 이벤트에서 JSON 메타 데이터를 구문 분석하고 type(v).__name__.

결과 데이터 프레임에는 모든 이벤트에서 발견 된 이벤트 당 1 행과 고유 필드 당 하나의 열이 있습니다. 그만큼 fillna('missing') 특정 필드가없는 이벤트를 처리합니다 nunique() 얼마나 많은 다른 값 (포함 missing) 각 열에 나타납니다.

산출:

device_type       1
page_path         1
session_length    1
purchase_value    2
dtype: int64

5. 다단계 데이터베이스 연결 성능 ​​집계

리소스 모니터링을위한 작업 유형 및 연결별로 그룹화 된 요약 통계를 만듭니다.

connection_perf = db_logs.groupby(['operation', 'connection_id']).agg({'duration_ms': ['mean', 'count'], 'rows_processed': ['sum', 'mean']}).round(2)

이 데이터베이스는 작업 유형 및 연결 ID별로 동시에 로그를 그룹화하여 다양한 연결이 다양한 작업을 처리하는 방법에 대한 계층 적 분석을 만듭니다.

그만큼 agg() 함수는 여러 집계 함수를 적용합니다. mean 그리고 count 평균 성능과 쿼리 빈도를 모두 표시하는 동안 sum 그리고 mean ~을 위한 rows_processed 처리량 패턴을 보여줍니다. 그만큼 round(2) 읽을 수있는 소수점 정밀도를 보장합니다.

산출:

골재골재

이는 각 연결이 다른 작업을 수행하는 방법을 보여주는 다중 인덱스 데이터 프레임을 만듭니다.

6. 시간별 이벤트 유형 분포 패턴 생성

사용자 행동주기를 이해하기 위해 다른 시간에 걸쳐 이벤트 유형 분포 패턴을 계산합니다.

hourly_patterns = pd.DataFrame(events).assign(hour=lambda x: pd.to_datetime(x['timestamp']).dt.hour).groupby(['hour', 'event_type']).size().unstack(fill_value=0).div(pd.DataFrame(events).assign(hour=lambda x: pd.to_datetime(x['timestamp']).dt.hour).groupby('hour').size(), axis=0).round(3)

이것은 타임 스탬프에서 시간을 추출합니다 assign() 그리고 Lambda는 시간 대 이벤트 유형을 사용하여 groupby 그리고 unstack.

그만큼 div() 운영은 시간당 총 이벤트에 의해 정규화되어 원시 카운트가 아닌 비례 분포를 보여줍니다.

잘린 출력 :

시간별 다이스트시간별 다이스트

각 이벤트 유형의 비율을 보여주는 행렬을 반환합니다 (view,,, click,,, purchase) 하루 각 시간 동안, 다른 행동에 대한 사용자 행동 패턴과 피크 활동 기간을 드러냅니다.

7. 상태 코드 별 API 오류율 요약 계산

모든 엔드 포인트에서 오류 분포 패턴을 분석하여 API 건강을 모니터링하십시오.

error_breakdown = pd.DataFrame(api_logs).groupby(['endpoint', 'status_code']).size().unstack(fill_value=0).div(pd.DataFrame(api_logs).groupby('endpoint').size(), axis=0).round(3)

이것은 둘 다 API 로그를 그룹화합니다 endpoint 그리고 status_code그런 다음 사용합니다 size() 발생을 계산합니다 unstack() 상태 코드를 열로 피벗합니다. 그만큼 div() 작업은 원시 카운트가 아닌 비율을 보여주기 위해 엔드 포인트 당 총 요청에 의해 정규화되어 어떤 엔드 포인트가 가장 높은 오류율을 갖는 지, 어떤 오류가 생성되는지를 나타냅니다.

산출:

status_code     200    400    500
endpoint                         
/api/metrics  0.789  0.151  0.060
/api/orders   0.827  0.140  0.033
/api/users    0.772  0.167  0.061

각 엔드 포인트에 대해 각 상태 코드 (200, 400, 500)의 비율을 보여주는 행렬을 작성하여 문제가있는 엔드 포인트를 쉽게 찾을 수 있으며 클라이언트 오류 (4xx) 또는 서버 오류 (5xx)가 실패하는지 여부를 쉽게 찾을 수 있습니다.

8. 슬라이딩 윈도우 이상 탐지 구현

현재의 성능을 최근의 역사적 공연과 비교하여 비정상적인 패턴을 감지하십시오.

anomaly_flags = db_logs.sort_values('timestamp').assign(rolling_mean=lambda x: x['duration_ms'].rolling(window=100, min_periods=10).mean()).assign(is_anomaly=lambda x: x['duration_ms'] > 2 * x['rolling_mean'])

이 정렬은 연대순으로 로그를 사용하여 사용하여 마지막 100 개의 작업의 롤링 평균을 계산합니다. rolling()그런 다음 현재 지속 시간이 롤링 평균의 두 배를 초과하는 작업을 플래그합니다. 그만큼 min_periods=10 충분한 데이터를 사용할 수있는 후에 만 ​​계산이 시작되도록합니다.

잘린 출력 :

슬라이딩 윈 -OP슬라이딩 윈 -OP

각 데이터베이스 작업에 이상 플래그를 추가하여 정적 임계 값을 사용하지 않고 최근 성능에 비해 비정상적으로 느린 작업을 식별합니다.

9. 메모리 효율적인 데이터 유형 최적화

숫자 유형을 가장 작은 표현으로 다운 캐스팅하여 데이터 프레임 메모리 사용량을 자동으로 최적화합니다.

optimized_df = db_logs.assign(**{c: (pd.to_numeric(db_logs[c], downcast="integer") if pd.api.types.is_integer_dtype(db_logs[c]) else pd.to_numeric(db_logs[c], downcast="float")) for c in db_logs.select_dtypes(include=['int', 'float']).columns})

이것은 숫자 열만 선택하고 원본에서 대체합니다. db_logs 다운 캐스트 된 버전을 사용하여 pd.to_numeric(). 정수 열의 경우 시도합니다 int8,,, int16그리고 int32 머무르기 전에 int64. 플로트 칼럼의 경우 시도합니다 float32 ~ 전에 float64.

그렇게하면 큰 데이터 세트의 메모리 사용량이 줄어 듭니다.

10. 시간당 이벤트 처리 메트릭 계산

이벤트 볼륨 및 사용자 참여 패턴을 추적하여 스트리밍 파이프 라인 건강을 모니터링합니다.

pipeline_metrics = pd.DataFrame(events).assign(hour=lambda x: pd.to_datetime(x['timestamp']).dt.hour).groupby('hour').agg({'event_id': 'count', 'user_id': 'nunique', 'event_type': lambda x: (x == 'purchase').mean()}).rename(columns={'event_id': 'total_events', 'user_id': 'unique_users', 'event_type': 'purchase_rate'}).round(3)

시간마다 타임 스탬프 및 그룹 이벤트에서 시간을 추출한 다음 세 가지 주요 메트릭을 계산합니다. count()사용하는 고유 한 사용자 nunique()구매 이벤트의 비율을 계산하는 람다를 사용한 구매 전환율. 그만큼 rename() 메소드는 최종 출력에 대한 설명 열 이름을 제공합니다.

산출:

이벤트 프로그램 출력이벤트 프로그램 출력

이는 이벤트 볼륨, 사용자 참여 수준 및 하루 종일 전환율을 나타내는 시간별 메트릭을 보여줍니다.

마무리

이 1 라이너는 데이터 엔지니어링 작업에 유용합니다. 그들은 Pandas 운영, 통계 분석 및 데이터 변환 기술을 결합하여 실제 시나리오를 효율적으로 처리합니다.

각 패턴은 특정 요구 사항에 따라 적응하고 확장 할 수 있으며 생산 사용에 효과적인 핵심 논리를 유지할 수 있습니다.

행복한 코딩!

발라 프리 야 c 인도의 개발자이자 기술 작가입니다. 그녀는 수학, 프로그래밍, 데이터 과학 및 컨텐츠 제작의 교차점에서 일하는 것을 좋아합니다. 그녀의 관심 분야와 전문 지식에는 DevOps, 데이터 과학 및 자연어 처리가 포함됩니다. 그녀는 독서, 쓰기, 코딩 및 커피를 즐깁니다! 현재 그녀는 자습서, 방법 안내, 의견 조각 등을 통해 개발자 커뮤니티와 지식을 배우고 공유하는 작업을하고 있습니다. Bala는 또한 매력적인 리소스 개요 및 코딩 자습서를 만듭니다.

출처 참조

Post Comment

당신은 놓쳤을 수도 있습니다