Spark と SparkSQL のカスタムタスクのスケジュールを設定する

Dataplex では、1 回限りの実行、定期的なスケジュール、またはオンデマンドのいずれかで、カスタムコードの実行をスケジュールできます。オンデマンドはプレビュー版で、API でのみ利用できます。Spark(Java)、PySpark(Spark バージョン 3.2 に限定)、SparkSQL を使用して顧客データ変換をスケジュール設定できます。Dataplex は、サーバーレス Spark 処理と組み込みのサーバーレス スケジューラを使用してコードを実行します。

用語

タスク
Dataplex タスクは、Dataplex がスケジュールに従って行う作業を表します。コード、パラメータ、スケジュールがカプセル化されます。
Job

ジョブは、Dataplex タスクの 1 回の実行を表します。たとえば、タスクが毎日実行されるようにスケジュール設定されている場合、Dataplex はジョブを毎日作成します。

2023 年 5 月 10 日以降に作成されたジョブの場合、[トリガー] フィールドにジョブの実行トリガータイプが表示されます。

ジョブ実行のトリガーの種類は次のとおりです。

  • RUN_REQUEST: RunTask API の呼び出しによってジョブが実行されたことを示します。

  • TASK_CONFIG: タスクの TriggerSpec 構成によってジョブが実行されたことを示します。

スケジュール モード

Dataplex では、次のスケジュール モードがサポートされています。

1 回だけ実行
このモードは、タスクを 1 回だけ実行するために使用します。すぐに実行することも、後で実行することもできます。タスクをすぐに実行した場合、実行が始まるまで最大 2 分かかる場合があります。
スケジュールに従って実行する
このモードは、タスクを繰り返し実行するために使用します。サポートされている繰り返しパターンは、毎日、毎週、毎月、またはカスタムです。
オンデマンドで実行

このモードは、以前に作成したタスクをオンデマンドで実行するために使用します。オンデマンド実行モードは、RunTask API でのみサポートされています。ジョブがオンデマンドで実行される場合、Dataplex は既存のパラメータを使用してジョブを作成します。ExecutionSpec 引数と、ジョブを実行するラベルを指定できます。

始める前に

  1. Dataproc API を有効にします。

    Dataproc API を有効にする

  2. ネットワークやサブネットワークで限定公開の Google アクセスを有効にします。Dataplex タスクで使用するネットワークで、限定公開の Google アクセスを有効にします。Dataplex タスクの作成時にネットワークまたはサブネットワークを指定しない場合、Dataplex はデフォルトのサブネットワークを使用します。デフォルトのサブネットワークで限定公開の Google アクセスを有効にする必要があります。

  3. サービス アカウントを作成します。 Dataplex タスクをスケジュール設定するには、サービス アカウントが必要です。サービス アカウントは、タスクを実行するプロジェクトに属している必要があります。このサービス アカウントには次の権限が必要です。

    • 処理中の BigQuery や Cloud Storage データへのアクセス権。

    • タスクを実行するプロジェクトに対する Dataproc ワーカーロール

    • タスクがレイクに接続された Dataproc Metastore インスタンスの読み取りまたは更新を必要とする場合、サービス アカウントには Dataproc Metastore 閲覧者または編集者のロールが必要です。このロールは、Dataplex レイクが設定されているプロジェクトで付与する必要があります。

    • タスクが Spark SQL ジョブの場合は、サービス アカウントに Dataplex デベロッパー ロールを付与する必要があります。このロールは、Dataplex レイクが設定されているプロジェクトで付与する必要があります。

    • タスクが Spark SQL ジョブの場合は、結果が書き込まれるバケットに対する Cloud Storage 管理者権限が必要です。

    • Spark SQL タスクとカスタム Spark タスクのスケジュールを設定して実行するには、Dataplex メタデータ リーダー(roles/dataplex.metadataReader)、Dataplex 閲覧者(roles/dataplex.viewer)、Dataproc Metastore メタデータ ユーザー(roles/metastore.metadataUser)の IAM ロールがサービス アカウントに対して付与されている必要があります。

  4. ジョブを送信するユーザーに、サービス アカウントのサービス アカウント ユーザー ロール(roles/iam.serviceAccountUser)を付与します。手順については、サービス アカウントに対するアクセス権の管理をご覧ください。

  5. Dataplex Lake サービス アカウントにサービス アカウントを使用するための権限を付与します。Dataplex レイクのサービス アカウントは、Google Cloud コンソールの [レイクの詳細] ページで確認できます。

  6. Dataplex レイクを含むプロジェクトが、タスクが実行されるプロジェクトと異なる場合は、Dataplex レイクのサービス アカウントに、タスクを実行するプロジェクトの Dataproc 編集者のロールを付与します。

  7. 必要なコード アーティファクト(JAR、Python、SQL スクリプト ファイル)またはアーカイブ ファイル(.jar.tar.tar.gz.tgz.zip)を Cloud Storage パスに配置します。

  8. サービス アカウントに、これらのコード アーティファクトを保存する Cloud Storage バケットに対する必要な storage.objects.get 権限が付与されていることを確認してください。

