SPECIALIST

多様な専門性を持つNRIデジタル社員のコラム、インタビューやインサイトをご紹介します。

BACK

AWS Glue for Spark ジョブ開発の手引き(前編)

はじめに

こんにちは、NRIデジタルの松村です。
最近、サーバレスでジョブ環境を構築するプロダクトである、Job Atelierを公開しました。Job Atelierでは、ジョブをAWS GlueやAWS Lambda、ワークフローをAWS Step Functionsを使うことで、サーバレスでのジョブ環境を実現しています。筆者も2023年には複数のプロジェクトでGlueを使ったジョブを開発しました。
AWS Glueは様々な機能が盛り込まれてきていますが、その中心となっているのは、最も古くに開発された機能の一つである、ジョブ機能、特に Apache Spark を利用できる AWS Glue for Spark ジョブ(以下、Sparkジョブ)ではないかと考えています。Sparkジョブは、PySpark を通じてApache Sparkの機能を最大限に活用できるコンピューティングプラットフォームであり、AWSがGlue拡張機能を開発していることにより各種AWSサービスともシームレスに結合することができる、パワフルなツールです(SparkやPySparkの説明は後述します)。
初期リリース時点の2017年から、ある程度の処理プロセスについては自動的にコードを生成する機能が付属しており、2020年に発表されたGlue Studioを用いるとGUIでの処理開発もかなりの範囲で実現できるようになってきました。2022年にはGlue Studioにカスタムビジュアル変換が追加され、ユーザ作成機能もGUI上で追加可能となりました。

AWS Glue のドキュメント履歴 – AWS Glueより筆者抜粋

一方で、Sparkの様々な機能を利用したり、少し変わった処理を開発しようと思うとコードで実装することが必要になります。例えば筆者も、DynamoDB内へ構造化したデータをロードするために、Glue Studio では完結することができず、Glue上での開発を実施していました。この開発、公式・非公式含めて様々なドキュメントが公開されているのですが、様々な技術が組み合わされていること、そのバックボーンで必要となる知識も多いことから、中々難易度が高く、よく困ります。そのため、Sparkジョブ開発をスムーズに進めていくことができるよう、本稿を用意しました。

本稿では、Sparkジョブ開発に関連する各ライブラリとその関係性をはじめとして、利用方法、処理パターン、テスト方式、エラー調査方法、などを、筆者の経験を交えて説明します。もちろん、実際の機能量から比べるとごく一部の内容にはなってしまいますが、少しでも開発の助けになれれば幸いです。

なお、本文内でAWS関連のアイコンを使用します。AWS シンプルアイコン – AWS アーキテクチャーセンター | AWSで提供されるアイコンを使用しています。ご認識おきください。

また、各参照先ドキュメントは 2024/2 時点の情報になります。そのため、今後の更新により記事と実態がずれる可能性があります。ご認識おきください。

記事は全体で、前編、中編後編の3編構成になります。前編ではGlueやSparkに関する説明を行います。中編では実際の開発を行うための基礎となる部分について、実コードを交えて説明します。後編では、実践的な開発を行うためのライブラリを活用した説明や機能調査の進め方、テスト実行方法などの、開発を進めていくために不可欠な要素について述べていきます。間をあけずに公開していく予定ですので、是非最後まで読んでみてください。

対象者と前提知識

本稿では以下の対象者を想定しています。

  • PythonでGlueのSparkジョブ(PySpark)を開発しようとしている方
  • Sparkジョブ開発にあたり、何を知っておかなければいけないかを知りたい方

以下の内容については詳しくは説明しません。必要に応じて、別途調べてみてください。なお、知らなかったとしてもある程度読み進めることはできるように構成していますので、その点はご安心ください。

  • Pythonの基礎知識
  • ETLに関する基礎(ETL(Extract/Transform/Load)とは何か、など)
  • AWS Glue自体のサービスラインナップ、位置づけ、機能説明
  • Spark/PySpark
  • ストリーミングジョブ
  • ワークフローなど、ジョブ群を一連の流れとして動かすための仕組み

また、本稿では概念的な理解を優先するため、あえて正確ではない表現をしている個所がいくつかあります。詳しく調べていくと本稿記載と矛盾する部分があるかもしれませんが、その場合は「この記事ではざっくりした概念についての説明をしているので、詳細は異なる場合がある」と考え、各原典の記載を正しいものとしてご確認ください。

