Membuat template Dataflow klasik

Dalam dokumen ini, Anda akan mempelajari cara membuat template klasik kustom dari kode pipeline Dataflow. Template klasik memaketkan pipeline Dataflow yang ada untuk membuat template yang dapat digunakan kembali yang dapat Anda sesuaikan untuk setiap tugas dengan mengubah parameter pipeline tertentu. Daripada menulis template, Anda menggunakan perintah untuk membuat template dari pipeline yang ada.

Berikut adalah ringkasan singkat prosesnya. Detail proses ini diberikan di bagian berikutnya.

  1. Dalam kode pipeline, gunakan antarmuka ValueProvider untuk semua opsi pipeline yang ingin Anda tetapkan atau gunakan saat runtime. Gunakan objek DoFn yang menerima parameter runtime.
  2. Perluas template Anda dengan metadata tambahan sehingga parameter kustom divalidasi saat template klasik dijalankan. Contoh metadata tersebut mencakup nama template klasik kustom dan parameter opsional.
  3. Periksa apakah konektor I/O pipeline mendukung objek ValueProvider, dan buat perubahan sesuai kebutuhan.
  4. Buat dan siapkan template klasik kustom.
  5. Jalankan template klasik kustom.

Untuk mempelajari berbagai jenis template Dataflow, manfaatnya, dan kapan harus memilih template klasik, lihat Template Dataflow.

Izin yang diperlukan untuk menjalankan template klasik

Izin yang Anda perlukan untuk menjalankan template Dataflow klasik bergantung pada tempat Anda menjalankan template, dan apakah sumber dan sink untuk pipeline berada di project lain.

Untuk informasi selengkapnya tentang cara menjalankan pipeline Dataflow secara lokal atau menggunakan Google Cloud, lihat Keamanan dan izin Dataflow.

Untuk mengetahui daftar peran dan izin Dataflow, lihat Kontrol akses Dataflow.

Batasan

  • Opsi pipeline berikut tidak didukung dengan template klasik. Jika Anda perlu mengontrol jumlah thread harness pekerja, gunakan Template Flex.

    Java

    numberOfWorkerHarnessThreads
      

    Python

    number_of_worker_harness_threads
      
  • Runner Dataflow tidak mendukung opsi ValueProvider untuk topik Pub/Sub dan parameter langganan. Jika Anda memerlukan opsi Pub/Sub dalam parameter runtime, gunakan Template Flex.

Tentang parameter runtime dan antarmuka ValueProvider

Antarmuka ValueProvider memungkinkan pipeline menerima parameter runtime. Apache Beam menyediakan tiga jenis objek ValueProvider.

Nama Deskripsi
RuntimeValueProvider

RuntimeValueProvider adalah jenis ValueProvider default. RuntimeValueProvider memungkinkan pipeline Anda menerima nilai yang hanya tersedia selama eksekusi pipeline. Nilai ini tidak tersedia selama konstruksi pipeline, sehingga Anda tidak dapat menggunakan nilai tersebut untuk mengubah grafik alur kerja pipeline.

Anda dapat menggunakan isAccessible() untuk memeriksa apakah nilai ValueProvider tersedia. Jika Anda memanggil get() sebelum eksekusi pipeline, Apache Beam akan menampilkan error:
Value only available at runtime, but accessed from a non-runtime context.

Gunakan RuntimeValueProvider jika Anda tidak mengetahui nilainya terlebih dahulu. Untuk mengubah parameter value saat runtime, jangan tetapkan nilai untuk parameter dalam template. Tetapkan nilai untuk parameter saat Anda membuat tugas dari template.

StaticValueProvider

StaticValueProvider memungkinkan Anda memberikan nilai statis ke pipeline. Nilai ini tersedia selama konstruksi pipeline, sehingga Anda dapat menggunakan nilai tersebut untuk mengubah grafik alur kerja pipeline.

Gunakan StaticValueProvider jika Anda mengetahui nilainya terlebih dahulu. Lihat bagian StaticValueProvider untuk mengetahui contohnya.

NestedValueProvider

NestedValueProvider memungkinkan Anda menghitung nilai dari objek ValueProvider lainnya. NestedValueProvider menggabungkan ValueProvider, dan jenis ValueProvider yang digabungkan menentukan apakah nilai dapat diakses selama pembuatan pipeline.

Gunakan NestedValueProvider saat Anda ingin menggunakan nilai untuk menghitung nilai lain saat runtime. Lihat bagian NestedValueProvider untuk mengetahui contohnya.

Menggunakan parameter runtime dalam kode pipeline

Bagian ini menjelaskan cara menggunakan ValueProvider, StaticValueProvider, dan NestedValueProvider.

Menggunakan ValueProvider dalam opsi pipeline

