Membuat template Dataflow klasik

Dalam dokumen ini, Anda akan mempelajari cara membuat template klasik kustom dari kode pipeline Dataflow Anda. Template klasik mengemas pipeline Dataflow yang ada untuk membuat template yang dapat digunakan kembali yang dapat disesuaikan untuk setiap tugas dengan mengubah parameter pipeline tertentu. Alih-alih menulis template, Anda menggunakan perintah untuk membuat template dari pipeline yang ada.

Berikut adalah ikhtisar singkat prosesnya. Detail proses ini diberikan di bagian berikutnya.

  1. Di kode pipeline, gunakan antarmuka ValueProvider untuk semua opsi pipeline yang ingin Anda tetapkan atau gunakan saat runtime. Gunakan objek DoFn yang menerima parameter runtime.
  2. Perluas template Anda dengan metadata tambahan sehingga parameter kustom divalidasi saat template klasik dijalankan. Contoh metadata tersebut mencakup nama template klasik kustom dan parameter opsional Anda.
  3. Periksa apakah konektor I/O pipeline mendukung objek ValueProvider, dan buat perubahan sesuai kebutuhan.
  4. Membuat dan mengatur template klasik kustom.
  5. Jalankan template klasik kustom.

Untuk mempelajari berbagai jenis template Dataflow, manfaatnya, dan kapan harus memilih template klasik, lihat Template Dataflow.

Izin yang diperlukan untuk menjalankan template klasik

Izin yang Anda perlukan untuk menjalankan template klasik Dataflow bergantung pada tempat Anda menjalankan template, dan apakah sumber dan sink untuk pipeline berada di project lain.

Untuk mengetahui informasi selengkapnya tentang menjalankan pipeline Dataflow baik secara lokal atau menggunakan Google Cloud, lihat Keamanan dan izin Dataflow.

Untuk mengetahui daftar peran dan izin Dataflow, lihat Kontrol akses Dataflow.

Batasan

  • Opsi pipeline berikut tidak didukung dengan template klasik. Jika Anda perlu mengontrol jumlah thread harness pekerja, gunakan Template Fleksibel.

    Java

    numberOfWorkerHarnessThreads
      

    Python

    number_of_worker_harness_threads
      
  • Runner Dataflow tidak mendukung opsi ValueProvider untuk topik Pub/Sub dan parameter langganan. Jika Anda memerlukan opsi Pub/Sub dalam parameter runtime, gunakan Template Flex.

Tentang parameter runtime dan antarmuka ValueProvider

Antarmuka ValueProvider memungkinkan pipeline menerima parameter runtime. Apache Beam menyediakan tiga jenis objek ValueProvider.

Nama Deskripsi
RuntimeValueProvider

RuntimeValueProvider adalah jenis ValueProvider default. RuntimeValueProvider memungkinkan pipeline Anda menerima nilai yang hanya tersedia selama eksekusi pipeline. Nilai ini tidak tersedia selama konstruksi pipeline, sehingga Anda tidak dapat menggunakan nilai tersebut untuk mengubah grafik alur kerja pipeline.

Anda dapat menggunakan isAccessible() untuk memeriksa apakah nilai ValueProvider tersedia atau tidak. Jika Anda memanggil get() sebelum eksekusi pipeline, Apache Beam akan menampilkan error:
Value only available at runtime, but accessed from a non-runtime context.

Gunakan RuntimeValueProvider jika Anda tidak mengetahui nilainya sebelumnya. Untuk mengubah nilai parameter saat runtime, jangan menetapkan nilai untuk parameter dalam template. Tetapkan nilai untuk parameter saat Anda membuat tugas dari template.

StaticValueProvider

StaticValueProvider memungkinkan Anda memberikan nilai statis ke pipeline Anda. Nilai ini tersedia selama konstruksi pipeline, sehingga Anda dapat menggunakan nilai tersebut untuk mengubah grafik alur kerja pipeline.

Gunakan StaticValueProvider jika Anda mengetahui nilainya sebelumnya. Lihat bagian StaticValueProvider untuk mengetahui contohnya.

NestedValueProvider

NestedValueProvider memungkinkan Anda menghitung nilai dari objek ValueProvider lainnya. NestedValueProvider menggabungkan ValueProvider, dan jenis ValueProvider yang digabungkan menentukan apakah nilai dapat diakses selama konstruksi pipeline.

Gunakan NestedValueProvider jika Anda ingin menggunakan nilai tersebut untuk menghitung nilai lain saat runtime. Lihat bagian NestedValueProvider untuk mengetahui contohnya.

Menggunakan parameter runtime di kode pipeline

Bagian ini memandu cara menggunakan ValueProvider, StaticValueProvider, dan NestedValueProvider.

Menggunakan ValueProvider di opsi pipeline

