💫 개요
- Glue Job 사용, 데이터 Transform 완료 후 Redshift에 Load
- Connection을 사용해서 특정 VPC, subnet 안에서 실행되도록 설정
- Schedule 관리는 MWAA(Managed Airflow)의 GlueJobOperator을 사용
👀 이슈
An error occurred while calling o436.pyWriteDynamicFrame. File already exists:s3://<bucket>/folder1/sample.csv
실행하다가 이런 에러가 반복적으로 발생했다. 물론, retry도 설정하지 않았다.
▶ 세부 설정
- fail 시 retry 지정하지 않음.
- glueContext.write_dynamic_frame.from_options 메서드를 사용하여 redshift에 적재
- redshift_tmp_dir 값으로 지정한 경로에 저장할 데이터를 csv 형태로 적재한 다음, redshift copy command를 사용하여 적재하는 메서드이다.
- 따라서, 이 메서드로 인해 지정한 redshift_tmp_dir에 csv 파일이 떨어진다.
- 코드 내에 temp 디렉토리에 write하는 코드 존재하지 않음.
Trouble Shooting
처음엔 내가 작성한 코드 상에 redshift_tmp_dir path에 저장하는 로직이 있는지 확인하였다. -> 없다ㅎ..
retry도 설정하지 않았다.
우선 "File already exists in glue" 를 검색하였다.
첨부 링크에서 이 에러가 실제 원인이 아니며, 로그를 자세히 보면 hidden reason이 있는 것을 알게되었다. (stackoverflow 링크)
CloudWatch 로그 확인
마스킹한 것처럼, ParquetDecodingException 으로 에러가 발생하였다. 또한, 아래 사진과 같이 그 아래에는 ClassCastException도 있었다.
ParquetDecodingException
위 링크 글을 아직 자세히 이해하지 못했다. 하지만, 이해한 부분만 간단히 정리하면,
- Spark 2.0 부터 Vectorized Parquet file reader 기능이 추가되었다.
- 한 번에 하나의 row를 읽어 디코딩하는 대신, Vectorized reader는 배치로 columnar 포맷의 여러 rows를 가져와 컬럼별로 일괄 처리한다. ( 아래는 이해한 부분을 간략히 사진으로 나타내어보았다.ㅎ 틀렸다면 태클 부탁드립니다!!!!!!!!!!1111111 )
2. 데이터브릭스에서 측정했을 때 Vecorized reader 방식이 non-vectorized 보다 9배정도 빨랐다.
나는 write_dynamic_frame.from_options 부분에서 에러가 발생하였기 때문에, 당연히 write할 때 발생하는 이슈인줄 알았다.
하지만, 위의 Parquet File 구조 문서에서 얻은 짧은 지식으로.. File을 read할 때 Decoding 에러가 발생하는 것을 알 수 있었다. 아래 사진은 조금 전 분석했던 CW로그이다. 빨간 박스 부분을 보면, InternalParquetRecordReader 즉, 파일을 읽을 때 에러가 발생한 것을 알 수 있다.
그렇다면, 왜 마지막 부분인 write 때 에러가 발생했는가?
내 코드 중간에는 Spark(rdd, df) 실행 계획에서 action을 일으키는 로직이 없었다. 따라서, 마지막 부분인 write를 실행할 때 action이 발생하면서 코드가 실행.
그리고, ClassCastException 로그 부분에서 long type에 무언가 문제가 있음을 알게되었다.
5252... 원인을 알아버렸잔하 ?! 하지만 몰랐다. 이게 삽질의 시작이라는 것을 .. ㅎ
Try 1: Long Type의 Null을 모두 0으로 치환
- (idea) Long Type으로 스키마를 지정했는데, 하필이면 불러온 데이터가 모두 null이어서 에러가 발생하는 것이 아닐까라는 추측
- (action) spark df when과 withColumn을 사용하여 Long type 컬럼의 null을 0으로 치환시킨 new column을 생성
- (결과): 어림도 없었다.
Try 2: spark.sql.parquet.writeLegacyFormat 설정
- (idea) Hive와 Parquet 에서 데이터를 다루는 형식의 차이로 인해 발생한 것이다. ( 참고한 링크 )
- (action) --conf "spark.sql.parquet.writeLegacyFormat=true" 로 변경
- (결과) 어림도 없다.
Try 3: 지정한 Parquet schema 확인
- (idea) 이쯤되니까, 휴먼에러 냄새가 살짝 나기 시작했다.. 혹시 내가 bulk해서 저장한 스키마와 read할 때 저장한 스키마가 다른 것이 아닌가?
- (action) 저장한 스키마와 불러올 때 지정한 스키마를 비교한다.
- (결과) 어림도 없다. 동일하다.
Try 4: spark.sql.hive.convertMetastoreParquet 설정
- (idea) Hive 메타스토어 Parquet 테이블 변환 문서를 읽고... 하이브 메타스토어를 설정한 거에서 읽어오는 문제 때문인가? config 값을 바꿔봐야하나? 하고 설정 ( 참고한 링크 )
- (action) --conf "spark.sql.hive.convertMetastoreParquet=false" 설정
- (결과): 어림도 없다.
Try 5: 드디어.. !!
이때 갑자기 뇌리를 스치는 생각이 있었다. 작업일 전일에, parquet 파일 저장할 때 스키마의 데이터타입을 하나 변경한 것. (원본 데이터에서 sameField(LongType), Col#1 > sameField(StringType) 와 같이 같은 name이고, 같은 값을 저장하는 필드지만 depth에 따라 데이터타입이 달라 통일해주는 작업을 전일에 진행했었다. ....^_ㅜ
이제 모두 이해가 되었다.
read하려는 target s3 path 의 parquet 파일에 데이터 타입이 아래 사진처럼 여러개였던 것이다.
하지만, 나는 불러올 때 Col#1 > sameField를 LongType으로 지정해서 Read 했고, Vectorized Parquet file reader 방식으로 배치로 파일을 read하다가 타입 불일치로 ClassCastException를 발생했던 것이었다... 하하.
실제로, 타입을 통일시켜 주고 코드를 실행하니 매우 잘 되었다.
반성 및 회고
이날, 정말 2~3시간 동안 트러블 슈팅해서 정신이 혼미했었다. 그래도, 이 날을 기점 삼아 얻게된 지식이나 교훈을 정리하자면,
- Spark DF 데이터 타입은 매우 민감하다. 수정할 때 주의하자.
- Paruqet File의 read 방식 (Vectorized)
- 단편적으로 보여주는 메인 에러 로그만 믿지말자... 히든 에러가 있을 수 있다.
- spark 세부 config 설정: 이 때까지 딥하게 건드려 본적이 없었지만, 오늘을 기점으로 여러 config 값을 수정하고, 웹 UI에서 적용되었는지 확인해보는 경험이 생기게 되었다.

'Spark' 카테고리의 다른 글
[Spark] 실행 환경 (클러스터, 로컬) 설정 (0) | 2022.02.25 |
---|