SPECIALIST

多様な専門性を持つNRIデジタル社員のコラム、インタビューやインサイトをご紹介します。

BACK

バッチジョブワークフロー実行基盤としての「Google Cloud Composer」実用性検証【前編】

こんにちは、NRIデジタルの島です。

過日、「AWS Step Functions」を使用したバッチジョブの実行基盤の検証を実施し、その内容を以下記事に投稿しました。
AWS Step Functionsでバッチジョブワークフローの実行基盤を構築する

また、「AWS Step Functions」を効率的にテストできるよう、ローカル環境におけるテスト方法について以下記事にてご紹介させていただきました。
AWS Step Functions Local モック統合機能を利用したローカルテスト

本記事では、 Google Cloud 上でのバッチジョブワークフローの実行基盤について考えていきたいと思います。今回は【前半】として、バッチジョブのワークフローの構築と実行ついて紹介します。


Google Cloudの場合もAWSと同様、定番のバッチジョブワークフロー実行サービスがあるわけではなく、バッチのユースケースに応じて、要件を満たせるサービス(1つ or 複数サービスの組み合わせ)を選定していくようになりそうです。選択肢はいくつかありそうですが、筆者が考える選択肢は以下の通りです。

サービス

概要

備考

①Cloud Scheduler + WorkFlows + Cloud Run Jobs

ジョブスケジューリング(Cloud Scheduler)、ワークフロー(WorkFlows)、プロビジョニング(Cloud Run Jobs)の各サービスを組み合わせたバッチ実行基盤。

ワークロード実行が「Cloud Run Jobs」となる為、以下の制約事項がある。

・デフォルトで10分間、最長で1時間までしか実行できない
・GPUインスタンスは実行できない

②Cloud Scheduler + WorkFlows + Batch for Google Cloud

上記①構成のワークロード実行を「Batch for Google Cloud」にした構成。
Batch を使ってみる|Google Cloud

「Batch for Google Cloud」は昨年(2022年)GAされたばかりの新しいサービスで利用実績が少ない。東京リージョンでも最近利用可能になったばかり。

Cloud Composer

スケジューリング、ワークフロー、プロビジョニング等のワークフロー実行に必要な機能を全て包含したサービス。
ワークフロー(DAG)を取り込み、Google Kubernetes Engine(GKE)上のデータプレーンにてスケジューラーやワーカーのPodが実行される。

Apache Airflowのマネージドサービスで、ワークフローの全ての構成や機能を記述し、DAGファイルのみで完結する為管理しやすいが、学習コストは高い。また、環境を維持しているだけでそれなりのコストがかかる。

④Dataflow

大量データをサーバレス(Google Compute Engine(GCE)インスタンス)で高速処理(並列実行、自動スケーリング)可能なサービスで、ストリーミングやETLのユースケースで利用されることが多い。
Apach Beamフレームワークを使用。
Dataflow|Google Cloud

ユースケースが一般的なバッチ処理というよりは、ストリーミングやETLの色が強い。Apache Beamフレームワークのお作法に準拠する必要がある為、学習コストが高く、また別プラットフォームへの移行時のハードルは高い。

⑤ジョブマネージャ or Cron + Google Compute Engine(GCE)

マネージドサービスを使用せず、自前でIaaS(Google Compute Engine(GCE))上にスクリプトやCronを格納して実行。ジョブマネージャが必要な場合は3rd Partyのジョブマネージャ製品と連携する。

自由度は高いが、構築・運用管理コストが莫大。

※MLOpsの意味合いになると「Vertex AI」等のAI Platform関連のサービスも候補となりますが、ここでは一般的なバッチジョブフローの実行をスコープとしています

この中から、筆者は「Cloud Composer」 がバッチジョブワークフロー実行基盤としての有力候補と考えました。
Cloud Composerのドキュメント|Google Cloud

主な理由は以下の通りです。

・Kubernetes(以下 k8s)のマネージドサービスである「Google Kubernetes Engine(以下GKE)」と「Apache Airflow(以下Airflow)」というOSSプロダクトベースで構成されたサービスであり、単一のクラウドに依存した技術に縛られることなく、他のプラットフォームでもその技術ノウハウを流用可能

