Menetapkan opsi pipeline Dataflow

Halaman ini menjelaskan cara menetapkan opsi pipeline untuk tugas Dataflow Anda. Opsi pipeline ini mengonfigurasi cara dan tempat pipeline Anda dijalankan, serta resource mana yang digunakannya.

Eksekusi pipeline terpisah dari pelaksanaan program Apache Beam Anda. Program Apache Beam yang Anda tulis membuat pipeline untuk eksekusi yang ditangguhkan. Artinya, program tersebut menghasilkan serangkaian langkah yang dapat dijalankan oleh runner Apache Beam yang didukung. Runner yang kompatibel mencakup runner Dataflow di Google Cloud dan runner langsung yang menjalankan pipeline langsung di lingkungan lokal.

Anda dapat meneruskan parameter ke tugas Dataflow saat runtime. Untuk mengetahui informasi tambahan tentang menyetel opsi pipeline saat runtime, lihat Mengonfigurasi opsi pipeline.

Menggunakan opsi pipeline dengan Apache Beam SDK

Anda dapat menggunakan SDK berikut untuk menetapkan opsi pipeline untuk tugas Dataflow:

  • Apache Beam SDK untuk Python
  • Apache Beam SDK untuk Java
  • Apache Beam SDK untuk Go

Untuk menggunakan SDK ini, tetapkan runner pipeline dan parameter eksekusi lainnya menggunakan class Apache Beam SDK PipelineOptions.

Ada dua metode untuk menentukan opsi pipeline:

  • Menetapkan opsi pipeline secara terprogram dengan menyediakan daftar opsi pipeline.
  • Tetapkan opsi pipeline langsung di command line saat Anda menjalankan kode pipeline.

Menetapkan opsi pipeline secara terprogram

Anda dapat menetapkan opsi pipeline secara terprogram dengan membuat dan mengubah objek PipelineOptions.

Java

Buat objek PipelineOptions menggunakan metode PipelineOptionsFactory.fromArgs.

Sebagai contoh, lihat bagian Peluncuran pada contoh Dataflow di halaman ini.

Python

Buat objek PipelineOptions.

Sebagai contoh, lihat bagian Peluncuran pada contoh Dataflow di halaman ini.

Go

Menetapkan opsi pipeline secara terprogram menggunakan PipelineOptions tidak didukung di Apache Beam SDK untuk Go. Gunakan argumen command line Go.

Sebagai contoh, lihat bagian Peluncuran pada contoh Dataflow di halaman ini.

Menetapkan opsi pipeline pada command line

Anda dapat menetapkan opsi pipeline dengan menggunakan argumen command line.

Java

Contoh sintaksis berikut adalah dari pipeline WordCount di panduan memulai Java.

mvn -Pdataflow-runner compile exec:java \
  -Dexec.mainClass=org.apache.beam.examples.WordCount \
  -Dexec.args="--project=PROJECT_ID \
  --gcpTempLocation=gs://BUCKET_NAME/temp/ \
  --output=gs://BUCKET_NAME/output \
  --runner=DataflowRunner \
  --region=REGION"

Ganti kode berikut:

  • PROJECT_ID: project ID Google Cloud Anda
  • BUCKET_NAME: nama bucket Cloud Storage Anda
  • REGION: endpoint regional Dataflow, us-central1

Python

Contoh sintaksis berikut adalah dari pipeline WordCount di panduan memulai Python.

python -m apache_beam.examples.wordcount \
  --region DATAFLOW_REGION \
  --input gs://dataflow-samples/shakespeare/kinglear.txt \
  --output gs://STORAGE_BUCKET/results/outputs \
  --runner DataflowRunner \
  --project PROJECT_ID \
  --temp_location gs://STORAGE_BUCKET/tmp/

Ganti kode berikut:

  • DATAFLOW_REGION: endpoint regional tempat Anda ingin men-deploy tugas Dataflow—misalnya, europe-west1

    Flag --region menggantikan region default yang ditetapkan dalam server metadata, klien lokal, atau variabel lingkungan.

  • STORAGE_BUCKET: nama bucket Cloud Storage

  • PROJECT_ID: project ID Google Cloud

Go

Contoh sintaksis berikut adalah dari pipeline WordCount di panduan memulai Go.

go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
   --output gs://BUCKET_NAME/results/outputs \
   --runner dataflow \
   --project PROJECT_ID \
   --region DATAFLOW_REGION \
   --staging_location gs://BUCKET_NAME/binaries/

