Spark

[Glue Job] File already exists

sol-hee 2023. 7. 2. 15:52

💫 개요

  • Glue Job 사용, 데이터 Transform 완료 후 Redshift에 Load
  • Connection을 사용해서 특정 VPC, subnet 안에서 실행되도록 설정
  • Schedule 관리는 MWAA(Managed Airflow)의 GlueJobOperator을 사용

👀 이슈

An error occurred while calling o436.pyWriteDynamicFrame. File already exists:s3://<bucket>/folder1/sample.csv

실행하다가 이런 에러가 반복적으로 발생했다. 물론, retry도 설정하지 않았다.

 

▶ 세부 설정

  • fail 시 retry 지정하지 않음.
  • glueContext.write_dynamic_frame.from_options 메서드를 사용하여 redshift에 적재
    • redshift_tmp_dir 값으로 지정한 경로에 저장할 데이터를 csv 형태로 적재한 다음, redshift copy command를 사용하여 적재하는 메서드이다.
    • 따라서, 이 메서드로 인해 지정한 redshift_tmp_dir에 csv 파일이 떨어진다.
  • 코드 내에 temp 디렉토리에 write하는 코드 존재하지 않음.

Trouble Shooting

처음엔 내가 작성한 코드 상에 redshift_tmp_dir path에 저장하는 로직이 있는지 확인하였다. -> 없다ㅎ..

retry도 설정하지 않았다.

심각하게 무언가 잘못된 거 같다. 커신이 왔다 갔나..

우선 "File already exists in glue" 를 검색하였다.

첨부 링크에서 이 에러가 실제 원인이 아니며, 로그를 자세히 보면 hidden reason이 있는 것을 알게되었다. (stackoverflow 링크)

 

CloudWatch 로그 확인

마스킹한 것처럼, ParquetDecodingException 으로 에러가 발생하였다. 또한, 아래 사진과 같이 그 아래에는 ClassCastException도 있었다.

ParquetDecodingException

Parquet 파일 구조(Deep Dive)

위 링크 글을 아직 자세히 이해하지 못했다. 하지만, 이해한 부분만 간단히 정리하면,

  1.  Spark 2.0 부터 Vectorized Parquet file reader 기능이 추가되었다.
    • 한 번에 하나의 row를 읽어 디코딩하는 대신, Vectorized reader는 배치로 columnar 포맷의 여러 rows를 가져와 컬럼별로 일괄 처리한다. ( 아래는 이해한 부분을 간략히 사진으로 나타내어보았다.ㅎ 틀렸다면 태클 부탁드립니다!!!!!!!!!!1111111 )

columnar 포맷에러 여러 row를 배치로 읽음.
그리고 파란색처럼 배치 내 데이터를 컬럼별로 일괄 처리

2. 데이터브릭스에서 측정했을 때 Vecorized reader 방식이 non-vectorized 보다 9배정도 빨랐다.

 

나는 write_dynamic_frame.from_options 부분에서 에러가 발생하였기 때문에, 당연히 write할 때 발생하는 이슈인줄 알았다.

하지만, 위의 Parquet File 구조 문서에서 얻은 짧은 지식으로.. File을 read할 때 Decoding 에러가 발생하는 것을 알 수 있었다. 아래 사진은 조금 전 분석했던 CW로그이다. 빨간 박스 부분을 보면,  InternalParquetRecordReader 즉, 파일을 읽을 때 에러가 발생한 것을 알 수 있다. 

그렇다면, 왜 마지막 부분인 write 때 에러가 발생했는가?

내 코드 중간에는 Spark(rdd, df) 실행 계획에서 action을 일으키는 로직이 없었다. 따라서, 마지막 부분인 write를 실행할 때 action이 발생하면서 코드가 실행.

그리고, ClassCastException 로그 부분에서 long type에 무언가 문제가 있음을 알게되었다.

이마 탁-