ETLとAWS Glue for Sparkジョブ

AWS Glueは、AWSにおいて分析系の仕組みの中で中核的位置づけとなるサービスです。もちろん、分析系の仕組みで前半戦に位置づけられるETL処理についてのサポートも行っています。

AWS上でETLを実行するためには、以下のアプローチが考えられます。

  1. AWS Glueを利用してジョブを構築する
  2. EC2や、ECS/EKSをベースとしたコンテナサービスなどのコンピューティングリソースを活用し、その上で処理する仕組みを構築する
  3. AWS Lambdaを利用してジョブを構築する
  4. Amazon Athenaを利用して処理する
  5. AWS Batchを利用してジョブを構築する


それぞれの項目についてもう少し詳しく説明します。

  1. AWS Glueを利用してジョブを構築する

    AWS Glue

    AWS Glue は、サーバレスでETLジョブを構築する際の有力な候補です。実装言語はPythonに限定されますが、シェルジョブ(シンプルなPythonプログラム)、PySpark、Ray、といった複数のフレームワーク及び稼働させるための環境がセットで提供され、利用可能です。AWSの他サービスはもちろんのこと、他クラウド等のデータソースに接続するためのコネクタも豊富に用意されているため、複数のクラウドにデータソースが散らばっている場合でも採用できます。本稿のメインコンテンツでもあるため、詳細は後述します。

  2. EC2や、ECS/EKSをベースとしたコンテナサービスなどのコンピューティングリソースを活用し、その上で処理する仕組みを構築する

    Amazon EC2
    Amazon ECS

    Amazon EC2, ECS などは、純粋にコンピューティングリソースの提供を受けてアプリケーションを開発するアプローチです。非常に柔軟性が高く様々な処理系やDockerイメージなどを利用可能ですが、その分、自分たちで環境構築をする必要があります。

  3. AWS Lambdaを利用してジョブを構築する

    AWS Lambda

    AWS Lambda は、ともすればGlueよりよく知られているサービスかもしれません。Lambdaは、FaaS(Function as a Service)とも呼ばれる、関数を実行するためのサービスです。様々な言語プラットフォームを持ち、単純な、インプットに対する処理を定義することで様々なユースケースに対応可能です。一方で、「特定の1つの処理をシンプルに実行する」ことを重視してサービスが設計されているため、ETLのような1処理が重くなりがちで実行時間も読めないような処理とはあまり相性がよくありません。その1つの例として挙げられますが、Lambdaの実行時間は最大15分という上限が決まっており、拡張不可です。リクエストごとに独立して処理をさせるなどのシーンでは有用ですが、まとめて大量に処理を行う場合にはこれらの特性を理解した利用が必要です。

  4. Amazon Athenaを利用して処理する

    Amazon Athena

    Amazon Athena は、S3を中心としたデータレイク/データソースの間で、SQLクエリベースの処理を実行できる基盤になります。CTAS(Create Table As Select)と呼ばれる、クエリ結果を利用して新しいテーブルを作成する、といった方法を併用することで、ETLを一部実現することができます。SQLクエリという、多くの人になじみがあるツールを使用できることから、弊社内でも常に一定量のユーザがいます。とても便利ではありますが、クエリで表現が難しい処理が実現できない、基本的にはS3上でデータが取り扱われるために他データストアとのやり取りは一定の工夫が必要、コンピューティングリソースが全て自動で割り当てられるために性能の保証やチューニングが難しい、などの点があります。これらの点から、ユースケースに応じて利用を吟味する必要があります。また、処理自体も非同期で動くため、ジョブで使用する場合には終了を待ち合わせるための仕組みを別途用意する必要があります。

  5. AWS Batchを利用してジョブを構築する

    AWS Batch

    AWS Batch は、日本でよく言う「バッチ処理」という用語から並べられやすいためここに挙げていますが、実際は他の4つとは種類が異なります。以下で、AWS Batchにおける「バッチ」と、日本でよく言う「バッチ処理」の違いについて簡単に説明します。

    特定の処理について、大量のコンピューティングリソースを使用して処理を進めたい場合には、この処理環境を統一的に構築する必要がありますが、これはある程度手間のかかる仕事になります。AWS Batchは、こういう「特定の、一度に大量のコンピューティングリソースを使用する処理」を行う際に効率的にリソースの設定、調達、実行、完了確認などを行うために便利な仕組みを提供するサービスになっています。実態としては、コンピューティングリソース自体は、EC2やFargateなどを利用し、直接インスタンスを立ち上げたり、この上でECSやEKSなどのコンテナ管理に基づいてコンテナを立ち上げたりして調達を行います。
    一方、よく言われる「バッチ処理」は、ワークフローとジョブの組み合わせで表現されることが多いです。個々でどのような処理をするのかについては「ジョブ」として定義し、これらをどの順番で実行するのか、およびどのタイミングで実行するのか、また、異常終了時にどこで異常終了し、どこから再開すればよいのかなどを制御する仕組みとして「ワークフロー」がある、という考え方になります。こちらの考え方に従うと、「バッチ」は「ジョブ」の一形態、ということになります。日本国内でも用語のブレが大きいため、「ジョブ」と「バッチ」は上記それぞれの意味でお互い使用されることも多く、文脈に応じて何を意味しているかを理解する必要があります。本稿では上述の定義に従うものとし、以降のAWS Glueで実装する対象は「ジョブ」として統一表記します。