Ganti kode berikut:

  • BUCKET_NAME: nama bucket Cloud Storage

  • PROJECT_ID: project ID Google Cloud

  • DATAFLOW_REGION: Endpoint regional tempat Anda ingin men-deploy tugas Dataflow. Misalnya, europe-west1. Untuk mengetahui daftar lokasi yang tersedia, lihat Lokasi Dataflow. Flag --region menggantikan region default yang ditetapkan dalam server metadata, klien lokal, atau variabel lingkungan Anda.

Menetapkan opsi pipeline eksperimental

Di SDK Java, Python, dan Go, opsi pipeline experiments mengaktifkan fitur Dataflow eksperimental atau pra-GA.

Tetapkan secara terprogram

Untuk menetapkan opsi experiments secara terprogram, gunakan sintaksis berikut.

Java

Pada objek PipelineOptions Anda, sertakan opsi experiments dengan menggunakan sintaksis berikut. Contoh ini menetapkan ukuran boot disk menjadi 80 GB dengan tanda eksperimen.

options.setExperiments("streaming_boot_disk_size_gb=80")

Untuk contoh yang menunjukkan cara membuat objek PipelineOptions, lihat bagian Peluncuran pada contoh Dataflow di halaman ini.

Python

Pada objek PipelineOptions Anda, sertakan opsi experiments dengan menggunakan sintaksis berikut. Contoh ini menetapkan ukuran boot disk menjadi 80 GB dengan tanda eksperimen.

beam_options = PipelineOptions(
  beam_args,
  experiments='streaming_boot_disk_size_gb=80')

Untuk contoh yang menunjukkan cara membuat objek PipelineOptions, lihat bagian Peluncuran pada contoh Dataflow di halaman ini.

Go

Menetapkan opsi pipeline secara terprogram menggunakan PipelineOptions tidak didukung di Apache Beam SDK untuk Go. Gunakan argumen command line Go.

Menetapkan pada command line

Untuk menetapkan opsi experiments pada command line, gunakan sintaksis berikut.

Java

Contoh ini menetapkan ukuran boot disk menjadi 80 GB dengan tanda eksperimen.

--experiments=streaming_boot_disk_size_gb=80

Python

Contoh ini menetapkan ukuran boot disk menjadi 80 GB dengan tanda eksperimen.

--experiments=streaming_boot_disk_size_gb=80

Go

Contoh ini menetapkan ukuran boot disk menjadi 80 GB dengan tanda eksperimen.

--experiments=streaming_boot_disk_size_gb=80

Tetapkan dalam template

Untuk mengaktifkan fitur eksperimental saat menjalankan template Dataflow, gunakan flag --additional-experiments.

Template klasik

gcloud dataflow jobs run JOB_NAME --additional-experiments=EXPERIMENT[,...]

template flex

gcloud dataflow flex-template run JOB_NAME --additional-experiments=EXPERIMENT[,...]

Mengakses objek opsi pipeline

Saat Anda membuat objek Pipeline dalam program Apache Beam, teruskan PipelineOptions. Saat menjalankan pipeline Anda, layanan Dataflow akan mengirimkan salinan PipelineOptions ke setiap pekerja.

Java

Akses PipelineOptions di dalam instance DoFn transformasi ParDo menggunakan metode ProcessContext.getPipelineOptions.

Python

Fitur ini tidak didukung di Apache Beam SDK untuk Python.

Go

Akses opsi pipeline menggunakan beam.PipelineOptions.

Luncurkan di Dataflow

Jalankan tugas Anda di resource Google Cloud terkelola menggunakan layanan runner Dataflow. Menjalankan pipeline dengan Dataflow akan membuat tugas Dataflow, yang menggunakan resource Compute Engine dan Cloud Storage di project Google Cloud Anda. Untuk mengetahui informasi tentang izin Dataflow, baca Keamanan dan izin Dataflow.

Tetapkan opsi yang diperlukan

Untuk menjalankan pipeline menggunakan Dataflow, tetapkan opsi pipeline berikut:

