SPECIALIST

多様な専門性を持つNRIデジタル社員のコラム、インタビューやインサイトをご紹介します。

BACK

AWS Glue for Spark ジョブ開発の手引き(中編)

こんにちは、NRIデジタルの松村です。
本記事は「AWS Glue for Sparkジョブ開発の手引き」の中編になります。前編では、AWS上でどのようにジョブ開発を進めていくべきかという議論からスタートし、その中で AWS Glue for Sparkジョブ(以下、Sparkジョブ)に着目して、その機能や構造について簡単に説明しました。

中編では、実際に開発を進めていくにあたり、一番の根幹となる DynamicFrame や DataFrame の説明から行います。その後に、一番シンプルなパターンのETLジョブを作りながら、それらがどのような仕組みで実現されているのかといった技術構造をひも解いていきます。それでは早速本文へ移りましょう。

DynamicFrame(glue)とDataFrame(PySpark)とDataFrame(Pandas)

実装を進める上では、まずはGlue拡張ライブラリを使用するところから始める方が多いのではないかと思います。これは、各種AWSサービスとの連結をシームレスにするために構築されているからです。例えば、DynamoDBから読み取る場合、Glue拡張ライブラリでは1処理で読み取れますが、PySparkで使用するためには、まず boto3 というAWS APIのPythonラッパーを使用してデータを取得後に変換する必要があります。また、 boto3 を使用すると内部的に並列処理することができないため、性能面でも劣ります。

Glue拡張ライブラリでは、データを格納するために DynamicFrame というクラスを使用します。Glue拡張ライブラリ内でデータを取り扱う際には DynamicFrame を使用する、という程度でご理解ください。

PySparkでは、同様に、データを格納するために DataFrame というクラスを使用します。これは、Glue拡張ライブラリでデータを格納する DynamicFrame と名前が似ていますが、クラス上の依存関係はありません。一定量の類似の機能(count, show など)が実装されていますが、仕様が異なるケースも見受けられますので、各ドキュメントを確認し、実装されている機能については都度確認することを推奨します。

DynamicFrame, DataFrame は異なるクラスであり、そのままそれぞれの機能を取り扱うことはできません。使いたい機能に合わせて変換する必要があります。DynamicFrame と相互に変換するためには、 DynamicFrame クラスに付属する、 toDF, fromDF 関数を使用します。前述の通り、PySparkでは SparkSession というセッション内で処理を進めることになり、各 DataFrame は SparkSession から派生して作成されます。同様に、 DynamicFrame も、glue版SparkSessionとでもいうべき、glueContext を使用します。glueContext はSparkSession上に構築されます。そのため、 glue DynamicFrame → pyspark DataFrame の変換をする際には glueContext 内に内包される SparkSession を暗黙的に使用し、 toDF では特に他の引数なく変換可能ですが、 pyspark DataFrame → glue DynamicFrame では明示的に glueContext を指定して fromDF 関数を使用する必要があります。

ここまでの内容を実装したサンプルコードはこちらになります。雰囲気を見ていただければ幸いです。

from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql import SparkSession

# GlueContextを作成
sc = SparkContext()
glueContext = GlueContext(sc)
# これが SparkSession
spark = glueContext.spark_session

# データソースがS3上にあると仮定し、パスを指定
s3_path = "s3://your_bucket/your_data/"

# Glue DynamicFrameにデータを読み込む
dyf = glueContext.create_dynamic_frame.from_catalog(
    database="your_database",
    table_name="your_table",
    transformation_ctx="dyf"
)

# DynamicFrameをDataFrameに変換
df = dynamic_frame.toDF()

# DataFrameをDynamicFrameに再度変換
dyf2 = DynamicFrame.fromDF(data_frame, glueContext, "dyf2")

上記の中でdyfdf というシノニムを使用していますが、glue DynamicFrame と pyspark DataFrame は同じコード内でもよく併用し、かつ間違いやすいので、これらのシノニムを使うことで判別できるようにしていることが多いようです。例えば、 my_database_dyf や、aggregated_df などのように postfix として使用するとわかりやすいかもしれません。類似の名前を付けて混在すると、当然ながら動きませんので、注意しましょう。

