Utilizza flussi di lavoro Dataproc incorporati

A differenza dei flussi di lavoro standard che creano un'istanza di una risorsa modello di flusso di lavoro creata in precedenza, i flussi di lavoro in linea utilizzano un file YAML o una definizione WorkflowTemplate incorporata per eseguire un flusso di lavoro.

Creazione ed esecuzione di un flusso di lavoro in linea

gcloud

Vedi Crea un'istanza di un flusso di lavoro utilizzando un file YAML.

REST &AM; LINEA CMD

Prima di utilizzare i dati della richiesta, effettua le seguenti sostituzioni:

Metodo HTTP e URL:

POST https://dataproc.googleapis.com/v1/projects/project-id/regions/region/workflowTemplates:instantiateInline

Corpo JSON richiesta:

{
  "jobs": [
    {
      "hadoopJob": {
        "mainJarFileUri": "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar",
        "args": [
          "teragen",
          "1000",
          "hdfs:///gen/"
        ]
      },
      "stepId": "teragen"
    },
    {
      "hadoopJob": {
        "mainJarFileUri": "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar",
        "args": [
          "terasort",
          "hdfs:///gen/",
          "hdfs:///sort/"
        ]
      },
      "stepId": "terasort",
      "prerequisiteStepIds": [
        "teragen"
      ]
    }
  ],
  "placement": {
    "managedCluster": {
      "clusterName": "cluster-name",
      "config": {
        "gceClusterConfig": {
          "zoneUri": "zone"
        }
      }
    }
  }
}

Per inviare la richiesta, espandi una delle seguenti opzioni:

Dovresti ricevere una risposta JSON simile alla seguente:

{
  "name": "projects/project-id/regions/region/operations/2fbd0dad-...",
  "metadata": {
    "@type": "type.googleapis.com/google.cloud.dataproc.v1.WorkflowMetadata",
    "graph": {
      "nodes": [
        {
          "stepId": "teragen",
          "state": "RUNNABLE"
        },
        {
          "stepId": "terasort",
          "prerequisiteStepIds": [
            "teragen"
          ],
          "state": "BLOCKED"
        }
      ]
    },
    "state": "PENDING",
    "startTime": "2020-04-02T22:50:44.826Z"
  }
}

Console

Attualmente, la creazione di flussi di lavoro in linea non è supportata nella console Google Cloud. Puoi visualizzare i modelli di flusso di lavoro e i flussi di lavoro istanziati nella pagina Flussi di lavoro di Dataproc.

Go

  1. Installare la libreria client
  2. Configurare le credenziali predefinite dell'applicazione
  3. Esegui il codice
    import (
    	"context"
    	"fmt"
    	"io"
    
    	dataproc "cloud.google.com/go/dataproc/apiv1"
    	"google.golang.org/api/option"
    	dataprocpb "google.golang.org/genproto/googleapis/cloud/dataproc/v1"
    )
    
    func instantiateInlineWorkflowTemplate(w io.Writer, projectID, region string) error {
    	// projectID := "your-project-id"
    	// region := "us-central1"
    
    	ctx := context.Background()
    
    	// Create the cluster client.
    	endpoint := region + "-dataproc.googleapis.com:443"
    	workflowTemplateClient, err := dataproc.NewWorkflowTemplateClient(ctx, option.WithEndpoint(endpoint))
    	if err != nil {
    		return fmt.Errorf("dataproc.NewWorkflowTemplateClient: %v", err)
    	}
    	defer workflowTemplateClient.Close()
    
    	// Create jobs for the workflow.
    	teragenJob := &dataprocpb.OrderedJob{
    		JobType: &dataprocpb.OrderedJob_HadoopJob{
    			HadoopJob: &dataprocpb.HadoopJob{
    				Driver: &dataprocpb.HadoopJob_MainJarFileUri{
    					MainJarFileUri: "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar",
    				},
    				Args: []string{
    					"teragen",
    					"1000",
    					"hdfs:///gen/",
    				},
    			},
    		},
    		StepId: "teragen",
    	}
    
    	terasortJob := &dataprocpb.OrderedJob{
    		JobType: &dataprocpb.OrderedJob_HadoopJob{
    			HadoopJob: &dataprocpb.HadoopJob{
    				Driver: &dataprocpb.HadoopJob_MainJarFileUri{
    					MainJarFileUri: "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar",
    				},
    				Args: []string{
    					"terasort",
    					"hdfs:///gen/",
    					"hdfs:///sort/",
    				},
    			},
    		},
    		StepId: "terasort",
    		PrerequisiteStepIds: []string{
    			"teragen",
    		},
    	}
    
    	// Create the cluster placement.
    	clusterPlacement := &dataprocpb.WorkflowTemplatePlacement{
    		Placement: &dataprocpb.WorkflowTemplatePlacement_ManagedCluster{
    			ManagedCluster: &dataprocpb.ManagedCluster{
    				ClusterName: "my-managed-cluster",
    				Config: &dataprocpb.ClusterConfig{
    					GceClusterConfig: &dataprocpb.GceClusterConfig{
    						// Leave "ZoneUri" empty for "Auto Zone Placement"
    						// ZoneUri: ""
    						ZoneUri: "us-central1-a",
    					},
    				},
    			},
    		},
    	}
    
    	// Create the Instantiate Inline Workflow Template Request.
    	req := &dataprocpb.InstantiateInlineWorkflowTemplateRequest{
    		Parent: fmt.Sprintf("projects/%s/regions/%s", projectID, region),
    		Template: &dataprocpb.WorkflowTemplate{
    			Jobs: []*dataprocpb.OrderedJob{
    				teragenJob,
    				terasortJob,
    			},
    			Placement: clusterPlacement,
    		},
    	}
    
    	// Create the cluster.
    	op, err := workflowTemplateClient.InstantiateInlineWorkflowTemplate(ctx, req)
    	if err != nil {
    		return fmt.Errorf("InstantiateInlineWorkflowTemplate: %v", err)
    	}
    
    	if err := op.Wait(ctx); err != nil {
    		return fmt.Errorf("InstantiateInlineWorkflowTemplate.Wait: %v", err)
    	}
    
    	// Output a success message.
    	fmt.Fprintf(w, "Workflow created successfully.")
    	return nil
    }
    