Gunakan ValueProvider untuk semua opsi pipeline yang ingin Anda tetapkan atau gunakan saat runtime.

Misalnya, cuplikan kode WordCount berikut tidak mendukung parameter runtime. Kode ini menambahkan opsi file input, membuat pipeline, dan membaca baris dari file input:

Java

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    String getInputFile();
    void setInputFile(String value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

  class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

Untuk menambahkan dukungan parameter runtime, ubah opsi file input untuk menggunakan ValueProvider.

Java

Gunakan ValueProvider<String>, bukan String, untuk jenis opsi file input.

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    ValueProvider<String> getInputFile();
    void setInputFile(ValueProvider<String> value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

Ganti add_argument dengan add_value_provider_argument.

 class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      # Use add_value_provider_argument for arguments to be templatable
      # Use add_argument as usual for non-templatable arguments
      parser.add_value_provider_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

Menggunakan ValueProvider dalam fungsi Anda

Untuk menggunakan nilai parameter runtime dalam fungsi Anda sendiri, perbarui fungsi untuk menggunakan parameter ValueProvider.

Contoh berikut berisi opsi ValueProvider bilangan bulat, dan fungsi sederhana yang menambahkan bilangan bulat. Fungsi ini bergantung pada bilangan bulat ValueProvider. Selama eksekusi, pipeline menerapkan MySumFn ke setiap bilangan bulat dalam PCollection yang berisi [1, 2, 3]. Jika nilai runtime adalah 10, PCollection yang dihasilkan akan berisi [11, 12, 13].

Java

  public interface SumIntOptions extends PipelineOptions {
      // New runtime parameter, specified by the --int
      // option at runtime.
      ValueProvider<Integer> getInt();
      void setInt(ValueProvider<Integer> value);
  }

  class MySumFn extends DoFn<Integer, Integer> {
      ValueProvider<Integer> mySumInteger;

      MySumFn(ValueProvider<Integer> sumInt) {
          // Store the value provider
          this.mySumInteger = sumInt;
      }

      @ProcessElement
      public void processElement(ProcessContext c) {
         // Get the value of the value provider and add it to
         // the element's value.
         c.output(c.element() + mySumInteger.get());
      }
  }

  public static void main(String[] args) {
    SumIntOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(SumIntOptions.class);

    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
      // Get the value provider and pass it to MySumFn
     .apply(ParDo.of(new MySumFn(options.getInt())))
     .apply("ToString", MapElements.into(TypeDescriptors.strings()).via(x -> x.toString()))
     .apply("OutputNums", TextIO.write().to("numvalues"));

    p.run();
  }

Python

  import apache_beam as beam
  from apache_beam.options.pipeline_options import PipelineOptions
  from apache_beam.options.value_provider import StaticValueProvider
  from apache_beam.io import WriteToText

  class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_value_provider_argument('--templated_int', type=int)

  class MySumFn(beam.DoFn):
    def __init__(self, templated_int):
      self.templated_int = templated_int

    def process(self, an_int):
      yield self.templated_int.get() + an_int

  pipeline_options = PipelineOptions()
  p = beam.Pipeline(options=pipeline_options)

  user_options = pipeline_options.view_as(UserOptions)
  sum = (p
         | 'ReadCollection' >> beam.io.ReadFromText(
             'gs://some/integer_collection')
         | 'StringToInt' >> beam.Map(lambda w: int(w))
         | 'AddGivenInt' >> beam.ParDo(MySumFn(user_options.templated_int))
         | 'WriteResultingCollection' >> WriteToText('some/output_path'))

Gunakan StaticValueProvider

Untuk memberikan nilai statis ke pipeline, gunakan StaticValueProvider.

Contoh ini menggunakan MySumFn, yang merupakan DoFn yang menggunakan ValueProvider<Integer>. Jika mengetahui nilai parameter terlebih dahulu, Anda dapat menggunakan StaticValueProvider untuk menentukan nilai statis sebagai ValueProvider.

Java

Kode ini mendapatkan nilai pada runtime pipeline:

  .apply(ParDo.of(new MySumFn(options.getInt())))

Sebagai gantinya, Anda dapat menggunakan StaticValueProvider dengan nilai statis:

  .apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))

Python

Kode ini mendapatkan nilai pada runtime pipeline:

  beam.ParDo(MySumFn(user_options.templated_int))

Sebagai gantinya, Anda dapat menggunakan StaticValueProvider dengan nilai statis:

  beam.ParDo(MySumFn(StaticValueProvider(int,10)))

Anda juga dapat menggunakan StaticValueProvider saat menerapkan modul I/O yang mendukung parameter reguler dan parameter runtime. StaticValueProvider mengurangi duplikasi kode dari penerapan dua metode yang serupa.