また、これまた似た名前ですが、pandasというライブラリにもDataFrameというクラスがあります。pandas はPython上でデータ解析を行うために便利なフレームワークで、データサイエンティストの方々が御用達のものです。行列演算を簡単に行える他、データ分析でよく使われる処理が一式ライブラリに含まれています。ちなみに、これまた PySpark の DataFrame とはクラス的な依存関係はありません。しかし、相互の変換は同様に可能で、 PySpark DataFrame → pandas DataFrame は PySpark DataFrame.toPandas 関数で、pandas DataFrame → PySpark DataFrame は PySpark SparkSession.createDataFrame 関数で実現できます。なお、PySparkおよびGlueでは並列処理実行のための仕組みを用意しておく必要があるためSessionオブジェクトが必要ですが、Pandasはそのような処理系での準備を行うものではないため、それ相当のものはありません。

ここまでで見てきた辺関係をまとめると、

Glue(awsglue) ⇔ PySpark ⇔ pandas

という変換が可能、とまとめることができます。

なお、こう見るとそれぞれ等価なライブラリであるように見えますが、Glue + PySpark と pandas で、ライブラリの目的は大きく異なります。 Glue + PySpark は、大規模データ処理を分散して実行するためのフレームワークです。一方で pandas はデータ分析処理を効率的に開発するためのライブラリです。このスタンスの違いは、それぞれの処理を理解するために重要ですので覚えておきましょう。

また、以降の説明では pandas が出てくることはほとんどないため、単に DataFrame と記載した場合は PySpark のものを指し示すこととします。

開発その1:DynamicFrameのみを使用する

それでは、前提となるサービス・ライブラリを一通り頭に入れたところで、実際の開発に入りましょう。様々なパターンが存在しますが、わかりやすい方から順にステップ・バイ・ステップで開発を進め、その後に関連するTipsのまとめやテスト方式などについて説明していきます。

一番初めになるのは、最もシンプルなパターンで、glue DynamicFrame のみを使用するパターンです。この方式は、基本的には悩みどころはほとんどないのですが、その中で唯一あるのは「何ができて何ができないのかわからない」という問題です。サンプルコードについては本家ドキュメントである程度記載されていますので、本稿ではサンプルは最小限として、どのように調べていけばよいのかという点を中心に、開発のセクションを進めていきます。

DynamicFrame関連のドキュメントとしては、以下の2つを中心に見ていくことになります。

  • PySpark拡張機能:DynamicFrame関連のクラスについての説明
  • PySpark変換:DynamicFrameを使った変換処理を行うクラスの説明。DynamicFrameにメソッドとして実装されているものと、ここで記載されているクラスを使用する必要があるものがある

以降は、Script Editorという、手動でコード開発をするベースとなるテンプレートからジョブを作成した状態を想定して進めます。Script Editorは、GlueのAWSコンソールで提供される機能の一つです。Glueジョブをコンソール上で開発するための方法はいくつかありますが、そのうち最もシンプルな、テンプレートコードのみを提供するものが、Script Editorになります。

以下のようなソースコードがテンプレートとして提供されます。以後、この前提で説明を進めます。(2024/2時点の状態になります)

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
job.commit()

ジョブのコードは、上記のうち job.init()job.commit() の間に記述していきます。なお、以後はこのテンプレート部分については省略し、出自が分かりにくいライブラリについてのみ、 import 文を追加することとします。

Read/Write(AWSネイティブリソース)

初めに行う必要があるのは、データの読み書きになるでしょう。データソースから取得し、別のデータソースへ書き込みます。初期時点で glueContext, sparkSession は存在していますので、これらは使えるものとします。

まず、データソースからの取得ですが、ここから既にドキュメントベースで追いかけるのが難しくなります。サンプルを確認すると、glueContext.create_dynamic_frame.from_catalog() という関数を呼び、DynamicFrameを作成しています。一方で、 glueContext のドキュメントにはこの関数/変数の記載がありません。実際の実装から取得した情報は以下のようになります。

pprint(vars(glueContext))