悩ましい……

以上より、AWS Glueは、開発したいETL処理に集中しやすく、特にPython環境に慣れ親しんでいる方たちからは使いやすい選択肢である、と言えます。

AWS Glue
Amazon EC2
Amazon ECS
AWS Lambda
Amazon Athena

言語

Python/Scala

自由

Java、Go、PowerShell、Node.js、C#、Python、Ruby

SQL

AWS管理度

追加検討事項

15分制限あり

非同期、性能チューニングが難しい

Glueジョブの種類

Glue上でジョブを開発する方法として、3種類の方法が提供されています。

  1. Pythonシェルジョブ
    単純に、Pythonのスクリプトを実行するだけのジョブです。とても分かりやすいですが、並列実行はできない1)厳密には、Python内での並列実行は可能ですが、複数のインスタンスを立ち上げて並列処理させることはできない、です。ため、大量のデータを処理する際にはスケーリングの観点から不安があります。
  2. Sparkジョブ
    PySparkを軸としたSparkシステム上で処理を行うジョブです。PySparkおよびその拡張版であるglueのライブラリを使用することで、Spark分散処理の機能を利用できます。そのため必要となるような複数のインスタンス展開やdriverとexecutorの構成などはGlueがすべて提供してくれます。ログやSpark Web UIなど、モニタリングや問題原因調査を行うための仕組みも併せて提供を受けられます。なお、Sparkはscala(Java VM上で稼働する、オブジェクト指向と関数型の双方に親和性がある言語)用のSDKも提供しているため、scalaでの実装も可能になっています。
  3. Rayジョブ
    2023/5提供とかなり新しい機能です。Ray自体も v1.0.0 リリースが 2020/9 と新しいライブラリで、Pythonプログラムの並列実行を実現するための仕組みを提供します。Sparkと比較して学習コストが低く、Pandasなど分析でよく使われているライブラリをそのまま並列処理化できる、ということで徐々に利用が進んでいるようです。ちなみに、「Ray」自体は一般用語のため、調べる場合は「python Ray」など、関連する用語を合わせて検索しましょう。

ETLジョブを実現するための手段としては、大量データ処理時にも問題なく並列でスケーリングできるSparkもしくはRayが選択されることが多いと思いますが、以降では、実績も多くGlue上での選択肢も多い、Sparkジョブで開発する前提で説明を進めます。

GlueとPySpark、Spark

Glueでは、PySparkをはじめ、ETLで使用する様々なPythonライブラリがデフォルトでインストールされた状態で提供されます。また、GlueではPySparkのライブラリを拡張したいくつかの機能が提供されます。これらを元に開発を進めていくわけですが、ここで改めてそれぞれの位置づけについて説明したいと思います。これらを理解することで、各ライブラリをどのように取り扱えばよいかについて理解が深まります。

Spark

出典)https://spark.apache.org/images/spark-logo.png