Java

  • project: ID project Google Cloud Anda.
  • runner: runner pipeline yang menjalankan pipeline Anda. Untuk eksekusi Google Cloud, nilai ini harus DataflowRunner.
  • gcpTempLocation: jalur Cloud Storage bagi Dataflow untuk mengatur sebagian besar file sementara. Jika ingin menentukan bucket, Anda harus membuat bucket terlebih dahulu. Jika tidak menetapkan gcpTempLocation, Anda dapat menetapkan opsi pipeline tempLocation, lalu gcpTempLocation akan disetel ke nilai tempLocation. Jika keduanya tidak ditentukan, gcpTempLocation default akan dibuat.
  • stagingLocation: jalur Cloud Storage bagi Dataflow untuk men-staging file biner Anda. Jika Anda menggunakan Apache Beam SDK 2.28 atau versi yang lebih baru, jangan tetapkan opsi ini. Untuk Apache Beam SDK 2.28 atau yang lebih rendah, jika Anda tidak menetapkan opsi ini, apa yang Anda tentukan untuk tempLocation akan digunakan untuk lokasi staging.

    gcpTempLocation default akan dibuat jika atau tempLocation tidak ditentukan. Jika tempLocation ditentukan dan gcpTempLocation tidak, tempLocation harus berupa jalur Cloud Storage, dan gcpTempLocation ditetapkan secara default ke jalur tersebut. Jika tempLocation tidak ditentukan dan gcpTempLocation ditetapkan, tempLocation tidak akan diisi.

Python

  • project: ID project Google Cloud Anda.
  • region: endpoint regional untuk tugas Dataflow Anda.
  • runner: runner pipeline yang menjalankan pipeline Anda. Untuk eksekusi Google Cloud, nilai ini harus DataflowRunner.
  • temp_location: jalur Cloud Storage untuk Dataflow guna melakukan file tugas sementara yang dibuat selama eksekusi pipeline.

Go

  • project: ID project Google Cloud Anda.
  • region: endpoint regional untuk tugas Dataflow Anda.
  • runner: runner pipeline yang menjalankan pipeline Anda. Untuk eksekusi Google Cloud, nilai ini harus dataflow.
  • staging_location: jalur Cloud Storage untuk Dataflow guna melakukan file tugas sementara yang dibuat selama eksekusi pipeline.

Menetapkan opsi pipeline secara terprogram

Kode contoh berikut menunjukkan cara membuat pipeline dengan menetapkan runner secara terprogram dan opsi lain yang diperlukan untuk menjalankan pipeline menggunakan Dataflow.

Java

// Create and set your PipelineOptions.
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

// For cloud execution, set the Google Cloud project, staging location,
// and set DataflowRunner.
options.setProject("my-project-id");
options.setStagingLocation("gs://my-bucket/binaries");
options.setRunner(DataflowRunner.class);

// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);

Python

import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg', help='description')
args, beam_args = parser.parse_known_args()

# Create and set your PipelineOptions.
# For Cloud execution, specify DataflowRunner and set the Cloud Platform
# project, job name, temporary files location, and region.
# For more information about regions, check:
# https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
beam_options = PipelineOptions(
    beam_args,
    runner='DataflowRunner',
    project='my-project-id',
    job_name='unique-job-name',
    temp_location='gs://my-bucket/temp',
    region='us-central1')
# Note: Repeatable options like dataflow_service_options or experiments must
# be specified as a list of string(s).
# e.g. dataflow_service_options=['enable_prime']

# Create the Pipeline with the specified options.
with beam.Pipeline(options=beam_options) as pipeline:
  pass  # build your pipeline here.

Go

Apache Beam SDK untuk Go menggunakan argumen command line Go. Gunakan flag.Set() untuk menetapkan nilai flag.

// Use the Go flag package to parse custom options.
flag.Parse()

// Set the required options programmatically.
// For Cloud execution, specify the Dataflow runner, Google Cloud
// project ID, region, and staging location.
// For more information about regions, see
// https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
flag.Set("runner", "dataflow")
flag.Set("project", "my-project-id")
flag.Set("region", "us-central1")
flag.Set("staging_location", "gs://my-bucket/binaries")

beam.Init()

// Create the Pipeline.
p := beam.NewPipeline()
s := p.Root()

Setelah Anda membuat pipeline, tentukan semua pembacaan, transformasi, dan penulisan pipeline, serta jalankan pipeline tersebut.

Menggunakan opsi pipeline dari command line

Contoh berikut menunjukkan cara menggunakan opsi pipeline yang ditentukan pada command line. Contoh ini tidak menetapkan opsi pipeline secara terprogram.

Java

// Set your PipelineOptions to the specified command-line options
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation();

// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);

Python

Gunakan modul argparse Python untuk mengurai opsi command line.