↓ (抜粋)
...
 'create_data_frame': <awsglue.dataframereader.DataFrameReader object at 0x7fb6d4a5b6a0>,
 'create_dynamic_frame': <awsglue.dynamicframe.DynamicFrameReader object at 0x7fb6e23b69e0>,
 'sparkSession': <pyspark.sql.session.SparkSession object at 0x7fb6d4a5ba90>,
 'spark_session': <pyspark.sql.session.SparkSession object at 0x7fb6d4a5ba90>,
 'write_data_frame': <awsglue.dataframewriter.DataFrameWriter object at 0x7fb6d4a5b310>,
 'write_dynamic_frame': <awsglue.dynamicframe.DynamicFrameWriter object at 0x7fb6d4a5b4f0>}

ここからわかることとしては、 glueContext の中身としては、 SparkSession オブジェクト(2つありますが、IDを見ればわかるように同じものです)以外に、DynamicFrame および DataFrame の Reader/Writer を持っている構造だということが分かります。つまり、 DynamicFrame を使用する場合は、

の2種類を使って読み書きする、と理解するのがよさそうです。つまり、例えば上述の読み出しにおける glueContext.create_dynamic_frame.from_catalog() は、 DynamicFrameReader の from_catalog() 関数を参照すれば使用することができます。

DynamicFrameReaderには from_rdd, from_options, from_catalogの3種類が、 DynamicFrameWriter には from_options, from_catalog, from_jdbc_conf の3種類がメソッドとして定義されているかと思います。この中で最もよく使うのは、様々なデータソースとの接続で使用される from_options になるでしょう。 from_catalog だけ少し説明をしておきますと、これは Glue DataCatalog で管理されているテーブルへアクセスする際に使用します。使用する時に確認すればわかると思いますので、ここでは割愛します。

なお、 glueContext のドキュメントには、create_dynamic_frame_from_options などの関数が定義されています。実は DynamicFrameReader / Writer はこれらのシノニムになっており、挙動としては同じです。 awsglue モジュールは awslabs の github 上で公開されており、例えば DynamicFrameReader の挙動はこちらで確認することが可能です。構造がどうなっているかが分かりにくい場合は、githubのソースやGlue上でのテスト稼働による検証などで確かめてみるとよいでしょう。

では、 from_options を使用して各データソースへどのように接続すればよいか、です。これは、 connection_type に応じて設定するべきパラメータが異なります。種別及びそれぞれで設定可能なパラメータについては、接続パラメータのページおよびその配下でまとめられています。コネクション種別ごとに、サポートされている Glue バージョンが異なりますので、接続したい対象のデータソースについて確認するように注意してください。基本的には最新バージョンにしておけばサポートされますが、一部マーケットプレイスで提供されているものなどは最新バージョンでは使用できない(検証されていない)場合があります。

以下はDynamoDBから読み取り、「ttl」項目を削除し、別のDynamoDBへ書き込むサンプルスクリプトです。

dyf = glue_context.create_dynamic_frame.from_options(
    connection_type="dynamodb",
    connection_options={"dynamodb.input.tableName": test_source,
        "dynamodb.throughput.read.percent": "1.0",
        "dynamodb.splits": "100"
    }
)
dyf = dyf.drop_fields(paths=["ttl"])

glue_context.write_dynamic_frame_from_options(
    frame=dyf,
    connection_type="dynamodb",
    connection_options={"dynamodb.output.tableName": test_sink,
        "dynamodb.throughput.write.percent": "1.0"
    }
)

ちなみに、上記で特徴的な処理/表記が2つあるので、解説しておきます。

1つは、 sink です。見慣れない方も多いと思いますが、これは恐らくデータフロー図などの概念から来ていると思われます。データフローを表現する時に、データの発生元と収束先を表す用語の1つとして、 sourcesink があります。キッチンのシンクと同じ意味で、吸い込まれていく先、ということですね。AWSの公式ページにあるETLの説明には、データソースの対義語として、データを流し込む先のデータレイク/データウェアハウスなどを総称して、データシンクと表現されています。そのため、書き込み先として時々 sink という言葉が出てくる場合があります。

