バッチジョブワークフロー実行基盤としての「Google Cloud Composer」実用性検証【後編】
こんにちは、NRIデジタルの島です。
バッチジョブワークフロー実行基盤としての「Google Cloud Composer」実用性検証【前編】では、GPUワークロードも含めたバッチジョブのワークフローを実行することができました。
【後編】は実運用で必要になりそうな以下3つの運用事項
について検証を進めていきたいと思います。
3. 運用・監視
実行制御
運用上、ジョブを強制停止したり、エラー時にリトライしたり、多くの操作が必要となります。手動(Airflow管理コンソール)及び自動(DAG)にてどのような制御ができるのか、を確認します。
バッチ運用日時
バッチアプリケーションは、過去日時での実行など、論理的な運用日時として振る舞うことが必要になるケースがあります。この運用日時を各バッチジョブへどのように連携していくのか、を確認します。
監視
異常発生時の運用担当へのアラート通知等、能動的に検知する仕組みをどのように実現できるのか、を確認します。
3.1 実行制御
手動実行制御(Airflow管理コンソール)
まずはAirflow管理コンソールでの手動制御について確認します。これは運用担当によるオペレーションを想定しています。
強制停止
手動で強制停止操作を実施し、依存関係のある後続ジョブが実行されないことが確認できました。
「Graph」画面にて実行中の「pod-job03-1」ボックスを押下

「Actions」画面にて「Mark Failed」を押下

「pod-job03-1」が失敗し、後続の「Job04」が「upstream_failed」で未実行となる。

スキップ
手動でのスキップボタン等はなく、強制的に「Success」とすることでスキップされることが確認できました。
前項(強制停止)の状態(「pod-job03-1」が失敗している状態)でスキップ(「Mark Success」を押下)

「pod-job03-1」がスキップされ、「pod-job04」が即実行

休止
手動でその各DAGのジョブフロー全体を休止する場合は、以下のようにトグルボタンで休止にすることで実行されなくなります。

ただし、実行中のジョブを休止することはできず、休止対象は次のジョブからとなります。また、ジョブ単位で一時的に休止して、一定時間後再開のような制御をすることはできなそうです。
リトライ
失敗したタスクから以下の通りリトライができました。
「pod-job03-1」ジョブが失敗している状態

失敗した「pod-job03-1」からリトライ(「Clear」を押下)

正常に完了

自動実行制御(DAG)
次に運用担当による手動制御ではなく、DAGで自動的に制御可能な仕組みについて確認していきます。
自動リトライ
DAGにおける自動リトライ関連の設定箇所、パラメータは以下の通りです。よりスコープの狭い範囲の設定値が優先されます。なお、検証時点では、 リトライが効くのはあくまでバッチジョブ(タスク)単位のみ です。バッチジョブワークフロー(DAG)全体でリトライX回のようにはできませんので注意してください。
Airflow全体
Airflow設定ファイル(airflow.cfg)の以下デフォルト設定値は全DAGに影響します。
default_task_retries リトライ回数
default_task_retry_delay リトライ時の遅延時間
ワークフロー(DAG)全体
DAG内の以下設定値はDAG内の全バッチジョブ(タスク)に影響します。
retries リトライ回数
retry_delay リトライ時の遅延時間
retry_exponential_backoff 指数関数的後退アルゴリズムの適用有無
max_retry_delay リトライ時の最大遅延時間
on_retry_callback リトライ時のコールバック関数
バッチジョブ(タスク)単位
DAG内のバッチジョブ(タスク)レベルの設定値は、個々のバッチジョブ(タスク)にのみ影響します。
※設定値は「DAG全体」と同様
以下のジョブフローで検証。

① Airflow全体にのみ設定
以下のようにComposer管理コンソールから「30秒毎に3回リトライ」 で設定します。