Gunakan ValueProvider untuk semua opsi pipeline yang ingin Anda tetapkan atau gunakan pada runtime.

Misalnya, cuplikan kode WordCount berikut tidak mendukung parameter runtime. Kode ini menambahkan opsi file input, membuat pipeline, dan membaca baris dari file input tersebut:

Java

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    String getInputFile();
    void setInputFile(String value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

  class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

Untuk menambahkan dukungan parameter runtime, ubah opsi file input untuk menggunakan ValueProvider.

Java

Gunakan ValueProvider<String>, bukan String, untuk opsi jenis file input.

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    ValueProvider<String> getInputFile();
    void setInputFile(ValueProvider<String> value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

Ganti add_argument dengan add_value_provider_argument.

 class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      # Use add_value_provider_argument for arguments to be templatable
      # Use add_argument as usual for non-templatable arguments
      parser.add_value_provider_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

Menggunakan ValueProvider dalam fungsi Anda

Untuk menggunakan parameter value runtime dalam fungsi Anda sendiri, update fungsi untuk menggunakan parameter ValueProvider.

Contoh berikut berisi opsi ValueProvider bilangan bulat, dan fungsi sederhana yang menambahkan bilangan bulat. Fungsi ini bergantung pada bilangan bulat ValueProvider. Selama eksekusi, pipeline menerapkan MySumFn ke setiap bilangan bulat dalam PCollection yang berisi [1, 2, 3]. Jika nilai runtime adalah 10, PCollection yang dihasilkan akan berisi [11, 12, 13].

Java

  public interface SumIntOptions extends PipelineOptions {
      // New runtime parameter, specified by the --int
      // option at runtime.
      ValueProvider<Integer> getInt();
      void setInt(ValueProvider<Integer> value);
  }

  class MySumFn extends DoFn<Integer, Integer> {
      ValueProvider<Integer> mySumInteger;

      MySumFn(ValueProvider<Integer> sumInt) {
          // Store the value provider
          this.mySumInteger = sumInt;
      }

      @ProcessElement
      public void processElement(ProcessContext c) {
         // Get the value of the value provider and add it to
         // the element's value.
         c.output(c.element() + mySumInteger.get());
      }
  }

  public static void main(String[] args) {
    SumIntOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(SumIntOptions.class);

    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
      // Get the value provider and pass it to MySumFn
     .apply(ParDo.of(new MySumFn(options.getInt())))
     .apply("ToString", MapElements.into(TypeDescriptors.strings()).via(x -> x.toString()))
     .apply("OutputNums", TextIO.write().to("numvalues"));

    p.run();
  }

Python

  import apache_beam as beam
  from apache_beam.options.pipeline_options import PipelineOptions
  from apache_beam.options.value_provider import StaticValueProvider
  from apache_beam.io import WriteToText

  class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_value_provider_argument('--templated_int', type=int)

  class MySumFn(beam.DoFn):
    def __init__(self, templated_int):
      self.templated_int = templated_int

    def process(self, an_int):
      yield self.templated_int.get() + an_int

  pipeline_options = PipelineOptions()
  p = beam.Pipeline(options=pipeline_options)

  user_options = pipeline_options.view_as(UserOptions)
  sum = (p
         | 'ReadCollection' >> beam.io.ReadFromText(
             'gs://some/integer_collection')
         | 'StringToInt' >> beam.Map(lambda w: int(w))
         | 'AddGivenInt' >> beam.ParDo(MySumFn(user_options.templated_int))
         | 'WriteResultingCollection' >> WriteToText('some/output_path'))

Gunakan StaticValueProvider

Untuk memberikan nilai statis ke pipeline Anda, gunakan StaticValueProvider.

Contoh ini menggunakan MySumFn, yang merupakan DoFn yang menggunakan ValueProvider<Integer>. Jika sudah mengetahui nilai parameter sebelumnya, Anda dapat menggunakan StaticValueProvider untuk menentukan nilai statis sebagai ValueProvider.

Java

Kode ini mendapatkan nilai pada runtime pipeline:

  .apply(ParDo.of(new MySumFn(options.getInt())))

Sebagai gantinya, Anda dapat menggunakan StaticValueProvider dengan nilai statis:

  .apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))

Python

Kode ini mendapatkan nilai pada runtime pipeline:

  beam.ParDo(MySumFn(user_options.templated_int))

Sebagai gantinya, Anda dapat menggunakan StaticValueProvider dengan nilai statis:

  beam.ParDo(MySumFn(StaticValueProvider(int,10)))

Anda juga dapat menggunakan StaticValueProvider saat mengimplementasikan modul I/O yang mendukung parameter reguler dan parameter runtime. StaticValueProvider mengurangi duplikasi kode agar tidak menerapkan dua metode serupa.

