Kafka to CSV

  • Post author:
  • Post category:칼럼
  • Post comments:1 Comment
  • Post last modified:December 3, 2023

Kafka에 저장한 JSON 포맷의 레코드를 모두 추출해 CSV 파일로 변환하는 일련의 과정을 기술한다.

{
  "data": {
    "row": [
      "좋아",
      "아주 좋아",
      "몰라"
    ],
    "title": "survey 171225"
  },
  "event_name": "my-event",
  "timestamp": "2017-12-26T13:31:52.03",
  "labels": null,
  "version": "1.0"
}

더불어 한국어 윈도우용 엑셀을 사용하는 사람이 CSV 파일을 여는데 아무 문제가 없도록 파일 인코딩을 euc-kr로 맞추는 작업을 한다. Kafka 브로커에서 수집한 데이터를 시간별로 정렬하는 작업도 잊지 않는다.

kubectl run myworkspace -i -t --rm --image=golang -- bash로 kafka 클러스터에 접속가능한 도커 컨테이너를 생성하고 터미널 접속을 한다.

apt-get update && apt-get install -y kafkacat python3-pip && pip3 install csvkit

# Kafka에서 데이터를 내려 받는다.
MSGS=t.txt
echo '[' > "${MSGS}"
kafkacat -C  -b kafka-0.broker:9092,kafka-1.broker:9092,kafka-2.broker:9092,kafka-3.broker:9092,kafka-4.broker:9092 -o beginning -t exitpolls -e -D ',\n' -q >> "${MSGS}"
echo '{} ]' >> "${MSGS}"


TMPFILE=$(mktemp)
OUTPUT="${MSGS}.csv"

# JSON 포맷의 데이터를 .csv로 변환한다.
PYTHONIOENCODING=utf8 in2csv --format=json "${MSGS}" > "${TMPFILE}"

# 최근 데이터를 위로 올려서 정렬한다.
head -n -1 "${TMPFILE}" |  PYTHONIOENCODING=utf8 csvsort -c timestamp --reverse > "${OUTPUT}"

head -n 3 "${OUTPUT}"

# 파일 인코딩을 euc-kr로 바꾼다.
OUTPUT_KR="${MSGS}.euc-kr.csv"
iconv -f utf-8 -t euc-kr//TRANSLIT "${OUTPUT}" > "${OUTPUT_KR}"

head -n 3 "${OUTPUT_KR}"

이렇게 만든 결과 파일을 로컬 머신으로 내려 받으면 일이 끝난다.

kubectl cp myworkspace-3798134014-kgnpl:t.txt.euc-kr.csv t.txt.euc-kr.csv

참고자료

Author Details
Kubernetes, DevSecOps, AWS, 클라우드 보안, 클라우드 비용관리, SaaS 의 활용과 내재화 등 소프트웨어 개발 전반에 도움이 필요하다면 도움을 요청하세요. 지인이라면 가볍게 도와드리겠습니다. 전문적인 도움이 필요하다면 저의 현업에 방해가 되지 않는 선에서 협의가능합니다.
0 0 votes
Article Rating
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

1 Comment
Oldest
Newest Most Voted
Inline Feedbacks
View all comments