# Use Python argparse module to parse custom arguments
import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# For more details on how to use argparse, take a look at:
#   https://docs.python.org/3/library/argparse.html
parser = argparse.ArgumentParser()
parser.add_argument(
    '--input-file',
    default='gs://dataflow-samples/shakespeare/kinglear.txt',
    help='The file path for the input text to process.')
parser.add_argument(
    '--output-path', required=True, help='The path prefix for output files.')
args, beam_args = parser.parse_known_args()

# Create the Pipeline with remaining arguments.
beam_options = PipelineOptions(beam_args)
with beam.Pipeline(options=beam_options) as pipeline:
  lines = (
      pipeline
      | 'Read files' >> beam.io.ReadFromText(args.input_file)
      | 'Write files' >> beam.io.WriteToText(args.output_path))

Go

Gunakan paket Go flag untuk mengurai opsi command line. Anda harus mengurai opsi sebelum memanggil beam.Init(). Dalam contoh ini, output adalah opsi command line.

// Define configuration options
var (
  output = flag.String("output", "", "Output file (required).")
)

// Parse options before beam.Init()
flag.Parse()

beam.Init()

// Input validation must be done after beam.Init()
if *output == "" {
  log.Fatal("No output provided!")
}

p := beam.NewPipeline()

Setelah Anda membuat pipeline, tentukan semua pembacaan, transformasi, dan penulisan pipeline, lalu jalankan pipeline tersebut.

Mengontrol mode eksekusi

Saat program Apache Beam menjalankan pipeline pada layanan seperti Dataflow, program tersebut dapat menjalankan pipeline secara asinkron, atau dapat memblokirnya hingga pipeline selesai. Anda dapat mengubah perilaku ini menggunakan panduan berikut.

Java

Saat program Java Apache Beam menjalankan pipeline pada layanan seperti Dataflow, pipeline ini biasanya dijalankan secara asinkron. Untuk menjalankan pipeline dan menunggu hingga tugas selesai, tetapkan DataflowRunner sebagai runner pipeline dan panggil pipeline.run().waitUntilFinish() secara eksplisit.

Saat Anda menggunakan DataflowRunner dan memanggil waitUntilFinish() pada objek PipelineResult yang ditampilkan dari pipeline.run(), pipeline akan dijalankan di Google Cloud, tetapi kode lokal akan menunggu tugas cloud selesai dan menampilkan objek DataflowPipelineJob akhir. Saat tugas berjalan, layanan Dataflow mencetak update status tugas dan pesan konsol saat menunggu.

Python

Saat program Apache Beam Python menjalankan pipeline pada layanan seperti Dataflow, pipeline ini biasanya dijalankan secara asinkron. Untuk memblokir hingga pipeline selesai, gunakan metode wait_until_finish() dari objek PipelineResult, yang ditampilkan dari metode run() runner.

Go

Saat program Apache Beam Go menjalankan pipeline di Dataflow, pipeline tersebut bersifat sinkron secara default dan melakukan pemblokiran hingga pipeline selesai. Jika Anda tidak ingin memblokir, ada dua opsi:

  1. Memulai tugas dengan rutinitas Go.

    go func() {
      pr, err := beamx.Run(ctx, p)
      if err != nil {
        // Handle the error.
      }
      // Send beam.PipelineResult into a channel.
      results <- pr
    }()
    // Do other operations while the pipeline runs.
    
  2. Gunakan flag command line --async, yang ada dalam paket jobopts.

Untuk melihat detail eksekusi, memantau progres, dan memverifikasi status penyelesaian tugas, gunakan Antarmuka pemantauan Dataflow atau antarmuka command line Dataflow.

Menggunakan sumber streaming

Java

Jika pipeline Anda membaca dari sumber data tidak terbatas, misalnya Pub/Sub, pipeline akan otomatis dijalankan dalam mode streaming.

Python

Jika pipeline Anda menggunakan sumber data tidak terbatas, seperti Pub/Sub, Anda harus menetapkan opsi streaming ke benar (true).

Go

Jika pipeline Anda membaca dari sumber data tidak terbatas, misalnya Pub/Sub, pipeline akan otomatis dijalankan dalam mode streaming.

Tugas streaming menggunakan jenis mesin Compute Engine n1-standard-2 atau yang lebih tinggi secara default.

Luncurkan secara lokal

Daripada menjalankan pipeline di resource cloud terkelola, Anda dapat memilih untuk menjalankan pipeline secara lokal. Eksekusi lokal memiliki keuntungan tertentu untuk menguji, men-debug, atau menjalankan pipeline di atas set data yang kecil. Misalnya, eksekusi lokal menghapus dependensi pada layanan Dataflow jarak jauh dan project Google Cloud terkait.

