주제

데이터 웨어하우스와 ETL 소개
1. 데이터 웨어하우스와 데이터 레이크
2. ETL와 ELT
3. 데이터 플랫폼의 발전 단계
4. 데이터 파이프라인

 

 

데이터 웨어하우스와 데이터 레이크

데이터 레이크

  • 구조화 데이터 + 비구조화 데이터 (로그 파일)
  • 보존 기한이 없는 모든 데이터를 원래 형태대로 보존하는 스토리지에 가까움
  • 데이터 레이크가 있는 환경에서 ETL과 ELT
    • 데이터 레이크와 데이터 웨어하우스 밖에서 안으로 데이터를 가져오는 것 : ETL
    • 데이터 레이크와 데이터 웨어하우스 안에 있는 데이터를 처리하는 것 : ELT

 

데이터 웨어하우스

  • 기본적으로 클라우드가 대세 옵션
  • 데이터가 커져도 문제가 없는 확장가능성(scalable)과 적절한 비용이 중요한 포인트
  • 고정비용 옵션과 가변비용 옵션이 존재, 후자가 좀 더 확장가능한 옵션

 

 

ETL와 ELT

  • ETL(Extract, Transform, Load) :
    • Extract : 데이터 소스에서 데이터를 추출하는 과정 (보통 API 호출을 이용)
    • Transform : 추출한 데이터를 원하는 포맷으로 변환하는 과정
    • Load : 요약된 데이터를 데이터 웨어하우스의 테이블로 삽입하는 과정
  • 중요한 데이터를 다루는 ETL이 실패했을 경우 이를 빨리 고쳐서 다시 실행하는 것이 중요
  • 이를 적절하게 스케줄하고 관리하는 것이 중요해지며 프레임워크가 중요해짐(Airflow)
    • 다수의 ETL이 존재할 경우 이들 간의 의존관계를 정의하는 기능 필요
    • 특정 ETL이 실패할 경우 이에 관환 에러 메세지를 받고 재실행해주는 기능도 중요해짐(Backfill)

 

  • ELT :
    • 데이터 웨어하우스 내부 데이터를 조작해서 새로운 데이터를 만드는 프로세스 → Analytics Engineering
    • 이 프로세스 전용 기술들이 존재 : dbt가 가장 유명
    • 데이터 분석가가 수행

 

 

데이터 플랫폼의 발전 단계

1. 발전 단계 : 데이터 양 증가

  • Spark 같은 빅데이터 처리시스템 도입
  • 데이터 레이크 도입 : 보통 로그 데이터와 같은 대용량 비구조화 데이터 대상
    • 데이터 소스 → 데이터 파이프라인 → 데이터 웨어하우스
    • 데이터 소스 → 데이터 파이프라인 → 데이터 레이크
    • 데이터 레이크 → 데이터 파이프라인 → 데이터 웨어하우스
      • 이 때 Spark / Hadoop 등이 사용됨

2. 성숙 단계 : 현업단의 데이터 활용 가속화

  • ELT 단이 더 중요해지면서 dbt 등의 Analytics engineering 도입
    • 데이터 레이크 to 데이터 레이크, 데이터 레이크 to 데이터 웨어하우스, 데이터 웨어하우스 to 데이터 웨어하우스
  • MLOps 등 머신러닝 개발 운영 관련 효율성 증대 노력

 

 

데이터 파이프라인

  • 데이터 소스로부터 목적지로 복사하는 작업
    • 보통 코딩 혹은 SQL을 통해 이루어짐
    • 대부분의 경우 목적지는 데이터 웨어하우스가 됨
  • 데이터 소스의 예 :
    • 프로덕션 데이터베이스, 로그 파일, API 등
    • 데이터의 예 : Click stream, call data, ads performance data, transactions 등
  • 데이터 목적지의 예 :
    • 데이터 웨어하우스, 캐시 시스템, S3 등

 

데이터 파이프라인의 종류

1. Raw Data ETL Jobs → 데이터 엔지니어가 수행

  1. 외부와 내부 데이터 소스에서 데이터를 읽어다가 (많은 경우 API를 통함)
  2. 적당한 데이터 포맷 변환 후
  3. 데이터 웨어하우스 로드

 

2. ELT(Summary / Report) Jobs → Analytics Engineer(dbt)

  1. DW(혹은 DL)로부터 데이터를 읽어 다시 DW에 쓰는 ETL
  2. Raw Data를 읽어서 일종의 리포트 형태나 써머리 형태의 테이블을 다시 만드는 용도
  3. 특수한 형태로는 AB 테스트 결과를 분석하는 데이터 파이프라인도 존재
  • 요약 테이블의 경우 SQL(CTAS를 통해) 만으로 만들고 이는 데이터 분석가가 하는 것이 맞음

 

3. Production Data Jobs

  1. DW로부터 데이터를 읽어 다른 Storage로 쓰는 ETL
    • 써머리 정보가 프로덕션 환경에서 성능 이유로 필요한 경우
    • 혹은 머신러닝 모델에서 필요한 피쳐들을 미리 계산하는 경우
  2. 이 경우 흔한 타겟 스토리지 :
    • Cassandra / HBase / DynamoDB와 같은 NoSQL
    • MySQL과 같은 관계형 데이터베이스 (OLTP)
    • Redis/Memcache와 같은 캐시
    • ElasticSearch와 같은 검색엔진

 

 

