インライン Dataproc ワークフローを使用する

以前に作成したワークフロー テンプレート リソースをインスタンス化する標準のワークフローとは異なり、インライン ワークフローでは YAML ファイルか、埋め込みの WorkflowTemplate 定義を使用して、ワークフローを実行します。

インライン ワークフローの作成と実行

gcloud

YAML ファイルを使用してワークフローをインスタンス化するをご覧ください。

REST

リクエストのデータを使用する前に、次のように置き換えます。

HTTP メソッドと URL:

POST https://dataproc.googleapis.com/v1/projects/project-id/regions/region/workflowTemplates:instantiateInline

リクエストの本文(JSON):

{
  "jobs": [
    {
      "hadoopJob": {
        "mainJarFileUri": "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar",
        "args": [
          "teragen",
          "1000",
          "hdfs:///gen/"
        ]
      },
      "stepId": "teragen"
    },
    {
      "hadoopJob": {
        "mainJarFileUri": "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar",
        "args": [
          "terasort",
          "hdfs:///gen/",
          "hdfs:///sort/"
        ]
      },
      "stepId": "terasort",
      "prerequisiteStepIds": [
        "teragen"
      ]
    }
  ],
  "placement": {
    "managedCluster": {
      "clusterName": "cluster-name",
      "config": {
        "gceClusterConfig": {
          "zoneUri": "zone"
        }
      }
    }
  }
}

リクエストを送信するには、次のいずれかのオプションを展開します。

次のような JSON レスポンスが返されます。

{
  "name": "projects/project-id/regions/region/operations/2fbd0dad-...",
  "metadata": {
    "@type": "type.googleapis.com/google.cloud.dataproc.v1.WorkflowMetadata",
    "graph": {
      "nodes": [
        {
          "stepId": "teragen",
          "state": "RUNNABLE"
        },
        {
          "stepId": "terasort",
          "prerequisiteStepIds": [
            "teragen"
          ],
          "state": "BLOCKED"
        }
      ]
    },
    "state": "PENDING",
    "startTime": "2020-04-02T22:50:44.826Z"
  }
}

コンソール

現在、Google Cloud コンソールでは、インライン ワークフローの作成はサポートされていません。ワークフロー テンプレートとインスタンス化されたワークフローは、Dataproc の [ワークフロー] ページで表示できます。

Go

  1. クライアント ライブラリのインストール
  2. アプリケーションのデフォルト認証情報を設定します。
  3. コードを実行します。
    import (
    	"context"
    	"fmt"
    	"io"
    
    	dataproc "cloud.google.com/go/dataproc/apiv1"
    	"cloud.google.com/go/dataproc/apiv1/dataprocpb"
    	"google.golang.org/api/option"
    )
    
    func instantiateInlineWorkflowTemplate(w io.Writer, projectID, region string) error {
    	// projectID := "your-project-id"
    	// region := "us-central1"
    
    	ctx := context.Background()
    
    	// Create the cluster client.
    	endpoint := region + "-dataproc.googleapis.com:443"
    	workflowTemplateClient, err := dataproc.NewWorkflowTemplateClient(ctx, option.WithEndpoint(endpoint))
    	if err != nil {
    		return fmt.Errorf("dataproc.NewWorkflowTemplateClient: %w", err)
    	}
    	defer workflowTemplateClient.Close()
    
    	// Create jobs for the workflow.
    	teragenJob := &dataprocpb.OrderedJob{
    		JobType: &dataprocpb.OrderedJob_HadoopJob{
    			HadoopJob: &dataprocpb.HadoopJob{
    				Driver: &dataprocpb.HadoopJob_MainJarFileUri{
    					MainJarFileUri: "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar",
    				},
    				Args: []string{
    					"teragen",
    					"1000",
    					"hdfs:///gen/",
    				},
    			},
    		},
    		StepId: "teragen",
    	}
    
    	terasortJob := &dataprocpb.OrderedJob{
    		JobType: &dataprocpb.OrderedJob_HadoopJob{
    			HadoopJob: &dataprocpb.HadoopJob{
    				Driver: &dataprocpb.HadoopJob_MainJarFileUri{
    					MainJarFileUri: "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar",
    				},
    				Args: []string{
    					"terasort",
    					"hdfs:///gen/",
    					"hdfs:///sort/",
    				},
    			},
    		},
    		StepId: "terasort",
    		PrerequisiteStepIds: []string{
    			"teragen",
    		},
    	}
    
    	// Create the cluster placement.
    	clusterPlacement := &dataprocpb.WorkflowTemplatePlacement{
    		Placement: &dataprocpb.WorkflowTemplatePlacement_ManagedCluster{
    			ManagedCluster: &dataprocpb.ManagedCluster{
    				ClusterName: "my-managed-cluster",
    				Config: &dataprocpb.ClusterConfig{
    					GceClusterConfig: &dataprocpb.GceClusterConfig{
    						// Leave "ZoneUri" empty for "Auto Zone Placement"
    						// ZoneUri: ""
    						ZoneUri: "us-central1-a",
    					},
    				},
    			},
    		},
    	}
    
    	// Create the Instantiate Inline Workflow Template Request.
    	req := &dataprocpb.InstantiateInlineWorkflowTemplateRequest{
    		Parent: fmt.Sprintf("projects/%s/regions/%s", projectID, region),
    		Template: &dataprocpb.WorkflowTemplate{
    			Jobs: []*dataprocpb.OrderedJob{
    				teragenJob,
    				terasortJob,
    			},
    			Placement: clusterPlacement,
    		},
    	}
    
    	// Create the cluster.
    	op, err := workflowTemplateClient.InstantiateInlineWorkflowTemplate(ctx, req)
    	if err != nil {
    		return fmt.Errorf("InstantiateInlineWorkflowTemplate: %w", err)
    	}
    
    	if err := op.Wait(ctx); err != nil {
    		return fmt.Errorf("InstantiateInlineWorkflowTemplate.Wait: %w", err)
    	}
    
    	// Output a success message.
    	fmt.Fprintf(w, "Workflow created successfully.")
    	return nil
    }
    

