Amazon Kinesis Data Firehose Dynamic Partitoning으로 분석하기 전에 데이터 뼈맞추기

Amazon Kinesis Data Firehose를 통해 로그를 수집하면서 생긴 문제를 Dynamic Partitioning을 통해 해결했던 기록입니다.
2023.12.05

로그를 수집할 일이 생겼는데 아래와 같은 시스템이었다.

Auth0: 인증, 인가 SaaS
Amazon Eventbridge: 메시징 처리
Amazon Kinesis Data Firehose: 메시지를 일정 크기 혹은 시간이 지난 후에 저장해줌
Amazon S3: 오브젝트 스토리지
Amazon Athena: Amazon S3의 데이터들을 따로 추출할 필요없이 바로 검색가능하게 해줌

외부에서 오는 로그를 수집해서 저장하고 Amazon S3에 저장해서 Amazon Athena로 쿼리를 통해 분석해야 하는 상황이었다.

처음에는 여차저차해서 잘 만들었고, 쿼리를 통해 데이터가 검색이 되는 것을 확인했다.

기분좋게 잘 마무리 한줄만 알았다.

tl;dr

  1. Amazon Kinesis Data Firehose를 사용해 저장한 로그가 한줄의 JSON으로 저장되어 문제가 생김

  2. 한줄로 저장되는 JSON을 여러줄을 만들려면 New line delimiter 설정을 활성화 시켜야 함

  3. New line delimiter 설정을 활성화 시키려면 Dynamic Partitoning 설정이 필요함

문제 발생

그런데, 나중에 보니 Auth0에서 발생한 일부 로그들이 Amazon Athena로 검색이 안되는 걸 알게된다.

분명 어느 부분에서도 에러가 발생하지 않았고, 서로의 시스템은 잘 연결되있는 것 처럼 보였고, 실제로 쿼리를 날려보면 아예 검색이 안되는 것도 아니었다.

AWS 부분은 많이 알려진 방법이라 문제가 없다 생각하여, Auth0와의 연결이 잘못되었나 확인해보아도 잘 처리되고 있는걸 확인할 수 있었다...

그러면 Amazon Athena의 테이블 DDL 문제인가 싶어 변경해보아도 똑같고, 검색하는 쿼리를 수정해보아도 똑같았다.

결국 마지막으로 들어온 데이터들을 좀 더 확인하게 된다.

한줄로 나란히 있는 JSON

처음에 시스템을 구성하고 테스트 했을 때는 로그가 검색이 되는 것만을 확인했다. 근데 지금 발생한 원인은 실제 발생한 모든 로그가 일정 부분 제대로 검색이 안되는 문제다.

그렇다보니, 저장한 데이터와 쿼리에서 검색되는 결과값을 비교해보고 싶어졌다.

Amazon Kinesis Data Firehose는 특정한 설정이 없다면 JSON을 저장하기전에 이전에 온 JSON들과 합쳐서 저장하게 되는 경우가 있다. (타 AWS 서비스와 연계한다던지 경우에는 아닌 경우도 있는 것 같다.)

예를 들면 이런거다.

{"a": 1}{"b": 2}{"c": 3}

각각의 값들을 한줄로 저장하게 되어진다. 처음 테스트시에 이런 상황을 예상했어야 했는데, 여러 파일들이 생기는 것을 확인하고 검색이 되는 것을 확인하다 보니 이러한 특성이 있다는 것을 눈치채지 못했다. 그리고 테스트시에 파일을 열어보기도 했었는데, 위처럼 간단한 데이터가 아니다 보니 그냥 JSON이 들어있는지만 확인하고 제대로 신경쓰지 못 했다.

위같은 한줄짜리 JSON을 Amazon Athena로 검색을 하게 되면 아래처럼 나오게 된다.

id count
a      1

우리가 원하는 값은 a, b, c 였지만 한줄로 이루어진 JSON을 하나의 값으로 이해한 것이다.

그래서 여러줄로 변경해서 데이터를 넣고 테스트 해보니 잘 동작했다.

이런식으로 데이터가 존재하면,

{"a": 1}
{"b": 2}
{"c": 3}

실제 원하는 값을 다 얻을 수 있었다.

id count
a      1
b      2
c      3

원인과 해결책은 알았지만, 해결책을 어떻게 해야할까?

Amazon Kinesis Data Firehose -> Amazon S3 이 사이에 어떠한 데이터를 변경시켜주는 시스템이 없다면 어려워보였다.

실제로 AWS Lambda를 통한 방법이 많이 검색되고 Amazon Kinesis Data Firehose에서도 자체적으로 연동을 지원하는 것 같았다.

하지만, 단순히 여러줄을 만들기 위해 데이터를 파싱하고 \n 넣어주고, 나중에 AWS Lambda의 언어 런타임을 변경해준다던지, 앞으로 여러 설정을 변경해야 된다던지, 코드 변경 및 관리 등등... 유지보수 할거리를 더 늘리고 싶지 않았다.

좀 더 간단한 방법이 있을 것 같았다.

Dynamic Partitioning 기능 사용하기

검색을 하다 Amazon Kinesis Data Firehose의 Dynamic Partitioning라는 기능이 자주보여 자세히 들여다보니, Dynamic Partitioning을 활성화 시키면, 하위 옵션 중에 내가 하고 싶은 한줄을 여러줄로 바꾸는 옵션이 있었다!