Google Cloudでシステム構築を行う際に、使用するコンピューティングリソースとしてGKEを選択するケースも多く、そういったケースにおいては技術スタックを揃えられる可能性が高い

・大規模データ処理のワークロード(AIやビックデータ等)を実行可能なGPUインスタンスをオンデマンドでプロビジョニング可能
※GKEのAutopilotモードで実行すれば、ワーカーノードのプーリング定義も不要
Autopilot の概要|Google Cloud

・自前で複数サービスを組み合わせる必要がなく、一つのサービスとして統合されている為、運用管理がシンプル

本記事ではComposerを試用して、バッチジョブワークフロー実行基盤としての実用性を検証致しましたので、その内容を共有していきたいと思います。

Cloud Composerとは

「Cloud Composer(以下Composer)」はAirflowをベースに、ワークフローの構築、スケジューリング、モニタリング等を支援するフルマネージドなワークフローサービスです。Airflowでは、DAG(「有向非巡回グラフ」を定義するPythonコード)をベースにワークフローが構築 されます。Composer環境を作成すると、各AirflowコンポーネントはGKEクラスタ上でPodとして稼働します。

簡易的な構成イメージは以下のようになります。

DAGを登録すると、DAGのワークフローがGKEクラスタへロードされ、その定義に応じてスケジューラ(上図「airflow-scheduler」Pod)が実行のスケジューリングをします。具体的には、作成したDAGをCloud Storage(以下GCS)上の特定のフォルダに登録することで、Composerが定期的にDAGの内容を取り込む仕組みになっております。
また、Airflowの管理コンソールで各ワークフロー制御やステータス確認を行いますが、そのWebサーバもPodで稼働しています(上図「airflow-webserver」Pod)。アクセス制御はGoogle Cloud認証に加え、IPアドレス等で制御することも可能です。
UI/Screenshots-Airflow Documentation

※上記ドキュメントより抜粋

なお、スケジュール設定等DAGの内容を管理コンソール上から変更するなどは出来ませんので注意してください。

Operator

AirflowにはOperator という概念があり、DAGに定義されているOperatorに応じてスケジューラがワークロードをスケジューリングします。
Operators-Airflow Documentation


例えば、「PythonOperator」 であれば、そのコードが上図「airflow-worker」Pod上で実行され、「KubernetesPodOperator」 であれば、指定したネームスペース上(例えば上図の「composer-user-workloads」)へPodを作成して実行します。
PythonOperator-Airflow Documentation
KubernetesPodOperator-Airflow Documentation

Composerバージョン

執筆時点では、 Composerはバージョン1バージョン2 を選択可能です。バージョン2となって、GKEクラスタがAutopilotベースになったことでプロビジョニングやリソース管理が柔軟 になったことや、全てのコンポーネントがGKE上で稼働 するようになってよりシンプルになったなど、改善点が多いですので、新規に利用する場合には、特別な理由がない限りバージョン2 を選択してしまって問題ないかと思います。なお、バージョン1でもバージョン2でもAirflow2.xは使用可能です。

両バージョン差異の詳細は以下をご参照ください。
Cloud Composer のバージョニングの概要|Google Cloud

実用性検証

では、実際に実機にてComposerを使用したバッチジョブワークフローの実行基盤としての実用性について検証していきたいと思います。
※なお、本検証は「Composer-2.1.14-Airflow-2.4.3」 で実施します

1. 検証用のサンプルと構成

1.1 サンプルバッチジョブワークフロー

今回、検証として定義したバッチジョブワークフローのユースケースは以下のようなものです。

①データ取得処理
GCSからCSVファイルをダウンロードし、加工後のファイルをGCS及びk8sのPVC(PersistentVolumeClaim)でNFSマウントしたFilestoreへ保存する。

②GPU処理
①でGCSへ保存されたCSVファイルをダウンロードし、そのファイルの情報をもとにGPUインスタンス上でGPU処理を実行する 。並列性を確認するために、本バッチジョブは2ジョブ並列で実行する。
※本処理は「GPUインスタンス上で動作しているか」を確認するだけの処理です

③マスタ登録処理
FilestoreをPVCでNFSマウントし、そこに保存されているデータを加工してAlloyDB(Postgres)に書き込む。並列性を確認するために、本バッチジョブは2ジョブ並列で実行する。