데이터 파이프라인을 만들 때 고려할 점

 

이상과 현실 간의 괴리

  • 이상 혹은 환상
    • 내가 만든 데이터 파이프라인은 문제 없이 동작할 것이다
    • 내가 만든 데이터 파이프라인을 관리하는 것은 어렵지 않을 것이다
  • 현실 혹은 실상
    • 데이터 파이프라인은 많은 이유로 실패함
      • 버그
      • 데이터 소스상의 이유 : 동작하지 않거나 포맷이 바뀐다면?
      • 데이터 파이프라인들 간의 의존도의 이해도 부족
    • 데이터 파이프라인의 수가 늘어나면 유지보수 비용이 기하급수적으로 늘어남
      • 데이터 소스 간의 의존도가 생기면서 이는 더 복잡해짐. 만일 마케팅 채널 정보가 업데이트가 안된다면 마케팅 관련 다른 모든 정보들이 갱신되지 않음
      • 관리해야하는 DW 상의 테이블도 늘어남 → 인프라 비용과 검색 비용도 늘어남

 

 

Best Practices

1. 가능하면 데이터가 작을 경우 매번 통채로 복사해서 테이블 만들기 (Full Refresh)

  • Incremental update만이 가능하다면, 대상 데이터 소스가 갖춰야할 몇 가지 조건이 있음
    • 데이터 소스가 프로덕션 데이터베이스 테이블이라면 다음 필드가 필요 :
      • created(데이터 업데이트 관점에서는 필요하지는 않음)
      • modified
      • deleted
    • 데이터 소스가 API라면 특정 날짜를 기준으로 새로 생성되거나 업데이트된 레코드들을 읽어올 수 있어야함

 

2. 멱등성(Idempotency)을 보장하는 것이 중요

  • 멱등성이란?
    • 동일한 입력 데이터로 데이터 파이프라인을 다수 실행해도 최종 테이블의 내용이 달라지지 않아야 함
      • 예를 들면 중복 데이터가 생기지 말아야 함
    • 중요한 포인트는 critical point들이 모두 one atomic action으로 실행이 되어야 한다는 점
      • SQL transaction이 꼭 필요한 기술

 

3. 실패한 데이터 파이프라인은 재실행이 쉬워야 함

  • 과거 데이터를 다시 채우는 과정(Backfill)이 쉬워야 함
  • Airflow는 이 부분에 강점을 갖고 있음

 

4. 데이터 파이프라인의 입력과 출력을 명확히 하고 문서화

  • 비즈니스 오너 명시 : 누가 이 데이터를 요청했는지를 기록으로 남기 것!
  • 이게 나중에 데이터 카탈로그로 들어가서 데이터 디스커버리에 사용 가능함
    • 데이터 리니지가 중요해짐 → 이걸 이해하지 못하면 온갖 종류의 사고 발생

 

5. 주기적으로 쓸모없는 데이터들 삭제

 

 

6. 데이터 파이프라인 사고 시마다 사고 리포트(post-mortem) 쓰기

  • 목적은 동일한 혹은 아주 비숫한 사고가 또 발생하는 것을 막기 위함
  • 사고 원인 (root-cause)을 이해하고 이를 방지하기 위한 액션 아이템들의 실행이 중요해짐
  • 기술 부채의 정도를 이야기해주는 바로미터

 

7. 중요 데이터 파이프라인의 입력과 출력을 체크하기

  • 아주 간단하게 입력 레코드의 수와 출력 레코드의 수가 몇 개인지 체크하는 것부터 시작
  • 써머리 테이블을 만들어내고 Primary key가 존재한다면 Primary key uniqueness가 보장되는지 체크하는 것이 필요함
  • 중복 레코드 확인

 

 

ETL 실습

  • Extract 함수
    • url을 이용하여 raw data 가져오기
def extract(url):
    f = requests.get(url)
    return (f.text)
  • Transform 함수
    • 데이터를 가져와 원하는 포맷으로 변경
    • 이 경우에는 \n을 기준으로 텍스트만을 뽑고, 쉼표를 기준으로 이름과 성별을 각각 name과 gender라는 변수에 정의
def transform(text):
    lines = text.strip().split("\n")
    records = []
    for l in lines[1:]:
      (name, gender) = l.split(",") # l = "Ben,M" -> [ 'Ben', 'M' ]
      records.append([name, gender])
    return records
  • Load 함수
    • 리스트 안에 리스트 형태로 바뀐 데이터를 Redshift에 연결하여 테이블을 생성하고, 해당 테이블에 삽입
    • try exception을 이용하여 만약 실패한다면 Rollback
def load(records):
    """
    records = [
      [ "Ben", "M" ],
      [ "Claire", "F" ],
      ...
    ]
    """
    cur = get_Redshift_connection()
    cur.execute("BEGIN;")
    try:
        cur.execute("""DROP TABLE IF EXISTS jyunghye.name_gender;
CREATE TABLE jyunghye.name_gender (
   name varchar(32) primary key,
   gender varchar(8)
);""")
        for r in records:
            name = r[0]
            gender = r[1]
            print(name, "-", gender)
            sql = "INSERT INTO jyunghye.name_gender VALUES ('{n}', '{g}')".format(n=name, g=gender)
            cur.execute(sql)
        cur.execute("END")   # cur.execute("COMMIT"); conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        cur.execute("ROLLBACK")

 

+ Recent posts