Spark(Java または Python)タスクをスケジュール設定する

コンソール

  1. Google Cloud コンソールで [Dataplex] ページに移動します。

    Dataplex に移動

  2. [プロセス] ビューに移動します。

  3. [タスクの作成] をクリックします。

  4. [カスタム Spark タスクの作成] で、[タスクの作成] をクリックします。

  5. Dataplex レイクを選択します。

  6. タスク名を入力します。

  7. タスクの [ID] を作成します。

  8. [Task configuration] セクションの [タイプ] で、[Spark] または [PySpark] を選択します。

  9. 関連する引数を入力します。

  10. [サービス アカウント] フィールドに、カスタム Spark タスクを実行可能なユーザー サービス アカウントを入力します。

  11. [続行] をクリックします。

  12. 省略可: [スケジュールを設定]: [1 回だけ実行] または [繰り返し] を選択します。必須フィールドに入力します。

  13. [続行] をクリックします。

  14. 省略可:[リソースのカスタマイズ] と [その他の設定の追加] を行います。

  15. [作成] をクリックします。

gcloud

gcloud CLI コマンドを使用して Spark(Java / Python)タスクをスケジュール設定できます。次の表では、使用する必須パラメータとオプション パラメータを示します。

パラメータ 説明
--lake Dataplex サービスのレイクリソースのレイク ID。
--location Dataplex サービスのロケーション。
--spark-main-class ドライバのメインクラス。クラスを含む jar ファイルは、デフォルトの CLASSPATH に存在する必要があります。
--spark-main-jar-file-uri メインクラスを含む jar ファイルの Cloud Storage URI。
--spark-archive-uris 省略可:各エグゼキュータの作業ディレクトリに解凍されるアーカイブの Cloud Storage URI。サポートされているファイル形式: .jar.tar.tar.gz.tgz.zip
--spark-file-uris 省略可: 各エグゼキュータの作業ディレクトリに配置されるファイルの Cloud Storage URI。
--batch-executors-count 省略可: ジョブ エグゼキュータの合計数。デフォルト値は 2 です。
--batch-max-executors-count 省略可: 構成可能なエグゼキュータの最大数。デフォルト値は 1000 です。batch-max-executors-countbatch-executors-count より大きい場合、Dataplex は自動スケーリングを有効にします。
--container-image-java-jars 省略可: クラスパスに追加する Java JARS のリスト。有効な入力には、Jar バイナリへの Cloud Storage URI が含まれます。
例:gs://bucket-name/my/path/to/file.jar
--container-image-properties 省略可:プロパティキー(prefix:property 形式で指定します)。
例: core:hadoop.tmp.dir
詳細については、クラスタ プロパティをご覧ください。
--vpc-network-tags 省略可: ジョブに適用するネットワーク タグのリスト。
--vpc-network-name 省略可:ジョブが実行される Virtual Private Cloud ネットワーク。デフォルトでは、Dataplex はプロジェクト内で Default という VPC ネットワークを使用します。
--vpc-network-name または --vpc-sub-network-name のいずれか 1 つだけを使用する必要があります。
--vpc-sub-network-name 省略可:ジョブが実行される VPC サブネットワーク。
--vpc-sub-network-name または --vpc-network-name のいずれか 1 つだけを使用する必要があります。
--trigger-type ユーザー指定タスクのトリガータイプ。値は次のいずれかにする必要があります。
ON_DEMAND - タスクの作成後 1 回だけタスクが実行されます。
RECURRING - タスクはスケジュールに従って定期的に実行されます。
--trigger-start-time 省略可: タスクが最初に実行された時刻。形式は、「{year}-{month}-{day}T{hour}:{min}:{sec}Z」で、タイムゾーンは UTC です。たとえば、「2017-01-15T01:30:00Z」は 2017 年 1 月 15 日 01:30 UTC をエンコードしたものです。この値が指定されていない場合、タスクは、送信後(トリガータイプが ON_DEMAND の場合)、または指定されたスケジュール(トリガータイプが RECURRING の場合)で実行されます。
--trigger-disabled 省略可:タスクの実行を止めます。このパラメータは、すでに実行中のタスクをキャンセルするのではなく、RECURRING タスクを一時的に無効にします。
--trigger-max-retires 省略可: 中止するまでの再試行回数。失敗したタスクを再試行しない場合は、値を 0 に設定します。
--trigger-schedule タスクを定期的に実行するための cron スケジュール
--description 省略可: タスクの説明。
--display-name 省略可: タスクの表示名。
--labels 省略可:追加するラベルの KEY=VALUE ペアのリスト。
--execution-args 省略可:タスクに渡す引数。引数には、Key-Value ペアを混在させることが可能です。実行時の引数として、Key-Value ペアのカンマ区切りのリストを渡すことができます。位置引数を渡すには、キーを TASK_ARGS に設定し、値をすべての位置引数のカンマ区切り文字列に設定します。カンマ以外の区切り文字を使用するには、エスケープをご覧ください。
key-value と位置引数が一緒に渡される場合は、TASK_ARGS が最後の引数として渡されます。
--execution-service-account タスクの実行に使用するサービス アカウント。
--max-job-execution-lifetime 省略可: ジョブ実行の有効期限が切れるまでの最長時間。
--container-image 省略可:ジョブ ランタイム環境のカスタム コンテナ イメージ。指定しない場合は、デフォルトのコンテナ イメージが使用されます。
--kms-key 省略可:暗号化に使用する Cloud KMS 鍵。形式は次のとおりです。
projects/{project_number}/locations/{location_id}/keyRings/{key-ring-name}/cryptoKeys/{key-name}