Sparkは、Apache Sparkプロジェクトで提供される、MapReduceの考え方を基底とした分散処理の仕組みです。Apache SparkのHPではmulti-language engineと記載されていますが、Javaでベースのエンジンが記述されており、これが様々な言語から利用できるようにSDKが提供されている、という表現が一番近しいかと思います。例えばPySparkで処理を実行した場合も、その後ろではJavaVM上で稼働して処理を遂行します。この仕組みは、問題発生時の調査を行う際に重要なポイントとなるため、覚えておいてください。
また、Sparkプロジェクトでは、Spark本体以外にも、HDFS、Hive、SparkStreamingなど、様々なライブラリを提供しています。これらはGlueでも使用できるものも多く、関連する部分ではSparkの知識が求められる場合も少なくありません。実装時、および実行時に問題となった場合に、これらのキーワードが出てきた時は、GlueだけでなくSparkのドキュメントをあたってみることで解決する場合もあります。なお、SparkはHadoop技術を基礎としてその課題点を解決するために組み上げられた仕組みでもあります。AWSがSparkとHadoopの比較を公式に提示していますので、興味がある方はご一読ください。

PySpark

PySparkは、Apache SparkにアクセスするためのPython APIです。pysparkというライブラリで提供されます。公式によると、Spark全ての機能をサポートしているとのことです。実行するためにはSparkの実行環境のようなものを立ち上げて利用する必要があり、 pyspark.sql.SparkSession というセッション内で処理を進めていくことになります。Spark処理で1点とても重要なのは、「記述した処理を読み込んだタイミングでは実行されない」ということです。何を言っているかわからない方も多いと思いますが、(Py)Sparkの処理は、コードを読み込んだタイミングでは構造を定義されているだけであり、出力(アクション)が定義されたタイミングで初めて実行される、という挙動をします。AWS Black Beltの、パフォーマンスチューニングの資料で詳しく解説されていますので、興味がある方はそちらをご参照ください。IaC(Infrastructure as Code)を実装されている方はなじみがある概念かもしれません。この点も、デバッグを行う際にはとても重要になります。

Glue

最後にGlueです。Glueが拡張する機能はawsglueというライブラリで提供されます。ここでは、GlueがPySparkを拡張したライブラリのことを指します。筆者の理解では、主に2つの点をサポートするべく、GlueはPySparkのライブラリを拡張しています。
1つは、読み込み時の型定義です。Glueは、Glue Studioなどからも透けて見えるように、「配置されているデータを手軽に処理したい」といった要求に応えるというのが一つのユースケースで存在します。また、DynamoDBなど、同じ項目だが複数の型種類が存在する、などのデータ構造に対しても対応したい、というニーズがあります。これらの型柔軟性を対応するために、読み込み時に型推定したり、特定のカラムで複数のデータ型を扱えたりという拡張がなされています。
もう1つはAWSサービスとの親和性です。Sparkは、基本的にApache Hadoopの技術をベースとしています。ここで重要な点だけ取り上げると、処理と処理の間は複数のファイル群によって媒介されるということです。もちろん、RDBなどの一般的なデータストアとの接続についてはサポートライブラリがありますが、Glue DataCatalog、DynamoDBなど、AWSの提供するデータストアに対するコネクタがあるわけではありません。これらの様々なデータストアに対応した接続オプションを、Glue拡張ライブラリでは提供しています。一方で、Glue拡張ライブラリではある程度の処理系もサポートしてはいるものの、Sparkが提供する処理系と比べると非常に限られています。そのため、Glue Studioでも、一定種別の処理についてはSparkを使用するように実装されています。ここでは、Glue拡張機能はそれ単体でのみ使用するわけではなく、PySparkと併用して使用するものだ、という点だけご理解ください。

Sparkジョブの構造

ここまでの話を図でまとめるとこのようになります。

  1. SparkジョブはGlueが提供する、Spark実行環境上で動作します。これらの環境管理はGlueサービスが実施し、利用者は直接制御することなく、利用したい環境の指示(例えば、処理スペック、並列度、リトライ回数など)をするだけで利用できます。
  2. Spark環境を利用するために、PySparkおよびこれをAWSが拡張したawsglueライブラリが提供されます。利用者はこの上で実現したい処理を記述します。

前編のまとめ

前編では、どのようにAWS上でジョブ開発を進めていくべきかといった議論からスタートし、Sparkジョブの構造までを説明してきました。次の中編では、実際のサンプルコードも交えながら、一番シンプルなパターンでのジョブ開発と、その背景にある技術構造に迫ります。ご期待ください。

References   [ + ]

1. 厳密には、Python内での並列実行は可能ですが、複数のインスタンスを立ち上げて並列処理させることはできない、です。