「pod-job01」を失敗させ、設定値通り合計4回(初回 + リトライ3回)実行されることが確認できました。
>k get po -n composer-user-workloads -w NAME READY STATUS RESTARTS AGE pod-job01-ni5yv128 0/1 Pending 0 0s pod-job01-ni5yv128 0/1 Pending 0 0s pod-job01-ni5yv128 0/1 ContainerCreating 0 pod-job01-ni5yv128 1/1 Running 0 pod-job01-ni5yv128 0/1 Error 0 pod-job01-ni5yv128 0/1 Error 0 pod-job01-ni5yv128 0/1 Error 0 pod-job01-ni5yv128 0/1 Terminating 0 pod-job01-ni5yv128 0/1 Terminating 0 pod-job01-h9t8mlsf 0/1 Pending 0 pod-job01-h9t8mlsf 0/1 Pending 0 pod-job01-h9t8mlsf 0/1 ContainerCreating 0 pod-job01-h9t8mlsf 1/1 Running 0 ←リトライ1回目 pod-job01-h9t8mlsf 0/1 Error 0 pod-job01-h9t8mlsf 0/1 Error 0 pod-job01-h9t8mlsf 0/1 Error 0 pod-job01-h9t8mlsf 0/1 Terminating 0 pod-job01-h9t8mlsf 0/1 Terminating 0 pod-job01-ygamrsmh 0/1 Pending 0 pod-job01-ygamrsmh 0/1 Pending 0 pod-job01-ygamrsmh 0/1 ContainerCreating 0 pod-job01-ygamrsmh 1/1 Running 0 ←リトライ2回目 pod-job01-ygamrsmh 0/1 Error 0 pod-job01-ygamrsmh 0/1 Error 0 pod-job01-ygamrsmh 0/1 Error 0 pod-job01-ygamrsmh 0/1 Terminating 0 pod-job01-ygamrsmh 0/1 Terminating 0 pod-job01-m7hlm75i 0/1 Pending 0 pod-job01-m7hlm75i 0/1 Pending 0 pod-job01-m7hlm75i 0/1 ContainerCreating 0 pod-job01-m7hlm75i 1/1 Running 0 ←リトライ3回目 pod-job01-m7hlm75i 0/1 Completed 0 pod-job01-m7hlm75i 0/1 Completed 0 pod-job01-m7hlm75i 0/1 Terminating 0 pod-job01-m7hlm75i 0/1 Terminating 0
②DAG全体に設定
DAGのデフォルト値を5回に設定。
DAG
・・・・ default_args = { ・・・ "retries": 5, # 失敗時の再試行回数 "retry_delay": timedelta(seconds=30), # 再試行間隔 "retry_exponential_backoff ": False # Backoffの有無 } ・・・・
全体設定の3回を超え、5回リトライされていることが確認できました。
>k get po -n composer-user-workloads -w NAME READY STATUS RESTARTS AGE pod-job01-1ozw1eo1 0/1 Pending 0 0s pod-job01-1ozw1eo1 0/1 Pending 0 0s pod-job01-1ozw1eo1 0/1 ContainerCreating 0 0s pod-job01-1ozw1eo1 1/1 Running 0 8s pod-job01-1ozw1eo1 0/1 Error 0 9s pod-job01-1ozw1eo1 0/1 Error 0 11s pod-job01-1ozw1eo1 0/1 Error 0 11s pod-job01-1ozw1eo1 0/1 Terminating 0 11s pod-job01-1ozw1eo1 0/1 Terminating 0 11s pod-job01-2ah12guy 0/1 Pending 0 1s pod-job01-2ah12guy 0/1 Pending 0 1s pod-job01-2ah12guy 0/1 ContainerCreating 0 1s pod-job01-2ah12guy 1/1 Running 0 8s ←リトライ1回目 pod-job01-2ah12guy 0/1 Error 0 9s pod-job01-2ah12guy 0/1 Error 0 11s pod-job01-2ah12guy 0/1 Error 0 11s pod-job01-2ah12guy 0/1 Terminating 0 11s pod-job01-2ah12guy 0/1 Terminating 0 11s pod-job01-10p8p5ft 0/1 Pending 0 0s pod-job01-10p8p5ft 0/1 Pending 0 0s pod-job01-10p8p5ft 0/1 ContainerCreating 0 1s pod-job01-10p8p5ft 1/1 Running 0 8s ←リトライ2回目 pod-job01-10p8p5ft 0/1 Error 0 9s pod-job01-10p8p5ft 0/1 Error 0 11s pod-job01-10p8p5ft 0/1 Error 0 12s pod-job01-10p8p5ft 0/1 Terminating 0 12s pod-job01-10p8p5ft 0/1 Terminating 0 12s pod-job01-b4luxjj9 0/1 Pending 0 0s pod-job01-b4luxjj9 0/1 Pending 0 0s pod-job01-b4luxjj9 0/1 ContainerCreating 0 0s pod-job01-b4luxjj9 1/1 Running 0 7s ←リトライ3回目 pod-job01-b4luxjj9 0/1 Error 0 8s pod-job01-b4luxjj9 0/1 Error 0 10s pod-job01-b4luxjj9 0/1 Error 0 11s pod-job01-b4luxjj9 0/1 Terminating 0 11s pod-job01-b4luxjj9 0/1 Terminating 0 11s pod-job01-nbgkh4cj 0/1 Pending 0 0s pod-job01-nbgkh4cj 0/1 Pending 0 0s pod-job01-nbgkh4cj 0/1 ContainerCreating 0 0s pod-job01-nbgkh4cj 1/1 Running 0 6s ←リトライ4回目 pod-job01-nbgkh4cj 0/1 Error 0 8s pod-job01-nbgkh4cj 0/1 Error 0 11s pod-job01-nbgkh4cj 0/1 Error 0 11s pod-job01-nbgkh4cj 0/1 Terminating 0 11s pod-job01-nbgkh4cj 0/1 Terminating 0 11s pod-job01-36mxtb8e 0/1 Pending 0 0s pod-job01-36mxtb8e 0/1 Pending 0 0s pod-job01-36mxtb8e 0/1 ContainerCreating 0 0s pod-job01-36mxtb8e 1/1 Running 0 8s ←リトライ5回目 pod-job01-36mxtb8e 0/1 Completed 0 10s pod-job01-36mxtb8e 0/1 Completed 0 12s pod-job01-36mxtb8e 0/1 Terminating 0 13s pod-job01-36mxtb8e 0/1 Terminating 0 13s
③バッチジョブ(タスク)個別に設定
「pod-job03-1」のみ「2回」で設定。
DAG
・・・・ job03_1 = GKEStartPodOperator( task_id="pod-job03-1", ・・・ retries=2, retry_delay=timedelta(seconds=30), ・・・
「pod-job01」が「5回」(リトライ5回目で成功)、「pod-job03-1」が「2回」のリトライ失敗でジョブ全体が「Failed」になることが確認できました。
>k get po -n composer-user-workloads -w NAME READY STATUS RESTARTS AGE pod-job01-if7zoh7p 0/1 Pending 0 0s pod-job01-if7zoh7p 0/1 Pending 0 0s pod-job01-if7zoh7p 0/1 ContainerCreating 0 0s pod-job01-if7zoh7p 1/1 Running 0 8s pod-job01-if7zoh7p 0/1 Error 0 9s pod-job01-if7zoh7p 0/1 Error 0 11s pod-job01-if7zoh7p 0/1 Error 0 11s pod-job01-if7zoh7p 0/1 Terminating 0 11s pod-job01-if7zoh7p 0/1 Terminating 0 11s pod-job01-9ualrb1g 0/1 Pending 0 0s pod-job01-9ualrb1g 0/1 Pending 0 0s pod-job01-9ualrb1g 0/1 ContainerCreating 0 0s pod-job01-9ualrb1g 1/1 Running 0 7s ←リトライ1回目 pod-job01-9ualrb1g 0/1 Error 0 8s pod-job01-9ualrb1g 0/1 Error 0 10s pod-job01-9ualrb1g 0/1 Error 0 11s pod-job01-9ualrb1g 0/1 Terminating 0 11s pod-job01-9ualrb1g 0/1 Terminating 0 11s pod-job01-z1iygw78 0/1 Pending 0 0s pod-job01-z1iygw78 0/1 Pending 0 0s pod-job01-z1iygw78 0/1 ContainerCreating 0 0s pod-job01-z1iygw78 1/1 Running 0 7s ←リトライ2回目 pod-job01-z1iygw78 0/1 Error 0 8s pod-job01-z1iygw78 0/1 Error 0 10s pod-job01-z1iygw78 0/1 Error 0 11s pod-job01-z1iygw78 0/1 Terminating 0 11s pod-job01-z1iygw78 0/1 Terminating 0 11s pod-job01-88d4l14g 0/1 Pending 0 0s pod-job01-88d4l14g 0/1 Pending 0 0s pod-job01-88d4l14g 0/1 ContainerCreating 0 0s pod-job01-88d4l14g 1/1 Running 0 8s ←リトライ3回目 pod-job01-88d4l14g 0/1 Error 0 9s pod-job01-88d4l14g 0/1 Error 0 11s pod-job01-88d4l14g 0/1 Error 0 11s pod-job01-88d4l14g 0/1 Terminating 0 11s pod-job01-88d4l14g 0/1 Terminating 0 11s pod-job01-2epa2bet 0/1 Pending 0 0s pod-job01-2epa2bet 0/1 Pending 0 0s pod-job01-2epa2bet 0/1 ContainerCreating 0 0s pod-job01-2epa2bet 1/1 Running 0 7s ←リトライ4回目 pod-job01-2epa2bet 0/1 Error 0 8s pod-job01-2epa2bet 0/1 Error 0 10s pod-job01-2epa2bet 0/1 Error 0 10s pod-job01-2epa2bet 0/1 Terminating 0 10s pod-job01-2epa2bet 0/1 Terminating 0 10s pod-job01-ui84ztlb 0/1 Pending 0 0s pod-job01-ui84ztlb 0/1 Pending 0 0s pod-job01-ui84ztlb 0/1 ContainerCreating 0 1s pod-job01-ui84ztlb 1/1 Running 0 8s ←リトライ5回目 pod-job01-ui84ztlb 0/1 Completed 0 9s pod-job01-ui84ztlb 0/1 Completed 0 11s pod-job01-ui84ztlb 0/1 Terminating 0 12s pod-job01-ui84ztlb 0/1 Terminating 0 12s pod-job03-2-v7rn2n4v 0/1 Pending 0 0s pod-job03-2-v7rn2n4v 0/1 Pending 0 0s pod-job03-2-v7rn2n4v 0/1 ContainerCreating 0 0s pod-job03-1-q78qz460 0/1 Pending 0 0s pod-job03-1-q78qz460 0/1 Pending 0 0s pod-job03-1-q78qz460 0/1 ContainerCreating 0 0s pod-job03-2-v7rn2n4v 1/1 Running 0 8s pod-job03-1-q78qz460 1/1 Running 0 8s pod-job03-1-q78qz460 0/1 Error 0 9s pod-job03-2-v7rn2n4v 0/1 Completed 0 10s pod-job03-2-v7rn2n4v 0/1 Completed 0 12s pod-job03-1-q78qz460 0/1 Error 0 11s pod-job03-2-v7rn2n4v 0/1 Terminating 0 12s pod-job03-2-v7rn2n4v 0/1 Terminating 0 12s pod-job03-1-q78qz460 0/1 Error 0 12s pod-job03-1-q78qz460 0/1 Terminating 0 12s pod-job03-1-q78qz460 0/1 Terminating 0 12s pod-job03-1-dhyv0fam 0/1 Pending 0 0s pod-job03-1-dhyv0fam 0/1 Pending 0 1s pod-job03-1-dhyv0fam 0/1 ContainerCreating 0 1s pod-job03-1-dhyv0fam 1/1 Running 0 8s ←リトライ1回目 pod-job03-1-dhyv0fam 0/1 Error 0 9s pod-job03-1-dhyv0fam 0/1 Error 0 11s pod-job03-1-dhyv0fam 0/1 Error 0 12s pod-job03-1-dhyv0fam 0/1 Terminating 0 12s pod-job03-1-dhyv0fam 0/1 Terminating 0 12s pod-job03-1-mgfs8wqy 0/1 Pending 0 0s pod-job03-1-mgfs8wqy 0/1 Pending 0 0s pod-job03-1-mgfs8wqy 0/1 ContainerCreating 0 0s pod-job03-1-mgfs8wqy 1/1 Running 0 7s ←リトライ2回目 pod-job03-1-mgfs8wqy 0/1 Error 0 8s pod-job03-1-mgfs8wqy 0/1 Error 0 10s pod-job03-1-mgfs8wqy 0/1 Error 0 11s pod-job03-1-mgfs8wqy 0/1 Terminating 0 11s pod-job03-1-mgfs8wqy 0/1 Terminating 0 11s

同時実行制御
バッチジョブワークフローを運用する上で、前回バッチジョブワークフローがまだ実行中の場合に、多重実行されないように実行の排他制御をしたい場合があります。Airflowではそのような制御が可能かを確認します。
以下の単純なバッチジョブワークフローを5分間隔複数回スケジューリングし、「pod-job02」の初回実行分のみ10分間スリープ
させてみます。

結果、デフォルトでは以下の通り、初回のバッチジョブワークフローが実行中の状態でも待つことなく、2回目以降が実行されてしまうようです。(追い越しが発生してしまう)
>k get po -n composer-user-workloads -w NAME READY STATUS RESTARTS AGE pod-job01-r5v5iegn 0/1 Pending 0 0s pod-job01-r5v5iegn 0/1 Pending 0 0s pod-job01-r5v5iegn 0/1 ContainerCreating 0 0s pod-job01-r5v5iegn 1/1 Running 0 3s pod-job01-r5v5iegn 0/1 Completed 0 5s pod-job01-r5v5iegn 0/1 Completed 0 7s pod-job01-r5v5iegn 0/1 Terminating 0 7s pod-job01-r5v5iegn 0/1 Terminating 0 7s pod-job02-a54509pa 0/1 Pending 0 0s pod-job02-a54509pa 0/1 Pending 0 0s pod-job02-a54509pa 0/1 ContainerCreating 0 0s pod-job02-a54509pa 1/1 Running 0 3s ←このバッチジョブを10分間スリープ pod-job01-6fam9jvz 0/1 Pending 0 0s ←まだ先行が実行中(スリープ中)にもかかわらず、後発のバッチジョブの実行が開始される pod-job01-6fam9jvz 0/1 Pending 0 0s pod-job01-6fam9jvz 0/1 ContainerCreating 0 0s pod-job01-6fam9jvz 1/1 Running 0 3s pod-job01-6fam9jvz 0/1 Completed 0 5s pod-job01-6fam9jvz 0/1 Completed 0 6s pod-job01-6fam9jvz 0/1 Terminating 0 7s pod-job01-6fam9jvz 0/1 Terminating 0 7s pod-job02-zzrullkf 0/1 Pending 0 0s pod-job02-zzrullkf 0/1 Pending 0 0s pod-job02-zzrullkf 0/1 ContainerCreating 0 0s pod-job02-zzrullkf 1/1 Running 0 3s pod-job02-zzrullkf 0/1 Completed 0 5s pod-job02-zzrullkf 0/1 Completed 0 6s pod-job02-zzrullkf 0/1 Terminating 0 7s pod-job02-zzrullkf 0/1 Terminating 0 7s ・・・・ pod-job02-a54509pa 0/1 Completed 0 10m ←初回分がようやく終了 pod-job02-a54509pa 0/1 Completed 0 10m pod-job02-a54509pa 0/1 Terminating 0 10m pod-job02-a54509pa 0/1 Terminating 0 10m ・・・・

Airflowには処理実行の並列度を制御するパラメータも多く存在し、以下のパラメータにより同一バッチジョブワークフローの同時実行制御が実現できます。
max_active_runs_per_dag
DAG単位での同時実行可能最大数。
Configuration Reference — Airflow Documentation
ただ、上記パラメータはAirflow全体に影響するパラメータの為、各DAG単位で設定可能なようにしたい場合は、DAGレベルで「max_active_runs
」パラメータを設定することにより上書き可能です。
では、「max_active_runs 」パラメータを設定して再度実行してみます。
DAG
・・・ with models.DAG( dag_id="sch-job-flows48", schedule_interval=timedelta(minutes=5), ・・・ max_active_runs=1 ← ) as dag: ・・・
今度は以下のように、実行中のバッチジョブワークフローを追い越すことなく、順次実行されることが確認できました。
k get po -n composer-user-workloads -w NAME READY STATUS RESTARTS AGE pod-job01-07uqwxdk 0/1 Pending 0 0s pod-job01-07uqwxdk 0/1 Pending 0 0s pod-job01-07uqwxdk 0/1 ContainerCreating 0 0s pod-job01-07uqwxdk 1/1 Running 0 3s pod-job01-07uqwxdk 0/1 Completed 0 5s pod-job01-07uqwxdk 0/1 Completed 0 7s pod-job01-07uqwxdk 0/1 Terminating 0 7s pod-job01-07uqwxdk 0/1 Terminating 0 7s pod-job02-wbspkjmv 0/1 Pending 0 0s pod-job02-wbspkjmv 0/1 Pending 0 0s pod-job02-wbspkjmv 0/1 ContainerCreating 0 0s pod-job02-wbspkjmv 1/1 Running 0 4s pod-job02-wbspkjmv 0/1 Completed 0 10m pod-job02-wbspkjmv 0/1 Completed 0 10m pod-job02-wbspkjmv 0/1 Terminating 0 10m pod-job02-wbspkjmv 0/1 Terminating 0 10m pod-job01-fvwbijp5 0/1 Pending 0 0s pod-job01-fvwbijp5 0/1 Pending 0 0s pod-job01-fvwbijp5 0/1 ContainerCreating 0 0s pod-job01-fvwbijp5 1/1 Running 0 3s pod-job01-fvwbijp5 0/1 Completed 0 5s pod-job01-fvwbijp5 0/1 Completed 0 7s pod-job01-fvwbijp5 0/1 Terminating 0 7s pod-job01-fvwbijp5 0/1 Terminating 0 7s pod-job02-9wh4hte4 0/1 Pending 0 0s pod-job02-9wh4hte4 0/1 Pending 0 0s pod-job02-9wh4hte4 0/1 ContainerCreating 0 0s pod-job02-9wh4hte4 1/1 Running 0 4s pod-job02-9wh4hte4 0/1 Completed 0 5s pod-job02-9wh4hte4 0/1 Completed 0 7s pod-job02-9wh4hte4 0/1 Terminating 0 8s pod-job02-9wh4hte4 0/1 Terminating 0 8s pod-job01-y21meafr 0/1 Pending 0 0s pod-job01-y21meafr 0/1 Pending 0 0s pod-job01-y21meafr 0/1 ContainerCreating 0 0s pod-job01-y21meafr 1/1 Running 0 3s pod-job01-y21meafr 0/1 Completed 0 5s pod-job01-y21meafr 0/1 Completed 0 7s pod-job01-y21meafr 0/1 Terminating 0 7s pod-job01-y21meafr 0/1 Terminating 0 7s pod-job02-bc0oajpw 0/1 Pending 0 0s pod-job02-bc0oajpw 0/1 Pending 0 0s pod-job02-bc0oajpw 0/1 ContainerCreating 0 0s pod-job02-bc0oajpw 1/1 Running 0 3s pod-job02-bc0oajpw 0/1 Completed 0 5s pod-job02-bc0oajpw 0/1 Completed 0 7s pod-job02-bc0oajpw 0/1 Terminating 0 7s pod-job02-bc0oajpw 0/1 Terminating 0 7s
なお、この場合は遅延したバッチジョブワークフロー終了日時を起点に再度スケジューリングされるようです。

また、複数バッチジョブワークフロー(DAG)間の実行を制御(DAG-Aが完了後にDAG-Bを実行のような制御)したい場合は、以下ページの「ExternalTaskSensor
」等を使用することで実現できますのでご確認ください。
airflow.sensors.external_task — Airflow Documentation
3.2 バッチ運用日時
前編の「2.1 バッチジョブワークフローのトリガー」にて記載の通り、キャッチアップ機能によりAirflowは実行期間内に未実行のフローを実行します。過去日時時点でのバッチジョブワークフローとして実行することとなるので、バッチジョブワークフローは実際の実行日時として処理するのではなく、論理的なバッチ運用日時(過去日時)として処理する必要があります。
論理的なバッチ実行日時は「logical_date(論理日時)」
変数(JINJAテンプレート
)で取得可能で、以下のような記述でバッチジョブへ連携できます。
※JSTで取得する場合はタイムゾーンを指定してください
# DAG ※JSTで取得 ・・・ job01 = GKEStartPodOperator( ・・・・ cmds=["python3", "job01.py", "{{ logical_date.in_tz('Asia/Tokyo') }}"], ←タイムゾーン指定で連携 ・・・ # Pythonコード ・・・・ if __name__ == '__main__': print("Operation date {}...".format(sys.argv[1])) ・・・ # ログ ・・・・ [2023-05-01, 16:42:49 JST] {pod_manager.py:235} INFO - Operation date 2023-05-01T16:42:25.093874+09:00... [2023-05-01, 16:42:49 JST] {pod_manager.py:235} INFO - Downloaded storage object input/query.csv from bucket searchip-poc-batch-bucket to local file /app/share/query.csv. [2023-05-01, 16:42:49 JST] {pod_manager.py:235} INFO - Uploaded storage object /app/share/query.csv from local file searchip-poc-batch-bucket to bucket output/query.csv. [2023-05-01, 16:42:50 JST] {pod_manager.py:288} INFO - Pod pod-job01-dnmwfv1b has phase Running ・・・・・
また、キャッチアップ機能による過去実行ではなく、明示的に過去日時を指定して実施したい場合には、管理コンソールから日付を指定して実行します。
以下の通り過去日時を指定し、バッチ運用日時として過去日時を取得することができました。


# ログ ・・・ [2023-05-01, 17:34:27 JST] {pod_manager.py:235} INFO - Operation date 2023-04-05T11:00:00+09:00... [2023-05-01, 17:34:27 JST] {pod_manager.py:235} INFO - Downloaded storage object input/query.csv from bucket searchip-poc-batch-bucket to local file /app/share/query.csv. [2023-05-01, 17:34:27 JST] {pod_manager.py:235} INFO - Uploaded storage object /app/share/query.csv from local file searchip-poc-batch-bucket to bucket output/query.csv. [2023-05-01, 17:34:28 JST] {pod_manager.py:288} INFO - Pod pod-job01-ewx9nofc has phase Running ・・・
3.3 監視
最後に運用上必須となる監視について確認します。
バッチジョブの実行状況等はAirflow管理コンソールから確認可能で、各ステータス画面やログ等の状況に応じて前項でご紹介したようなリトライや強制停止などの実行制御操作が可能ですが、運用担当が常に管理画面を参照しているとは限りません。エラー時や処理遅延時に運用担当へアラート通知するなどして能動的に検知する仕組みが必要になってきます。特に処理遅延は気付きにくいポイントでもあるので、ここでは「遅延監視」
にフォーカスして確認していきたいと思います。
まず、タイムアウトに関する設定には以下のようなものがあります。
dagrun_timeout
DAG(バッチジョブワークフロー)全体のタイムアウト値
※DAGレベルで指定
execution_timeout
バッチジョブ単位のタイムアウト値
※バッチジョブ(タスク)レベルで指定
startup_timeout_seconds
Podがアサインされて実行されるまでのタイムアウト値(秒)
※バッチジョブ(タスク)レベルで指定
タイムアウトがしっかりと機能するか確認してみます。
DAGに「dagrun_timeout」を「1分」で設定します。
DAG
・・・ dagrun_timeout=timedelta(minutes=1), # タイムアウト時間 ・・・
「pod-job01→pod-job02」のバッチジョブワークフローを作成して、「pod-job02」のみ数十秒スリープさせて、ジョブフローがトータル1分以上になるようにして実行します。
以下の通り、タイムアウトにて中断されることが確認できました。
# 実行ログ ・・・ [2023-04-27, 16:23:12 JST] {local_task_job.py:217} WARNING - DagRun timed out after 0:01:04.655821. [2023-04-27, 16:23:17 JST] {local_task_job.py:217} WARNING - DagRun timed out after 0:01:09.797571. [2023-04-27, 16:23:17 JST] {local_task_job.py:223} WARNING - State of this instance has been externally set to skipped. Terminating instance. [2023-04-27, 16:23:17 JST] {process_utils.py:129} INFO - Sending Signals.SIGTERM to group 8699. PIDs of all processes in the group: [8699] [2023-04-27, 16:23:17 JST] {process_utils.py:84} INFO - Sending the signal Signals.SIGTERM to group 8699 [2023-04-27, 16:23:17 JST] {taskinstance.py:1564} ERROR - Received SIGTERM. Terminating subprocesses. [2023-04-27, 16:23:17 JST] {kubernetes_pod.py:690} INFO - Deleting pod: pod-job02-n7cfv5rs [2023-04-27, 16:23:18 JST] {process_utils.py:79} INFO - Process psutil.Process(pid=8699, status='terminated', exitcode=0, started='07:22:45') (8699) terminated with exit code 0 # Podステータス遷移 >k get po -n composer-user-workloads -w NAME READY STATUS RESTARTS AGE pod-job01-1h3m9mp3 0/1 Pending 0 0s pod-job01-1h3m9mp3 0/1 Pending 0 0s pod-job01-1h3m9mp3 0/1 ContainerCreating 0 0s pod-job01-1h3m9mp3 1/1 Running 0 7s pod-job01-1h3m9mp3 0/1 Completed 0 10s pod-job01-1h3m9mp3 0/1 Completed 0 12s pod-job01-1h3m9mp3 0/1 Terminating 0 12s pod-job01-1h3m9mp3 0/1 Terminating 0 12s pod-job02-n7cfv5rs 0/1 Pending 0 0s pod-job02-n7cfv5rs 0/1 Pending 0 0s pod-job02-n7cfv5rs 0/1 ContainerCreating 0 0s pod-job02-n7cfv5rs 1/1 Running 0 7s pod-job02-n7cfv5rs 1/1 Terminating 0 18s ←中断 pod-job02-n7cfv5rs 1/1 Terminating 0 18s pod-job02-n7cfv5rs 0/1 Terminating 0 49s pod-job02-n7cfv5rs 0/1 Terminating 0 49s pod-job02-n7cfv5rs 0/1 Terminating 0 49s

※この場合はスキップ扱い(ピンク色枠)となるようです
次にバッチジョブ(タスク)単位のタイムアウト時間の検証。
DAGで「pod-job01」の「execution_timeout」を「10秒」で設定してみます。
DAG
・・・ execution_timeout=timedelta(seconds=10), ・・・
「pod-job01」→「pod-job02」のバッチジョブワークフローにて、「pod-job01」のみ数十秒スリープさせて、バッチジョブ単位のタイムアウト時間10秒を超過するようにして実行。
以下の通り、タイムアウトにて中断されることが確認できました。
# 実行ログ ・・・ airflow.exceptions.AirflowTaskTimeout: Timeout, PID: 9241 [2023-04-27, 16:41:24 JST] {taskinstance.py:1402} INFO - Marking task as FAILED. dag_id=dag-sch-batch-flows52, task_id=pod-job01, execution_date=20230427T074010, start_date=20230427T074110, end_date=20230427T074124 [2023-04-27, 16:41:24 JST] {logging_mixin.py:137} INFO - Task dag-sch-batch-flows52__pod-job01__20230427 failed! ・・・ # Podステータス遷移 >k get po -n composer-user-workloads -w NAME READY STATUS RESTARTS AGE pod-job01-h05ghqgt 0/1 Pending 0 0s pod-job01-h05ghqgt 0/1 Pending 0 0s pod-job01-h05ghqgt 0/1 ContainerCreating 0 0s pod-job01-h05ghqgt 1/1 Running 0 7s ←開始 pod-job01-h05ghqgt 1/1 Running 0 17s pod-job01-h05ghqgt 1/1 Terminating 0 17s ←中断 pod-job01-h05ghqgt 0/1 Terminating 0 41s pod-job01-h05ghqgt 0/1 Terminating 0 41s pod-job01-h05ghqgt 0/1 Terminating 0 41s

SLA
上記タイムアウトはあくまで処理実行時間の制限値であり、それを超えた場合にバッチジョブは異常終了扱いとなります。しかしながら、処理は継続させたまま、遅延アラートを発報し、何らかのアクションを講じたいシーンも多々存在すると思います。Airflowには「SLA」という機能があり、処理は止めないが「SLA違反」として遅延を通知する仕組みになります。
SLA
Tasks — Airflow Documentation
SLAに関する設定には以下のようなものがあります。
sla
バッチジョブ単位のSLA違反までのタイムアウト値
※バッチジョブ(タスク)レベルで指定
sla_miss_callback
SLA違反になった場合にトリガーされるコールバックメソッド
SLAを30秒に設定し、SLA違反時に呼び出されるコールバックメソッドにより出力される文言をCloud Loggingアラートにより発報できるかを確認します。実行されるジョブはSLA違反になるように60秒間スリープするように設定します。
※この文言にてアラート通知をするようにCloud Loggingアラートには設定済み
DAG
・・・ def sla_callback(dag, task_list, blocking_task_list, slas, blocking_tis): print("***SLA Alert!!***") ・・・ with models.DAG( ・・・・ dagrun_timeout=timedelta(minutes=10), sla_miss_callback=sla_callback ) as dag: job01 = GKEStartPodOperator( ・・・ cmds=["sh", "-c", "sleep 60s"], sla=timedelta(seconds=30) ) ・・・・ job01 >> job02
しかしながら、Airflow管理コンソール上ではSLA違反として記録されるようですが、一向にアラートが通知されません。Cloud Loggingコンソール上でもコールバックメソッドが出力するメッセージが見つかりませんでした。
その後調査を続けたところ、そもそもの仕様が望んでいたものと異なることがわかりました。望んでいる仕様は「SLA違反のタイミングでアラート通知される」
でしたが、実際の仕様は「タスクが終了したタイミングで事後判定される」
ようです。これは以下のGitHub Issueでも言及されており、「一般的に望んでいる仕様ではない為、新しい仕様で作り替えるべき」のようなコメントもされています。
Alert if a task misses deadline · Issue #18031 · apache/airflow
なお、このような現状から以下のようにSLAを外部で監視する実装を推奨しているようです。
The SLAyer your Data Pipeline Needs
残念ながら、検証で使用したAirflowバージョンでのSLA機能では要件は満たせない為、機能のアップデートを期待しながら別の方法で遅延監視実装するしかなさそうです。
4. 実機検証を通して
Composer(Airflow)は確認観点が多く、全ては検証しきれませんでしたが、最低限確認すべき項目は試せたのではないかと思います。結果、できること、できないことありましたが、バッチジョブワークフローの実行基盤として必須な機能自体はほぼ揃っているように感じました。ただ、色々と考慮しなければいけない事項もあり、筆者が検証を通して思った懸念事項や使用するにあたっての考慮事項を最後にまとめておきたいと思います。
①実行タイミング
前編の「2.1 バッチジョブワークフローのトリガー」にて記載した通り、実行タイミングのクセがかなり強く、しっかりと仕様を理解した上で使用しないと、想定外の結果を招く可能性があります。直感的な実行タイミングになるよう本家に改善を要望したいですが、少なくとも執筆時点では仕様の理解と訓練を繰り返すしかなさそうです。開発・運用体制次第では、担当者には直感的に記述してもらい、デプロイ時に裏で変換するような共通機能の検討も必要になってくるかもしれません。
②DAGの更新
検証内容としては掲載しませんでしたが、バッチジョブワークフローの更新、つまりDAGの更新についても考慮が必要です。執筆時点では、Airflowにはバージョン管理という考え方がなく 、DAGに対する「一意性」はあくまで「ファイル単位」 の為、DagIDが異なっていてもファイル名が同一の場合は同一視されます。古いDagIDのDAGは削除されることになってしまう 為、運用には注意が必要です。何も考えずに同一ファイル名で更新してしまうと、仮に途中経過(実行中や停止中、リトライ待ち等)のバッチジョブワークフローがあっても強制的に洗い替えられてしまいます。このような事象が発生するのを避ける為、筆者としては、以下のようなルール付けをするのが好ましいのではないかと思います。
・DagIDをバージョンとすること
・DAG内のコードを変更するたびにDagID(=バージョン)を更新すること
・基本的にファイル名とDagIDは一致させること(こうすることで一意性問題の発生を防げる)
※ただし、上記は1DAG=1バッチジョブワークフローのようなシンプルな構成の場合で、DAG同士を連携する(DAGからDAGを呼ぶ)ような構成の場合はこの限りではありません
このバージョン管理問題は、運用上の課題になってくると思いますので、本家のアップデートに期待したいところです。
また、速度的なところで言うと、DAG更新(GCSへアップロード)してから反映されるまでが非常に遅いです。Airflowへの反映のタイミングはairflow.cfgの「schedule → dag_dir_list_interval」
」パラメータに依存します。変更したい場合は以下のようにComposerの管理コンソールからオーバライドしていただくのが良いかと思います。
「30秒」への変更例

③実行クラスタ
本検証での構成においては、ワークロードは別のGKEクラスタで実行しました。この場合、Composer用のクラスタに加え、ワークロード実行用のクラスタも常時起動しておかなければならない為、維持コストがかかります。バッチジョブワークフロー自体は常時動くものではない為、「GKECreateClusterOperator」、「GKEDeleteClusterOperator」 を使用して、実行時にワークロード実行用のクラスタを一時的に作成し、終わったら削除するようなバッチジョブワークフローをご検討いただくのが良いかもしれません。
(再掲)Google Kubernetes Engine Operator を使用する | Cloud Composer | Google Cloud
(再掲)Google Kubernetes Engine Operators — apache-airflow-providers-google Documentation
④同時実行制御
「3.1 実行制御 → 同時実行制御」の項にて記述した通り、AirflowにはDAG単位での同時実行数制御の他にも並列度を制御できるパラメータが多くあります。
※以下に一部のみ掲載します
parallelism
Airflow全体での同時実行最大数。
task_concurrency
タスク単位での同時実行最大数。
pool / pool_slots
タスク単位のプールと消費スロット数。割り当てられたプールでタスクを実行することで同時実行制御が可能。
※未指定の場合は、既存のデフォルトプールに割り当てられる
これらのパラメータをチューニングすることによる効果には、全体の処理性能向上もありますが、多重実行されてしまった場合などに外部サービスや連携先システムへの負荷に対する流量制御としての効果もあります。また、「①実行タイミング」にて言及した通り、Airflowのスケジューリングは難易度が高く、意図せず実行されてしまうリスクも少なからずあります。よって、実運用していく際にはこの辺のパラメータも忘れずに設計していくことをお勧めします。
また、同時実行の並列度ではなく、排他制御という観点ですと、以下のように先行バッチジョブ(タスク)の結果によって実行有無を制御するパラメータも存在します。制御は各バッチジョブ(タスク)単位となるようですが、必要に応じて適用をご検討いただければと思います。
depends_on_past
前回のバッチジョブ(タスク)実行結果がエラーの場合、該当のバッチジョブ(タスク)は実行しない。
wait_for_downstream
前回のバッチジョブ(タスク)直下のバッチジョブ(タスク)の実行結果がエラーの場合、該当のバッチジョブ(タスク)は実行しない。
なお、実行制御関連では上記のように比較的柔軟な設定が可能ですが、不測の事態に備える意味でも、バッチジョブアプリケーション自体は、並列に何度実行されても同じ結果となるよう冪等に実装することを意識していただいたほうがよいと思います。
⑤遅延監視
「3.3 監視 → SLA」の検証結果の通り、SLA機能の仕様がイマイチで遅延監視としては使用できなそうです。本家のアップデートに期待したいですが、それまでは別の方法を考える必要があります。Cloud Monitoringのメトリクスでバッチジョブワークフロー単位(DAG全体)、バッチジョブ(タスク)単位の実行時間が取得できるので、そこにアラート設定する方法があります。以下の通り、想定通りアラート通知が機能することを確認しています。遅延監視実装時の参考にしていただければと思います。
Cloud Monitoringでアラート設定

バッチジョブワークフロー(DAG全体)遅延アラートメール

バッチジョブ(タスク)レベル遅延アラートメール

さいごに
Comoserの検証を進めてきましたが、多機能でかつAirflowの仕様のクセが強い為、学習コストは比較的高くなりそうな印象を受けました。ただ、Airflowは他クラウドでも採用されており(e.g. AWS Amazon Managed Workflows for Apache Airflow (MWAA))、今後Cloud Native時代のワークフロー実行基盤のデファクトになる可能性も高く、一度覚えて使いこなせるようになればエンジニアとしての強みも増すのではないでしょうか。
今回は機能メインで検証してきましたが、引き続き非機能(可用性や性能面等)も含めた検証を進めていきたいと考えております。
以上