Java の例

glcoud dataplex tasks create --project=<project-name> --location=<location> --lake=<lake-id> --trigger-type=ON_DEMAND –spark-main-jar-file-uri=<gcs location to java file> --execution-service-account=<service-account-email> --trigger-start-time=<timestamp after which job starts ex. 2099-01-01T00:00:00Z> --labels=key1=value1,key2=value3,key3=value3 --execution-args=arg1=value1,arg2=value3,arg3=value3 <task-id>

PySpark の例

gcloud dataplex tasks create --project=<project-name> --location=<location> --lake=<lake-id> --trigger-type=RECURRING --trigger-schedule=<Cron schedule https://en.wikipedia.org/wiki/Cron> --spark-python-script-file=<gcs location to python script> --execution-service-account=<service-account-email> --execution-args=^::^arg1=value1::arg2=value2::TASK_ARGS="pos-arg1, pos-arg2" <task-id>

REST

タスクを作成するには、API Explorer を使用します。

Spark SQL タスクのスケジュールを設定する

gcloud

Spark SQL タスクのスケジュールを設定するには、Spark (Java または Python)タスクのスケジュールと同じ gcloud CLI コマンドを実行します。次の追加パラメータを指定します。

パラメータ 説明
--spark-sql-script SQL クエリテキスト。spark-sql-script または spark-sql-script-file のどちらかが必要です。
--spark-sql-script-file クエリファイルへの参照。この値は、クエリファイルの Cloud Storage URI か、SQL スクリプト コンテンツへのパスです。spark-sql-script または spark-sql-script-file のどちらかが必要です。
--execution-args Spark SQL タスクの場合、次の引数は必須であり、位置引数として渡す必要があります。
--output_location, <GCS uri of the output directory>
--output_format, <output file format>
サポートされている形式は、CSV ファイル、JSON ファイル、parquet、orc です。
gcloud dataplex tasks create --project=<project-name> --location=<location> --lake=<lake-id> --execution-service-account=<service-account-email> --trigger-type=ON_DEMAND --spark-sql-script=<sql-script> --execution-args=^::^TASK_ARGS="--output_location, <gcs folder location>, --output_format, json" <sql-task-id>