④キャッシュ処理
Google Compute Engine(GCE)にデプロイされているWebアプリケーション へHTTPリクエストし、FilestoreとAlloyDBに保存されているデータを取得し、そのデータをMemorystoreへ保存する。オンラインAPI(架空です)が利用する想定。

※各バッチアプリケーションは全てPythonで実装しておりますが、処理の内容自体に特別な意味はありませんので、細かい内容は解説を割愛します


1.2 Composer構成

さて、上記サンプルバッチジョブワークフローにて検証を実施するComposer環境ですが、もともとは前述した「KubernetesPodOperator」を使用して、以下のようにComposer環境のGKEクラスタ内で各バッチジョブワークフローのワークロードを実行する想定でした。


しかしながら、執筆時点では、 Composerが稼働するGKEクラスタ上ではGPUインスタンスのプロビジョニングができない ようです。その為、本検証では、以下の GKEStartPodOperator を使用して、コンテナ化された上記バッチアプリケーションを別クラスタ にて実行する構成にします。
Google Kubernetes Engine Operator を使用する|Cloud Composer|Google Cloud
Google Kubernetes Engine Operators-Airflow

※「KubernetesPodOperator」はComposerと同一クラスタでの実行のみサポートしており、別クラスタでの実行ができません


検証構成


※参考までに、「KubernetesPodOperator」を使用して、Composerのクラスタ上でGPUインスタンスをプロビジョニングしようとすると、以下のようなエラーになります。

・・・{},"status ":"Failure ","message ":"admission webhook \"gkepolicy.common-webhooks.networking.gke.io \"denied the request: GKE Warden rejected the request because it violates one or more constraints.\nViolations details: {\"[denied by autopilot-composer-limitations]\":[\"Container ‘base’ specifies GPU resources. Cloud Composer 2 does not support creating workloads with GPUs. \"]}\nRequested by user: ‘*****@*****.iam.gserviceaccount.com’, groups: ‘system:authenticated’.","reason ":"GKE Warden constraints violations ","code ":400}

では、次項よりDAGコードを適宜修正しながら、検証を進めていきたいと思います。

なお、DAGの概要や記述方法についての詳細には触れませんので、以下本家AirflowやComposerのドキュメント等を参考にしてください。
DAGs-Airflow Documentation
Airflow DAG を書き込む|Cloud Composer|Google Cloud

2. バッチジョブワークフローの構築と実行

2.1 バッチジョブワークフローのトリガー

まずはバッチジョブワークフローのトリガー、つまりDAGの実行タイミングについてです。DAGをトリガーする方法はいくつかありますが、本検証では以下の2つについて検証します。

スケジュール実行(定時・定期)
手動実行

その他のトリガーについては以下をご参照ください。
DAG をトリガーする|Cloud Composer|Google Cloud


スケジュール実行(定時・定期)

スケジュール設定

DAGコードで 「start_date」 パラメータと「schedule_interval」 パラメータを指定することで、スケジュール実行可能となります。schedule_intervalにはPythonのdatetimeモジュールでの指定やCron式が使用でき、定時(日次や月次、日時指定等)、定期(何分毎や毎時等)のスケジュール指定が可能です。


e.g. 日時ジョブの実行の場合

  • datetime.timedeltaでの指定
    schedule_interval=datetime.timedelta(days=1)
  • cron式での指定
    schedule_interval=’0 0 * * *’
  • プロセットアノテーションでの指定
    schedule_interval=’@daily’

datetime — 基本的な日付型および時間型¶
DAG Runs-Airflow Documentation


DAGを登録後の初回実行タイミングは「start_dateを起点として、最初のschedule_interval の完了時点」 で実行されます。ポイントは「schedule_intervalの開始時点ではない」 ことです。例えば、現在日時が「2023/4/4 11:00」で、以下のようなDAGを定義して実行するとします。

・・・・
開始日時
start_date=datetime(2023, 4, 4, 13, 0)
スケジュール間隔(=毎日15時に実行)
schedule_interval='0 15 * * *'
・・・・

この場合、開始時間が日時実行時間よりも前の為、直感的には4/4 15:00に初回実行されそうですが、前述の通り「schedule_interval の完了時点で実行される」仕様の為、初回実行日時は4/5 15:00となります。

