약 30 줄의 파이썬에서 데이터 과학 워크 플로우를위한 ETL 파이프 라인 구축

약 30 줄의 파이썬에서 데이터 과학 워크 플로우를위한 ETL 파이프 라인 구축

약 30 줄의 파이썬에서 데이터 과학 워크 플로우를위한 ETL 파이프 라인 구축약 30 줄의 파이썬에서 데이터 과학 워크 플로우를위한 ETL 파이프 라인 구축
저자의 이미지 | 표의 문자

당신은 다른 형식과 소스에 데이터를 산란시킬 때의 느낌을 알고 있으며, 그것을 모두 이해해야합니까? 그것이 바로 오늘 우리가 해결하고있는 것입니다. 지저분한 데이터를 가져 와서 실제로 유용한 것으로 바꾸는 ETL 파이프 라인을 구축합시다.

이 기사에서는 전자 상거래 트랜잭션을 처리하는 파이프 라인 만들기를 안내합니다. 공상은없고 작업을 완료하는 실용적인 코드입니다.

CSV 파일 (전자 상거래 플랫폼에서 다운로드 한 것처럼)에서 데이터를 가져 와서 정리하고 분석을 위해 적절한 데이터베이스에 저장합니다.

github의 코드 링크

ETL (Extract, Transform, Load) 파이프 라인은 무엇입니까?

모든 ETL 파이프 라인은 동일한 패턴을 따릅니다. 어딘가 (추출)에서 데이터를 가져 와서 정리하고 더 나은 (변환) 한 다음 유용한 곳 (로드)을 넣습니다.

ETL-PIPELINEETL-PIPELINE
ETL 파이프 라인 | 저자의 이미지 | diagrams.net (draw.io)

프로세스는 발췌 데이터가 데이터베이스, API, 파일 또는 스트리밍 플랫폼과 같은 다양한 소스 시스템에서 데이터를 검색하는 단계. 이 단계에서 파이프 라인은 다른 일정과 형식에서 작동 할 수있는 이질적인 시스템에 대한 연결을 유지하면서 관련 데이터를 식별하고 가져옵니다.

다음 변환 단계는 추출 된 데이터가 청소, 검증 및 구조 조정을 겪는 핵심 처리 단계를 나타냅니다. 이 단계는 데이터 품질 문제를 해결하고, 비즈니스 규칙을 적용하며, 계산을 수행하며, 데이터를 필요한 형식 및 구조로 변환합니다. 일반적인 변환에는 데이터 유형 변환, 필드 매핑, 집계 및 복제 또는 유효하지 않은 레코드 제거가 포함됩니다.

마지막으로, 위상은 현재 변환 된 데이터를 대상 시스템으로 전송합니다. 이 단계는 전체 부하를 통해 발생할 수 있습니다. 전체 데이터 세트가 교체되거나 새로 변경된 데이터 또는 변경된 데이터 만 추가되는 증분로드를 통해 발생할 수 있습니다. 로딩 전략은 데이터 볼륨, 시스템 성능 요구 사항 및 비즈니스 요구와 같은 요소에 따라 다릅니다.

1 단계 : 추출

“추출”단계는 데이터를 손에 넣는 곳입니다. 실제 세계에서는 전자 상거래 플랫폼의보고 대시 보드 에서이 CSV를 다운로드하거나 FTP 서버에서 가져 오거나 API를 통해 얻을 수 있습니다. 여기에서 사용 가능한 CSV 파일에서 읽고 있습니다.

def extract_data_from_csv(csv_file_path):
    try:
        print(f"Extracting data from {csv_file_path}...")
        df = pd.read_csv(csv_file_path)
        print(f"Successfully extracted {len(df)} records")
        return df
    except FileNotFoundError:
        print(f"Error: {csv_file_path} not found. Creating sample data...")
        csv_file = create_sample_csv_data()
        return pd.read_csv(csv_file)

이제 소스 (raw_transactions.csv)의 원시 데이터를 보유하고 있으므로이를 사용 가능한 것으로 변환해야합니다.

2 단계 : 변환

이곳은 데이터를 실제로 유용하게 만드는 곳입니다.

def transform_data(df):
    print("Transforming data...")
    
    df_clean = df.copy()
    
    # Remove records with missing emails
    initial_count = len(df_clean)
    df_clean = df_clean.dropna(subset=['customer_email'])
    removed_count = initial_count - len(df_clean)
    print(f"Removed {removed_count} records with missing emails")
    
    # Calculate derived fields
    df_clean['total_amount'] = df_clean['price'] * df_clean['quantity']
    
    # Extract date components
    df_clean['transaction_date'] = pd.to_datetime(df_clean['transaction_date'])
    df_clean['year'] = df_clean['transaction_date'].dt.year
    df_clean['month'] = df_clean['transaction_date'].dt.month
    df_clean['day_of_week'] = df_clean['transaction_date'].dt.day_name()
    
    # Create customer segments
    df_clean['customer_segment'] = pd.cut(df_clean['total_amount'], 
                                        bins=[0, 50, 200, float('inf')], 
                                        labels=['Low', 'Medium', 'High'])
    
    return df_clean