REST

タスクを作成するには、API Explorer を使用します。

タスクをモニタリングする

コンソール

  1. Google Cloud コンソールで [Dataplex] ページに移動します。

    Dataplex に移動

  2. [Process] ビューに移動します。

  3. [Tasks] タブには、タスク テンプレート タイプでフィルタされたタスクのリストが表示されます。

  4. [名前] 列で、表示するタスクをクリックします。

  5. 表示するタスクの [ジョブ ID] をクリックします。

    Google Cloud コンソールで Dataproc ページが開き、モニタリングと出力の詳細を表示できます。

gcloud

次の表では、タスクをモニタリングするための gcloud CLI コマンドを示します。

操作 gcloud CLI コマンド
タスクの一覧表示 gcloud dataplex tasks list --project=<project-name> --location=<location> --lake=<lake-id>
タスクの詳細の表示 gcloud dataplex tasks describe --project=<project-name> --location=<location> --lake=<lake-id> <task-id>
タスクのジョブを一覧表示する gcloud dataplex tasks jobs list --project=<project-name> --location=<location> --lake=<lake-id> --task=<task-id>
ジョブの詳細の表示 gcloud dataplex tasks jobs describe --project=<project-name> --location=<location> --lake=<lake-id> --task=<task-id> <job-id>

Dataplex は Dataproc Serverless(バッチ)でジョブを実行します。Dataplex ジョブの実行ログを表示するには、次の手順を行います。

  1. Dataproc サーバーレス(バッチ)ジョブ ID を取得します。次のコマンドを実行します。

    gcloud dataplex tasks jobs describe --project=<project-name> --location=<location> --lake=<lake-id> --task=<task-id> <job-id>
    
  2. ログを確認します。 前のコマンドで取得したジョブ ID を使用して、次のコマンドを実行します。

    gcloud beta dataproc batches wait --project=<project-name> --region=<location> <job-id>
    

REST

タスクまたはジョブget または list するには、API Explorer を使用します。

スケジュールを管理する

Google Cloud コンソールでは、Dataplex 内でタスクのスケジュールの編集、タスクの削除、進行中のジョブのキャンセルができます。次の表では、これらのアクションに対する gcloud CLI コマンドを示します。

操作 gcloud CLI コマンド
タスク スケジュールの編集 gcloud dataplex tasks update --project=<project-name> --location=<location> --lake=<lake-id> --trigger-schedule=<updated-schedule> <task-id>
タスクを削除する gcloud dataplex tasks delete --project=<project-name> --location=<location> --lake=<lake-id> <task-id>
ジョブのキャンセル gcloud dataplex tasks jobs cancel --project=<project-name> --location=<location> --lake=<lake-id> --task=<task-id> <job-id>

次のステップ