Java

Kode sumber untuk contoh ini berasal dari TextIO.java Apache Beam di GitHub.

  // Create a StaticValueProvider<String> from a regular String parameter
  // value, and then call .from() with this new StaticValueProvider.
  public Read from(String filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return from(StaticValueProvider.of(filepattern));
  }

  // This method takes a ValueProvider parameter.
  public Read from(ValueProvider<String> filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return toBuilder().setFilepattern(filepattern).build();
  }

Python

Dalam contoh ini, ada satu konstruktor yang menerima argumen string atau ValueProvider. Jika argumen adalah string, argumen tersebut akan dikonversi menjadi StaticValueProvider.

class Read():

  def __init__(self, filepattern):
    if isinstance(filepattern, str):
      # Create a StaticValueProvider from a regular string parameter
      filepattern = StaticValueProvider(str, filepattern)

    self.filepattern = filepattern

Gunakan NestedStaticValueProvider

Untuk menghitung nilai dari objek ValueProvider lain, gunakan NestedValueProvider.

NestedValueProvider menggunakan ValueProvider dan penerjemah SerializableFunction sebagai input. Saat Anda memanggil .get() pada NestedValueProvider, penerjemah akan membuat nilai baru berdasarkan nilai ValueProvider. Terjemahan ini memungkinkan Anda menggunakan nilai ValueProvider untuk membuat nilai akhir yang Anda inginkan.

Dalam contoh berikut, pengguna memberikan nama file file.txt. Transformasi menambahkan jalur gs://directory_name/ ke nama file. Memanggil .get() akan menampilkan gs://directory_name/file.txt.

Java

  public interface WriteIntsOptions extends PipelineOptions {
      // New runtime parameter, specified by the --fileName
      // option at runtime.
      ValueProvider<String> getFileName();
      void setFileName(ValueProvider<String> value);
  }

  public static void main(String[] args) {
     WriteIntsOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WriteIntsOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
     // Write to the computed complete file path.
     .apply("OutputNums", TextIO.write().to(NestedValueProvider.of(
        options.getFileName(),
        new SerializableFunction<String, String>() {
          @Override
          public String apply(String file) {
            return "gs://directoryname/" + file;
          }
        })));

    p.run();
  }

Menggunakan metadata dalam kode pipeline

Anda dapat memperluas template dengan metadata tambahan sehingga parameter kustom divalidasi saat template dijalankan. Jika Anda ingin membuat metadata untuk template, ikuti langkah-langkah berikut:

  1. Buat file berformat JSON bernama TEMPLATE_NAME_metadata menggunakan parameter di Parameter metadata dan format di Contoh file metadata. Ganti TEMPLATE_NAME dengan nama template Anda.

    Pastikan file metadata tidak memiliki ekstensi nama file. Misalnya, jika nama template Anda adalah myTemplate, file metadatanya harus myTemplate_metadata.

  2. Simpan file metadata di Cloud Storage dalam folder yang sama dengan template.

Parameter metadata

Kunci parameter Wajib Deskripsi nilai
name Ya Nama template Anda.
description Tidak Paragraf singkat yang menjelaskan template.
streaming Tidak Jika true, template ini mendukung streaming. Nilai defaultnya adalah false.
supportsAtLeastOnce Tidak Jika true, template ini mendukung pemrosesan setidaknya satu kali. Nilai defaultnya adalah false. Tetapkan parameter ini ke true jika template dirancang untuk berfungsi dengan mode streaming setidaknya satu kali.
supportsExactlyOnce Tidak Jika true, template ini mendukung pemrosesan tepat satu kali. Nilai defaultnya adalah true.
defaultStreamingMode Tidak Mode streaming default, untuk template yang mendukung mode setidaknya satu kali dan mode tepat satu kali. Gunakan salah satu nilai berikut: "AT_LEAST_ONCE", "EXACTLY_ONCE". Jika tidak ditentukan, mode streaming defaultnya adalah exactly-once.
parameters Tidak Array parameter tambahan yang digunakan template. Array kosong digunakan secara default.
name Ya Nama parameter yang digunakan dalam template Anda.
label Ya String yang dapat dibaca manusia yang digunakan di konsol Google Cloud untuk melabeli parameter.
helpText Ya Paragraf singkat yang menjelaskan parameter.
isOptional Tidak false jika parameter wajib ada dan true jika parameter bersifat opsional. Kecuali jika ditetapkan dengan nilai, isOptional akan ditetapkan secara default ke false. Jika Anda tidak menyertakan kunci parameter ini untuk metadata, metadata akan menjadi parameter yang diperlukan.
regexes Tidak Array ekspresi reguler POSIX-egrep dalam bentuk string yang digunakan untuk memvalidasi nilai parameter. Misalnya, ["^[a-zA-Z][a-zA-Z0-9]+"] adalah satu ekspresi reguler yang memvalidasi bahwa nilai dimulai dengan huruf, lalu memiliki satu atau beberapa karakter. Array kosong digunakan secara default.