Java

Kode sumber untuk contoh ini adalah dari TextIO.java on GitHub Apache Beam.

  // Create a StaticValueProvider<String> from a regular String parameter
  // value, and then call .from() with this new StaticValueProvider.
  public Read from(String filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return from(StaticValueProvider.of(filepattern));
  }

  // This method takes a ValueProvider parameter.
  public Read from(ValueProvider<String> filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return toBuilder().setFilepattern(filepattern).build();
  }

Python

Dalam contoh ini, ada satu konstruktor yang menerima argumen string atau ValueProvider. Jika argumennya adalah string, argumen tersebut akan dikonversi menjadi StaticValueProvider.

class Read():

  def __init__(self, filepattern):
    if isinstance(filepattern, str):
      # Create a StaticValueProvider from a regular string parameter
      filepattern = StaticValueProvider(str, filepattern)

    self.filepattern = filepattern

Gunakan NestedStaticValueProvider

Untuk menghitung nilai dari objek ValueProvider lain, gunakan NestedValueProvider.

NestedValueProvider menggunakan penerjemah ValueProvider dan SerializableFunction sebagai input. Saat Anda memanggil .get() di NestedValueProvider, penerjemah akan membuat nilai baru berdasarkan nilai ValueProvider. Dengan terjemahan ini, Anda dapat menggunakan nilai ValueProvider untuk membuat nilai akhir yang diinginkan.

Pada contoh berikut, pengguna memberikan nama file file.txt. Transformasi ini menambahkan jalur gs://directory_name/ ke nama file. Memanggil .get() akan menampilkan gs://directory_name/file.txt.

Java

  public interface WriteIntsOptions extends PipelineOptions {
      // New runtime parameter, specified by the --fileName
      // option at runtime.
      ValueProvider<String> getFileName();
      void setFileName(ValueProvider<String> value);
  }

  public static void main(String[] args) {
     WriteIntsOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WriteIntsOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
     // Write to the computed complete file path.
     .apply("OutputNums", TextIO.write().to(NestedValueProvider.of(
        options.getFileName(),
        new SerializableFunction<String, String>() {
          @Override
          public String apply(String file) {
            return "gs://directoryname/" + file;
          }
        })));

    p.run();
  }

Menggunakan metadata di kode pipeline

Anda dapat memperluas template dengan metadata tambahan sehingga parameter kustom divalidasi saat template dijalankan. Jika ingin membuat metadata untuk template Anda, ikuti langkah-langkah berikut:

  1. Buat file berformat JSON bernama TEMPLATE_NAME_metadata menggunakan parameter dalam Parameter metadata dan formatnya dalam Contoh file metadata. Ganti TEMPLATE_NAME dengan nama template Anda.

    Pastikan file metadata tidak memiliki ekstensi nama file. Misalnya, jika nama template Anda adalah myTemplate, file metadatanya harus myTemplate_metadata.

  2. Simpan file metadata di Cloud Storage dalam folder yang sama dengan template.

Parameter metadata

Kunci parameter Diperlukan Deskripsi nilai
name Ya Nama template Anda.
description Tidak Paragraf singkat berisi teks yang menjelaskan template.
streaming Tidak Jika true, template ini mendukung streaming. Nilai defaultnya adalah false.
supportsAtLeastOnce Tidak Jika true, template ini mendukung pemrosesan setidaknya satu kali. Nilai defaultnya adalah false. Tetapkan parameter ini ke true jika template dirancang untuk berfungsi dengan mode streaming minimal satu kali.
supportsExactlyOnce Tidak Jika true, template ini mendukung pemrosesan tepat satu kali. Nilai default-nya adalah true.
defaultStreamingMode Tidak Mode streaming default, untuk template yang mendukung mode minimal satu kali dan mode tepat satu kali. Gunakan salah satu nilai berikut: "AT_LEAST_ONCE", "EXACTLY_ONCE". Jika tidak ditentukan, mode streaming default adalah tepat satu kali.
parameters Tidak Array parameter tambahan yang digunakan template. Array kosong digunakan secara default.
name Ya Nama parameter yang digunakan di template Anda.
label Ya String yang dapat dibaca manusia, yang digunakan di Konsol Google Cloud untuk memberi label pada parameter.
helpText Ya Paragraf singkat berisi teks yang menjelaskan parameter.
isOptional Tidak false jika parameter diperlukan dan true jika parameter bersifat opsional. Kecuali ditetapkan dengan nilai, isOptional ditetapkan secara default ke false. Jika Anda tidak menyertakan kunci parameter ini untuk metadata, metadata akan menjadi parameter yang diperlukan.
regexes Tidak Array ekspresi reguler POSIX-egrep dalam bentuk string yang digunakan untuk memvalidasi nilai parameter. Misalnya, ["^[a-zA-Z][a-zA-Z0-9]+"] adalah satu ekspresi reguler yang memvalidasi bahwa nilai diawali dengan huruf, lalu memiliki satu atau beberapa karakter. Array kosong digunakan secara default.