첫째, 불완전한 고객 데이터는 대부분의 분석에 도움이되지 않기 때문에 이메일이 누락 된 행으로 줄을 삭제하고 있습니다.

그런 다음 계산합니다 total_amount 가격과 수량을 곱하여. 이것은 분명해 보이지만 이와 같은 파생 필드가 원시 데이터에서 얼마나 자주 누락되었는지 놀랄 것입니다.

날짜 추출은 정말 편리합니다. 타임 스탬프를 갖는 대신 이제 우리는 별도의 해, 월 및 주일 열이 있습니다. 이로 인해 “주말에 더 많이 팔리나요?”와 같은 패턴을 쉽게 분석 할 수 있습니다.

고객 세분화 사용 pd.cut() 특히 유용 할 수 있습니다. 자동으로 고객을 지출 카테고리로 만들어냅니다. 이제 거래 금액 만있는 대신 의미있는 비즈니스 부문이 있습니다.

3 단계 :로드

실제 프로젝트에서는 데이터베이스에로드하거나 API로 보내거나 클라우드 스토리지로 밀어 넣을 수 있습니다.

여기에서는 깨끗한 데이터를 적절한 SQLITE 데이터베이스에로드하고 있습니다.

def load_data_to_sqlite(df, db_name="ecommerce_data.db", table_name="transactions"):
    print(f"Loading data to SQLite database '{db_name}'...")
    
    conn = sqlite3.connect(db_name)
    
    try:
        df.to_sql(table_name, conn, if_exists="replace", index=False)
        
        cursor = conn.cursor()
        cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
        record_count = cursor.fetchone()[0]
        
        print(f"Successfully loaded {record_count} records to '{table_name}' table")
        
        return f"Data successfully loaded to {db_name}"
        
    finally:
        conn.close()

이제 분석가는 SQL 쿼리를 실행하고 BI 도구를 연결하며 실제로 의사 결정 에이 데이터를 사용할 수 있습니다.

SQLITE는 가벼우 며 설정이 필요하지 않으며 쉽게 공유하거나 백업 할 수있는 단일 파일을 생성하기 때문에 이에 대해 잘 작동합니다. 그만큼 if_exists="replace" 매개 변수는 중복 데이터에 대해 걱정하지 않고이 파이프 라인을 여러 번 실행할 수 있음을 의미합니다.

부하가 성공했음을 알 수 있도록 검증 단계를 추가했습니다. 나중에 빈 테이블을 찾기 위해 데이터가 안전하게 저장된다고 생각하는 것보다 더 나쁜 것은 없습니다.

ETL 파이프 라인을 실행합니다

이것은 전체 추출, 변환,로드 워크 플로를 오케스트레이션합니다.

def run_etl_pipeline():
    print("Starting ETL Pipeline...")
    
    # Extract
    raw_data = extract_data_from_csv('raw_transactions.csv')
    
    # Transform  
    transformed_data = transform_data(raw_data)
    
    # Load
    load_result = load_data_to_sqlite(transformed_data)
    
    print("ETL Pipeline completed successfully!")
    
    return transformed_data

이것이 어떻게 모든 것을 함께 묶는 지 주목하십시오. 추출, 변환,로드, 완료. 이것을 실행하고 즉시 처리 된 데이터를 볼 수 있습니다.

GitHub에서 전체 코드를 찾을 수 있습니다.

마무리

이 파이프 라인은 원시 거래 데이터를 가져 와서 분석가 나 데이터 과학자가 실제로 작업 할 수있는 것으로 전환합니다. 깨끗한 레코드, 계산 된 필드 및 의미있는 세그먼트가 있습니다.

각 함수는 한 가지 일을 잘 수행하며 나머지를 깨지 않고 쉽게 수정하거나 확장 할 수 있습니다.

이제 직접 실행해보십시오. 또한 다른 사용 사례를 위해 수정하십시오. 행복한 코딩!

발라 프리 야 c 인도의 개발자이자 기술 작가입니다. 그녀는 수학, 프로그래밍, 데이터 과학 및 컨텐츠 제작의 교차점에서 일하는 것을 좋아합니다. 그녀의 관심 분야와 전문 지식에는 DevOps, 데이터 과학 및 자연어 처리가 포함됩니다. 그녀는 독서, 쓰기, 코딩 및 커피를 즐깁니다! 현재 그녀는 자습서, 방법 안내, 의견 조각 등을 통해 개발자 커뮤니티와 지식을 배우고 공유하는 작업을하고 있습니다. Bala는 또한 매력적인 리소스 개요 및 코딩 자습서를 만듭니다.

출처 참조

Post Comment

당신은 놓쳤을 수도 있습니다