Contoh file metadata

Java

Layanan Dataflow menggunakan metadata berikut untuk memvalidasi parameter kustom template WordCount:

{
  "description": "An example pipeline that counts words in the input file.",
  "name": "Word Count",
  "streaming": false,
  "parameters": [
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "inputFile",
      "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt",
      "label": "Input Cloud Storage file(s)"
    },
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "output",
      "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts",
      "label": "Output Cloud Storage file(s)"
    }
  ]
}

Python

Layanan Dataflow menggunakan metadata berikut untuk memvalidasi parameter kustom template WordCount:

{
  "description": "An example pipeline that counts words in the input file.",
  "name": "Word Count",
  "streaming": false,
  "parameters": [
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "input",
      "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt",
      "label": "Input Cloud Storage file(s)"
    },
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "output",
      "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts",
      "label": "Output Cloud Storage file(s)"
    }
  ]
}

Anda dapat mendownload file metadata untuk template yang disediakan Google dari direktori template Dataflow.

Konektor I/O pipeline dan ValueProvider yang didukung

Java

Beberapa konektor I/O berisi metode yang menerima objek ValueProvider. Untuk menentukan dukungan untuk konektor dan metode tertentu, lihat dokumentasi referensi API untuk konektor I/O. Metode yang didukung memiliki overload dengan ValueProvider. Jika metode tidak memiliki overload, metode tersebut tidak mendukung parameter runtime. Konektor I/O berikut memiliki setidaknya dukungan ValueProvider parsial:

  • IO berbasis file: TextIO, AvroIO, FileIO, TFRecordIO, XmlIO
  • BigQueryIO*
  • BigtableIO (memerlukan SDK 2.3.0 atau yang lebih baru)
  • PubSubIO
  • SpannerIO

Python

Beberapa konektor I/O berisi metode yang menerima objek ValueProvider. Untuk menentukan dukungan untuk konektor I/O dan metodenya, lihat dokumentasi referensi API untuk konektor. Konektor I/O berikut menerima parameter runtime:

  • IO berbasis file: textio, avroio, tfrecordio

Membuat dan melakukan staging template klasik

Setelah menulis pipeline, Anda harus membuat dan menyiapkan file template. Saat Anda membuat dan melakukan staging template, lokasi staging akan berisi file tambahan yang diperlukan untuk menjalankan template. Jika Anda menghapus lokasi staging, template akan gagal dijalankan. Tugas Dataflow tidak langsung berjalan setelah Anda mem-build template. Untuk menjalankan tugas Dataflow berbasis template kustom, Anda dapat menggunakan konsol Google Cloud , Dataflow REST API, atau gcloud CLI.

Contoh berikut menunjukkan cara melakukan staging file template:

Java

Perintah Maven ini membuat dan menyiapkan template di lokasi Cloud Storage yang ditentukan dengan --templateLocation.

    mvn compile exec:java \
     -Dexec.mainClass=com.example.myclass \
     -Dexec.args="--runner=DataflowRunner \
                  --project=PROJECT_ID \
                  --stagingLocation=gs://BUCKET_NAME/staging \
                  --templateLocation=gs://BUCKET_NAME/templates/TEMPLATE_NAME \
                  --region=REGION" \
     -P dataflow-runner
    

Pastikan jalur templateLocation sudah benar. Ganti kode berikut:

  • com.example.myclass: class Java Anda
  • PROJECT_ID: project ID Anda
  • BUCKET_NAME: nama bucket Cloud Storage Anda
  • TEMPLATE_NAME: nama template Anda
  • REGION: region tempat men-deploy tugas Dataflow Anda

Python

Perintah Python ini membuat dan menyiapkan template di lokasi Cloud Storage yang ditentukan dengan --template_location.

  python -m examples.mymodule \
    --runner DataflowRunner \
    --project PROJECT_ID \
    --staging_location gs://BUCKET_NAME/staging \
    --template_location gs://BUCKET_NAME/templates/TEMPLATE_NAME \
    --region REGION

Pastikan jalur template_location sudah benar. Ganti kode berikut:

  • examples.mymodule: modul Python Anda
  • PROJECT_ID: project ID Anda
  • BUCKET_NAME: nama bucket Cloud Storage Anda
  • TEMPLATE_NAME: nama template Anda
  • REGION: region tempat men-deploy tugas Dataflow Anda

Setelah membuat dan menyiapkan template, langkah berikutnya adalah menjalankan template.