もう1つは、 dyf = dyf.drop_fields(paths=["ttl"])です。同じ名前に再代入しています。DynamicFrame でも DataFrame でもそうですが、戻り値として変換後のFrameを戻す、というI/FをしているAPIが多いです。これは、マーティン・ファウラーの言う「流れるようなインタフェース」日本語訳)の形をイメージするのが分かりやすいと思いますが、要はメソッドチェーンの形で変換を記述することができるということです。

dyf = dyf.drop_fields(paths=["useless"], ...)
         .join(frame2=another_dyf, ...)
         .select_fields(paths=["key", "val"], ...)

みたいなことですね。ただ、これは一般的な Python のフォーマットルールに反することが多いので(というかPythonはスクリプト言語なので、1行目を見たら続きがないからそこで終わったと判断してしまいます。なので ...).join( のように明確に途中であることがわからないといけません)、

dyf = dyf.drop_fields(paths=["useless"], ...)
dyf = dyf.join(frame2=another_dyf, ...)
dyf = dyf.select_fields(paths=["key", "val"], ...)

みたいに書く方がお勧めです。こっちはフォーマッタを適用しても崩れません。これを見ればわかりますが、要はメソッドチェーンを表現しているだけなわけで、いちいち変換のたびに新しい変数名を定義するのは無駄です。ということで、同じ変数名で再代入を繰り返す方式がわかりやすいです。もちろん、このように一連の処理を表しているわけではなくて混在する場合、例えば

dyf1 = _create_dyf_from_dynamodb()  # 別の所で定義した、DynamoDBから読み取りDynamicFrameを生成する関数
dyf2 = _create_dyf_from_S3()  # 別の所で定義した、S3から読み取りDynamicFrameを生成する関数
dyf1 = dyf1.drop_fields(...)
dyf2 = dyf2.dorp_fields(...)
dyf3 = dyf1.join(frame2=dyf2, ...)

のような場合は、それぞれ適した名前で変数を定義した方がよいことは言うまでもありません。

Read/Write(VPCリソース、外部リソース)

先ほどの接続では、特にネットワーク的な何かを設定していませんでした。Glueは、特に指定しない場合はパブリックネットワーク上で実行され、IAM権限に従って接続制御を行います。そのため、例えばDynamoDBにアクセスするなら、DynamoDBへのアクセス許可が含まれたロールを割り当てることでアクセスできるようになります。

それでは、VPCリソース(プライベートネットワーク)や外部リソース、例えばVPC内に配置されたRDSやGoogle BigQueryなどの他クラウド上のサービスなどに接続する場合にはどうすればよいのでしょうか。ここで、Glue接続(Connection)を使用します(以降、「接続」はわかりにくいので、「コネクション」と記載します)。Glueコネクションを作成し、ジョブに割り当てることで、様々なリソースへ接続するためのネットワーク的設定を行うことができます。VPCの場合であればENI作成、外部クラウドへの接続であれば接続のための認証情報を設定して接続確立まで、をGlueコネクションが提供してくれます。

コネクションを使用してデータソースに接続する場合、例えばJDBC接続であればこのページになりますが、コネクション名を指定して接続を行います。以下はMySQLへ接続する場合です。

dyf = glue_context.create_dynamic_frame.from_options(
    connection_type="mysql",
    connection_options={"useConnectionProperties": True,
        "connectionName": "PREPARED-CONNECTION-NAME"
    }
)

ここでは useConnectionProperties を使用しており、コネクションが持っている接続系情報を継承するように設定していますが、ネットワーク部分だけをコネクションで設定し、ユーザや接続先DB名などはスクリプト内で設定することも可能です。その場合は、 url などの必要なパラメータを connection_options に並べて定義することになります。

なお、注意していただきたいのは、Glueジョブには複数のコネクションを割り当てることができますが、使用可能なのは1つだけ、ということです(こちらのFAQ末尾に少しだけ記載があります)。複数設定が可能なのはマルチAZ構成での冗長化のためであり、複数のコネクションを併用できるわけではない、とのことのようです。特にVPCベースの接続を行う際にはご注意ください。仕組上、複数のコネクションを使用せざるを得ない場合、例えばGoogle BigQueryからデータを取得してRDSへ書き込む、といった場合には、一度どこかのストアへ書き込んで別のジョブで実行する構成を検討した方がよいかもしれません。