Java

  1. Installare la libreria client
  2. Configurare le credenziali predefinite dell'applicazione
  3. Esegui il codice
    import com.google.api.gax.longrunning.OperationFuture;
    import com.google.cloud.dataproc.v1.ClusterConfig;
    import com.google.cloud.dataproc.v1.GceClusterConfig;
    import com.google.cloud.dataproc.v1.HadoopJob;
    import com.google.cloud.dataproc.v1.ManagedCluster;
    import com.google.cloud.dataproc.v1.OrderedJob;
    import com.google.cloud.dataproc.v1.RegionName;
    import com.google.cloud.dataproc.v1.WorkflowMetadata;
    import com.google.cloud.dataproc.v1.WorkflowTemplate;
    import com.google.cloud.dataproc.v1.WorkflowTemplatePlacement;
    import com.google.cloud.dataproc.v1.WorkflowTemplateServiceClient;
    import com.google.cloud.dataproc.v1.WorkflowTemplateServiceSettings;
    import com.google.protobuf.Empty;
    import java.io.IOException;
    import java.util.concurrent.ExecutionException;
    
    public class InstantiateInlineWorkflowTemplate {
    
      public static void instantiateInlineWorkflowTemplate() throws IOException, InterruptedException {
        // TODO(developer): Replace these variables before running the sample.
        String projectId = "your-project-id";
        String region = "your-project-region";
        instantiateInlineWorkflowTemplate(projectId, region);
      }
    
      public static void instantiateInlineWorkflowTemplate(String projectId, String region)
          throws IOException, InterruptedException {
        String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region);
    
        // Configure the settings for the workflow template service client.
        WorkflowTemplateServiceSettings workflowTemplateServiceSettings =
            WorkflowTemplateServiceSettings.newBuilder().setEndpoint(myEndpoint).build();
    
        // Create a workflow template service client with the configured settings. The client only
        // needs to be created once and can be reused for multiple requests. Using a try-with-resources
        // closes the client, but this can also be done manually with the .close() method.
        try (WorkflowTemplateServiceClient workflowTemplateServiceClient =
            WorkflowTemplateServiceClient.create(workflowTemplateServiceSettings)) {
    
          // Configure the jobs within the workflow.
          HadoopJob teragenHadoopJob =
              HadoopJob.newBuilder()
                  .setMainJarFileUri("file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar")
                  .addArgs("teragen")
                  .addArgs("1000")
                  .addArgs("hdfs:///gen/")
                  .build();
          OrderedJob teragen =
              OrderedJob.newBuilder().setHadoopJob(teragenHadoopJob).setStepId("teragen").build();
    
          HadoopJob terasortHadoopJob =
              HadoopJob.newBuilder()
                  .setMainJarFileUri("file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar")
                  .addArgs("terasort")
                  .addArgs("hdfs:///gen/")
                  .addArgs("hdfs:///sort/")
                  .build();
          OrderedJob terasort =
              OrderedJob.newBuilder()
                  .setHadoopJob(terasortHadoopJob)
                  .addPrerequisiteStepIds("teragen")
                  .setStepId("terasort")
                  .build();
    
          // Configure the cluster placement for the workflow.
          // Leave "ZoneUri" empty for "Auto Zone Placement".
          // GceClusterConfig gceClusterConfig =
          //     GceClusterConfig.newBuilder().setZoneUri("").build();
          GceClusterConfig gceClusterConfig =
              GceClusterConfig.newBuilder().setZoneUri("us-central1-a").build();
          ClusterConfig clusterConfig =
              ClusterConfig.newBuilder().setGceClusterConfig(gceClusterConfig).build();
          ManagedCluster managedCluster =
              ManagedCluster.newBuilder()
                  .setClusterName("my-managed-cluster")
                  .setConfig(clusterConfig)
                  .build();
          WorkflowTemplatePlacement workflowTemplatePlacement =
              WorkflowTemplatePlacement.newBuilder().setManagedCluster(managedCluster).build();
    
          // Create the inline workflow template.
          WorkflowTemplate workflowTemplate =
              WorkflowTemplate.newBuilder()
                  .addJobs(teragen)
                  .addJobs(terasort)
                  .setPlacement(workflowTemplatePlacement)
                  .build();
    
          // Submit the instantiated inline workflow template request.
          String parent = RegionName.format(projectId, region);
          OperationFuture<Empty, WorkflowMetadata> instantiateInlineWorkflowTemplateAsync =
              workflowTemplateServiceClient.instantiateInlineWorkflowTemplateAsync(
                  parent, workflowTemplate);
          instantiateInlineWorkflowTemplateAsync.get();
    
          // Print out a success message.
          System.out.printf("Workflow ran successfully.");
    
        } catch (ExecutionException e) {
          System.err.println(String.format("Error running workflow: %s ", e.getMessage()));
        }
      }
    }

