Dalam dokumen ini, Anda akan mempelajari cara membuat template klasik kustom dari kode pipeline Dataflow Anda. Template klasik mengemas pipeline Dataflow yang ada untuk membuat template yang dapat digunakan kembali yang dapat disesuaikan untuk setiap tugas dengan mengubah parameter pipeline tertentu. Alih-alih menulis template, Anda menggunakan perintah untuk membuat template dari pipeline yang ada.
Berikut adalah ikhtisar singkat prosesnya. Detail proses ini diberikan di bagian berikutnya.
- Di kode pipeline, gunakan antarmuka
ValueProvider
untuk semua opsi pipeline yang ingin Anda tetapkan atau gunakan saat runtime. Gunakan objekDoFn
yang menerima parameter runtime. - Perluas template Anda dengan metadata tambahan sehingga parameter kustom divalidasi saat template klasik dijalankan. Contoh metadata tersebut mencakup nama template klasik kustom dan parameter opsional Anda.
- Periksa apakah konektor I/O pipeline mendukung objek
ValueProvider
, dan buat perubahan sesuai kebutuhan. - Membuat dan mengatur template klasik kustom.
- Jalankan template klasik kustom.
Untuk mempelajari berbagai jenis template Dataflow, manfaatnya, dan kapan harus memilih template klasik, lihat Template Dataflow.
Izin yang diperlukan untuk menjalankan template klasik
Izin yang Anda perlukan untuk menjalankan template klasik Dataflow bergantung pada tempat Anda menjalankan template, dan apakah sumber dan sink untuk pipeline berada di project lain.
Untuk mengetahui informasi selengkapnya tentang menjalankan pipeline Dataflow baik secara lokal atau menggunakan Google Cloud, lihat Keamanan dan izin Dataflow.
Untuk mengetahui daftar peran dan izin Dataflow, lihat Kontrol akses Dataflow.
Batasan
- Opsi pipeline berikut tidak didukung dengan template klasik. Jika Anda perlu mengontrol jumlah
thread harness pekerja, gunakan
Template Fleksibel.
Java
numberOfWorkerHarnessThreads
Python
number_of_worker_harness_threads
- Runner Dataflow tidak mendukung opsi
ValueProvider
untuk topik Pub/Sub dan parameter langganan. Jika Anda memerlukan opsi Pub/Sub dalam parameter runtime, gunakan Template Flex.
Tentang parameter runtime dan antarmuka ValueProvider
Antarmuka ValueProvider
memungkinkan pipeline menerima parameter runtime. Apache Beam menyediakan tiga jenis objek ValueProvider
.
Nama | Deskripsi |
---|---|
RuntimeValueProvider |
Anda dapat menggunakan Gunakan |
StaticValueProvider |
Gunakan |
NestedValueProvider |
Gunakan |
Menggunakan parameter runtime di kode pipeline
Bagian ini memandu cara menggunakan ValueProvider
,
StaticValueProvider
, dan NestedValueProvider
.
Menggunakan ValueProvider
di opsi pipeline
Gunakan ValueProvider
untuk semua opsi pipeline yang ingin Anda tetapkan
atau gunakan pada runtime.
Misalnya, cuplikan kode WordCount
berikut tidak mendukung parameter runtime. Kode ini menambahkan opsi file input, membuat pipeline, dan membaca baris dari file input tersebut:
Java
public interface WordCountOptions extends PipelineOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") String getInputFile(); void setInputFile(String value); } public static void main(String[] args) { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WordCountOptions.class); Pipeline p = Pipeline.create(options); p.apply("ReadLines", TextIO.read().from(options.getInputFile())); ...
Python
class WordcountOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_argument( '--input', default='gs://dataflow-samples/shakespeare/kinglear.txt', help='Path of the file to read from') parser.add_argument( '--output', required=True, help='Output file to write results to.') pipeline_options = PipelineOptions(['--output', 'some/output_path']) p = beam.Pipeline(options=pipeline_options) wordcount_options = pipeline_options.view_as(WordcountOptions) lines = p | 'read' >> ReadFromText(wordcount_options.input)
Untuk menambahkan dukungan parameter runtime, ubah opsi file input untuk menggunakan
ValueProvider
.
Java
Gunakan ValueProvider<String>
, bukan String
,
untuk opsi jenis file input.
public interface WordCountOptions extends PipelineOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") ValueProvider<String> getInputFile(); void setInputFile(ValueProvider<String> value); } public static void main(String[] args) { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WordCountOptions.class); Pipeline p = Pipeline.create(options); p.apply("ReadLines", TextIO.read().from(options.getInputFile())); ...
Python
Ganti add_argument
dengan add_value_provider_argument
.
class WordcountOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): # Use add_value_provider_argument for arguments to be templatable # Use add_argument as usual for non-templatable arguments parser.add_value_provider_argument( '--input', default='gs://dataflow-samples/shakespeare/kinglear.txt', help='Path of the file to read from') parser.add_argument( '--output', required=True, help='Output file to write results to.') pipeline_options = PipelineOptions(['--output', 'some/output_path']) p = beam.Pipeline(options=pipeline_options) wordcount_options = pipeline_options.view_as(WordcountOptions) lines = p | 'read' >> ReadFromText(wordcount_options.input)
Menggunakan ValueProvider
dalam fungsi Anda
Untuk menggunakan parameter value runtime dalam fungsi Anda sendiri, update fungsi
untuk menggunakan parameter ValueProvider
.
Contoh berikut berisi opsi ValueProvider
bilangan bulat,
dan fungsi sederhana yang menambahkan bilangan bulat. Fungsi ini bergantung pada
bilangan bulat ValueProvider
. Selama eksekusi, pipeline menerapkan MySumFn
ke setiap bilangan bulat dalam PCollection
yang berisi [1, 2, 3]
. Jika nilai runtime adalah 10, PCollection
yang dihasilkan
akan berisi [11, 12, 13]
.
Java
public interface SumIntOptions extends PipelineOptions { // New runtime parameter, specified by the --int // option at runtime. ValueProvider<Integer> getInt(); void setInt(ValueProvider<Integer> value); } class MySumFn extends DoFn<Integer, Integer> { ValueProvider<Integer> mySumInteger; MySumFn(ValueProvider<Integer> sumInt) { // Store the value provider this.mySumInteger = sumInt; } @ProcessElement public void processElement(ProcessContext c) { // Get the value of the value provider and add it to // the element's value. c.output(c.element() + mySumInteger.get()); } } public static void main(String[] args) { SumIntOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(SumIntOptions.class); Pipeline p = Pipeline.create(options); p.apply(Create.of(1, 2, 3)) // Get the value provider and pass it to MySumFn .apply(ParDo.of(new MySumFn(options.getInt()))) .apply("ToString", MapElements.into(TypeDescriptors.strings()).via(x -> x.toString())) .apply("OutputNums", TextIO.write().to("numvalues")); p.run(); }
Python
import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.value_provider import StaticValueProvider from apache_beam.io import WriteToText class UserOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_value_provider_argument('--templated_int', type=int) class MySumFn(beam.DoFn): def __init__(self, templated_int): self.templated_int = templated_int def process(self, an_int): yield self.templated_int.get() + an_int pipeline_options = PipelineOptions() p = beam.Pipeline(options=pipeline_options) user_options = pipeline_options.view_as(UserOptions) sum = (p | 'ReadCollection' >> beam.io.ReadFromText( 'gs://some/integer_collection') | 'StringToInt' >> beam.Map(lambda w: int(w)) | 'AddGivenInt' >> beam.ParDo(MySumFn(user_options.templated_int)) | 'WriteResultingCollection' >> WriteToText('some/output_path'))
Gunakan StaticValueProvider
Untuk memberikan nilai statis ke pipeline Anda, gunakan
StaticValueProvider
.
Contoh ini menggunakan MySumFn
, yang merupakan DoFn
yang menggunakan ValueProvider<Integer>
. Jika sudah mengetahui nilai parameter sebelumnya, Anda dapat menggunakan StaticValueProvider
untuk menentukan nilai statis sebagai ValueProvider
.
Java
Kode ini mendapatkan nilai pada runtime pipeline:
.apply(ParDo.of(new MySumFn(options.getInt())))
Sebagai gantinya, Anda dapat menggunakan StaticValueProvider
dengan nilai statis:
.apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))
Python
Kode ini mendapatkan nilai pada runtime pipeline:
beam.ParDo(MySumFn(user_options.templated_int))
Sebagai gantinya, Anda dapat menggunakan StaticValueProvider
dengan nilai statis:
beam.ParDo(MySumFn(StaticValueProvider(int,10)))
Anda juga dapat menggunakan StaticValueProvider
saat mengimplementasikan
modul I/O yang mendukung parameter reguler dan parameter runtime.
StaticValueProvider
mengurangi duplikasi kode agar tidak menerapkan dua metode serupa.
Java
Kode sumber untuk contoh ini adalah dari TextIO.java on GitHub Apache Beam.
// Create a StaticValueProvider<String> from a regular String parameter // value, and then call .from() with this new StaticValueProvider. public Read from(String filepattern) { checkNotNull(filepattern, "Filepattern cannot be empty."); return from(StaticValueProvider.of(filepattern)); } // This method takes a ValueProvider parameter. public Read from(ValueProvider<String> filepattern) { checkNotNull(filepattern, "Filepattern cannot be empty."); return toBuilder().setFilepattern(filepattern).build(); }
Python
Dalam contoh ini, ada satu konstruktor yang menerima
argumen string
atau ValueProvider
. Jika
argumennya adalah string
, argumen tersebut akan dikonversi menjadi
StaticValueProvider
.
class Read(): def __init__(self, filepattern): if isinstance(filepattern, str): # Create a StaticValueProvider from a regular string parameter filepattern = StaticValueProvider(str, filepattern) self.filepattern = filepattern
Gunakan NestedStaticValueProvider
Untuk menghitung nilai dari objek ValueProvider
lain, gunakan NestedValueProvider
.
NestedValueProvider
menggunakan penerjemah ValueProvider
dan
SerializableFunction
sebagai input. Saat Anda memanggil
.get()
di NestedValueProvider
, penerjemah
akan membuat nilai baru berdasarkan nilai ValueProvider
. Dengan terjemahan ini, Anda dapat menggunakan nilai ValueProvider
untuk membuat nilai akhir yang diinginkan.
Pada contoh berikut, pengguna memberikan nama file file.txt
. Transformasi ini menambahkan jalur gs://directory_name/
ke nama file. Memanggil .get()
akan menampilkan
gs://directory_name/file.txt
.
Java
public interface WriteIntsOptions extends PipelineOptions { // New runtime parameter, specified by the --fileName // option at runtime. ValueProvider<String> getFileName(); void setFileName(ValueProvider<String> value); } public static void main(String[] args) { WriteIntsOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WriteIntsOptions.class); Pipeline p = Pipeline.create(options); p.apply(Create.of(1, 2, 3)) // Write to the computed complete file path. .apply("OutputNums", TextIO.write().to(NestedValueProvider.of( options.getFileName(), new SerializableFunction<String, String>() { @Override public String apply(String file) { return "gs://directoryname/" + file; } }))); p.run(); }
Menggunakan metadata di kode pipeline
Anda dapat memperluas template dengan metadata tambahan sehingga parameter kustom divalidasi saat template dijalankan. Jika ingin membuat metadata untuk template Anda, ikuti langkah-langkah berikut:
- Buat file berformat JSON bernama
TEMPLATE_NAME_metadata
menggunakan parameter dalam Parameter metadata dan formatnya dalam Contoh file metadata. GantiTEMPLATE_NAME
dengan nama template Anda.Pastikan file metadata tidak memiliki ekstensi nama file. Misalnya, jika nama template Anda adalah
myTemplate
, file metadatanya harusmyTemplate_metadata
. - Simpan file metadata di Cloud Storage dalam folder yang sama dengan template.
Parameter metadata
Kunci parameter | Diperlukan | Deskripsi nilai | |
---|---|---|---|
name |
Ya | Nama template Anda. | |
description |
Tidak | Paragraf singkat berisi teks yang menjelaskan template. | |
streaming |
Tidak | Jika true , template ini mendukung streaming. Nilai defaultnya adalah
false . |
|
supportsAtLeastOnce |
Tidak | Jika true , template ini mendukung pemrosesan setidaknya satu kali. Nilai defaultnya
adalah false . Tetapkan parameter ini ke true jika template dirancang untuk berfungsi dengan mode streaming minimal satu kali.
|
|
supportsExactlyOnce |
Tidak | Jika true , template ini mendukung pemrosesan tepat satu kali. Nilai default-nya
adalah true . |
|
defaultStreamingMode |
Tidak | Mode streaming default, untuk template yang mendukung mode minimal satu kali dan mode tepat satu kali. Gunakan salah satu nilai berikut: "AT_LEAST_ONCE" ,
"EXACTLY_ONCE" . Jika tidak ditentukan, mode streaming default adalah tepat satu kali.
|
|
parameters |
Tidak | Array parameter tambahan yang digunakan template. Array kosong digunakan secara default. | |
name |
Ya | Nama parameter yang digunakan di template Anda. | |
label |
Ya | String yang dapat dibaca manusia, yang digunakan di Konsol Google Cloud untuk memberi label pada parameter. | |
helpText |
Ya | Paragraf singkat berisi teks yang menjelaskan parameter. | |
isOptional |
Tidak | false jika parameter diperlukan dan true jika parameter bersifat opsional. Kecuali ditetapkan dengan nilai, isOptional ditetapkan secara default ke false .
Jika Anda tidak menyertakan kunci parameter ini untuk metadata, metadata akan menjadi parameter yang diperlukan. |
|
regexes |
Tidak | Array ekspresi reguler POSIX-egrep dalam bentuk string yang digunakan untuk memvalidasi nilai parameter. Misalnya, ["^[a-zA-Z][a-zA-Z0-9]+"] adalah satu
ekspresi reguler yang memvalidasi bahwa nilai diawali dengan huruf, lalu memiliki satu atau
beberapa karakter. Array kosong digunakan secara default. |
Contoh file metadata
Java
Layanan Dataflow menggunakan metadata berikut untuk memvalidasi parameter kustom template WordCount:
{ "description": "An example pipeline that counts words in the input file.", "name": "Word Count", "streaming": false, "parameters": [ { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "inputFile", "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt", "label": "Input Cloud Storage file(s)" }, { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "output", "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts", "label": "Output Cloud Storage file(s)" } ] }
Python
Layanan Dataflow menggunakan metadata berikut untuk memvalidasi parameter kustom template WordCount:
{ "description": "An example pipeline that counts words in the input file.", "name": "Word Count", "streaming": false, "parameters": [ { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "input", "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt", "label": "Input Cloud Storage file(s)" }, { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "output", "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts", "label": "Output Cloud Storage file(s)" } ] }
Anda dapat mendownload file metadata untuk template yang disediakan Google dari direktori template Dataflow.
Konektor I/O pipeline yang didukung dan ValueProvider
Java
Beberapa konektor I/O berisi metode yang menerima
objek ValueProvider
. Untuk menentukan dukungan bagi konektor dan metode tertentu, lihat dokumentasi referensi API untuk konektor I/O. Metode yang didukung memiliki overload dengan
ValueProvider
. Jika suatu metode tidak memiliki overload, metode tersebut tidak akan
mendukung parameter runtime. Konektor I/O berikut memiliki setidaknya
dukungan ValueProvider
sebagian:
- IO berbasis file:
TextIO
,AvroIO
,FileIO
,TFRecordIO
,XmlIO
BigQueryIO
*BigtableIO
(memerlukan SDK 2.3.0 atau yang lebih baru)PubSubIO
SpannerIO
Python
Beberapa konektor I/O berisi metode yang menerima
objek ValueProvider
. Untuk menentukan dukungan bagi konektor I/O dan metodenya, lihat dokumentasi referensi API untuk konektor tersebut. Konektor I/O berikut menerima parameter runtime:
- IO berbasis file:
textio
,avroio
,tfrecordio
Membuat dan menyusun template klasik
Setelah menulis pipeline, Anda harus membuat dan mengatur file template. Saat Anda membuat dan melakukan staging sebuah template, lokasi staging berisi file tambahan yang diperlukan untuk menjalankan template Anda. Jika Anda menghapus lokasi staging, template akan gagal dijalankan. Tugas Dataflow tidak langsung berjalan setelah Anda menyiapkan template. Untuk menjalankan tugas Dataflow berbasis template kustom, Anda dapat menggunakan Google Cloud Console, Dataflow REST API, atau gcloud CLI.
Contoh berikut menunjukkan cara melakukan stage file template:
Java
Perintah Maven ini membuat dan menempatkan template di lokasi Cloud Storage yang ditentukan dengan --templateLocation
.
mvn compile exec:java \ -Dexec.mainClass=com.example.myclass \ -Dexec.args="--runner=DataflowRunner \ --project=PROJECT_ID \ --stagingLocation=gs://BUCKET_NAME/staging \ --templateLocation=gs://BUCKET_NAME/templates/TEMPLATE_NAME \ --region=REGION" \ -P dataflow-runner
Verifikasi bahwa jalur templateLocation
sudah benar. Ganti kode berikut:
com.example.myclass
: class Java AndaPROJECT_ID
: project ID AndaBUCKET_NAME
: nama bucket Cloud Storage AndaTEMPLATE_NAME
: nama template AndaREGION
: region untuk men-deploy tugas Dataflow Anda di
Python
Perintah Python ini membuat dan menempatkan template di lokasi Cloud Storage
yang ditentukan dengan --template_location
.
python -m examples.mymodule \ --runner DataflowRunner \ --project PROJECT_ID \ --staging_location gs://BUCKET_NAME/staging \ --template_location gs://BUCKET_NAME/templates/TEMPLATE_NAME \ --region REGION
Verifikasi bahwa jalur template_location
sudah benar. Ganti kode berikut:
examples.mymodule
: modul Python AndaPROJECT_ID
: project ID AndaBUCKET_NAME
: nama bucket Cloud Storage AndaTEMPLATE_NAME
: nama template AndaREGION
: region untuk men-deploy tugas Dataflow Anda di
Setelah membuat dan menata template, langkah selanjutnya adalah menjalankan template.