Java

  1. クライアント ライブラリのインストール
  2. アプリケーションのデフォルト認証情報を設定します。
  3. コードを実行します。
    import com.google.api.gax.longrunning.OperationFuture;
    import com.google.cloud.dataproc.v1.ClusterConfig;
    import com.google.cloud.dataproc.v1.GceClusterConfig;
    import com.google.cloud.dataproc.v1.HadoopJob;
    import com.google.cloud.dataproc.v1.ManagedCluster;
    import com.google.cloud.dataproc.v1.OrderedJob;
    import com.google.cloud.dataproc.v1.RegionName;
    import com.google.cloud.dataproc.v1.WorkflowMetadata;
    import com.google.cloud.dataproc.v1.WorkflowTemplate;
    import com.google.cloud.dataproc.v1.WorkflowTemplatePlacement;
    import com.google.cloud.dataproc.v1.WorkflowTemplateServiceClient;
    import com.google.cloud.dataproc.v1.WorkflowTemplateServiceSettings;
    import com.google.protobuf.Empty;
    import java.io.IOException;
    import java.util.concurrent.ExecutionException;
    
    public class InstantiateInlineWorkflowTemplate {
    
      public static void instantiateInlineWorkflowTemplate() throws IOException, InterruptedException {
        // TODO(developer): Replace these variables before running the sample.
        String projectId = "your-project-id";
        String region = "your-project-region";
        instantiateInlineWorkflowTemplate(projectId, region);
      }
    
      public static void instantiateInlineWorkflowTemplate(String projectId, String region)
          throws IOException, InterruptedException {
        String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region);
    
        // Configure the settings for the workflow template service client.
        WorkflowTemplateServiceSettings workflowTemplateServiceSettings =
            WorkflowTemplateServiceSettings.newBuilder().setEndpoint(myEndpoint).build();
    
        // Create a workflow template service client with the configured settings. The client only
        // needs to be created once and can be reused for multiple requests. Using a try-with-resources
        // closes the client, but this can also be done manually with the .close() method.
        try (WorkflowTemplateServiceClient workflowTemplateServiceClient =
            WorkflowTemplateServiceClient.create(workflowTemplateServiceSettings)) {
    
          // Configure the jobs within the workflow.
          HadoopJob teragenHadoopJob =
              HadoopJob.newBuilder()
                  .setMainJarFileUri("file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar")
                  .addArgs("teragen")
                  .addArgs("1000")
                  .addArgs("hdfs:///gen/")
                  .build();
          OrderedJob teragen =
              OrderedJob.newBuilder().setHadoopJob(teragenHadoopJob).setStepId("teragen").build();
    
          HadoopJob terasortHadoopJob =
              HadoopJob.newBuilder()
                  .setMainJarFileUri("file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar")
                  .addArgs("terasort")
                  .addArgs("hdfs:///gen/")
                  .addArgs("hdfs:///sort/")
                  .build();
          OrderedJob terasort =
              OrderedJob.newBuilder()
                  .setHadoopJob(terasortHadoopJob)
                  .addPrerequisiteStepIds("teragen")
                  .setStepId("terasort")
                  .build();
    
          // Configure the cluster placement for the workflow.
          // Leave "ZoneUri" empty for "Auto Zone Placement".
          // GceClusterConfig gceClusterConfig =
          //     GceClusterConfig.newBuilder().setZoneUri("").build();
          GceClusterConfig gceClusterConfig =
              GceClusterConfig.newBuilder().setZoneUri("us-central1-a").build();
          ClusterConfig clusterConfig =
              ClusterConfig.newBuilder().setGceClusterConfig(gceClusterConfig).build();
          ManagedCluster managedCluster =
              ManagedCluster.newBuilder()
                  .setClusterName("my-managed-cluster")
                  .setConfig(clusterConfig)
                  .build();
          WorkflowTemplatePlacement workflowTemplatePlacement =
              WorkflowTemplatePlacement.newBuilder().setManagedCluster(managedCluster).build();
    
          // Create the inline workflow template.
          WorkflowTemplate workflowTemplate =
              WorkflowTemplate.newBuilder()
                  .addJobs(teragen)
                  .addJobs(terasort)
                  .setPlacement(workflowTemplatePlacement)
                  .build();
    
          // Submit the instantiated inline workflow template request.
          String parent = RegionName.format(projectId, region);
          OperationFuture<Empty, WorkflowMetadata> instantiateInlineWorkflowTemplateAsync =
              workflowTemplateServiceClient.instantiateInlineWorkflowTemplateAsync(
                  parent, workflowTemplate);
          instantiateInlineWorkflowTemplateAsync.get();
    
          // Print out a success message.
          System.out.printf("Workflow ran successfully.");
    
        } catch (ExecutionException e) {
          System.err.println(String.format("Error running workflow: %s ", e.getMessage()));
        }
      }
    }