Node.js

  1. Installare la libreria client
  2. Configurare le credenziali predefinite dell'applicazione
  3. Esegui il codice
const dataproc = require('@google-cloud/dataproc');

// TODO(developer): Uncomment and set the following variables
// projectId = 'YOUR_PROJECT_ID'
// region = 'YOUR_REGION'

// Create a client with the endpoint set to the desired region
const client = new dataproc.v1.WorkflowTemplateServiceClient({
  apiEndpoint: `${region}-dataproc.googleapis.com`,
  projectId: projectId,
});

async function instantiateInlineWorkflowTemplate() {
  // Create the formatted parent.
  const parent = client.regionPath(projectId, region);

  // Create the template
  const template = {
    jobs: [
      {
        hadoopJob: {
          mainJarFileUri:
            'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar',
          args: ['teragen', '1000', 'hdfs:///gen/'],
        },
        stepId: 'teragen',
      },
      {
        hadoopJob: {
          mainJarFileUri:
            'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar',
          args: ['terasort', 'hdfs:///gen/', 'hdfs:///sort/'],
        },
        stepId: 'terasort',
        prerequisiteStepIds: ['teragen'],
      },
    ],
    placement: {
      managedCluster: {
        clusterName: 'my-managed-cluster',
        config: {
          gceClusterConfig: {
            // Leave 'zoneUri' empty for 'Auto Zone Placement'
            // zoneUri: ''
            zoneUri: 'us-central1-a',
          },
        },
      },
    },
  };

  const request = {
    parent: parent,
    template: template,
  };

  // Submit the request to instantiate the workflow from an inline template.
  const [operation] = await client.instantiateInlineWorkflowTemplate(request);
  await operation.promise();

  // Output a success message
  console.log('Workflow ran successfully.');

Python

  1. Installare la libreria client
  2. Configurare le credenziali predefinite dell'applicazione
  3. Esegui il codice
    from google.cloud import dataproc_v1 as dataproc
    
    def instantiate_inline_workflow_template(project_id, region):
        """This sample walks a user through submitting a workflow
        for a Cloud Dataproc using the Python client library.
    
        Args:
            project_id (string): Project to use for running the workflow.
            region (string): Region where the workflow resources should live.
        """
    
        # Create a client with the endpoint set to the desired region.
        workflow_template_client = dataproc.WorkflowTemplateServiceClient(
            client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
        )
    
        parent = "projects/{}/regions/{}".format(project_id, region)
    
        template = {
            "jobs": [
                {
                    "hadoop_job": {
                        "main_jar_file_uri": "file:///usr/lib/hadoop-mapreduce/"
                        "hadoop-mapreduce-examples.jar",
                        "args": ["teragen", "1000", "hdfs:///gen/"],
                    },
                    "step_id": "teragen",
                },
                {
                    "hadoop_job": {
                        "main_jar_file_uri": "file:///usr/lib/hadoop-mapreduce/"
                        "hadoop-mapreduce-examples.jar",
                        "args": ["terasort", "hdfs:///gen/", "hdfs:///sort/"],
                    },
                    "step_id": "terasort",
                    "prerequisite_step_ids": ["teragen"],
                },
            ],
            "placement": {
                "managed_cluster": {
                    "cluster_name": "my-managed-cluster",
                    "config": {
                        "gce_cluster_config": {
                            # Leave 'zone_uri' empty for 'Auto Zone Placement'
                            # 'zone_uri': ''
                            "zone_uri": "us-central1-a"
                        }
                    },
                }
            },
        }
    
        # Submit the request to instantiate the workflow from an inline template.
        operation = workflow_template_client.instantiate_inline_workflow_template(
            request={"parent": parent, "template": template}
        )
        operation.result()
    
        # Output a success message.
        print("Workflow ran successfully.")