Kafka to CSV

Kafka에 저장한 JSON 포맷의 레코드를 모두 추출해 CSV 파일로 변환하는 일련의 과정을 기술한다.

{
  "data": {
    "row": [
      "좋아",
      "아주 좋아",
      "몰라"
    ],
    "title": "survey 171225"
  },
  "event_name": "my-event",
  "timestamp": "2017-12-26T13:31:52.03",
  "labels": null,
  "version": "1.0"
}

더불어 한국어 윈도우용 엑셀을 사용하는 사람이 CSV 파일을 여는데 아무 문제가 없도록 파일 인코딩을 euc-kr로 맞추는 작업을 한다. Kafka 브로커에서 수집한 데이터를 시간별로 정렬하는 작업도 잊지 않는다.

kubectl run myworkspace -i -t --rm --image=golang -- bash로 kafka 클러스터에 접속가능한 도커 컨테이너를 생성하고 터미널 접속을 한다.

apt-get update && apt-get install -y kafkacat python3-pip && pip3 install csvkit

# Kafka에서 데이터를 내려 받는다.
MSGS=t.txt
echo '[' > "${MSGS}"
kafkacat -C  -b kafka-0.broker:9092,kafka-1.broker:9092,kafka-2.broker:9092,kafka-3.broker:9092,kafka-4.broker:9092 -o beginning -t exitpolls -e -D ',\n' -q >> "${MSGS}"
echo '{} ]' >> "${MSGS}"


TMPFILE=$(mktemp)
OUTPUT="${MSGS}.csv"

# JSON 포맷의 데이터를 .csv로 변환한다.
PYTHONIOENCODING=utf8 in2csv --format=json "${MSGS}" > "${TMPFILE}"

# 최근 데이터를 위로 올려서 정렬한다.
head -n -1 "${TMPFILE}" |  PYTHONIOENCODING=utf8 csvsort -c timestamp --reverse > "${OUTPUT}"

head -n 3 "${OUTPUT}"

# 파일 인코딩을 euc-kr로 바꾼다.
OUTPUT_KR="${MSGS}.euc-kr.csv"
iconv -f utf-8 -t euc-kr//TRANSLIT "${OUTPUT}" > "${OUTPUT_KR}"

head -n 3 "${OUTPUT_KR}"

이렇게 만든 결과 파일을 로컬 머신으로 내려 받으면 일이 끝난다.

kubectl cp myworkspace-3798134014-kgnpl:t.txt.euc-kr.csv t.txt.euc-kr.csv

참고자료

최 재훈

블로그, 페이스북, 트위터 고성능 서버 엔진, 데이터베이스, 지속적인 통합 등 다양한 주제에 관심이 많다.
Close Menu
%d bloggers like this: