Esegui un modello di esempio

Il modello WordCount è una pipeline batch che legge il testo da Cloud Storage, tokenizza le righe di testo in parole singole ed esegue un conteggio della frequenza per ciascuna parola. Per ulteriori informazioni su WordCount, consulta la pipeline di esempio WordCount.

Se il bucket Cloud Storage si trova al di fuori del perimetro di servizio, crea una regola di uscita che consenta l'accesso al bucket.

Parametri del modello

Parametro Descrizione
inputFile Il percorso del file di input di Cloud Storage.
outputFile Il percorso e il prefisso del file di output di Cloud Storage.

Esegui il modello Conteggio parole

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

  5. Nel menu a discesa Modello di flusso di dati, seleziona the WordCount template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

Nella shell o nel terminale, esegui il modello:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Word_Count \
    --region REGION_NAME \
    --parameters \
    inputFile=gs://dataflow-samples/shakespeare/kinglear.txt,output=gs://BUCKET_NAME/output/my_output

Sostituisci quanto segue:

  • JOB_NAME: un nome di job univoco a tua scelta

  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1

  • BUCKET_NAME: il nome del bucket Cloud Storage

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/latest/Word_Count
{
    "jobName": "JOB_NAME",
    "parameters": {
       "inputFile" : "gs://dataflow-samples/shakespeare/kinglear.txt",
       "output": "gs://BUCKET_NAME/output/my_output"
    },
    "environment": { "zone": "us-central1-f" }
}

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome di job univoco a tua scelta

  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1

  • BUCKET_NAME: il nome del bucket Cloud Storage

Java
/*
 * Copyright (C) 2016 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.templates;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.templates.WordCount.WordCountOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

/**
 * A template that counts words in text files.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Word_Count.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Word_Count",
    category = TemplateCategory.GET_STARTED,
    displayName = "Word Count",
    description =
        "Batch pipeline. Reads text from Cloud Storage, tokenizes text lines into individual words,"
            + " and performs frequency count on each of the words.",
    optionsClass = WordCountOptions.class,
    contactInformation = "https://cloud.google.com/support")
public class WordCount {

  static class ExtractWordsFn extends DoFn<String, String> {
    private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines");

    @ProcessElement
    public void processElement(ProcessContext c) {
      // Check if the line is empty.
      if (c.element().trim().isEmpty()) {
        emptyLines.inc();
        return;
      }

      // Split the line into words.
      String[] words = c.element().split("[^a-zA-Z']+");

      // Output each word encountered into the output PCollection.
      for (String word : words) {
        if (!word.isEmpty()) {
          c.output(word);
        }
      }
    }
  }

  /** A SimpleFunction that converts a Word and Count into a printable string. */
  public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
    @Override
    public String apply(KV<String, Long> input) {
      return input.getKey() + ": " + input.getValue();
    }
  }

  /**
   * A PTransform that converts a PCollection containing lines of text into a PCollection of
   * formatted word counts.
   */
  public static class CountWords
      extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
    @Override
    public PCollection<KV<String, Long>> expand(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts = words.apply(Count.<String>perElement());

      return wordCounts;
    }
  }

  /**
   * Options supported by {@link com.google.cloud.teleport.templates.WordCount}.
   *
   * <p>Inherits standard configuration options.
   */
  public interface WordCountOptions extends PipelineOptions {

    @TemplateParameter.GcsReadFile(
        order = 1,
        description = "Input file(s) in Cloud Storage",
        helpText =
            "The input file pattern Dataflow reads from. Use the example file "
                + "(gs://dataflow-samples/shakespeare/kinglear.txt) or enter the path to your own "
                + "using the same format: gs://your-bucket/your-file.txt")
    ValueProvider<String> getInputFile();

    void setInputFile(ValueProvider<String> value);

    @TemplateParameter.GcsWriteFolder(
        order = 2,
        description = "Output Cloud Storage file prefix",
        helpText = "Path and filename prefix for writing output files. Ex: gs://your-bucket/counts")
    ValueProvider<String> getOutput();

    void setOutput(ValueProvider<String> value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);
    p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
        .apply(new CountWords())
        .apply(MapElements.via(new FormatAsTextFn()))
        .apply("WriteCounts", TextIO.write().to(options.getOutput()));

    p.run();
  }
}