Node.js

  1. クライアント ライブラリのインストール
  2. アプリケーションのデフォルト認証情報を設定します。
  3. コードの実行
const dataproc = require('@google-cloud/dataproc');

// TODO(developer): Uncomment and set the following variables
// projectId = 'YOUR_PROJECT_ID'
// region = 'YOUR_REGION'

// Create a client with the endpoint set to the desired region
const client = new dataproc.v1.WorkflowTemplateServiceClient({
  apiEndpoint: `${region}-dataproc.googleapis.com`,
  projectId: projectId,
});

async function instantiateInlineWorkflowTemplate() {
  // Create the formatted parent.
  const parent = client.regionPath(projectId, region);

  // Create the template
  const template = {
    jobs: [
      {
        hadoopJob: {
          mainJarFileUri:
            'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar',
          args: ['teragen', '1000', 'hdfs:///gen/'],
        },
        stepId: 'teragen',
      },
      {
        hadoopJob: {
          mainJarFileUri:
            'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar',
          args: ['terasort', 'hdfs:///gen/', 'hdfs:///sort/'],
        },
        stepId: 'terasort',
        prerequisiteStepIds: ['teragen'],
      },
    ],
    placement: {
      managedCluster: {
        clusterName: 'my-managed-cluster',
        config: {
          gceClusterConfig: {
            // Leave 'zoneUri' empty for 'Auto Zone Placement'
            // zoneUri: ''
            zoneUri: 'us-central1-a',
          },
        },
      },
    },
  };

  const request = {
    parent: parent,
    template: template,
  };

  // Submit the request to instantiate the workflow from an inline template.
  const [operation] = await client.instantiateInlineWorkflowTemplate(request);
  await operation.promise();

  // Output a success message
  console.log('Workflow ran successfully.');

Python

  1. クライアント ライブラリのインストール
  2. アプリケーションのデフォルト認証情報を設定します。
  3. コードを実行します。
    from google.cloud import dataproc_v1 as dataproc
    
    def instantiate_inline_workflow_template(project_id, region):
        """This sample walks a user through submitting a workflow
        for a Cloud Dataproc using the Python client library.
    
        Args:
            project_id (string): Project to use for running the workflow.
            region (string): Region where the workflow resources should live.
        """
    
        # Create a client with the endpoint set to the desired region.
        workflow_template_client = dataproc.WorkflowTemplateServiceClient(
            client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
        )
    
        parent = f"projects/{project_id}/regions/{region}"
    
        template = {
            "jobs": [
                {
                    "hadoop_job": {
                        "main_jar_file_uri": "file:///usr/lib/hadoop-mapreduce/"
                        "hadoop-mapreduce-examples.jar",
                        "args": ["teragen", "1000", "hdfs:///gen/"],
                    },
                    "step_id": "teragen",
                },
                {
                    "hadoop_job": {
                        "main_jar_file_uri": "file:///usr/lib/hadoop-mapreduce/"
                        "hadoop-mapreduce-examples.jar",
                        "args": ["terasort", "hdfs:///gen/", "hdfs:///sort/"],
                    },
                    "step_id": "terasort",
                    "prerequisite_step_ids": ["teragen"],
                },
            ],
            "placement": {
                "managed_cluster": {
                    "cluster_name": "my-managed-cluster",
                    "config": {
                        "gce_cluster_config": {
                            # Leave 'zone_uri' empty for 'Auto Zone Placement'
                            # 'zone_uri': ''
                            "zone_uri": "us-central1-a"
                        }
                    },
                }
            },
        }
    
        # Submit the request to instantiate the workflow from an inline template.
        operation = workflow_template_client.instantiate_inline_workflow_template(
            request={"parent": parent, "template": template}
        )
        operation.result()
    
        # Output a success message.
        print("Workflow ran successfully.")