아주 나이스한 기분으로 콘솔창에 들어가 테스트 해보려 했지만, 사용중인 경우 Dynamic Partitioning 기능을 활성화/비활성화를 변경할 수 없었다...

물론 유저들이 사용하는 운영환경에 올라간 상황 전에 문제를 확인했기 때문에, 삭제하고 다시 만들면 되었다. 하지만, 그리 간단한 설정이 아니었다.

Dynamic Partitioning을 사용하기 위해 기본적인 추가 설정이 필요한데, bucket prefix라는 설정이 필요하게 된다.

이게 무엇인가 하면, Dynamic Partitioning 없이 데이터를 저장하게 되면 2023/12/05/08/helloworld.gz 같은 폴더구조로 파일이 생성되게 된다.

Dynamic Partitioning을 사용하게 되면 위와 같이 기본으로 만들어주는 prefix가 없어지고 직접 설정해야 한다!

다행히도 로그에서 오는 값들 중에 시간 값이 있어서 이 시간값을 기준으로 파싱을 해서 만들면 될 것 같았다.

물론 prefix를 간단하게 만들 수도 있지만, Amazon Athena에서도 검색이 쉽게 되게하고 싶어 기존의 날짜 형식의 폴더 구조를 따라가고 싶었다.

근데 이 설정 어떻게 하는 걸까...

Inline parsing for JSON & Bucket Prefix

앞에서 Bucket Prefix만 언급했지만, Inline parsing for JSON 설정 활성화도 필수다!..

요건 무엇이냐, 이름 그대로 각각의 JSON들을 jq를 통해 파싱하는 것이다. 그러면 jq가 뭐냐하면 JSON을 간단한 문법을 통해 파싱할 수 있는 cli다 아래 링크를 참조해보자.

jq 소개 페이지

물론 cli를 직접 사용하는 건 아니고, jq 엔진이라는 이름으로 jq의 문법을 입력하면 jq처럼 동작하는 형태로 이루어진다.

예를 들어 이런 시간값이 들어온다 해보자.

{"time": "2023-12-05T04:28:27Z"}

위 값을 jq로 파싱해보면 아래처럼 값을 잘 얻는 것을 알 수 있다.

❯ echo '{"time": "2023-12-05T04:28:27Z"}' | jq '.time'
"2023-12-05T04:28:27Z"

조금 더 해보자면 만약 년값의 2023을 얻어보고 싶다면 이렇게 할 수 있다.

❯ echo '{"time": "2023-12-05T04:28:27Z"}' | jq '.time | strptime("%Y-%m-%dT%H:%M:%SZ") | strftime("%Y")'
"2023"

strptime는 string parse time, strftime은 string from time이다. 값에서 시간타입으로 변환한 뒤에 년값만 스트링으로 출력하는 방식이다.

이러한 과정을 통해 시간값에서 bucket prefix로 사용할 값들을 모두 추출하면 된다!

# key: JQ expression
year: .time | strptime("%Y-%m-%dT%H:%M:%SZ") | strftime("%Y")
month: .time | strptime("%Y-%m-%dT%H:%M:%SZ") | strftime("%m")
day: .time | strptime("%Y-%m-%dT%H:%M:%SZ") | strftime("%d")
hour: .time | strptime("%Y-%m-%dT%H:%M:%SZ") | strftime("%H")

위 값들을 Inline parsing for JSON 설정에 키와 JQ expression 칸에 넣어주게 되면 여기서 파싱한 값들을 bucket prefix에서 사용할 수 있게 된다!

그러면, 위 파싱한 값으로 bucket prefix를 만들어보자.

!{partitionKeyFromQuery:year}/!{partitionKeyFromQuery:month}/!{partitionKeyFromQuery:day}/!{partitionKeyFromQuery:hour}/"

작성방식은 위처럼 !{partitionKeyFromQuery:${KEY}}, KEY 값을 위에서 파싱한 키값으로 넣어주게 되면 파싱된 값을 사용할 수 있게 된다.

예를 들어 '{"time": "2023-12-05T04:28:27Z"}' 값이 잘 파싱되었다면 2023/12/05/04/helloworld.gz 형식으로 파일이 잘 저장될 것이다.

마지막으로 여러줄로 변환하기

막상 여러줄로 변환하는 방법은 쉽다...

위에서 언급한 과정들을 다 적용하고 여러줄로 변환하려면, 가장 중요한 New line delimiter 기능을 활성화 시키면 한줄로 있던 JSON들이 여러줄로 저장되게 된다.

로그 검색도 잘 되는 것을 확인할 수 있었다!!!

마무리

Amazon Kinesis Data Firehose, Amazon Athena들과 매우 친해진 것 같다. 나랑은 관련없는 보라색 친구들인줄 알았는데 생각보다 재밌고 힘들었다.

사실 위 과정은 축약버전이고 AWS CloudFormation을 통해 구축해야했기 때문에 문서를 찾는 과정과, 트라이&에러 과정이 매우매우 힘들었다.

AWS CloudFormation 내용은 따로 정리해서 올려야 할 것 같다.

문제를 해결하는 와중에 한줄기 빛 같은 블로그가 있어서 남겨둔다. 없었다면 몇 배는 힘들었을 것 같다... 블로그 작성자 분에게 매우 감사하게 생각한다.

[AWS] Kinesis Data Data Firehose Dynamic Partitioning