샘플 템플릿 실행

WordCount 템플릿은 Cloud Storage에서 텍스트를 읽고, 텍스트 줄을 개별 단어로 토큰화하고, 각 단어의 출현 빈도를 세는 일괄 파이프라인입니다. WordCount에 대한 자세한 내용은 WordCount 파이프라인 예를 참조하세요.

Cloud Storage 버킷이 서비스 경계 외부에 있으면 버킷에 대한 액세스를 허용하는 이그레스 규칙을 만듭니다.

템플릿 매개변수

매개변수 설명
inputFile Cloud Storage 입력 파일 경로입니다.
outputFile Cloud Storage 출력 파일 경로 및 프리픽스입니다.

WordCount 템플릿 실행

콘솔gcloudAPI
  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the WordCount template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Word_Count \
    --region REGION_NAME \
    --parameters \
    inputFile=gs://dataflow-samples/shakespeare/kinglear.txt,output=gs://BUCKET_NAME/output/my_output

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름

  • REGION_NAME: Dataflow 작업을 배포할 리전(예: us-central1)

  • BUCKET_NAME: Cloud Storage 버킷 이름

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/latest/Word_Count
{
    "jobName": "JOB_NAME",
    "parameters": {
       "inputFile" : "gs://dataflow-samples/shakespeare/kinglear.txt",
       "output": "gs://BUCKET_NAME/output/my_output"
    },
    "environment": { "zone": "us-central1-f" }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름

  • LOCATION: Dataflow 작업을 배포할 리전(예: us-central1)

  • BUCKET_NAME: Cloud Storage 버킷 이름

Java
/*
 * Copyright (C) 2016 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.templates;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.templates.WordCount.WordCountOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

/**
 * A template that counts words in text files.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Word_Count.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Word_Count",
    category = TemplateCategory.GET_STARTED,
    displayName = "Word Count",
    description =
        "Batch pipeline. Reads text from Cloud Storage, tokenizes text lines into individual words,"
            + " and performs frequency count on each of the words.",
    optionsClass = WordCountOptions.class,
    contactInformation = "https://cloud.google.com/support")
public class WordCount {

  static class ExtractWordsFn extends DoFn<String, String> {
    private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines");

    @ProcessElement
    public void processElement(ProcessContext c) {
      // Check if the line is empty.
      if (c.element().trim().isEmpty()) {
        emptyLines.inc();
        return;
      }

      // Split the line into words.
      String[] words = c.element().split("[^a-zA-Z']+");

      // Output each word encountered into the output PCollection.
      for (String word : words) {
        if (!word.isEmpty()) {
          c.output(word);
        }
      }
    }
  }

  /** A SimpleFunction that converts a Word and Count into a printable string. */
  public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
    @Override
    public String apply(KV<String, Long> input) {
      return input.getKey() + ": " + input.getValue();
    }
  }

  /**
   * A PTransform that converts a PCollection containing lines of text into a PCollection of
   * formatted word counts.
   */
  public static class CountWords
      extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
    @Override
    public PCollection<KV<String, Long>> expand(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts = words.apply(Count.<String>perElement());

      return wordCounts;
    }
  }

  /**
   * Options supported by {@link com.google.cloud.teleport.templates.WordCount}.
   *
   * <p>Inherits standard configuration options.
   */
  public interface WordCountOptions extends PipelineOptions {

    @TemplateParameter.GcsReadFile(
        order = 1,
        description = "Input file(s) in Cloud Storage",
        helpText =
            "The input file pattern Dataflow reads from. Use the example file "
                + "(gs://dataflow-samples/shakespeare/kinglear.txt) or enter the path to your own "
                + "using the same format: gs://your-bucket/your-file.txt")
    ValueProvider<String> getInputFile();

    void setInputFile(ValueProvider<String> value);

    @TemplateParameter.GcsWriteFolder(
        order = 2,
        description = "Output Cloud Storage file prefix",
        helpText = "Path and filename prefix for writing output files. Ex: gs://your-bucket/counts")
    ValueProvider<String> getOutput();

    void setOutput(ValueProvider<String> value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);
    p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
        .apply(new CountWords())
        .apply(MapElements.via(new FormatAsTextFn()))
        .apply("WriteCounts", TextIO.write().to(options.getOutput()));

    p.run();
  }
}