주제
데이터 웨어하우스와 ETL 소개
1. 데이터 웨어하우스와 데이터 레이크
2. ETL와 ELT
3. 데이터 플랫폼의 발전 단계
4. 데이터 파이프라인
데이터 웨어하우스와 데이터 레이크
데이터 레이크
- 구조화 데이터 + 비구조화 데이터 (로그 파일)
- 보존 기한이 없는 모든 데이터를 원래 형태대로 보존하는 스토리지에 가까움
- 데이터 레이크가 있는 환경에서 ETL과 ELT
- 데이터 레이크와 데이터 웨어하우스 밖에서 안으로 데이터를 가져오는 것 : ETL
- 데이터 레이크와 데이터 웨어하우스 안에 있는 데이터를 처리하는 것 : ELT
데이터 웨어하우스
- 기본적으로 클라우드가 대세 옵션
- 데이터가 커져도 문제가 없는 확장가능성(scalable)과 적절한 비용이 중요한 포인트
- 고정비용 옵션과 가변비용 옵션이 존재, 후자가 좀 더 확장가능한 옵션
ETL와 ELT
- ETL(Extract, Transform, Load) :
- Extract : 데이터 소스에서 데이터를 추출하는 과정 (보통 API 호출을 이용)
- Transform : 추출한 데이터를 원하는 포맷으로 변환하는 과정
- Load : 요약된 데이터를 데이터 웨어하우스의 테이블로 삽입하는 과정
- 중요한 데이터를 다루는 ETL이 실패했을 경우 이를 빨리 고쳐서 다시 실행하는 것이 중요
- 이를 적절하게 스케줄하고 관리하는 것이 중요해지며 프레임워크가 중요해짐(Airflow)
- 다수의 ETL이 존재할 경우 이들 간의 의존관계를 정의하는 기능 필요
- 특정 ETL이 실패할 경우 이에 관환 에러 메세지를 받고 재실행해주는 기능도 중요해짐(Backfill)
- ELT :
- 데이터 웨어하우스 내부 데이터를 조작해서 새로운 데이터를 만드는 프로세스 → Analytics Engineering
- 이 프로세스 전용 기술들이 존재 : dbt가 가장 유명
- 데이터 분석가가 수행
데이터 플랫폼의 발전 단계
1. 발전 단계 : 데이터 양 증가
- Spark 같은 빅데이터 처리시스템 도입
- 데이터 레이크 도입 : 보통 로그 데이터와 같은 대용량 비구조화 데이터 대상
- 데이터 소스 → 데이터 파이프라인 → 데이터 웨어하우스
- 데이터 소스 → 데이터 파이프라인 → 데이터 레이크
- 데이터 레이크 → 데이터 파이프라인 → 데이터 웨어하우스
- 이 때 Spark / Hadoop 등이 사용됨
2. 성숙 단계 : 현업단의 데이터 활용 가속화
- ELT 단이 더 중요해지면서 dbt 등의 Analytics engineering 도입
- 데이터 레이크 to 데이터 레이크, 데이터 레이크 to 데이터 웨어하우스, 데이터 웨어하우스 to 데이터 웨어하우스
- MLOps 등 머신러닝 개발 운영 관련 효율성 증대 노력
데이터 파이프라인
- 데이터 소스로부터 목적지로 복사하는 작업
- 보통 코딩 혹은 SQL을 통해 이루어짐
- 대부분의 경우 목적지는 데이터 웨어하우스가 됨
- 데이터 소스의 예 :
- 프로덕션 데이터베이스, 로그 파일, API 등
- 데이터의 예 : Click stream, call data, ads performance data, transactions 등
- 데이터 목적지의 예 :
- 데이터 웨어하우스, 캐시 시스템, S3 등
데이터 파이프라인의 종류
1. Raw Data ETL Jobs → 데이터 엔지니어가 수행
- 외부와 내부 데이터 소스에서 데이터를 읽어다가 (많은 경우 API를 통함)
- 적당한 데이터 포맷 변환 후
- 데이터 웨어하우스 로드
2. ELT(Summary / Report) Jobs → Analytics Engineer(dbt)
- DW(혹은 DL)로부터 데이터를 읽어 다시 DW에 쓰는 ETL
- Raw Data를 읽어서 일종의 리포트 형태나 써머리 형태의 테이블을 다시 만드는 용도
- 특수한 형태로는 AB 테스트 결과를 분석하는 데이터 파이프라인도 존재
- 요약 테이블의 경우 SQL(CTAS를 통해) 만으로 만들고 이는 데이터 분석가가 하는 것이 맞음
3. Production Data Jobs
- DW로부터 데이터를 읽어 다른 Storage로 쓰는 ETL
- 써머리 정보가 프로덕션 환경에서 성능 이유로 필요한 경우
- 혹은 머신러닝 모델에서 필요한 피쳐들을 미리 계산하는 경우
- 이 경우 흔한 타겟 스토리지 :
- Cassandra / HBase / DynamoDB와 같은 NoSQL
- MySQL과 같은 관계형 데이터베이스 (OLTP)
- Redis/Memcache와 같은 캐시
- ElasticSearch와 같은 검색엔진
데이터 파이프라인을 만들 때 고려할 점
이상과 현실 간의 괴리
- 이상 혹은 환상
- 내가 만든 데이터 파이프라인은 문제 없이 동작할 것이다
- 내가 만든 데이터 파이프라인을 관리하는 것은 어렵지 않을 것이다
- 현실 혹은 실상
- 데이터 파이프라인은 많은 이유로 실패함
- 버그
- 데이터 소스상의 이유 : 동작하지 않거나 포맷이 바뀐다면?
- 데이터 파이프라인들 간의 의존도의 이해도 부족
- 데이터 파이프라인의 수가 늘어나면 유지보수 비용이 기하급수적으로 늘어남
- 데이터 소스 간의 의존도가 생기면서 이는 더 복잡해짐. 만일 마케팅 채널 정보가 업데이트가 안된다면 마케팅 관련 다른 모든 정보들이 갱신되지 않음
- 관리해야하는 DW 상의 테이블도 늘어남 → 인프라 비용과 검색 비용도 늘어남
- 데이터 파이프라인은 많은 이유로 실패함
Best Practices
1. 가능하면 데이터가 작을 경우 매번 통채로 복사해서 테이블 만들기 (Full Refresh)
- Incremental update만이 가능하다면, 대상 데이터 소스가 갖춰야할 몇 가지 조건이 있음
- 데이터 소스가 프로덕션 데이터베이스 테이블이라면 다음 필드가 필요 :
- created(데이터 업데이트 관점에서는 필요하지는 않음)
- modified
- deleted
- 데이터 소스가 API라면 특정 날짜를 기준으로 새로 생성되거나 업데이트된 레코드들을 읽어올 수 있어야함
- 데이터 소스가 프로덕션 데이터베이스 테이블이라면 다음 필드가 필요 :
2. 멱등성(Idempotency)을 보장하는 것이 중요
- 멱등성이란?
- 동일한 입력 데이터로 데이터 파이프라인을 다수 실행해도 최종 테이블의 내용이 달라지지 않아야 함
- 예를 들면 중복 데이터가 생기지 말아야 함
- 중요한 포인트는 critical point들이 모두 one atomic action으로 실행이 되어야 한다는 점
- SQL transaction이 꼭 필요한 기술
- 동일한 입력 데이터로 데이터 파이프라인을 다수 실행해도 최종 테이블의 내용이 달라지지 않아야 함
3. 실패한 데이터 파이프라인은 재실행이 쉬워야 함
- 과거 데이터를 다시 채우는 과정(Backfill)이 쉬워야 함
- Airflow는 이 부분에 강점을 갖고 있음
4. 데이터 파이프라인의 입력과 출력을 명확히 하고 문서화
- 비즈니스 오너 명시 : 누가 이 데이터를 요청했는지를 기록으로 남기 것!
- 이게 나중에 데이터 카탈로그로 들어가서 데이터 디스커버리에 사용 가능함
- 데이터 리니지가 중요해짐 → 이걸 이해하지 못하면 온갖 종류의 사고 발생
5. 주기적으로 쓸모없는 데이터들 삭제
6. 데이터 파이프라인 사고 시마다 사고 리포트(post-mortem) 쓰기
- 목적은 동일한 혹은 아주 비숫한 사고가 또 발생하는 것을 막기 위함
- 사고 원인 (root-cause)을 이해하고 이를 방지하기 위한 액션 아이템들의 실행이 중요해짐
- 기술 부채의 정도를 이야기해주는 바로미터
7. 중요 데이터 파이프라인의 입력과 출력을 체크하기
- 아주 간단하게 입력 레코드의 수와 출력 레코드의 수가 몇 개인지 체크하는 것부터 시작
- 써머리 테이블을 만들어내고 Primary key가 존재한다면 Primary key uniqueness가 보장되는지 체크하는 것이 필요함
- 중복 레코드 확인
ETL 실습
- Extract 함수
- url을 이용하여 raw data 가져오기
def extract(url):
f = requests.get(url)
return (f.text)
- Transform 함수
- 데이터를 가져와 원하는 포맷으로 변경
- 이 경우에는 \n을 기준으로 텍스트만을 뽑고, 쉼표를 기준으로 이름과 성별을 각각 name과 gender라는 변수에 정의
def transform(text):
lines = text.strip().split("\n")
records = []
for l in lines[1:]:
(name, gender) = l.split(",") # l = "Ben,M" -> [ 'Ben', 'M' ]
records.append([name, gender])
return records
- Load 함수
- 리스트 안에 리스트 형태로 바뀐 데이터를 Redshift에 연결하여 테이블을 생성하고, 해당 테이블에 삽입
- try exception을 이용하여 만약 실패한다면 Rollback
def load(records):
"""
records = [
[ "Ben", "M" ],
[ "Claire", "F" ],
...
]
"""
cur = get_Redshift_connection()
cur.execute("BEGIN;")
try:
cur.execute("""DROP TABLE IF EXISTS jyunghye.name_gender;
CREATE TABLE jyunghye.name_gender (
name varchar(32) primary key,
gender varchar(8)
);""")
for r in records:
name = r[0]
gender = r[1]
print(name, "-", gender)
sql = "INSERT INTO jyunghye.name_gender VALUES ('{n}', '{g}')".format(n=name, g=gender)
cur.execute(sql)
cur.execute("END") # cur.execute("COMMIT"); conn.commit()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
cur.execute("ROLLBACK")
'Data Science > TIL (Today I Learned)' 카테고리의 다른 글
| 프로그래머스 데이터분석 데브코스 1기 - 43일차 (0) | 2024.01.24 |
|---|---|
| 프로그래머스 데이터분석 데브코스 1기 - 42일차 (0) | 2024.01.23 |
| 프로그래머스 데이터분석 데브코스 1기 - 40일차 (0) | 2024.01.19 |
| 프로그래머스 데이터분석 데브코스 1기 - 39일차 (0) | 2024.01.18 |
| 프로그래머스 데이터분석 데브코스 1기 - 38일차 (0) | 2024.01.17 |