5252... 원인을 알아버렸잔하 ?! 하지만 몰랐다. 이게 삽질의 시작이라는 것을 .. ㅎ

 

Try 1: Long Type의 Null을 모두 0으로 치환

  • (idea) Long Type으로 스키마를 지정했는데, 하필이면 불러온 데이터가 모두 null이어서 에러가 발생하는 것이 아닐까라는 추측
  • (action) spark df when과 withColumn을 사용하여 Long type 컬럼의 null을 0으로 치환시킨 new column을 생성 
  • (결과): 어림도 없었다.

Try 2: spark.sql.parquet.writeLegacyFormat 설정

  • (idea) Hive와 Parquet 에서 데이터를 다루는 형식의 차이로 인해 발생한 것이다. ( 참고한 링크 )
  • (action) --conf "spark.sql.parquet.writeLegacyFormat=true" 로 변경
  • (결과) 어림도 없다.

Try 3: 지정한 Parquet schema 확인

  • (idea) 이쯤되니까, 휴먼에러 냄새가 살짝 나기 시작했다.. 혹시 내가 bulk해서 저장한 스키마와 read할 때 저장한 스키마가 다른 것이 아닌가?
  • (action) 저장한 스키마와 불러올 때 지정한 스키마를 비교한다.
  • (결과) 어림도 없다. 동일하다.

Try 4: spark.sql.hive.convertMetastoreParquet 설정

  • (idea) Hive 메타스토어 Parquet 테이블 변환 문서를 읽고... 하이브 메타스토어를 설정한 거에서 읽어오는 문제 때문인가? config 값을 바꿔봐야하나? 하고 설정 ( 참고한 링크 )
  • (action) --conf "spark.sql.hive.convertMetastoreParquet=false" 설정
  • (결과): 어림도 없다.

하지만, 지금 울고 있죠?


Try 5: 드디어.. !!

이때 갑자기 뇌리를 스치는 생각이 있었다. 작업일 전일에, parquet 파일 저장할 때 스키마의 데이터타입을 하나 변경한 것. (원본 데이터에서 sameField(LongType), Col#1 > sameField(StringType) 와 같이 같은 name이고, 같은 값을 저장하는 필드지만 depth에 따라 데이터타입이 달라 통일해주는 작업을 전일에 진행했었다. ....^_ㅜ

 

이제 모두 이해가 되었다.

read하려는 target s3 path 의 parquet 파일에 데이터 타입이 아래 사진처럼 여러개였던 것이다.

작업 전, depth가 다를 경우 sameField의 데이터 타입도 다름
작업 후, depth에 관계 없이 LongType을 가지도록 변경(두 필드 모두 같은 값을 나타냄)

하지만, 나는 불러올 때 Col#1 > sameField를 LongType으로 지정해서 Read 했고, Vectorized Parquet file reader 방식으로 배치로 파일을 read하다가 타입 불일치로 ClassCastException를 발생했던 것이었다... 하하.

실제로,  타입을 통일시켜 주고 코드를 실행하니 매우 잘 되었다.

 


반성 및 회고

이날, 정말 2~3시간 동안 트러블 슈팅해서 정신이 혼미했었다. 그래도, 이 날을 기점 삼아 얻게된 지식이나 교훈을 정리하자면,

  1. Spark DF 데이터 타입은 매우 민감하다. 수정할 때 주의하자.
  2. Paruqet File의 read 방식 (Vectorized)
  3. 단편적으로 보여주는 메인 에러 로그만 믿지말자... 히든 에러가 있을 수 있다. 
  4. spark 세부 config 설정: 이 때까지 딥하게 건드려 본적이 없었지만, 오늘을 기점으로 여러 config 값을 수정하고, 웹 UI에서 적용되었는지 확인해보는 경험이 생기게 되었다.

 

'Spark' 카테고리의 다른 글

[Spark] 실행 환경 (클러스터, 로컬) 설정  (0) 2022.02.25