杭州醫(yī)療器械網(wǎng)站制作app開(kāi)發(fā)價(jià)格表
01
背景
Apache DolphinScheduler 是一個(gè)分布式的可視化 DAG 工作流任務(wù)調(diào)度開(kāi)源系統(tǒng),具有簡(jiǎn)單易用、高可靠、高擴(kuò)展性、?持豐富的使用場(chǎng)景、提供多租戶(hù)模式等特性。適用于企業(yè)級(jí)場(chǎng)景,提供了一個(gè)可視化操作任務(wù)、工作流和全生命周期數(shù)據(jù)處理過(guò)程的解決方案。
隨著企業(yè)規(guī)模的擴(kuò)大,業(yè)務(wù)數(shù)據(jù)的激增,以及 Apache DolphinScheduler 產(chǎn)品的完善、社區(qū)的日益火爆,越來(lái)越多的 EMR 客戶(hù),使用其進(jìn)行集群任務(wù)的日常調(diào)度。相關(guān)安裝、集成實(shí)踐,本文不做詳述,可以參考博客《使用 DolphinScheduler 進(jìn)行 EMR 任務(wù)調(diào)度》。
使用 DolphinScheduler 進(jìn)行 EMR 任務(wù)調(diào)度
https://aws.amazon.com/cn/blogs/china/emr-task-scheduling-with-dolphinscheduler/
Amazon EMR Serverless 是 EMR?中的無(wú)服務(wù)器選項(xiàng),數(shù)據(jù)分析師和工程師可借助其輕松運(yùn)行開(kāi)源大數(shù)據(jù)分析框架(例如 Apache Spark 和 Apache Hive ),而無(wú)需配置、管理和擴(kuò)展集群或服務(wù)器,使得數(shù)據(jù)工程師和分析師能夠進(jìn)一步聚焦業(yè)務(wù)價(jià)值的創(chuàng)造,最終實(shí)現(xiàn)降本增效。因此,越來(lái)越多的客戶(hù),開(kāi)始嘗試從 EMR on EC2 切換到 EMR Serverless,或者說(shuō)從 DolphinScheduler + EMR 切換到 DolphinScheduler + EMR Serverless。
但在實(shí)踐過(guò)程中,如下問(wèn)題往往成為了攔路虎:
異步執(zhí)行:在使用 EMR on Amazon EC2 + DolphinScheduler 時(shí),很多客戶(hù)選擇 beeline、PyHive 或者 Spark-Submit 的方式,讓任務(wù)提交后同步執(zhí)行,以便調(diào)度引擎的正常工作與進(jìn)度的監(jiān)控。但 EMR?Serverless 僅支持任務(wù)提交后的異步執(zhí)行,這對(duì)于使用 DolphinScheduler 的客戶(hù)來(lái)講是很難接受的。
日志獲取:切換到 EMR Serverless 后,獲取任務(wù)日志的方式也發(fā)生了變化。由于任務(wù)的異步執(zhí)行,導(dǎo)致在 DolphinScheduler 提交任務(wù)后,往往需要到 EMR Serverless 的 Job 列表頁(yè)面查看日志,影響工作效率。
混合調(diào)度:很多客戶(hù)經(jīng)過(guò)實(shí)際評(píng)估后,往往需要將一部分任務(wù)放到 EMR on EC2 上運(yùn)行,將另一部分任務(wù)放到 EMR Serverless,以達(dá)到最佳的性?xún)r(jià)比。但兩類(lèi)群集的任務(wù)執(zhí)行與監(jiān)控方式區(qū)別較大,將兩種任務(wù)放到調(diào)度系統(tǒng)中混合調(diào)度的維護(hù)成本比較高。
任務(wù)形態(tài):客戶(hù)實(shí)際案例中,有的任務(wù)是執(zhí)行一段 SQL 語(yǔ)句,有的任務(wù)是執(zhí)行一個(gè) Spark 腳本文件。但在 EMR Serverless 中默認(rèn)僅支持提交腳本文件,無(wú)形中又給客戶(hù)多設(shè)置了一道使用障礙。
02
解決方案
整體介紹與示例
本文將以 Python 語(yǔ)言提交 Spark 任務(wù)為例,探索針對(duì)上述問(wèn)題的解決方案。如下圖所示,通過(guò)封裝一個(gè) Python 類(lèi)庫(kù),將 EMR On EC2 與 EMR Serverless 兩種形態(tài)下的 Spark 任務(wù)提交、執(zhí)行與監(jiān)控細(xì)節(jié)進(jìn)行抽象,面向 DolphinScheduler 提供統(tǒng)一的接口來(lái)進(jìn)行調(diào)用,簡(jiǎn)化用戶(hù)使用 EMR Serverless 的門(mén)檻。
我們先通過(guò)代碼演示如何使用封裝的 Python 類(lèi)庫(kù)提交 Spark 任務(wù),代碼示例如下。其中 emr_common.Session 是抽象出來(lái)的 Python 類(lèi)。
from emr_common import Session
#jobtype=0時(shí),表示 EMR On EC2。可以手動(dòng)設(shè)置集群 ID, 若不設(shè)置則默認(rèn)會(huì)獲取活動(dòng)集群中的第 1 個(gè)。
session_emr=Session(jobtype=0)
#提交 SQL 語(yǔ)句,執(zhí)行過(guò)程中,會(huì)持續(xù)打印狀態(tài)并在任務(wù)完成時(shí),打印日志
session_emr.submit_sql("sql-task","SELECT * FROM xxtable LIMIT 10"
#提交腳本文件,spark-test.py 是一個(gè) pysark 或者 pyspark.sql 的程序腳本,執(zhí)行過(guò)程中,會(huì)持續(xù)打印狀態(tài)并在任務(wù)完成時(shí),打印日志
session_emr.submit_file("script-task","spark-test.py")#jobtype=1 時(shí),表示 EMR Serverless??梢允謩?dòng)設(shè)置應(yīng)用 ID,若不設(shè)置則默認(rèn)會(huì)獲取 spark 應(yīng)用程序中的第 1 個(gè)。
session_emrserverless=Session(jobtype=1,logs_s3_path='s3://xxx/xx')
#提交 SQL 語(yǔ)句,執(zhí)行過(guò)程中,會(huì)持續(xù)打印狀態(tài)并在任務(wù)完成時(shí),打印日志
session_emrserverless.submit_sql("sql-task","SELECT * FROM xxtable LIMIT 10")
#提交腳本文件,spark-test.py 是一個(gè) pysark 或者 pyspark.sql 的程序腳本,執(zhí)行過(guò)程中,會(huì)持續(xù)打印狀態(tài)并在任務(wù)完成時(shí),打印日志
session_emrserverless.submit_file("script-task","spark-test.py")
原理 & 細(xì)節(jié)闡述
整體的類(lèi)結(jié)構(gòu)設(shè)計(jì),采用的是面向?qū)ο蟮拇砟J?。面向客?hù)使用的類(lèi)是 Session 類(lèi),在 Session 類(lèi)的構(gòu)造函數(shù)中,會(huì)根據(jù)傳入 jobtype 字段值來(lái)進(jìn)一步構(gòu)建內(nèi)部類(lèi):EMRSession 或者 EMRServerlessSession。而真正的 Spark 任務(wù)提交、監(jiān)控、日志查詢(xún)邏輯則是封裝在 EMRSession 或者 EMRServerlessSession 的對(duì)應(yīng)方法中。
EMRSession 的實(shí)現(xiàn)邏輯
當(dāng)調(diào)用 submit_sql(jobname,sql) 方法來(lái)提交任務(wù),則會(huì)先讀取 sql_template.py 文件,使用參數(shù) sql 來(lái)替換文件中的${query}占位符,并生成一個(gè)臨時(shí)文件上傳至?Amazon S3;若是通過(guò) submit_file(jobname,file) 方法來(lái)提交任務(wù),則需要提前將腳本文件通過(guò) DolphinScheduler 的資源中心進(jìn)行上傳,DolphinScheduler 后臺(tái)會(huì)將文件上傳至 S3 的指定目錄。
當(dāng)腳本文件上傳至 S3 后,再通過(guò) EMR Steps 中的 add_job_flow_steps 命令來(lái)遠(yuǎn)程提交 Spark 任務(wù)。這里有兩點(diǎn)需要指出:若設(shè)置了 Python 虛擬環(huán)境,則在提交 Spark 任務(wù)時(shí),會(huì)在 dd_job_flow_steps 命令的 spark-submit 配置部分設(shè)置相關(guān)參數(shù)來(lái)使用這個(gè)虛擬環(huán)境;同時(shí)也會(huì)使用默認(rèn)的或者用戶(hù)自定義的 spark_conf 參數(shù)來(lái)設(shè)置 spark 的 driver、executor 配置參數(shù)。
在任務(wù)執(zhí)行過(guò)程中,會(huì)每隔 10 秒獲取一次任務(wù)狀態(tài),并打印至控制臺(tái)。在失敗狀態(tài)時(shí)失敗時(shí),會(huì)到約定的 S3 路徑上獲取 Driver 的 stderr 與 stdout 日志文件。
EMRServerless 的實(shí)現(xiàn)邏輯
原理與 EMRSession 大同小異,只是各步驟具體的接口調(diào)用不同。
若調(diào)用 submit_sql(jobname,sql) 方法來(lái)提交任務(wù),則會(huì)先讀取 sql_template.py 文件,使用參數(shù) sql 來(lái)替換文件中的${query}占位符,并生成一個(gè)臨時(shí)文件上傳至 S3;若是通過(guò) submit_file(jobname,file) 方法來(lái)提交任務(wù),則需要提前將腳本文件通過(guò) DolphinScheduler 的資源中心進(jìn)行上傳,DolphinScheduler 后臺(tái)會(huì)將文件上傳至 S3 的指定目錄。
當(dāng)腳本文件上傳至 S3 后,再通過(guò) start_job_run 命令來(lái)遠(yuǎn)程提交 Spark 任務(wù)。這里有兩點(diǎn)需要指出:若設(shè)置了 Python 虛擬環(huán)境,則在提交 Spark 任務(wù)時(shí),會(huì)在 start_job_run 中 spark-submit 配置中設(shè)置相關(guān)參數(shù)來(lái)使用這個(gè)虛擬環(huán)境;同時(shí)也會(huì)使用默認(rèn)的或者用戶(hù)自定義的 spark_conf 參數(shù)來(lái)設(shè)置 Spark 的 driver、executor 配置參數(shù)。
在任務(wù)執(zhí)行過(guò)程中,會(huì)每隔 10 秒獲取一次任務(wù)狀態(tài),并打印至控制臺(tái)。在失敗狀態(tài)時(shí)失敗時(shí),會(huì)到約定的 S3 路徑上獲取 Driver 的 stderr 與 stdout 日志文件。
接下來(lái),我們通過(guò)時(shí)序圖來(lái)表示 submit_sql(jobname,sql) 的調(diào)用邏輯,如下圖所示:
完整代碼
下面將展示完整的代碼。其中,Session 類(lèi)構(gòu)造函數(shù)的參數(shù),大多設(shè)置了默認(rèn)值,以減少調(diào)用時(shí)的反復(fù)設(shè)置。在實(shí)際使用時(shí),需根據(jù)真實(shí)場(chǎng)景來(lái)替換這些參數(shù)的默認(rèn)值。接下來(lái),將逐一解釋 Session 類(lèi)構(gòu)造函數(shù)的每個(gè)參數(shù)。
application_id:若是 serverless,則設(shè)置應(yīng)用程序的 ID; 若是 emr on ec2,則設(shè)置集群 ID;若不設(shè)置,則自動(dòng)其第一個(gè) active 的 app 或者 cluster 的 ID
jobtype:0: EMR on EC2;1: serverless;默認(rèn)值為 0
job_role:EMR On EC2 的集群角色或者 EMRServerless 的 Job 角色??紤]到兩者都需要 S3、Glue 等服務(wù)的訪問(wèn)權(quán)限,可以統(tǒng)一使用一個(gè)角色
dolphin_s3_path:DolphinScheduler 中配置的用于存儲(chǔ)文件的 S3 路徑。在 DolphinScheduler 中調(diào)度的 Python 任務(wù)代碼中,可以直接通過(guò)相對(duì)路徑引用其它 python 文件
logs_s3_path:對(duì)于 EMR on EC2 來(lái)說(shuō),就是集群級(jí)別的保存日志的 S3 路徑;對(duì)于 EMR Serverless 來(lái)講是 Job 級(jí)別的保存日志的 S3 路徑,但通??梢越y(tǒng)一使用一個(gè)路徑
tempfile_s3_path:類(lèi)庫(kù)中會(huì)創(chuàng)建一些臨時(shí)文件并保存在 S3 上
python_venv_s3_path:有的客戶(hù)在編寫(xiě) pyspark 時(shí),還會(huì)引用一些其它的 Python 庫(kù)。這時(shí)就需要準(zhǔn)備一個(gè) Python 虛擬環(huán)境,提前預(yù)置各類(lèi)所需要的 Python 第三方庫(kù),并將虛擬環(huán)境打包并上傳至 S3
spark_conf:這將會(huì)是一個(gè)常用的參數(shù),用于設(shè)置 spark 的 driver 與 executor 的相關(guān)參數(shù)
import gzip
import os
from string import Template
import time
import boto3
from datetime import datetime
class EMRResult:def __init__(self,job_run_id,status):self.job_run_id=job_run_idself.status=status
class Session:def __init__(self,application_id='', #若是 serverless,則設(shè)置 應(yīng)用的 ID; 若是emr on ec2,則設(shè)置集群 ID;若不設(shè)置,則自動(dòng)其第一個(gè)active的 app 或者clusterjobtype=0, #0:EMR on EC2; 1: serverless ?job_role='arn:aws:iam::******:role/AmazonEMR-ExecutionRole-1694412227712',dolphin_s3_path='s3://*****/dolphinscheduler/ec2-user/resources/',logs_s3_path='s3://aws-logs-****-ap-southeast-1/elasticmapreduce/',tempfile_s3_path='s3://****/tmp/',python_venv_s3_path='s3://****/python/pyspark_venv.tar.gz',spark_conf='--conf spark.executor.cores=4 --conf spark.executor.memory=16g --conf spark.driver.cores=4 --conf spark.driver.memory=16g'):self.jobtype=jobtypeself.application_id = application_idself.region='ap-southeast-1'self.job_role = job_roleself.dolphin_s3_path = dolphin_s3_pathself.logs_s3_path=logs_s3_pathself.tempfile_s3_path=tempfile_s3_pathself.spark_conf=spark_confself.python_venv_s3_path=python_venv_s3_pathself.client = boto3.client('emr', region_name=self.region)self.client_serverless = boto3.client('emr-serverless', region_name=self.region)#如果未設(shè)置application_id,則查詢(xún)當(dāng)前第一個(gè) active 的 EMR 集群/或者 EMR Serverless 應(yīng)用的 IDif self.application_id == '':self.application_id=self.getDefaultApplicaitonId()if jobtype == 0 : ?#EMR on EC2self.session=EmrSession(region=self.region,application_id=self.application_id,job_role=self.job_role,dolphin_s3_path=self.dolphin_s3_path,logs_s3_path=self.logs_s3_path,tempfile_s3_path=self.tempfile_s3_path,python_venv_s3_path=self.python_venv_s3_path,spark_conf=self.spark_conf)elif jobtype ==1 : #EMR Serverlessself.session=EmrServerlessSession(region=self.region,application_id=self.application_id,job_role=self.job_role,dolphin_s3_path=self.dolphin_s3_path,logs_s3_path=self.logs_s3_path,tempfile_s3_path=self.tempfile_s3_path,python_venv_s3_path=self.python_venv_s3_path,spark_conf=self.spark_conf)else: #Pyhive ,used on-premiseself.session=PyHiveSession(host_ip="172.31.25.171",port=10000)self.initTemplateSQLFile()def submit_sql(self,jobname, sql):result= self.session.submit_sql(jobname,sql)if result.status == "FAILED" :raise Exception("ERROR:任務(wù)失敗")def submit_file(self,jobname, filename):result= ?self.session.submit_file(jobname,filename)if result.status == "FAILED":raise Exception("ERROR:任務(wù)失敗")def getDefaultApplicaitonId(self):if self.jobtype == 0: #EMR on EC2emr_clusters = self.client.list_clusters(ClusterStates=['STARTING', 'BOOTSTRAPPING', 'RUNNING', 'WAITING'])if emr_clusters['Clusters']:app_id= emr_clusters['Clusters'][0]['Id']print(f"選擇默認(rèn)的集群(或EMR Serverless 的應(yīng)用程序)ID:{app_id}")return app_idelse:raise Exception("沒(méi)有找到活躍的EMR集群")elif self.jobtype == 1: #EMR Serverlessemr_applications = self.client_serverless.list_applications()spark_applications = [app for app in emr_applications['applications'] if app['type'] == 'Spark']if spark_applications:app_id = spark_applications[0]['id']print(f"選擇默認(rèn)的應(yīng)用ID:{app_id}")return app_idelse:raise Exception("沒(méi)有找到活躍的 EMR Serverless 應(yīng)用")def initTemplateSQLFile(self):with open('sql_template.py', 'w') as f:f.write('''
from pyspark.sql import SparkSessionspark = (SparkSession.builder.enableHiveSupport().appName("Python Spark SQL basic example").getOrCreate()
)df = spark.sql("$query")
df.show()''')
class EmrSession:def __init__(self,region,application_id, ?#若是EMR on EC2,則設(shè)置集群 ID;若不設(shè)置,則自動(dòng)其第一個(gè)active的 app 或者clusterjob_role,dolphin_s3_path,logs_s3_path,tempfile_s3_path,python_venv_s3_path,spark_conf):self.s3_client = boto3.client("s3")self.region=regionself.client = boto3.client('emr', region_name=self.region)self.application_id = application_idself.job_role = job_roleself.dolphin_s3_path = dolphin_s3_pathself.logs_s3_path=logs_s3_pathself.tempfile_s3_path=tempfile_s3_pathself.python_venv_s3_path=python_venv_s3_pathself.spark_conf=spark_confself.client.modify_cluster(ClusterId=self.application_id,StepConcurrencyLevel=256)def submit_sql(self,jobname, sql):# temporary file for the sql parameterprint(f"RUN SQL:{sql}")self.python_venv_conf=''with open(os.path.join(os.path.dirname(os.path.abspath(__file__)), "sql_template.py")) as f:query_file = Template(f.read()).substitute(query=sql.replace('"', '\\"'))script_bucket = self.tempfile_s3_path.split('/')[2]script_key = '/'.join(self.tempfile_s3_path.split('/')[3:])current_time = datetime.now().strftime("%Y%m%d%H%M%S")script_key = script_key+"sql_template_"+current_time+".py"self.s3_client.put_object(Body=query_file, Bucket=script_bucket, Key=script_key)script_file=f"s3://{script_bucket}/{script_key}"result= self._submit_job_emr(jobname, script_file)self.s3_client.delete_object(Bucket=script_bucket, Key=script_key)return resultdef submit_file(self,jobname, filename):# temporary file for the sql parameterprint(f"Run File :{filename}")self.python_venv_conf=''if self.python_venv_s3_path and self.python_venv_s3_path != '':self.python_venv_conf = f"--conf spark.yarn.dist.archives={self.python_venv_s3_path}#environment --conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON=./environment/bin/python --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./environment/bin/python --conf spark.executorEnv.PYSPARK_PYTHON=./environment/bin/python"script_file=f"{self.dolphin_s3_path}{filename}"result= self._submit_job_emr(jobname, script_file)return resultdef _submit_job_emr(self, jobname, script_file):spark_conf_args = self.spark_conf.split()#設(shè)置虛擬環(huán)境的地址,用于支持 pyspark 以外的庫(kù)python_venv_args=[]if self.python_venv_conf and self.python_venv_conf != '':python_venv_args=self.python_venv_conf.split()jobconfig=[{'Name': f"{jobname}",'ActionOnFailure': 'CONTINUE','HadoopJarStep': {'Jar': 'command-runner.jar','Args': ['spark-submit','--deploy-mode','cluster','--master','yarn','--conf','spark.yarn.submit.waitAppCompletion=true'] + spark_conf_args + python_venv_args + [script_file]}}]response = self.client.add_job_flow_steps(JobFlowId=self.application_id,Steps=jobconfig)print(jobconfig)if response['ResponseMetadata']['HTTPStatusCode'] != 200:print('task failed:')print(response)job_run_id = response['StepIds'][0]print(f"Submit job on EMR ,job id: {job_run_id}")job_done = Falsestatus='PENDING'while not job_done:status = self.get_job_run(job_run_id)print(f"current status:{status}")job_done = status in ["SUCCESS","FAILED","CANCELLING","CANCELLED","COMPLETED"]time.sleep(10)if status == "FAILED":self.print_driver_log(job_run_id,log_type="stderr")self.print_driver_log(job_run_id,log_type="stdout")return EMRResult(job_run_id,status)def get_job_run(self, job_run_id: str) -> dict:step_status = self.client.describe_step(ClusterId=self.application_id,StepId=job_run_id)['Step']['Status']['State']return step_status.upper()def print_driver_log(self, job_run_id: str, log_type: str = "stderr") -> str:print("starting download the driver logs")s3_client = boto3.client("s3")logs_location = f"{self.logs_s3_path}{self.application_id}/steps/{job_run_id}/{log_type}.gz"logs_bucket = logs_location.split('/')[2]logs_key = '/'.join(logs_location.split('/')[3:])print(f"Fetching {log_type} from {logs_location}")try:#日志生成需要一段時(shí)間,最長(zhǎng) 100 秒for _ in range(10):try:s3_client.head_object(Bucket=logs_bucket, Key=logs_key)breakexcept Exception:print("等待日志生成中...")time.sleep(10)response = s3_client.get_object(Bucket=logs_bucket, Key=logs_key)file_content = gzip.decompress(response["Body"].read()).decode("utf-8")except s3_client.exceptions.NoSuchKey:file_content = ""print( f"等待超時(shí),請(qǐng)稍后到 EMR 集群的步驟中查看錯(cuò)誤日志或者手動(dòng)前往: {logs_location} 下載")print(file_content)class EmrServerlessSession:def __init__(self,region,application_id, #若是 serverless, 則設(shè)置 應(yīng)用的 ID;若不設(shè)置,則自動(dòng)其第一個(gè)active的 app job_role,dolphin_s3_path,logs_s3_path,tempfile_s3_path,python_venv_s3_path,spark_conf):self.s3_client = boto3.client("s3")self.region=regionself.client = boto3.client('emr-serverless', region_name=self.region)self.application_id = application_idself.job_role = job_roleself.dolphin_s3_path = dolphin_s3_pathself.logs_s3_path=logs_s3_pathself.tempfile_s3_path=tempfile_s3_pathself.python_venv_s3_path=python_venv_s3_pathself.spark_conf=spark_confdef submit_sql(self,jobname, sql): #serverless# temporary file for the sql parameterprint(f"RUN SQL:{sql}")self.python_venv_conf=''with open(os.path.join(os.path.dirname(os.path.abspath(__file__)), "sql_template.py")) as f:query_file = Template(f.read()).substitute(query=sql.replace('"', '\\"'))script_bucket = self.tempfile_s3_path.split('/')[2]script_key = '/'.join(self.tempfile_s3_path.split('/')[3:])current_time = datetime.now().strftime("%Y%m%d%H%M%S")script_key = script_key+"sql_template_"+current_time+".py"self.s3_client.put_object(Body=query_file, Bucket=script_bucket, Key=script_key)script_file=f"s3://{script_bucket}/{script_key}"result= self._submit_job_emr(jobname, script_file)#delete the temp fileself.s3_client.delete_object(Bucket=script_bucket, Key=script_key)return resultdef submit_file(self,jobname, filename): ?#serverless# temporary file for the sql parameterprint(f"RUN Script :{filename}")self.python_venv_conf=''if self.python_venv_s3_path and self.python_venv_s3_path != '':self.python_venv_conf = f"--conf spark.archives={self.python_venv_s3_path}#environment --conf spark.emr-serverless.driverEnv.PYSPARK_DRIVER_PYTHON=./environment/bin/python --conf spark.emr-serverless.driverEnv.PYSPARK_PYTHON=./environment/bin/python --conf spark.executorEnv.PYSPARK_PYTHON=./environment/bin/python"script_file=f"{self.dolphin_s3_path}{filename}"result= self._submit_job_emr(jobname, script_file)return resultdef _submit_job_emr(self, name, script_file):#serverlessjob_driver = {"sparkSubmit": {"entryPoint": f"{script_file}","sparkSubmitParameters": f"{self.spark_conf} --conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory {self.python_venv_conf}",}}print(f"job_driver:{job_driver}")response = self.client.start_job_run(applicationId=self.application_id,executionRoleArn=self.job_role,name=name,jobDriver=job_driver,configurationOverrides={"monitoringConfiguration": {"s3MonitoringConfiguration": {"logUri": self.logs_s3_path,}}},)job_run_id = response.get("jobRunId")print(f"Emr Serverless Job submitted, job id: {job_run_id}")job_done = Falsestatus="PENDING"while not job_done:status = self.get_job_run(job_run_id).get("state")print(f"current status:{status}")job_done = status in ["SUCCESS","FAILED","CANCELLING","CANCELLED",]time.sleep(10)if status == "FAILED":self.print_driver_log(job_run_id,log_type="stderr")self.print_driver_log(job_run_id,log_type="stdout")raise Exception(f"EMR Serverless job failed:{job_run_id}")return EMRResult(job_run_id,status)def get_job_run(self, job_run_id: str) -> dict:response = self.client.get_job_run(applicationId=self.application_id, jobRunId=job_run_id)return response.get("jobRun")def print_driver_log(self, job_run_id: str, log_type: str = "stderr") -> str:s3_client = boto3.client("s3")logs_location = f"{self.logs_s3_path}applications/{self.application_id}/jobs/{job_run_id}/SPARK_DRIVER/{log_type}.gz"logs_bucket = logs_location.split('/')[2]logs_key = '/'.join(logs_location.split('/')[3:])print(f"Fetching {log_type} from {logs_location}")try:response = s3_client.get_object(Bucket=logs_bucket, Key=logs_key)file_content = gzip.decompress(response["Body"].read()).decode("utf-8")except Exception:file_content = ""print(file_content)
在 DolphinScheduler 上的應(yīng)用
經(jīng)過(guò)以上類(lèi)庫(kù)抽象與封裝后,在 DolphinScheduler 上使用該類(lèi)庫(kù),可以簡(jiǎn)單且靈活的向 EMR on EC2 和 EMR Serverless 提交 Spark 任務(wù)。
首先,將上述代碼上傳至 DolphinScheduler 的資源中心,文件名為 “emr_common.py”,如下圖所示。
然后在工作流程中插入 Python節(jié)點(diǎn),按照 Demo 代碼示例,提交 Spark 任務(wù)。通過(guò) Session 的構(gòu)造函數(shù)參數(shù) jobtype 來(lái)控制,是向 EMR on EC2 提交 Spark 任務(wù),還是向 EMR Serverless 提交 Spark 任務(wù)。需要注意的是,填寫(xiě)完 Python 代碼后,為了讓節(jié)點(diǎn)中的 Python 代碼能正確地引用類(lèi)庫(kù) “emr_common.py”,一定要在節(jié)點(diǎn)的資源設(shè)置中添加 “emr_common.py”,如下兩圖所示(注:需要提前在 DolphinScheduler 的節(jié)點(diǎn)上安裝 emr_common.py 所引用的第三方 Python 庫(kù))。
當(dāng)任務(wù)執(zhí)行結(jié)束后,如果出現(xiàn)錯(cuò)誤,就可以在 DolphinScheduler 中直接查看日志,無(wú)需到 Yarn、Spark UI 或者 EMRServerless 的 Job 頁(yè)面去下載與查看日志了,如下圖所示。
03
總結(jié)
本文通過(guò)對(duì) EMR on EC2 與 EMRServerless 中 Spark 任務(wù)的提交、監(jiān)控、下載日志過(guò)程進(jìn)行抽象并封裝成 Python 類(lèi)庫(kù),極大的簡(jiǎn)化了使用 Spark 的門(mén)檻,以及從 EMR on EC2 切換至 EMRServerless 的改造成本,優(yōu)化了 EMRServerless+DolphinScheduler 的集成實(shí)踐,消除了客戶(hù)對(duì)于使用 EMRServerless 的一些疑惑以及擔(dān)憂。最終幫助客戶(hù)逐漸從集群運(yùn)維的工作負(fù)擔(dān)中解脫出來(lái),更加專(zhuān)注于應(yīng)用邏輯的開(kāi)發(fā)與業(yè)務(wù)價(jià)值的創(chuàng)造。
本篇作者
張盼富
亞馬遜云科技解決方案架構(gòu)師,從業(yè)十三年,先后經(jīng)過(guò)歷云計(jì)算、供應(yīng)鏈金融、電商等多個(gè)行業(yè),擔(dān)任過(guò)高級(jí)開(kāi)發(fā)、架構(gòu)師、產(chǎn)品經(jīng)理、開(kāi)發(fā)總監(jiān)等多種角色,有豐富的大數(shù)據(jù)應(yīng)用與數(shù)據(jù)治理經(jīng)驗(yàn)。加入亞馬遜云科技后,致力于通過(guò)大數(shù)據(jù)+AI 技術(shù),幫助企業(yè)加速數(shù)字化轉(zhuǎn)型。
劉元元
亞馬遜云科技解決方案架構(gòu)師,負(fù)責(zé)基于亞馬遜云科技的云計(jì)算方案架構(gòu)設(shè)計(jì)、咨詢(xún)、實(shí)施等工作。曾擔(dān)任研發(fā)經(jīng)理、架構(gòu)師的崗位并擁有多年的互聯(lián)網(wǎng)系統(tǒng)的架構(gòu)設(shè)計(jì)、系統(tǒng)開(kāi)發(fā)的經(jīng)驗(yàn),覆蓋金融、文旅、交通等行業(yè),在 SaaS 系統(tǒng)和 Serverless 領(lǐng)域有著豐富的經(jīng)驗(yàn)。
莊穎勤
亞馬遜云科技解決方案架構(gòu)師,負(fù)責(zé)基于亞馬遜云科技的云計(jì)算方案架構(gòu)設(shè)計(jì)、咨詢(xún)、實(shí)施等工作。在 DevOps、CI/CD 和容器等領(lǐng)域擁有豐富的技術(shù)和支持經(jīng)驗(yàn),致力于幫助客戶(hù)實(shí)現(xiàn)技術(shù)創(chuàng)新和業(yè)務(wù)發(fā)展。
星標(biāo)不迷路,開(kāi)發(fā)更極速!
關(guān)注后記得星標(biāo)「亞馬遜云開(kāi)發(fā)者」
聽(tīng)說(shuō),點(diǎn)完下面4個(gè)按鈕
就不會(huì)碰到bug了!