Contoh file metadata

Java

Layanan Dataflow menggunakan metadata berikut untuk memvalidasi parameter kustom template WordCount:

{
  "description": "An example pipeline that counts words in the input file.",
  "name": "Word Count",
  "streaming": false,
  "parameters": [
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "inputFile",
      "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt",
      "label": "Input Cloud Storage file(s)"
    },
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "output",
      "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts",
      "label": "Output Cloud Storage file(s)"
    }
  ]
}

Python

Layanan Dataflow menggunakan metadata berikut untuk memvalidasi parameter kustom template WordCount:

{
  "description": "An example pipeline that counts words in the input file.",
  "name": "Word Count",
  "streaming": false,
  "parameters": [
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "input",
      "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt",
      "label": "Input Cloud Storage file(s)"
    },
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "output",
      "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts",
      "label": "Output Cloud Storage file(s)"
    }
  ]
}

Anda dapat mendownload file metadata untuk template yang disediakan Google dari direktori template Dataflow.

Konektor I/O pipeline yang didukung dan ValueProvider

Java

Beberapa konektor I/O berisi metode yang menerima objek ValueProvider. Untuk menentukan dukungan bagi konektor dan metode tertentu, lihat dokumentasi referensi API untuk konektor I/O. Metode yang didukung memiliki overload dengan ValueProvider. Jika suatu metode tidak memiliki overload, metode tersebut tidak akan mendukung parameter runtime. Konektor I/O berikut memiliki setidaknya dukungan ValueProvider sebagian:

  • IO berbasis file: TextIO, AvroIO, FileIO, TFRecordIO, XmlIO
  • BigQueryIO*
  • BigtableIO (memerlukan SDK 2.3.0 atau yang lebih baru)
  • PubSubIO
  • SpannerIO

Python

Beberapa konektor I/O berisi metode yang menerima objek ValueProvider. Untuk menentukan dukungan bagi konektor I/O dan metodenya, lihat dokumentasi referensi API untuk konektor tersebut. Konektor I/O berikut menerima parameter runtime:

  • IO berbasis file: textio, avroio, tfrecordio

Membuat dan menyusun template klasik

Setelah menulis pipeline, Anda harus membuat dan mengatur file template. Saat Anda membuat dan melakukan staging sebuah template, lokasi staging berisi file tambahan yang diperlukan untuk menjalankan template Anda. Jika Anda menghapus lokasi staging, template akan gagal dijalankan. Tugas Dataflow tidak langsung berjalan setelah Anda menyiapkan template. Untuk menjalankan tugas Dataflow berbasis template kustom, Anda dapat menggunakan Google Cloud Console, Dataflow REST API, atau gcloud CLI.

Contoh berikut menunjukkan cara melakukan stage file template:

Java

Perintah Maven ini membuat dan menempatkan template di lokasi Cloud Storage yang ditentukan dengan --templateLocation.

    mvn compile exec:java \
     -Dexec.mainClass=com.example.myclass \
     -Dexec.args="--runner=DataflowRunner \
                  --project=PROJECT_ID \
                  --stagingLocation=gs://BUCKET_NAME/staging \
                  --templateLocation=gs://BUCKET_NAME/templates/TEMPLATE_NAME \
                  --region=REGION" \
     -P dataflow-runner
    

Verifikasi bahwa jalur templateLocation sudah benar. Ganti kode berikut:

  • com.example.myclass: class Java Anda
  • PROJECT_ID: project ID Anda
  • BUCKET_NAME: nama bucket Cloud Storage Anda
  • TEMPLATE_NAME: nama template Anda
  • REGION: region untuk men-deploy tugas Dataflow Anda di

Python

Perintah Python ini membuat dan menempatkan template di lokasi Cloud Storage yang ditentukan dengan --template_location.

  python -m examples.mymodule \
    --runner DataflowRunner \
    --project PROJECT_ID \
    --staging_location gs://BUCKET_NAME/staging \
    --template_location gs://BUCKET_NAME/templates/TEMPLATE_NAME \
    --region REGION

Verifikasi bahwa jalur template_location sudah benar. Ganti kode berikut:

  • examples.mymodule: modul Python Anda
  • PROJECT_ID: project ID Anda
  • BUCKET_NAME: nama bucket Cloud Storage Anda
  • TEMPLATE_NAME: nama template Anda
  • REGION: region untuk men-deploy tugas Dataflow Anda di

Setelah membuat dan menata template, langkah selanjutnya adalah menjalankan template.