「schedule_interval の完了時点」 というのは、「ひとまとまりのインターバル期間が完了したとき」 ということです。つまり、この場合では1回目のインターバルは 4/4 15:00~4/5 15:00 となるため、初回稼働は 4/5 15:00 となります


なお、スケジューリングさせたくない(手動のみ)の場合はDAGレベルで明示的に「None」を指定してください。(schedule_interval=None)

キャッチアップ

DAGにはCatchUpというパラメータがあり、これにより過去分のジョブを実行するかどうかを制御することができます。
DAG Runs-Airflow Documentation

CatchUpを「True」 にした場合は、開始日時(start_date)以降〜現在日時までで実行対象となる過去日時のバッチジョブワークフローを実行します。ただ、仮に「False」 を設定しても、直近のバッチジョブワークフローのみは実行されてしまう仕様の為、過去分を実行したくない場合は、初回実行タイミングが到来しないようにstart_dateを設定する 必要があります。

例えば、前項の例であればCatchUpの値に関わらず、過去分対象のバッチジョブワークフローは存在しない為、何も実行されません。

ただ、意図せず実行されることにリスクを感じる場合は、DAGコードにて「is_paused_upon_creation」 パラメータを「True」に設定(デフォルトFalse)することで休止状態としてDAGを登録できます。


デプロイ時点でトグルがOFFとなり、休止(Paused)状態

※なお、「LatestOnlyOperator」 というOperatorを使用すると、全く実行しないようにすることが可能とのことですが、今回の検証では試していません
airflow.operators.latest_only-Airflow Documentation

タイムゾーン

Airflowは日時情報を内部的にUTCで管理しており、変更は非奨励のようです。その為、デフォルトタイムゾーンを変更せずにDAGの実装時に考慮する必要があります。具体的には標準ライブラリのtimezoneは使用せず、pendulumモジュールを使用してタイムゾーンを明示的に指定するようにします。
Time Zones-Airflow Documentation
pendulum 2.1.2

・・・
local_tz = pendulum.timezone("Asia/Tokyo")
・・・
"start_date": pendulum.datetime(2023, 4, 4, 13, 0, tz=local_tz)
・・・


では、仕様を理解したところで、実際に動かしてみましょう。



シナリオ①
開始日を過去日、インターバルを日次、キャッチアップをFalseでDAGをアップロードします。

DAG設定

# DAG
・・・・
start_date=pendulum.datetime(2023, 4, 17, 13, 0, tz=LOCAL_TZ) # 開始日時
schedule_interval='0 13 * * *',   # インターバル
catchup=False,   # キャッチアップ
・・・・


DAGデプロイ日時:4/19 14:15

期待結果
直近の「4/19 13:00実行分」のみ過去分として実行され、「4/20 13:00」に初回実行されること。
※過去分対象として「4/18 13:00実行分(インターバル=4/17 13:00〜 4/18 13:00)」と「4/19 13:00実行分(インターバル=4/18 13:00〜 4/19 13:00)」が対象となるが、キャッチアップが「False」の為直近4/19 13:00実行分のみが過去分実行対象となる

結果

以下の実行履歴画面の通り、想定通りの結果となりました。

下段が過去実行分、上段が初回実行分となります。ややこしいのは「Logical Date(論理日付)」 列の日時で、Airflowは前述した通り、「schedule_interval の完了時点」 で実行される仕様の為、以下のようになります。

過去分
4/18 13:00 〜 4/19 13:00のインターバル完了時点での実行になりますが、「Logical Data」はインターバル開始日時である「4/18 13:00」、実行日時(「Queued At」列を確認)はインターバル完了日時を過ぎている為、DAGデプロイ日時(4/19 14:25)以降の日時となります。

初回実行分
4/19 13:00 〜 4/20 13:00のインターバル完了時点での実行になりますが、「Logical Data」はインターバル開始日時「4/19 13:00」、実行日時はインターバル完了日時である「4/20 13:00」となります。

なお、GKEOperatorを使用している為、以下の通りGKEクラスタ内のPodにて実行されていることが確認出来ました。

