Halaman ini menjelaskan cara menetapkan opsi pipeline untuk tugas Dataflow Anda. Opsi pipeline ini mengonfigurasi cara dan tempat pipeline Anda berjalan serta resource yang digunakannya.
Eksekusi pipeline terpisah dari eksekusi program Apache Beam Anda. Program Apache Beam yang telah Anda tulis akan membuat pipeline untuk eksekusi yang ditangguhkan. Artinya, program menghasilkan serangkaian langkah yang dapat dijalankan oleh runner Apache Beam yang didukung. Runner yang kompatibel mencakup runner Dataflow di Google Cloud dan runner langsung yang mengeksekusi pipeline langsung di lingkungan lokal.
Anda dapat meneruskan parameter ke tugas Dataflow saat runtime. Untuk informasi tambahan tentang cara menetapkan opsi pipeline saat runtime, lihat Mengonfigurasi opsi pipeline.
Menggunakan opsi pipeline dengan Apache Beam SDK
Anda dapat menggunakan SDK berikut untuk menetapkan opsi pipeline untuk tugas Dataflow:
- Apache Beam SDK untuk Python
- Apache Beam SDK untuk Java
- Apache Beam SDK untuk Go
Untuk menggunakan SDK, Anda menetapkan runner pipeline dan parameter eksekusi lainnya dengan menggunakan class Apache Beam SDK PipelineOptions
.
Ada dua metode untuk menentukan opsi pipeline:
- Tetapkan opsi pipeline secara terprogram dengan memberikan daftar opsi pipeline.
- Tetapkan opsi pipeline langsung di command line saat Anda menjalankan kode pipeline.
Menetapkan opsi pipeline secara terprogram
Anda dapat menetapkan opsi pipeline secara terprogram dengan membuat dan mengubah objek PipelineOptions
.
Java
Buat objek
PipelineOptions
menggunakan metode PipelineOptionsFactory.fromArgs
.
Sebagai contoh, lihat bagian Contoh peluncuran di Dataflow di halaman ini.
Python
Buat objek
PipelineOptions
.
Sebagai contoh, lihat bagian Contoh peluncuran di Dataflow di halaman ini.
Go
Menetapkan opsi pipeline secara terprogram menggunakan PipelineOptions
tidak
didukung di Apache Beam SDK untuk Go. Gunakan argumen command line Go.
Sebagai contoh, lihat bagian Contoh peluncuran di Dataflow di halaman ini.
Menetapkan opsi pipeline di command line
Anda dapat menetapkan opsi pipeline menggunakan argumen command line.
Java
Contoh sintaksis berikut berasal dari pipeline WordCount
di
panduan memulai Java.
mvn -Pdataflow-runner compile exec:java \
-Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--project=PROJECT_ID \
--gcpTempLocation=gs://BUCKET_NAME/temp/ \
--output=gs://BUCKET_NAME/output \
--runner=DataflowRunner \
--region=REGION"
Ganti kode berikut:
PROJECT_ID
: project ID Google Cloud AndaBUCKET_NAME
: nama bucket Cloud Storage AndaREGION
: Region dataflow,us-central1
Python
Contoh sintaksis berikut berasal dari pipeline WordCount
di
panduan memulai Python.
python -m apache_beam.examples.wordcount \
--region DATAFLOW_REGION \
--input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://STORAGE_BUCKET/results/outputs \
--runner DataflowRunner \
--project PROJECT_ID \
--temp_location gs://STORAGE_BUCKET/tmp/
Ganti kode berikut:
DATAFLOW_REGION
: region tempat Anda ingin men-deploy tugas Dataflow—misalnya,europe-west1
Flag
--region
akan menggantikan region default yang ditetapkan di server metadata, klien lokal, atau variabel lingkungan.STORAGE_BUCKET
: nama bucket Cloud StoragePROJECT_ID
: project ID Google Cloud
Go
Contoh sintaksis berikut berasal dari pipeline WordCount
di
panduan memulai Go.
go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://BUCKET_NAME/results/outputs \
--runner dataflow \
--project PROJECT_ID \
--region DATAFLOW_REGION \
--staging_location gs://BUCKET_NAME/binaries/
Ganti kode berikut:
BUCKET_NAME
: nama bucket Cloud StoragePROJECT_ID
: project ID Google CloudDATAFLOW_REGION
: Region tempat Anda ingin men-deploy tugas Dataflow. Misalnya,europe-west1
. Flag--region
akan menggantikan region default yang ditetapkan di server metadata, klien lokal, atau variabel lingkungan.
Menetapkan opsi pipeline eksperimental
Di SDK Java, Python, dan Go, opsi pipeline experiments
mengaktifkan fitur Dataflow eksperimental atau pra-GA.
Menetapkan secara terprogram
Untuk menetapkan opsi experiments
secara terprogram, gunakan sintaksis berikut.
Java
Dalam objek
PipelineOptions
, sertakan opsi experiments
menggunakan sintaksis berikut.
Contoh ini menetapkan ukuran disk booting ke 80 GB dengan flag eksperimen.
options.setExperiments("streaming_boot_disk_size_gb=80")
Untuk contoh yang menunjukkan cara membuat objek PipelineOptions
, lihat bagian Contoh peluncuran di Dataflow di halaman ini.
Python
Dalam objek
PipelineOptions
, sertakan opsi experiments
menggunakan sintaksis berikut.
Contoh ini menetapkan ukuran disk booting ke 80 GB dengan flag eksperimen.
beam_options = PipelineOptions(
beam_args,
experiments=['streaming_boot_disk_size_gb=80'])
Untuk contoh yang menunjukkan cara membuat objek PipelineOptions
, lihat bagian Contoh peluncuran di Dataflow di halaman ini.
Go
Menetapkan opsi pipeline secara terprogram menggunakan PipelineOptions
tidak
didukung di Apache Beam SDK untuk Go. Gunakan argumen command line Go.
Menetapkan di command line
Untuk menetapkan opsi experiments
di command line, gunakan sintaksis berikut.
Java
Contoh ini menetapkan ukuran disk booting ke 80 GB dengan flag eksperimen.
--experiments=streaming_boot_disk_size_gb=80
Python
Contoh ini menetapkan ukuran disk booting ke 80 GB dengan flag eksperimen.
--experiments=streaming_boot_disk_size_gb=80
Go
Contoh ini menetapkan ukuran disk booting ke 80 GB dengan flag eksperimen.
--experiments=streaming_boot_disk_size_gb=80
Menetapkan di template
Untuk mengaktifkan fitur eksperimental saat menjalankan template Dataflow, gunakan flag --additional-experiments
.
Template klasik
gcloud dataflow jobs run JOB_NAME --additional-experiments=EXPERIMENT[,...]
Template fleksibel
gcloud dataflow flex-template run JOB_NAME --additional-experiments=EXPERIMENT[,...]
Mengakses objek opsi pipeline
Saat Anda membuat objek Pipeline
dalam program Apache Beam, teruskan PipelineOptions
. Saat layanan Dataflow menjalankan pipeline, layanan tersebut akan mengirimkan salinan PipelineOptions
ke setiap pekerja.
Java
Akses PipelineOptions
di dalam instance DoFn
transform ParDo
apa pun menggunakan
metode ProcessContext.getPipelineOptions
.
Python
Fitur ini tidak didukung di Apache Beam SDK untuk Python.
Go
Akses opsi pipeline menggunakan beam.PipelineOptions
.
Meluncurkan di Dataflow
Jalankan tugas Anda di resource Google Cloud terkelola menggunakan layanan runner Dataflow. Menjalankan pipeline dengan Dataflow akan membuat tugas Dataflow, yang menggunakan resource Compute Engine dan Cloud Storage dalam project Google Cloud Anda. Untuk informasi tentang izin Dataflow, lihat Keamanan dan izin Dataflow.
Tugas Dataflow menggunakan Cloud Storage untuk menyimpan file sementara selama eksekusi pipeline. Untuk menghindari tagihan atas biaya penyimpanan yang tidak perlu, nonaktifkan fitur penghapusan sementara di bucket yang digunakan oleh tugas Dataflow Anda untuk penyimpanan sementara. Untuk mengetahui informasi selengkapnya, lihat Menghapus kebijakan penghapusan sementara dari bucket.
Menetapkan opsi yang diperlukan
Untuk menjalankan pipeline menggunakan Dataflow, tetapkan opsi pipeline berikut:
Java
project
: ID project Google Cloud Anda.runner
: runner pipeline yang mengeksekusi pipeline Anda. Untuk eksekusi Google Cloud, nilai ini harusDataflowRunner
.gcpTempLocation
: jalur Cloud Storage untuk Dataflow guna melakukan staging pada sebagian besar file sementara. Jika ingin menentukan bucket, Anda harus membuat bucket terlebih dahulu. Jika tidak menetapkangcpTempLocation
, Anda dapat menetapkan opsi pipelinetempLocation
, lalugcpTempLocation
ditetapkan ke nilaitempLocation
. Jika tidak ditentukan,gcpTempLocation
default akan dibuat.stagingLocation
: jalur Cloud Storage untuk Dataflow guna melakukan staging file biner Anda. Jika Anda menggunakan Apache Beam SDK 2.28 atau yang lebih tinggi, jangan tetapkan opsi ini. Untuk Apache Beam SDK 2.28 atau yang lebih rendah, jika Anda tidak menetapkan opsi ini, apa yang Anda tentukan untuktempLocation
akan digunakan untuk lokasi staging.gcpTempLocation
default dibuat jikagcpTempLocation
atautempLocation
tidak ditentukan. JikatempLocation
ditentukan dangcpTempLocation
tidak ditentukan,tempLocation
harus berupa jalur Cloud Storage, dangcpTempLocation
akan ditetapkan secara default. JikatempLocation
tidak ditentukan dangcpTempLocation
ditentukan,tempLocation
tidak akan diisi.
Python
project
: project ID Google Cloud Anda.region
: region untuk tugas Dataflow Anda.runner
: runner pipeline yang mengeksekusi pipeline Anda. Untuk eksekusi Google Cloud, nilai ini harusDataflowRunner
.temp_location
: jalur Cloud Storage untuk Dataflow guna melakukan staging file tugas sementara yang dibuat selama eksekusi pipeline.
Go
project
: project ID Google Cloud Anda.region
: region untuk tugas Dataflow Anda.runner
: runner pipeline yang mengeksekusi pipeline Anda. Untuk eksekusi Google Cloud, nilai ini harusdataflow
.staging_location
: jalur Cloud Storage untuk Dataflow guna membuat file tugas sementara yang dibuat selama eksekusi pipeline.
Menetapkan opsi pipeline secara terprogram
Contoh kode berikut menunjukkan cara membuat pipeline dengan menetapkan runner dan opsi lain yang diperlukan secara terprogram untuk menjalankan pipeline menggunakan Dataflow.
Java
// Create and set your PipelineOptions.
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
// For cloud execution, set the Google Cloud project, staging location,
// and set DataflowRunner.
options.setProject("my-project-id");
options.setStagingLocation("gs://my-bucket/binaries");
options.setRunner(DataflowRunner.class);
// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);
Python
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg', help='description')
args, beam_args = parser.parse_known_args()
# Create and set your PipelineOptions.
# For Cloud execution, specify DataflowRunner and set the Cloud Platform
# project, job name, temporary files location, and region.
# For more information about regions, check:
# https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
beam_options = PipelineOptions(
beam_args,
runner='DataflowRunner',
project='my-project-id',
job_name='unique-job-name',
temp_location='gs://my-bucket/temp',
region='us-central1')
# Note: Repeatable options like dataflow_service_options or experiments must
# be specified as a list of string(s).
# e.g. dataflow_service_options=['enable_prime']
# Create the Pipeline with the specified options.
with beam.Pipeline(options=beam_options) as pipeline:
pass # build your pipeline here.
Go
Apache Beam SDK untuk Go menggunakan argumen command line Go. Gunakan
flag.Set()
untuk menetapkan nilai flag.
// Use the Go flag package to parse custom options.
flag.Parse()
// Set the required options programmatically.
// For Cloud execution, specify the Dataflow runner, Google Cloud
// project ID, region, and staging location.
// For more information about regions, see
// https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
flag.Set("runner", "dataflow")
flag.Set("project", "my-project-id")
flag.Set("region", "us-central1")
flag.Set("staging_location", "gs://my-bucket/binaries")
beam.Init()
// Create the Pipeline.
p := beam.NewPipeline()
s := p.Root()
Setelah membuat pipeline, tentukan semua pembacaan, transformasi, dan penulisan pipeline, lalu jalankan pipeline.
Menggunakan opsi pipeline dari command line
Contoh berikut menunjukkan cara menggunakan opsi pipeline yang ditentukan di command line. Contoh ini tidak menetapkan opsi pipeline secara terprogram.
Java
// Set your PipelineOptions to the specified command-line options
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation();
// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);
Python
Gunakan modul argparse Python untuk mengurai opsi command line.
# Use Python argparse module to parse custom arguments
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# For more details on how to use argparse, take a look at:
# https://docs.python.org/3/library/argparse.html
parser = argparse.ArgumentParser()
parser.add_argument(
'--input-file',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='The file path for the input text to process.')
parser.add_argument(
'--output-path', required=True, help='The path prefix for output files.')
args, beam_args = parser.parse_known_args()
# Create the Pipeline with remaining arguments.
beam_options = PipelineOptions(beam_args)
with beam.Pipeline(options=beam_options) as pipeline:
lines = (
pipeline
| 'Read files' >> beam.io.ReadFromText(args.input_file)
| 'Write files' >> beam.io.WriteToText(args.output_path))
Go
Gunakan paket Go flag
untuk mengurai
opsi command line. Anda harus mengurai opsi sebelum memanggil
beam.Init()
. Dalam contoh ini, output
adalah opsi command line.
// Define configuration options
var (
output = flag.String("output", "", "Output file (required).")
)
// Parse options before beam.Init()
flag.Parse()
beam.Init()
// Input validation must be done after beam.Init()
if *output == "" {
log.Fatal("No output provided!")
}
p := beam.NewPipeline()
Setelah membuat pipeline, tentukan semua pembacaan, transformasi, dan penulisan pipeline, lalu jalankan pipeline.
Mengontrol mode eksekusi
Saat program Apache Beam menjalankan pipeline di layanan seperti Dataflow, program dapat menjalankan pipeline secara asinkron, atau dapat memblokir hingga pipeline selesai. Anda dapat mengubah perilaku ini menggunakan panduan berikut.
Java
Saat program Java Apache Beam menjalankan pipeline di layanan seperti
Dataflow, program tersebut biasanya dieksekusi secara asinkron. Untuk menjalankan
pipeline dan menunggu hingga tugas selesai, tetapkan DataflowRunner
sebagai
runner pipeline dan panggil pipeline.run().waitUntilFinish()
secara eksplisit.
Saat Anda menggunakan DataflowRunner
dan memanggil waitUntilFinish()
pada objek PipelineResult
yang ditampilkan dari pipeline.run()
, pipeline akan dieksekusi di Google Cloud, tetapi kode lokal menunggu tugas cloud selesai dan menampilkan objek DataflowPipelineJob
akhir. Saat tugas berjalan, layanan Dataflow akan mencetak update status tugas dan pesan konsol
selama menunggu.
Python
Saat program Python Apache Beam menjalankan pipeline di layanan seperti
Dataflow, program tersebut biasanya dieksekusi secara asinkron. Untuk memblokir
hingga pipeline selesai, gunakan metode wait_until_finish()
dari
objek PipelineResult
, yang ditampilkan dari metode run()
dari runner.
Go
Saat program Apache Beam Go menjalankan pipeline di Dataflow, program tersebut bersifat sinkron secara default dan memblokir hingga pipeline selesai. Jika Anda tidak ingin memblokir, ada dua opsi:
Mulai tugas dalam rutinitas Go.
go func() { pr, err := beamx.Run(ctx, p) if err != nil { // Handle the error. } // Send beam.PipelineResult into a channel. results <- pr }() // Do other operations while the pipeline runs.
Gunakan flag command line
--async
, yang ada dalam paketjobopts
.
Untuk melihat detail eksekusi, memantau progres, dan memverifikasi status penyelesaian tugas, gunakan antarmuka pemantauan Dataflow atau antarmuka command line Dataflow.
Menggunakan sumber streaming
Java
Jika pipeline Anda membaca dari sumber data yang tidak terbatas, seperti Pub/Sub, pipeline akan otomatis dieksekusi dalam mode streaming.
Python
Jika pipeline menggunakan sumber data yang tidak terbatas, seperti Pub/Sub, Anda
harus menetapkan opsi streaming
ke benar.
Go
Jika pipeline Anda membaca dari sumber data yang tidak terbatas, seperti Pub/Sub, pipeline akan otomatis dieksekusi dalam mode streaming.
Tugas streaming menggunakan jenis mesin Compute Engine
n1-standard-2
atau yang lebih tinggi secara default.
Meluncurkan secara lokal
Daripada menjalankan pipeline di resource cloud terkelola, Anda dapat memilih untuk menjalankan pipeline secara lokal. Eksekusi lokal memiliki keunggulan tertentu untuk menguji, men-debug, atau menjalankan pipeline Anda pada set data kecil. Misalnya, eksekusi lokal menghapus dependensi pada layanan Dataflow jarak jauh dan project Google Cloud terkait.
Saat menggunakan eksekusi lokal, Anda harus menjalankan pipeline dengan set data yang cukup
kecil agar sesuai dengan memori lokal. Anda dapat membuat set data dalam memori
kecil menggunakan transformasi Create
, atau Anda dapat menggunakan transformasi Read
untuk
bekerja dengan file lokal atau jarak jauh yang kecil. Eksekusi lokal biasanya memberikan
cara yang lebih cepat dan mudah untuk melakukan pengujian dan proses debug dengan lebih sedikit dependensi
eksternal, tetapi dibatasi oleh memori yang tersedia di lingkungan lokal
Anda.
Contoh kode berikut menunjukkan cara membuat pipeline yang dijalankan di lingkungan lokal Anda.
Java
// Create and set our Pipeline Options.
PipelineOptions options = PipelineOptionsFactory.create();
// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);
Python
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg')
args, beam_args = parser.parse_known_args()
# Create and set your Pipeline Options.
beam_options = PipelineOptions(beam_args)
args = beam_options.view_as(MyOptions)
with beam.Pipeline(options=beam_options) as pipeline:
lines = (
pipeline
| beam.io.ReadFromText(args.input)
| beam.io.WriteToText(args.output))
Go
// Parse options before beam.Init()
flag.Parse()
beam.Init()
p := beam.NewPipeline()
Setelah membuat pipeline, jalankan pipeline.
Membuat opsi pipeline kustom
Anda dapat menambahkan opsi kustom Anda sendiri selain PipelineOptions
standar. Command line Apache Beam juga dapat mengurai opsi
kustom menggunakan argumen command line yang ditentukan dalam format yang sama.
Java
Untuk menambahkan opsi Anda sendiri, tentukan antarmuka dengan metode pengambil dan penyetel untuk setiap opsi, seperti dalam contoh berikut:
public interface MyOptions extends PipelineOptions {
String getMyCustomOption();
void setMyCustomOption(String myCustomOption);
}
Python
Untuk menambahkan opsi Anda sendiri, gunakan metode add_argument()
(yang berperilaku
sama persis dengan
modul argparse standar
Python),
seperti dalam contoh berikut:
from apache_beam.options.pipeline_options import PipelineOptions
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--input')
parser.add_argument('--output')
Go
Untuk menambahkan opsi Anda sendiri, gunakan paket flag Go seperti yang ditunjukkan pada contoh berikut:
var (
input = flag.String("input", "", "")
output = flag.String("output", "", "")
)
Anda juga dapat menentukan deskripsi, yang muncul saat pengguna meneruskan --help
sebagai
argumen command line, dan nilai default.
Java
Anda menetapkan deskripsi dan nilai default menggunakan anotasi, sebagai berikut:
public interface MyOptions extends PipelineOptions {
@Description("My custom command line argument.")
@Default.String("DEFAULT")
String getMyCustomOption();
void setMyCustomOption(String myCustomOption);
}
Sebaiknya daftarkan antarmuka Anda dengan PipelineOptionsFactory
,
lalu teruskan antarmuka saat membuat objek PipelineOptions
. Saat
Anda mendaftarkan antarmuka dengan PipelineOptionsFactory
, --help
dapat
menemukan antarmuka opsi kustom dan menambahkannya ke output perintah
--help
. PipelineOptionsFactory
memvalidasi bahwa opsi kustom Anda
kompatibel dengan semua opsi terdaftar lainnya.
Contoh kode berikut menunjukkan cara mendaftarkan antarmuka opsi kustom
dengan PipelineOptionsFactory
:
PipelineOptionsFactory.register(MyOptions.class);
MyOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(MyOptions.class);
Sekarang pipeline Anda dapat menerima --myCustomOption=value
sebagai argumen
command line.
Python
Anda menetapkan deskripsi dan nilai default sebagai berikut:
from apache_beam.options.pipeline_options import PipelineOptions
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--input',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='The file path for the input text to process.')
parser.add_argument(
'--output', required=True, help='The path prefix for output files.')
Go
Anda menetapkan deskripsi dan nilai default sebagai berikut:
var (
input = flag.String("input", "gs://MY_STORAGE_BUCKET/input", "Input for the pipeline")
output = flag.String("output", "gs://MY_STORAGE_BUCKET/output", "Output for the pipeline")
)