Dalam dokumen ini, Anda akan mempelajari cara membuat template klasik kustom dari kode pipeline Dataflow. Template klasik memaketkan pipeline Dataflow yang ada untuk membuat template yang dapat digunakan kembali yang dapat Anda sesuaikan untuk setiap tugas dengan mengubah parameter pipeline tertentu. Daripada menulis template, Anda menggunakan perintah untuk membuat template dari pipeline yang ada.
Berikut adalah ringkasan singkat prosesnya. Detail proses ini diberikan di bagian berikutnya.
- Dalam kode pipeline, gunakan antarmuka
ValueProvider
untuk semua opsi pipeline yang ingin Anda tetapkan atau gunakan saat runtime. Gunakan objekDoFn
yang menerima parameter runtime. - Perluas template Anda dengan metadata tambahan sehingga parameter kustom divalidasi saat template klasik dijalankan. Contoh metadata tersebut mencakup nama template klasik kustom dan parameter opsional.
- Periksa apakah konektor I/O pipeline mendukung objek
ValueProvider
, dan buat perubahan sesuai kebutuhan. - Buat dan siapkan template klasik kustom.
- Jalankan template klasik kustom.
Untuk mempelajari berbagai jenis template Dataflow, manfaatnya, dan kapan harus memilih template klasik, lihat Template Dataflow.
Izin yang diperlukan untuk menjalankan template klasik
Izin yang Anda perlukan untuk menjalankan template Dataflow klasik bergantung pada tempat Anda menjalankan template, dan apakah sumber dan sink untuk pipeline berada di project lain.
Untuk informasi selengkapnya tentang cara menjalankan pipeline Dataflow secara lokal atau menggunakan Google Cloud, lihat Keamanan dan izin Dataflow.
Untuk mengetahui daftar peran dan izin Dataflow, lihat Kontrol akses Dataflow.
Batasan
- Opsi pipeline
berikut tidak didukung dengan template klasik. Jika Anda perlu mengontrol jumlah
thread harness pekerja, gunakan
Template Flex.
Java
numberOfWorkerHarnessThreads
Python
number_of_worker_harness_threads
- Runner Dataflow tidak mendukung opsi
ValueProvider
untuk topik Pub/Sub dan parameter langganan. Jika Anda memerlukan opsi Pub/Sub dalam parameter runtime, gunakan Template Flex.
Tentang parameter runtime dan antarmuka ValueProvider
Antarmuka ValueProvider
memungkinkan pipeline menerima
parameter runtime. Apache Beam menyediakan tiga jenis objek ValueProvider
.
Nama | Deskripsi |
---|---|
RuntimeValueProvider |
Anda dapat menggunakan Gunakan |
StaticValueProvider |
Gunakan |
NestedValueProvider |
Gunakan |
Menggunakan parameter runtime dalam kode pipeline
Bagian ini menjelaskan cara menggunakan ValueProvider
,
StaticValueProvider
, dan NestedValueProvider
.
Menggunakan ValueProvider
dalam opsi pipeline
Gunakan ValueProvider
untuk semua opsi pipeline yang ingin Anda tetapkan atau gunakan saat runtime.
Misalnya, cuplikan kode WordCount
berikut tidak mendukung parameter runtime. Kode ini menambahkan opsi file input, membuat pipeline, dan
membaca baris dari file input:
Java
public interface WordCountOptions extends PipelineOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") String getInputFile(); void setInputFile(String value); } public static void main(String[] args) { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WordCountOptions.class); Pipeline p = Pipeline.create(options); p.apply("ReadLines", TextIO.read().from(options.getInputFile())); ...
Python
class WordcountOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_argument( '--input', default='gs://dataflow-samples/shakespeare/kinglear.txt', help='Path of the file to read from') parser.add_argument( '--output', required=True, help='Output file to write results to.') pipeline_options = PipelineOptions(['--output', 'some/output_path']) p = beam.Pipeline(options=pipeline_options) wordcount_options = pipeline_options.view_as(WordcountOptions) lines = p | 'read' >> ReadFromText(wordcount_options.input)
Untuk menambahkan dukungan parameter runtime, ubah opsi file input untuk menggunakan
ValueProvider
.
Java
Gunakan ValueProvider<String>
, bukan String
,
untuk jenis opsi file input.
public interface WordCountOptions extends PipelineOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") ValueProvider<String> getInputFile(); void setInputFile(ValueProvider<String> value); } public static void main(String[] args) { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WordCountOptions.class); Pipeline p = Pipeline.create(options); p.apply("ReadLines", TextIO.read().from(options.getInputFile())); ...
Python
Ganti add_argument
dengan add_value_provider_argument
.
class WordcountOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): # Use add_value_provider_argument for arguments to be templatable # Use add_argument as usual for non-templatable arguments parser.add_value_provider_argument( '--input', default='gs://dataflow-samples/shakespeare/kinglear.txt', help='Path of the file to read from') parser.add_argument( '--output', required=True, help='Output file to write results to.') pipeline_options = PipelineOptions(['--output', 'some/output_path']) p = beam.Pipeline(options=pipeline_options) wordcount_options = pipeline_options.view_as(WordcountOptions) lines = p | 'read' >> ReadFromText(wordcount_options.input)
Menggunakan ValueProvider
dalam fungsi Anda
Untuk menggunakan nilai parameter runtime dalam fungsi Anda sendiri, perbarui fungsi
untuk menggunakan parameter ValueProvider
.
Contoh berikut berisi opsi ValueProvider
bilangan bulat,
dan fungsi sederhana yang menambahkan bilangan bulat. Fungsi ini bergantung pada bilangan bulat
ValueProvider
. Selama eksekusi, pipeline
menerapkan MySumFn
ke setiap bilangan bulat dalam PCollection
yang berisi [1, 2, 3]
. Jika nilai runtime adalah 10, PCollection
yang dihasilkan akan berisi [11, 12, 13]
.
Java
public interface SumIntOptions extends PipelineOptions { // New runtime parameter, specified by the --int // option at runtime. ValueProvider<Integer> getInt(); void setInt(ValueProvider<Integer> value); } class MySumFn extends DoFn<Integer, Integer> { ValueProvider<Integer> mySumInteger; MySumFn(ValueProvider<Integer> sumInt) { // Store the value provider this.mySumInteger = sumInt; } @ProcessElement public void processElement(ProcessContext c) { // Get the value of the value provider and add it to // the element's value. c.output(c.element() + mySumInteger.get()); } } public static void main(String[] args) { SumIntOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(SumIntOptions.class); Pipeline p = Pipeline.create(options); p.apply(Create.of(1, 2, 3)) // Get the value provider and pass it to MySumFn .apply(ParDo.of(new MySumFn(options.getInt()))) .apply("ToString", MapElements.into(TypeDescriptors.strings()).via(x -> x.toString())) .apply("OutputNums", TextIO.write().to("numvalues")); p.run(); }
Python
import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.value_provider import StaticValueProvider from apache_beam.io import WriteToText class UserOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_value_provider_argument('--templated_int', type=int) class MySumFn(beam.DoFn): def __init__(self, templated_int): self.templated_int = templated_int def process(self, an_int): yield self.templated_int.get() + an_int pipeline_options = PipelineOptions() p = beam.Pipeline(options=pipeline_options) user_options = pipeline_options.view_as(UserOptions) sum = (p | 'ReadCollection' >> beam.io.ReadFromText( 'gs://some/integer_collection') | 'StringToInt' >> beam.Map(lambda w: int(w)) | 'AddGivenInt' >> beam.ParDo(MySumFn(user_options.templated_int)) | 'WriteResultingCollection' >> WriteToText('some/output_path'))
Gunakan StaticValueProvider
Untuk memberikan nilai statis ke pipeline, gunakan
StaticValueProvider
.
Contoh ini menggunakan MySumFn
, yang merupakan DoFn
yang menggunakan ValueProvider<Integer>
. Jika mengetahui nilai parameter terlebih dahulu, Anda dapat menggunakan StaticValueProvider
untuk menentukan nilai statis sebagai ValueProvider
.
Java
Kode ini mendapatkan nilai pada runtime pipeline:
.apply(ParDo.of(new MySumFn(options.getInt())))
Sebagai gantinya, Anda dapat menggunakan StaticValueProvider
dengan nilai statis:
.apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))
Python
Kode ini mendapatkan nilai pada runtime pipeline:
beam.ParDo(MySumFn(user_options.templated_int))
Sebagai gantinya, Anda dapat menggunakan StaticValueProvider
dengan nilai statis:
beam.ParDo(MySumFn(StaticValueProvider(int,10)))
Anda juga dapat menggunakan StaticValueProvider
saat menerapkan
modul I/O yang mendukung parameter reguler dan parameter runtime.
StaticValueProvider
mengurangi duplikasi kode dari penerapan dua metode yang serupa.
Java
Kode sumber untuk contoh ini berasal dari TextIO.java Apache Beam di GitHub.
// Create a StaticValueProvider<String> from a regular String parameter // value, and then call .from() with this new StaticValueProvider. public Read from(String filepattern) { checkNotNull(filepattern, "Filepattern cannot be empty."); return from(StaticValueProvider.of(filepattern)); } // This method takes a ValueProvider parameter. public Read from(ValueProvider<String> filepattern) { checkNotNull(filepattern, "Filepattern cannot be empty."); return toBuilder().setFilepattern(filepattern).build(); }
Python
Dalam contoh ini, ada satu konstruktor yang menerima argumen
string
atau ValueProvider
. Jika argumen adalah string
, argumen tersebut akan dikonversi menjadi StaticValueProvider
.
class Read(): def __init__(self, filepattern): if isinstance(filepattern, str): # Create a StaticValueProvider from a regular string parameter filepattern = StaticValueProvider(str, filepattern) self.filepattern = filepattern
Gunakan NestedStaticValueProvider
Untuk menghitung nilai dari objek ValueProvider
lain, gunakan
NestedValueProvider
.
NestedValueProvider
menggunakan ValueProvider
dan penerjemah
SerializableFunction
sebagai input. Saat Anda memanggil
.get()
pada NestedValueProvider
, penerjemah
akan membuat nilai baru berdasarkan nilai ValueProvider
. Terjemahan
ini memungkinkan Anda menggunakan nilai ValueProvider
untuk membuat
nilai akhir yang Anda inginkan.
Dalam contoh berikut, pengguna memberikan nama file file.txt
. Transformasi
menambahkan jalur gs://directory_name/
ke
nama file. Memanggil .get()
akan menampilkan
gs://directory_name/file.txt
.
Java
public interface WriteIntsOptions extends PipelineOptions { // New runtime parameter, specified by the --fileName // option at runtime. ValueProvider<String> getFileName(); void setFileName(ValueProvider<String> value); } public static void main(String[] args) { WriteIntsOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WriteIntsOptions.class); Pipeline p = Pipeline.create(options); p.apply(Create.of(1, 2, 3)) // Write to the computed complete file path. .apply("OutputNums", TextIO.write().to(NestedValueProvider.of( options.getFileName(), new SerializableFunction<String, String>() { @Override public String apply(String file) { return "gs://directoryname/" + file; } }))); p.run(); }
Menggunakan metadata dalam kode pipeline
Anda dapat memperluas template dengan metadata tambahan sehingga parameter kustom divalidasi saat template dijalankan. Jika Anda ingin membuat metadata untuk template, ikuti langkah-langkah berikut:
- Buat file berformat JSON bernama
TEMPLATE_NAME_metadata
menggunakan parameter di Parameter metadata dan format di Contoh file metadata. GantiTEMPLATE_NAME
dengan nama template Anda.Pastikan file metadata tidak memiliki ekstensi nama file. Misalnya, jika nama template Anda adalah
myTemplate
, file metadatanya harusmyTemplate_metadata
. - Simpan file metadata di Cloud Storage dalam folder yang sama dengan template.
Parameter metadata
Kunci parameter | Wajib | Deskripsi nilai | |
---|---|---|---|
name |
Ya | Nama template Anda. | |
description |
Tidak | Paragraf singkat yang menjelaskan template. | |
streaming |
Tidak | Jika true , template ini mendukung streaming. Nilai defaultnya adalah
false . |
|
supportsAtLeastOnce |
Tidak | Jika true , template ini mendukung pemrosesan setidaknya satu kali. Nilai defaultnya adalah false . Tetapkan parameter ini ke true jika template dirancang
untuk berfungsi dengan mode streaming setidaknya satu kali.
|
|
supportsExactlyOnce |
Tidak | Jika true , template ini mendukung
pemrosesan tepat satu kali. Nilai defaultnya adalah true . |
|
defaultStreamingMode |
Tidak | Mode streaming default, untuk template yang mendukung mode setidaknya satu kali dan
mode tepat satu kali. Gunakan salah satu nilai berikut: "AT_LEAST_ONCE" ,
"EXACTLY_ONCE" . Jika tidak ditentukan, mode streaming defaultnya adalah exactly-once.
|
|
parameters |
Tidak | Array parameter tambahan yang digunakan template. Array kosong digunakan secara default. | |
name |
Ya | Nama parameter yang digunakan dalam template Anda. | |
label |
Ya | String yang dapat dibaca manusia yang digunakan di konsol Google Cloud untuk melabeli parameter. | |
helpText |
Ya | Paragraf singkat yang menjelaskan parameter. | |
isOptional |
Tidak | false jika parameter wajib ada dan true jika parameter bersifat opsional. Kecuali jika ditetapkan dengan nilai, isOptional akan ditetapkan secara default ke false .
Jika Anda tidak menyertakan kunci parameter ini untuk metadata, metadata akan menjadi parameter
yang diperlukan. |
|
regexes |
Tidak | Array ekspresi reguler POSIX-egrep dalam bentuk string yang digunakan untuk memvalidasi
nilai parameter. Misalnya, ["^[a-zA-Z][a-zA-Z0-9]+"] adalah satu ekspresi reguler yang memvalidasi bahwa nilai dimulai dengan huruf, lalu memiliki satu atau beberapa karakter. Array kosong digunakan secara default. |
Contoh file metadata
Java
Layanan Dataflow menggunakan metadata berikut untuk memvalidasi parameter kustom template WordCount:
{ "description": "An example pipeline that counts words in the input file.", "name": "Word Count", "streaming": false, "parameters": [ { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "inputFile", "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt", "label": "Input Cloud Storage file(s)" }, { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "output", "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts", "label": "Output Cloud Storage file(s)" } ] }
Python
Layanan Dataflow menggunakan metadata berikut untuk memvalidasi parameter kustom template WordCount:
{ "description": "An example pipeline that counts words in the input file.", "name": "Word Count", "streaming": false, "parameters": [ { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "input", "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt", "label": "Input Cloud Storage file(s)" }, { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "output", "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts", "label": "Output Cloud Storage file(s)" } ] }
Anda dapat mendownload file metadata untuk template yang disediakan Google dari direktori template Dataflow.
Konektor I/O pipeline dan ValueProvider
yang didukung
Java
Beberapa konektor I/O berisi metode yang menerima objek ValueProvider
. Untuk menentukan dukungan untuk konektor dan metode
tertentu, lihat dokumentasi referensi API
untuk konektor I/O. Metode yang didukung memiliki overload dengan
ValueProvider
. Jika metode tidak memiliki overload, metode tersebut tidak
mendukung parameter runtime. Konektor I/O berikut memiliki setidaknya dukungan ValueProvider
parsial:
- IO berbasis file:
TextIO
,AvroIO
,FileIO
,TFRecordIO
,XmlIO
BigQueryIO
*BigtableIO
(memerlukan SDK 2.3.0 atau yang lebih baru)PubSubIO
SpannerIO
Python
Beberapa konektor I/O berisi metode yang menerima
objek ValueProvider
. Untuk menentukan dukungan untuk konektor I/O
dan metodenya, lihat dokumentasi referensi API
untuk konektor. Konektor I/O berikut menerima parameter runtime:
- IO berbasis file:
textio
,avroio
,tfrecordio
Membuat dan melakukan staging template klasik
Setelah menulis pipeline, Anda harus membuat dan menyiapkan file template. Saat Anda membuat dan melakukan staging template, lokasi staging akan berisi file tambahan yang diperlukan untuk menjalankan template. Jika Anda menghapus lokasi staging, template akan gagal dijalankan. Tugas Dataflow tidak langsung berjalan setelah Anda mem-build template. Untuk menjalankan tugas Dataflow berbasis template kustom, Anda dapat menggunakan konsol Google Cloud , Dataflow REST API, atau gcloud CLI.
Contoh berikut menunjukkan cara melakukan staging file template:
Java
Perintah Maven ini membuat dan menyiapkan template di lokasi Cloud Storage yang ditentukan dengan --templateLocation
.
mvn compile exec:java \ -Dexec.mainClass=com.example.myclass \ -Dexec.args="--runner=DataflowRunner \ --project=PROJECT_ID \ --stagingLocation=gs://BUCKET_NAME/staging \ --templateLocation=gs://BUCKET_NAME/templates/TEMPLATE_NAME \ --region=REGION" \ -P dataflow-runner
Pastikan jalur templateLocation
sudah benar. Ganti kode berikut:
com.example.myclass
: class Java AndaPROJECT_ID
: project ID AndaBUCKET_NAME
: nama bucket Cloud Storage AndaTEMPLATE_NAME
: nama template AndaREGION
: region tempat men-deploy tugas Dataflow Anda
Python
Perintah Python ini membuat dan menyiapkan template di lokasi Cloud Storage yang ditentukan dengan --template_location
.
python -m examples.mymodule \ --runner DataflowRunner \ --project PROJECT_ID \ --staging_location gs://BUCKET_NAME/staging \ --template_location gs://BUCKET_NAME/templates/TEMPLATE_NAME \ --region REGION
Pastikan jalur template_location
sudah benar. Ganti kode berikut:
examples.mymodule
: modul Python AndaPROJECT_ID
: project ID AndaBUCKET_NAME
: nama bucket Cloud Storage AndaTEMPLATE_NAME
: nama template AndaREGION
: region tempat men-deploy tugas Dataflow Anda
Setelah membuat dan menyiapkan template, langkah berikutnya adalah menjalankan template.