Saat menggunakan eksekusi lokal, Anda harus menjalankan pipeline dengan set data yang cukup kecil agar muat dalam memori lokal. Anda dapat membuat set data dalam memori berukuran kecil menggunakan transformasi Create, atau Anda dapat menggunakan transformasi Read untuk bekerja dengan file lokal atau jarak jauh yang kecil. Eksekusi lokal biasanya memberikan cara yang lebih cepat dan mudah untuk melakukan pengujian dan proses debug dengan dependensi eksternal yang lebih sedikit, tetapi dibatasi oleh memori yang tersedia di lingkungan lokal Anda.

Kode contoh berikut menunjukkan cara membuat pipeline yang dijalankan di lingkungan lokal Anda.

Java

// Create and set our Pipeline Options.
PipelineOptions options = PipelineOptionsFactory.create();

// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);

Python

import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg')
args, beam_args = parser.parse_known_args()

# Create and set your Pipeline Options.
beam_options = PipelineOptions(beam_args)
args = beam_options.view_as(MyOptions)

with beam.Pipeline(options=beam_options) as pipeline:
  lines = (
      pipeline
      | beam.io.ReadFromText(args.input)
      | beam.io.WriteToText(args.output))

Go

// Parse options before beam.Init()
flag.Parse()

beam.Init()

p := beam.NewPipeline()

Setelah Anda membuat pipeline, jalankan.

Membuat opsi pipeline kustom

Anda dapat menambahkan opsi kustom sendiri selain PipelineOptions standar. Command line Apache Beam juga dapat mengurai opsi kustom menggunakan argumen command line yang ditentukan dalam format yang sama.

Java

Untuk menambahkan opsi Anda sendiri, tentukan antarmuka dengan metode pengambil dan penyetel untuk setiap opsi, seperti dalam contoh berikut:

public interface MyOptions extends PipelineOptions {
  String getMyCustomOption();
  void setMyCustomOption(String myCustomOption);
}

Python

Untuk menambahkan opsi Anda sendiri, gunakan metode add_argument() (yang berperilaku persis seperti modul argparse standar Python), seperti pada contoh berikut:

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument('--input')
    parser.add_argument('--output')

Go

Untuk menambahkan opsi Anda sendiri, gunakan paket Go flag seperti ditunjukkan dalam contoh berikut:

var (
  input  = flag.String("input", "", "")
  output = flag.String("output", "", "")
)

Anda juga dapat menentukan deskripsi, yang muncul saat pengguna meneruskan --help sebagai argumen command line, dan nilai default.

Java

Anda menetapkan deskripsi dan nilai default menggunakan anotasi, sebagai berikut:

public interface MyOptions extends PipelineOptions {
  @Description("My custom command line argument.")
  @Default.String("DEFAULT")
  String getMyCustomOption();
  void setMyCustomOption(String myCustomOption);
}

Sebaiknya daftarkan antarmuka dengan PipelineOptionsFactory, lalu teruskan antarmuka saat membuat objek PipelineOptions. Saat Anda mendaftarkan antarmuka dengan PipelineOptionsFactory, --help dapat menemukan antarmuka opsi kustom Anda dan menambahkannya ke output perintah --help. PipelineOptionsFactory memvalidasi bahwa opsi kustom Anda kompatibel dengan semua opsi terdaftar lainnya.

Kode contoh berikut menunjukkan cara mendaftarkan antarmuka opsi kustom Anda dengan PipelineOptionsFactory:

PipelineOptionsFactory.register(MyOptions.class);
MyOptions options = PipelineOptionsFactory.fromArgs(args)
                                          .withValidation()
                                          .as(MyOptions.class);

Sekarang pipeline Anda dapat menerima --myCustomOption=value sebagai argumen command line.

Python

Anda menetapkan deskripsi dan nilai default sebagai berikut:

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument(
        '--input',
        default='gs://dataflow-samples/shakespeare/kinglear.txt',
        help='The file path for the input text to process.')
    parser.add_argument(
        '--output', required=True, help='The path prefix for output files.')

Go

Anda menetapkan deskripsi dan nilai default sebagai berikut:

var (
  input  = flag.String("input", "gs://MY_STORAGE_BUCKET/input", "Input for the pipeline")
  output = flag.String("output", "gs://MY_STORAGE_BUCKET/output", "Output for the pipeline")
)