Read対象を絞り込む

ここまで、一度も、テーブルより小さい単位にデータ取得先を絞り込むような情報を記載してこなかったと思います。DynamoDBであればインデックス、RDBであればクエリ、などです。これは、書けるけど書かなかったわけではなく、原則としてテーブル全体の読み込みしかサポートしないことが原因です。正確には、RDBへJDBC接続した場合の述語プッシュダウンの使用や、パーティションの適用など、いくつかの読み込み量削減に関するアプローチは存在します。が、デフォルトでは全体を読み込む、と考えてください。

そのため、一般的なアプローチとして読み込み量を削減する方法というのはありません。上記のようなSpark最適化を行う方法(このAWSベストプラクティスでまとまっています)以外では、以下のような方法が考えられます。

  • boto3 などのPythonクライアントツールを使用してデータを読み込み、手動でDynamicFrameを作成する。ただし、これはかなり面倒です。まず、PythonのネイティブオブジェクトからDynamicFrameを作成する方法はありません。そのため、DataFrameを作成し、変換する必要があります。また、当然ですが、Sparkのもつ並列処理のアーキテクチャが全く使用できません。そのため、大量データになると性能的な問題が発生しやすくなり、並列化などの検討を個別で行う必要が出てきます。
  • RDBなどであれば、Viewなどの仕組みを使ってそもそもアクセスする対象を絞っておくことができます。データベース側に処理負荷がよることになりますが、それが許容でき、データソースへDDLを投入することが可能な場合は一つの選択肢になります。

型推定

ところで、なぜAWSが PySpark の拡張機能を開発したのか、という説明が、DynamicFrame クラスのドキュメント冒頭に記載されています。PySpark では読み込む前にスキーマの指定が必要なため、型推定を入れること、および複数の型が混在するようなケースでも読み込めるようにすること、が主目的ということです。例えば、DynamoDB では同じテーブルに複数の facet (データ構造)を持つことができます。このようなデータを読み込む際に、事前にスキーマ定義が必要となると読み込めないので、こういう拡張を行ったのだと思われます。

そのため、DynamicFrame では読み込み時に型定義が必要なく、自動的に推定されます。昔は、件数が少ない場合に型推定できずにエラーになったり、 20201201 という文字列を数字型に変換して日付処理ができなくなったり、といったことが起きていましたが、今では筆者が使用している限りではほとんどこの手の問題は当たらなくなりました。ただ、確実に型定義をしてエラーを起こさせないようにするには逆に不向きである、ということはご理解ください。その場合は、直接 DataFrame を使用する方がよいです。Glue の拡張機能を使わなくとも、PySpark フレームワークで開発する限りにおいては Glue の実行環境による恩恵を受けることができますので、その点はご安心ください。

ただ、その場合でも、データ出力をする際だけは、AWSサービスに出力するのであれば DynamicFrame の方がツールが揃っているので、個人的にはお勧めです。

変換処理

DynamicFrame を使ってどのような変換ができるのか、については、おおよそは、DynamicFrame クラスの transform で記載されている内容だと思っていただいてもよいかと思います。前述の PySpark変換 部分ではもう少し対応している処理自体は多いのですが、 transform の亜種のようなものが拡張されているイメージですので、できることを把握する上では問題ないでしょう。また、この部分については、慣れてくるまでは後述する Glue Studio を使った検討を行うことをお勧めします。そのため、ここでは変換については詳述せず、Glue Studio に関する説明部分で DataFrame の変換処理と合わせてご説明します。

中編のまとめ

中編では、DynamicFrame や DataFrame の説明に始まり、 DynamicFrame を中心にその技術的背景を読み解いてきました。また、利用する上での注意点についても述べてきました。これらはあくまで筆者の所感であり、意見の分かれるポイントでもあるかと思います。他の記事や発表における意見も参考に、どの技術を使用していくかについては各人でご判断いただけるようにと、最後に添えておきます。

それでは、後編では DataFrame を使用した開発にも手を広げていきます。また、どのように自分が実現したい処理を実現すればよいかという観点からのライブラリ調査方法や、開発で必要不可欠となるテストについても述べていきます。お楽しみに。