>k get po -n composer-user-workloads -w
NAME                 READY   STATUS      RESTARTS   AGE
pod-job01-35vxebnc   0/1     Pending     0          1s
pod-job01-35vxebnc   0/1     Pending     0          1s
pod-job01-35vxebnc   0/1     ContainerCreating   0          1s
pod-job01-35vxebnc   1/1     Running             0          5s
pod-job01-35vxebnc   0/1     Completed           0          6s
pod-job01-35vxebnc   0/1     Completed           0          8s
pod-job01-35vxebnc   0/1     Terminating         0          8s
pod-job01-35vxebnc   0/1     Terminating         0          8s

シナリオ②

シナリオ①と同様の条件で、キャッチアップ(CatchUp)を「True」 にした場合を確認します。

DAG設定

# DAG
・・・・
start_date=pendulum.datetime(2023, 4, 17, 13, 0, tz=LOCAL_TZ) # 開始日時
schedule_interval='0 13 * * *',   # インターバル
catchup=True,   # キャッチアップ
・・・・

DAGデプロイ日時:4/19 14:20

期待結果
18、19日分が過去分として実行され、4/20 1300に初回実行されること。

結果

以下の実行履歴画面の通り、シナリオ①で実行されなかった4/18分も実行されている ことが確認できました。

特に指定がない場合は並列で起動するようです。起動を直列にしたい場合などは、「3. 運用・監視」 >「3.1 実行制御」 >「同時実行制御」項等で後述する「並列度」を設定することを検討下さい。

シナリオ③

最後にスケジュールを毎時にして、定期実行されるかを確認します。なお、初回実行日時(開始日時+インターバル完了日時)が未到達の設定とします。

DAG設定

# DAG
・・・・
start_date=pendulum.datetime(2023, 4, 19, 14, 0, tz=LOCAL_TZ) # 開始日時
schedule_interval='0 * * * *',   # インターバル
catchup=False,   # キャッチアップ
・・・・

DAGデプロイ日時:4/19 14:40

期待結果
DAGデプロイ日時が初回実行インターバルの完了日時に達していない為、デプロイ時点では何も実行されず、4/19 15:00に初回実行され、その後は1時間置きに毎時実行されること。

結果
DAGデプロイ時点では実行されず、その後は毎時されることが確認できました。

DAGデプロイタイミングでは未実行。


15時の時点から毎時実行となる。

手動実行

手動で実行したい場合は、管理コンソールから以下のようにトリガー することで、以下の通り、実行することができました。

デプロイしたDAGの開始日時が現在日時より未来日の場合は手動でも実行不可となりますので注意してください。

2.2 ジョブフロー制御

次にジョブフロー制御について確認します。ジョブフローはDAGに以下のように記述します。

Job01 >>Job02
※Job01.set_downstream(Job02)やJob02.set_upstream(Job01)のような記述方法も可能です
※Airflow2になって、より依存関係が可視化しやすい記述方法があるようですが本検証では未使用です

DAGコードの記述を変えることで、以下の通りのバッチジョブワークフローになることを確認できました。


シーケンシャル
DAG

・・・・
job01 >> job03_1 >> job03_2
・・・・

パラレル

DAG

・・・・
job01 >> [job03_1, job03_2]
・・・・

待ち合わせ

DAG

・・・・
job01 >> [job03_1, job03_2, job03_3]
[job03_1, job03_2] >> job04
・・・・

2.3 プロビジョニング

GKEStartPodOperatorでは、一時的にPodを作成し、そのPod上でバッチジョブのワークロードを実行します。そのPodがデプロイされるインスタンスのプロビジョニングについて確認します。

実行ノード

各ジョブの実行は基本的には事前にプールされているノードが割り当てられますが、何も指定しない限り通常のCPUノードが割り当てられます。

>k get node -l '!cloud.google.com/gke-accelerator'
NAME                                                  STATUS   ROLES    AGE     VERSION
・・・・
gk3-asia-northeast1-sear-nap-s7c7mx26-359fad2f-znbb   Ready    <none>   7h24m   v1.24.10-gke.2300
・・・・

