[MWAA] GlueJobOperator 트러블슈팅기 (with. v2.2.2 -> v2.5.1 마이그레이션)
MWAA 를 2.2.2 에서 2.5.1로 업그레이드하는 중에, 기존의 GlueJobOperator 사용 시 기존 구성된 값을 모두 초기화 시키는 이슈 발생했다.
이슈
MWAA를 2.5.1로 업그레이드하고 나서 stage 환경에서 테스트를 진행하는데.... 이전에 구성해두었던 glue job config가 완전히 초기화되었다. 심지어 spark 로 설정해둔 것들이 python으로 변경되면서 정상적으로 실행되던 코드들이 아예 실행되지 않았다.
의심
- boto3 버전의 문제
- amazon providers 버전 문제 (기존 3.x.x -> 7.x.x)
원인 파악
우선, boto3 버전은 많이 변경되지 않았기 때문에 (해봤자 버그 픽스 정도.. ), 2번 amazon providers 의 문제일 거라 판단했다.
MWAA(v2.5.1)에서 제시하는 amazon providers 7.1.0 버전과 기존 사용 중이던 3.1.1 버전의 GlueJobOperator 코드를 비교했다.
GlueJobOperator 가 GlueJobHook으로 구현된 것을 소스 코드를 통해 파악하고, GlueJobHook 코드를 버전별로 비교했다.(아래 사진)
코드 분석
3.1.1에서는 get_or_create_glue_job을 실행
7.1.0에서는 create_or_update_glue_job을 실행
7.4.0에서는 update_config 값에 따른 조건문으로 create_or_update_glue_job을 실행하거나 get_or_create_glue_job을 실행한다.
그렇다.. 원래 get_or_create_glue_job(v3.1.1) 으로 실행되던 코드가 create_or_update_glue_job(v7.1.0)을 호출하면서 값이 몽땅 초기화 되버린 것이다. ㅜㅜ
그래도, 혹시 모르니 amazon providers 버전을 변경해보자.. !
MWAA를 배포할 때, amazon providers 버전을 바꿔서 적용할 수 있지 않을까라는 생각이 들어, local runner 에서 이미지를 빌드하여 test-requirements
를 진행하였다. (MWAA 서비스에 바로 적용할 경우, 적용되는 데 시간이 소요되어 local runner을 자주 사용한다.)
다른 버전도 열심히 시도해봤지만, (다운그레이드하기엔 추후 메이저 버전과 차이가 많이 나면 힘들 것 같고,) 심지어 애초에 다운그레이드한 버전도 conflict 로 받아지지 않았다.
왜냐면, MWAA Version(AWS 공식문서) 문서의 constraints-2.5.1 에 명시된 것처럼, 우리의 amazon-providers 버전이 7.1.0으로 고정되어 있기 때문이다.
해결책
기존 파이프라인을 깨뜨리지 않고, aws providers 7.1.0 버전을 적용할 방법을 생각했다.
(A) 기존 코드의 GlueJobOperator 가 사용되는 곳마다 glue config 값을 설정하여 적용한다.
(B) 7.1.0 버전의 코드를 상속 받아 수정한다.
A의 경우 기존 코드를 수정해야하는 단점과 glue 가 실행되는 모든 job 의 config 값을 코드에 박아넣어야한다는 단점이 있지만, 라이브러리 버전이 업그레이드 되어도 에러가 발생하지 않을 것으로 예상했다.
B의 경우 기존 라이브러리를 수정하기 때문에 버그의 위험이 있지만, 기존 코드를 수정하지 않아서 유지보수가 쉬울 것으로 판단했다.
MWAA 버전을 업그레이드하지 않는 이상, aws provider의 버전이 변경될 일이 없으므로 기존 코드 수정이 많이 필요하지 않는 B를 택하기로했다.
아래는 수정한 소스 코드이다.
class CustomGlueJobOperator(GlueJobOperator):
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
def execute(self, context: Context):
"""
# GlueJobHook -> CustomGlueJobHook 변경
Executes AWS Glue Job from Airflow
:return: the id of the current glue job.
"""
....
glue_job = CustomGlueJobHook(
job_name=self.job_name,
desc=self.job_desc,
concurrent_run_limit=self.concurrent_run_limit,
script_location=s3_script_location,
retry_limit=self.retry_limit,
num_of_dpus=self.num_of_dpus,
aws_conn_id=self.aws_conn_id,
region_name=self.region_name,
s3_bucket=self.s3_bucket,
iam_role_name=self.iam_role_name,
create_job_kwargs=self.create_job_kwargs,
)
self.log.info(
"Initializing AWS Glue Job: %s. Wait for completion: %s",
self.job_name,
self.wait_for_completion,
)
glue_job_run = glue_job.initialize_job(self.script_args, self.run_job_kwargs)
if self.wait_for_completion:
glue_job_run = glue_job.job_completion(self.job_name, glue_job_run["JobRunId"], self.verbose)
self.log.info(
"AWS Glue Job: %s status: %s. Run Id: %s",
self.job_name,
glue_job_run["JobRunState"],
glue_job_run["JobRunId"],
)
else:
self.log.info("AWS Glue Job: %s. Run Id: %s", self.job_name, glue_job_run["JobRunId"])
return glue_job_run["JobRunId"]
class CustomGlueJobHook(GlueJobHook):
"""
apache-airflow-providers-amazon 7.4.0 상속
* issue: glue script 값을 모두 초기화 시키는 이슈
# TODO: MWAA 2.5.1 이상 버전업 시 operator 제거 필요
"""
def __init__(
self,
update_config: bool = False,
*args,
**kwargs
):
super().__init__(*args, **kwargs)
self.update_config = update_config
def initialize_job(
self,
script_arguments: dict | None = None,
run_kwargs: dict | None = None,
) -> dict[str, str]:
"""
Initializes connection with AWS Glue to run job.
.. seealso::
- :external+boto3:py:meth:`Glue.Client.start_job_run`
"""
script_arguments = script_arguments or {}
run_kwargs = run_kwargs or {}
try:
if self.update_config:
job_name = self.create_or_update_glue_job()
else:
job_name = self.get_or_create_glue_job()
return self.conn.start_job_run(JobName=job_name, Arguments=script_arguments, **run_kwargs)
except Exception as general_error:
self.log.error("Failed to run aws glue job, error: %s", general_error)
raise
def get_or_create_glue_job(self) -> str | None:
"""
Get (or creates) and returns the Job name.
.. seealso::
- :external+boto3:py:meth:`Glue.Client.create_job`
:return:Name of the Job
"""
if self.has_job(self.job_name):
return self.job_name
config = self.create_glue_job_config()
self.log.info("Creating job: %s", self.job_name)
self.conn.create_job(**config)
return self.job_name
이제, glue job task를 실행시켜도 config 값이 날아가지 않는다.
반성
- 라이브러리 메이저 버전업은 당장당장 해주자... ( 그런데 amazon providers 릴리즈 진도가 너무 빨라요 선생님 )