>k get po -n composer-user-workloads -o wide -w
NAME                 READY   STATUS              RESTARTS   AGE   IP       NODE                                                  NOMINATED NODE   READINESS GATES
pod-job01-umaaq5fl   0/1     ContainerCreating   0          3s    <none>   gk3-asia-northeast1-sear-nap-s7c7mx26-359fad2f-znbb   <none>           <none>
pod-job01-umaaq5fl   1/1     Running             0          7s    10.3.1.37   gk3-asia-northeast1-sear-nap-s7c7mx26-359fad2f-znbb   <none>           <none>
pod-job01-umaaq5fl   0/1     Completed           0          9s    10.3.1.37   gk3-asia-northeast1-sear-nap-s7c7mx26-359fad2f-znbb   <none>           <none>
pod-job01-umaaq5fl   0/1     Completed           0          11s   10.3.1.37   gk3-asia-northeast1-sear-nap-s7c7mx26-359fad2f-znbb   <none>           <none>
pod-job01-umaaq5fl   0/1     Terminating         0          12s   10.3.1.37   gk3-asia-northeast1-sear-nap-s7c7mx26-359fad2f-znbb   <none>           <none>

※ノードやリソースが不足している場合はAutopilotの自動スケールにより追加されます


GPUインスタンス

GPUインスタンスはGKEのStandardクラスタのように事前にプールしておく必要はなく、DAGのGKEStartPodOperatorのNodeSelectorかPodAffinityにてラベルを指定することで、Autopilotによりプロビジョニングされます。ただしAutopilotではGPUインスタンスでの制限事項がある為、ワークロードの実行要件を満たすか確認が必要です。

Autopilot に GPU ワークロードをデプロイする|Google Cloud


また、前述した通り、Composerクラスタ上でGPUインスタンスのプロビジョニングができない為、別クラスタで実行する必要がありますが、その際はGCS等のサービスへのアクセス権をPodに付与する必要がありますで注意してください。
※実行するクラスタが本検証のように「Autopilotモード」の場合「Workload Identity」が有効化されている為です
Workload Identity を使用する|Google Cloud

以下DAG設定にて、正常にGPUインスタンスがプロビジョニングされ、GPUを使用した処理が実行できることが確認できました。

DAG

・・・・
job02_1 = GKEStartPodOperator(
        ・・・・
        node_selector={
            'cloud.google.com/gke-accelerator': 'nvidia-tesla-t4'
        },
        ・・・・
    )
・・・・


以下の通り、GPUインスタンス上で実行されている。

>k get po -n composer-user-workloads -o wide -w
NAME                   READY   STATUS              RESTARTS   AGE   IP       NODE                                               NOMINATED NODE   READINESS GATES
pod-job02-1-gy6en4rg   0/1     ContainerCreating   0          22m   <none>   gk3-gke-sch-poc-online-a-nap-c2cz41xa-0d5157d7-mh9n   <none>           <none>
pod-job02-1-gy6en4rg   1/1     Running             0          27m   172.25.0.196   gk3-gke-sch-poc-online-a-nap-c2cz41xa-0d5157d7-mh9n   <none>           <none>
pod-job02-1-gy6en4rg   0/1     Completed           0          28m   172.25.0.196   gk3-gke-sch-poc-online-a-nap-c2cz41xa-0d5157d7-mh9n   <none>           <none>
pod-job02-1-gy6en4rg   0/1     Terminating         0          28m   172.25.0.196   gk3-gke-sch-poc-online-a-nap-c2cz41xa-0d5157d7-mh9n   <none>           <none>

>k  get node -l "cloud.google.com/gke-accelerator=nvidia-tesla-t4" -L=cloud.google.com/gke-accelerator
NAME                                                  STATUS   ROLES    AGE    VERSION             GKE-ACCELERATOR
gk3-gke-sch-poc-online-a-nap-c2cz41xa-0d5157d7-mh9n   Ready    <none>   46m    v1.24.10-gke.2300   nvidia-tesla-t4

以下の通り、CUDA利用してPyTorchが実行できている。

・・・
[2023-04-26, 14:15:32 JST] {pod_manager.py:235} INFO - torch.cuda.is_available(): OK
[2023-04-26, 14:15:32 JST] {pod_manager.py:235} INFO - torch.cuda.device_count(): 1
[2023-04-26, 14:15:32 JST] {pod_manager.py:235} INFO - tensor(0., device='cuda:0')
・・・

Pod設定

繰り返しになりますが、バッチジョブのワークロードはPodで実行されることになります。そのPodの各種設定 はDAGコードの「GKEStartPodOperator」タスク生成時に指定します。ここではリソース割り当て設定とセキュリティコンテキスト設定について参考に載せておきます。


リソース割り当て設定例

        ・・・・
job01 = GKEStartPodOperator(
   ・・・・
   container_resources=k8s_models.V1ResourceRequirements(
        requests={
            "cpu": "250m",
            "memory": "1G",
            "ephemeral-storage": "10G"
        },
        limits={
            "cpu": "250m",
            "memory": "1G",
            "ephemeral-storage": "10G"
        },
    )
    ・・・・
)
・・・・


なお、Autopilotではリソース設定の制約があり、CPUとメモリは1:4で、合わなければ小さい側の値を比率に合うように自動的に調整されます。

Warning: Autopilot set default resource requests for Deployment composer-user-workloads/job01-app, as resource requests were not specified. See http://g.co/gke/autopilot-defaults

Autopilot のリソース リクエスト| Google Kubernetes Engine(GKE)| Google Cloud

セキュリティコンテキスト(非ルート実行)設定例

・・・
job01 = GKEStartPodOperator(
   ・・・・
   security_context=k8s_models.V1PodSecurityContext(
        run_as_non_root=True
    )
    ・・・・
)
・・・・

2.4 サンプルバッチジョブワークフローの実行

実行に必要な確認が出来たので、サンプルジョブフローを全部つなげて実行してみたいと思います。

(再掲)


以下の通り、GPUワークロードも含め全サンプルバッチジョブワークフローが正常に実行できました。

実行したDAGイメージは以下です。

DAGコード(抜粋版)

from datetime import timedelta
import pendulum
from airflow import models
from airflow.models import Variable
from airflow.operators.dummy import DummyOperator
from airflow.providers.google.cloud.operators.kubernetes_engine import (
    GKEStartPodOperator,
)
from kubernetes.client import models as k8s_models

・・・

default_args = {
    ・・・
    "start_date": pendulum.datetime(2023, 4, 26, 13, 0, tz=LOCAL_TZ)
    ・・・
}

with models.DAG(
        dag_id="dag001"
        default_args=default_args,
        description="test",
        schedule_interval='0 14 * * *', 
        catchup=False,  
        is_paused_upon_creation=True, 
        tags=["test-batchflow"]
) as dag:

    start = DummyOperator(
        task_id='start'
    )

    # 通常(非GPU)のバッチジョブ
    job01 = GKEStartPodOperator(
        task_id="pod-job01",
        project_id=GCP_PROJECT_ID,
        location=GKE_LOCATION,
        cluster_name=GKE_CLUSTER_NAME,
        name="pod-job01",
        namespace="composer-user-workloads",
        service_account_name=job-workload-sa,
        image=IMAGE_NORMAL,
        cmds=["python3", "job01.py"],
        env_from=configmap('job01'),
        volumes=[share_volumes()],
        volume_mounts=[share_volume_mounts()],
        container_resources=resources(),
        security_context=security_context(),
        startup_timeout_seconds=300,
        is_delete_operator_pod=True
    )

    # GPU版のバッチジョブ
    job02_1 = GKEStartPodOperator(
        task_id="pod-job02-1",
        project_id=GCP_PROJECT_ID,
        location=GKE_LOCATION,
        cluster_name=GKE_CLUSTER_NAME,
        name="pod-job02-1",
        namespace="composer-user-workloads",
        service_account_name=job-workload-sa,
        image=IMAGE_GPU,
        node_selector={
            'cloud.google.com/gke-accelerator': 'nvidia-tesla-t4'
        },
        cmds=["python3", "job02.py"],
        env_from=configmap('job02-1'),
        container_resources=resources_gpu(),
        security_context=security_context(),
        startup_timeout_seconds=1800,
        is_delete_operator_pod=True,
    )

    job02_2 = ・・・
    job03_1 = ・・・
    job03_2 = ・・・
    job04 = ・・・

    end = DummyOperator(
        task_id='end'
    )


    # バッチジョブフローを定義
    start >> job01 >> [job02_1, job02_2, job03_1, job03_2]
    [job03_1, job03_2] >> job04 >> end
    [job02_1, job02_2] >> end

さて、ここまでで無事GPUワークロードも含めたバッチジョブのワークフローを実行することができました。【後編】では実運用で必要になりそうな運用事項 について検証を進めていきたいと思います。