Dataflow-Pipelineoptionen festlegen

Auf dieser Seite wird erläutert, wie Sie Pipelineoptionen für Ihre Dataflow-Jobs festlegen. Mit diesen Pipelineoptionen wird konfiguriert, wie und wo die Pipeline ausgeführt wird und welche Ressourcen sie verwendet.

Die Pipelineausführung ist von der Ausführung Ihres Apache Beam-Programms getrennt. Das von Ihnen geschriebene Apache Beam-Programm erstellt eine Pipeline für die verzögerte Ausführung. Dies bedeutet, dass das Programm eine Reihe von Schritten generiert, die jeder unterstützte Apache Beam-Runner ausführen kann. Kompatible Runner umfassen den Dataflow-Runner in Google Cloud und den direkten Runner, der die Pipeline direkt in einer lokalen Umgebung ausführt.

Sie können Parameter zur Laufzeit an einen Dataflow-Job übergeben. Weitere Informationen zum Festlegen von Pipelineoptionen zur Laufzeit finden Sie unter Pipelineoptionen konfigurieren.

Pipelineoptionen mit Apache Beam SDKs verwenden

Mit den folgenden SDKs können Sie Pipelineoptionen für Dataflow-Jobs festlegen:

  • Apache Beam SDK für Python
  • Apache Beam SDK für Java
  • Apache Beam SDK für Go

Wenn Sie die SDKs verwenden möchten, legen Sie den Pipeline-Runner und andere Ausführungsparameter mit der Klasse PipelineOptions des Apache Beam SDK fest.

Es gibt zwei Methoden zum Angeben von Pipelineoptionen:

  • Sie können sie programmatisch festlegen, indem Sie eine Liste der Pipelineoptionen bereitstellen.
  • Legen Sie die Pipelineoptionen direkt in der Befehlszeile fest, wenn Sie den Pipelinecode ausführen.

Pipelineoptionen programmatisch festlegen

Sie können Pipelineoptionen programmatisch festlegen, indem Sie ein PipelineOptions-Objekt erstellen und ändern.

Java

Erstellen Sie ein PipelineOptions-Objekt mit der Methode PipelineOptionsFactory.fromArgs.

Ein Beispiel finden Sie auf dieser Seite im Abschnitt Launch on Dataflow-Beispiel.

Python

Erstellen Sie ein PipelineOptions-Objekt.

Ein Beispiel finden Sie auf dieser Seite im Abschnitt Launch on Dataflow-Beispiel.

Go

Das programmatische Festlegen von Pipelineoptionen mit PipelineOptions wird im Apache Beam SDK for Go nicht unterstützt. Verwenden Sie Go-Befehlszeilenargumente.

Ein Beispiel finden Sie auf dieser Seite im Abschnitt Launch on Dataflow-Beispiel.

Pipelineoptionen in der Befehlszeile festlegen

Sie können Pipelineoptionen mithilfe von Befehlszeilenargumenten festlegen.

Java

Die folgende Beispielsyntax stammt aus der WordCount-Pipeline in der Java-Kurzanleitung.

mvn -Pdataflow-runner compile exec:java \
  -Dexec.mainClass=org.apache.beam.examples.WordCount \
  -Dexec.args="--project=PROJECT_ID \
  --gcpTempLocation=gs://BUCKET_NAME/temp/ \
  --output=gs://BUCKET_NAME/output \
  --runner=DataflowRunner \
  --region=REGION"

Ersetzen Sie Folgendes:

  • PROJECT_ID: Ihre Google Cloud-Projekt-ID
  • BUCKET_NAME: der Name Ihres Cloud Storage-Buckets
  • REGION: eine Dataflow-Region, us-central1

Python

Die folgende Beispielsyntax stammt aus der WordCount-Pipeline in der Python-Kurzanleitung.

python -m apache_beam.examples.wordcount \
  --region DATAFLOW_REGION \
  --input gs://dataflow-samples/shakespeare/kinglear.txt \
  --output gs://STORAGE_BUCKET/results/outputs \
  --runner DataflowRunner \
  --project PROJECT_ID \
  --temp_location gs://STORAGE_BUCKET/tmp/

Ersetzen Sie Folgendes:

  • DATAFLOW_REGION: Die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. europe-west1

    Das Flag --region überschreibt die Standardregion, die auf dem Metadatenserver, auf Ihrem lokalen Client oder in Umgebungsvariablen festgelegt ist.

  • STORAGE_BUCKET: der Name des Cloud Storage-Buckets

  • PROJECT_ID: die Google Cloud-Projekt-ID

Go

Die folgende Beispielsyntax stammt aus der WordCount-Pipeline in der Go-Kurzanleitung.

go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
   --output gs://BUCKET_NAME/results/outputs \
   --runner dataflow \
   --project PROJECT_ID \
   --region DATAFLOW_REGION \
   --staging_location gs://BUCKET_NAME/binaries/

Ersetzen Sie Folgendes:

  • BUCKET_NAME: der Name des Cloud Storage-Buckets

  • PROJECT_ID: die Google Cloud-Projekt-ID

  • DATAFLOW_REGION: die Region, in der Sie den Dataflow-Job bereitstellen möchten. Beispiel: europe-west1 Das Flag --region überschreibt die Standardregion, die auf dem Metadatenserver, auf Ihrem lokalen Client oder in Umgebungsvariablen festgelegt ist.

Experimentelle Pipelineoptionen festlegen

In den Java-, Python- und Go-SDKs können Sie mit der experiments-Pipeline-Option experimentelle oder Vorab-GA-Dataflow-Features aktivieren.

Programmatisch festlegen

Verwenden Sie die folgende Syntax, um die Option experiments programmatisch festzulegen.

Java

Fügen Sie die Option experiments mit der folgenden Syntax in Ihr PipelineOptions-Objekt ein. In diesem Beispiel wird die Größe des Bootlaufwerks mit dem Flag „experiment“ auf 80 GB festgelegt.

options.setExperiments("streaming_boot_disk_size_gb=80")

Ein Beispiel zum Erstellen des PipelineOptions-Objekts finden Sie auf dieser Seite im Abschnitt Launch on Dataflow-Beispiel.

Python

Fügen Sie die Option experiments mit der folgenden Syntax in Ihr PipelineOptions-Objekt ein. In diesem Beispiel wird die Größe des Bootlaufwerks mit dem Flag „experiment“ auf 80 GB festgelegt.

beam_options = PipelineOptions(
  beam_args,
  experiments=['streaming_boot_disk_size_gb=80'])

Ein Beispiel zum Erstellen des PipelineOptions-Objekts finden Sie auf dieser Seite im Abschnitt Launch on Dataflow-Beispiel.

Go

Das programmatische Festlegen von Pipelineoptionen mit PipelineOptions wird im Apache Beam SDK for Go nicht unterstützt. Verwenden Sie Go-Befehlszeilenargumente.

Über die Befehlszeile festgelegt

Verwenden Sie die folgende Syntax, um die Option experiments in der Befehlszeile festzulegen.

Java

In diesem Beispiel wird die Größe des Bootlaufwerks mit dem Flag „experiment“ auf 80 GB festgelegt.

--experiments=streaming_boot_disk_size_gb=80

Python

In diesem Beispiel wird die Größe des Bootlaufwerks mit dem Flag „experiment“ auf 80 GB festgelegt.

--experiments=streaming_boot_disk_size_gb=80

Go

In diesem Beispiel wird die Größe des Bootlaufwerks mit dem Flag „experiment“ auf 80 GB festgelegt.

--experiments=streaming_boot_disk_size_gb=80

In einer Vorlage festlegen

Verwenden Sie das Flag --additional-experiments, um eine experimentelle Funktion beim Ausführen einer Dataflow-Vorlage zu aktivieren.

Klassische Vorlage

gcloud dataflow jobs run JOB_NAME --additional-experiments=EXPERIMENT[,...]

Flexible Vorlage

gcloud dataflow flex-template run JOB_NAME --additional-experiments=EXPERIMENT[,...]

Auf das Pipelineoptionsobjekt zugreifen

Wenn Sie das Pipeline-Objekt in Ihrem Apache Beam-Programm erstellen, übergeben Sie PipelineOptions. Wenn der Dataflow-Dienst Ihre Pipeline ausführt, sendet er eine Kopie der PipelineOptions an jeden Worker.

Java

Mit der Methode ProcessContext.getPipelineOptions können Sie in jeder DoFn-Instanz einer ParDo-Transformation auf PipelineOptions zugreifen.

Python

Diese Funktion wird im Apache Beam SDK für Python nicht unterstützt.

Go

Mit beam.PipelineOptions können Sie auf Pipelineoptionen zugreifen.

In Dataflow starten

Sie können Ihren Job mit dem Dataflow-Runner-Dienst auf verwalteten Google Cloud-Ressourcen ausführen. Wenn Sie die Pipeline mit Dataflow ausführen, wird ein Dataflow-Job erstellt, der Compute Engine- und Cloud Storage-Ressourcen in Ihrem Google Cloud-Projekt verwendet. Informationen zu Dataflow-Berechtigungen finden Sie unter Sicherheit und Berechtigungen in Dataflow.

Dataflow-Jobs speichern temporäre Daten während der Pipelineausführung in Cloud Storage. Damit Ihnen keine unnötigen Speicherkosten entstehen, sollten Sie die Funktion für vorläufiges Löschen für Buckets, die von den Dataflow-Jobs als temporärer Speicher verwendet werden, deaktivieren. Weitere Informationen finden Sie unter Richtlinie für das Soft-Löschen aus einem Bucket entfernen.

Erforderliche Optionen festlegen

Legen Sie zum Ausführen Ihrer Pipeline mit Dataflow die folgenden Pipelineoptionen fest:

Java

  • project: Die ID Ihres Google Cloud-Projekts.
  • runner: Der Pipeline-Runner, der Ihre Pipeline ausführt. Für eine Google Cloud-basierte Ausführung müssen Sie DataflowRunner verwenden.
  • gcpTempLocation: Ein Cloud Storage-Pfad, den Dataflow für das Staging der meisten temporärer Dateien verwendet. Wenn Sie einen Bucket angeben möchten, müssen Sie ihn vorab erstellen. Wenn Sie gcpTempLocation nicht festlegen, können Sie die Pipelineoption tempLocation festlegen und dann gcpTempLocation auf den Wert tempLocation festlegen. Wenn Sie keines von beidem angeben, wird gcpTempLocation mit einem Standardwert erstellt.
  • stagingLocation: Ein Cloud Storage-Bucket, den Dataflow für das Staging Ihrer Binärdateien verwendet. Wenn Sie das Apache Beam SDK 2.28 oder höher verwenden, legen Sie diese Option nicht fest. Wenn Sie diese Option im Apache Beam SDK 2.28 oder niedriger nicht festgelegt haben, wird der für tempLocation angegebene Wert für den Staging-Ort verwendet.

    Eine standardmäßige gcpTempLocation wird erstellt, wenn weder dies noch tempLocation angegeben wird. Wenn tempLocation angegeben wird, gcpTempLocation jedoch nicht, muss tempLocation ein Cloud Storage-Pfad sein und gcpTempLocation verwendet ihn standardmäßig. Ist tempLocation nicht angegeben, dafür aber gcpTempLocation, wird tempLocation nicht ausgefüllt.

Python

  • project: Ihre Google Cloud-Projekt-ID.
  • region: Die Region für Ihren Dataflow-Job.
  • runner: Der Pipeline-Runner, der Ihre Pipeline ausführt. Für eine Google Cloud-basierte Ausführung müssen Sie DataflowRunner verwenden.
  • temp_location: Ein Cloud Storage-Pfad, den Dataflow für das Staging temporärer Jobdateien verwendet, die während der Pipelineausführung erstellt werden.

Go

  • project: Ihre Google Cloud-Projekt-ID.
  • region: Die Region für Ihren Dataflow-Job.
  • runner: Der Pipeline-Runner, der Ihre Pipeline ausführt. Für eine Google Cloud-basierte Ausführung müssen Sie dataflow verwenden.
  • staging_location: Ein Cloud Storage-Pfad, den Dataflow für das Staging temporärer Jobdateien verwendet, die während der Pipelineausführung erstellt werden.

Pipelineoptionen programmatisch festlegen

Der folgende Beispielcode zeigt, wie eine Pipeline erstellt wird, indem Sie den Runner und andere erforderliche Optionen programmatisch zum Ausführen der Pipeline mit Dataflow festlegen.

Java

// Create and set your PipelineOptions.
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

// For cloud execution, set the Google Cloud project, staging location,
// and set DataflowRunner.
options.setProject("my-project-id");
options.setStagingLocation("gs://my-bucket/binaries");
options.setRunner(DataflowRunner.class);

// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);

Python

import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg', help='description')
args, beam_args = parser.parse_known_args()

# Create and set your PipelineOptions.
# For Cloud execution, specify DataflowRunner and set the Cloud Platform
# project, job name, temporary files location, and region.
# For more information about regions, check:
# https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
beam_options = PipelineOptions(
    beam_args,
    runner='DataflowRunner',
    project='my-project-id',
    job_name='unique-job-name',
    temp_location='gs://my-bucket/temp',
    region='us-central1')
# Note: Repeatable options like dataflow_service_options or experiments must
# be specified as a list of string(s).
# e.g. dataflow_service_options=['enable_prime']

# Create the Pipeline with the specified options.
with beam.Pipeline(options=beam_options) as pipeline:
  pass  # build your pipeline here.

Go

Das Apache Beam SDK für Go verwendet Go-Befehlszeilenargumente. Mit flag.Set() können Sie Flag-Werte festlegen.

// Use the Go flag package to parse custom options.
flag.Parse()

// Set the required options programmatically.
// For Cloud execution, specify the Dataflow runner, Google Cloud
// project ID, region, and staging location.
// For more information about regions, see
// https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
flag.Set("runner", "dataflow")
flag.Set("project", "my-project-id")
flag.Set("region", "us-central1")
flag.Set("staging_location", "gs://my-bucket/binaries")

beam.Init()

// Create the Pipeline.
p := beam.NewPipeline()
s := p.Root()

Nachdem Sie Ihre Pipeline konstruiert haben, legen Sie alle Lese- und Schreibvorgänge sowie Transformationen fest und führen die Pipeline aus.

Pipelineoptionen über die Befehlszeile verwenden

Das folgende Beispiel zeigt, wie Pipelineoptionen verwendet werden, die in der Befehlszeile angegeben sind. In diesem Beispiel werden die Pipelineoptionen nicht programmatisch festgelegt.

Java

// Set your PipelineOptions to the specified command-line options
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation();

// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);

Python

Verwenden Sie das Python-Modul argparse, um Befehlszeilenoptionen zu parsen.

# Use Python argparse module to parse custom arguments
import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# For more details on how to use argparse, take a look at:
#   https://docs.python.org/3/library/argparse.html
parser = argparse.ArgumentParser()
parser.add_argument(
    '--input-file',
    default='gs://dataflow-samples/shakespeare/kinglear.txt',
    help='The file path for the input text to process.')
parser.add_argument(
    '--output-path', required=True, help='The path prefix for output files.')
args, beam_args = parser.parse_known_args()

# Create the Pipeline with remaining arguments.
beam_options = PipelineOptions(beam_args)
with beam.Pipeline(options=beam_options) as pipeline:
  lines = (
      pipeline
      | 'Read files' >> beam.io.ReadFromText(args.input_file)
      | 'Write files' >> beam.io.WriteToText(args.output_path))

Go

Mit dem Go-Paket flag können Sie Befehlszeilenoptionen parsen. Sie müssen die Optionen vor dem Aufruf von beam.Init() parsen. In diesem Beispiel ist output eine Befehlszeilenoption.

// Define configuration options
var (
  output = flag.String("output", "", "Output file (required).")
)

// Parse options before beam.Init()
flag.Parse()

beam.Init()

// Input validation must be done after beam.Init()
if *output == "" {
  log.Fatal("No output provided!")
}

p := beam.NewPipeline()

Nachdem Sie Ihre Pipeline konstruiert haben, legen Sie alle Lese- und Schreibvorgänge sowie Transformationen fest und führen die Pipeline aus.

Ausführungsmodi steuern

Wenn ein Apache Beam-Programm eine Pipeline in einem Dienst wie Dataflow ausführt, kann das Programm die Pipeline entweder asynchron ausführen oder bis zum Abschluss der Pipeline blockieren. Sie können dieses Verhalten mithilfe der folgenden Anleitung ändern.

Java

Wenn ein Apache Beam Java-Programm eine Pipeline in einem Dienst wie Dataflow ausführt, wird es normalerweise asynchron ausgeführt. Wenn Sie eine Pipeline ausführen und warten möchten, bis der Job abgeschlossen ist, legen Sie DataflowRunner als den Pipeline-Runner fest und rufen Sie pipeline.run().waitUntilFinish() explizit auf.

Wenn Sie DataflowRunner verwenden und waitUntilFinish() für das von pipeline.run() zurückgegebene Objekt PipelineResult aufrufen, wird die Pipeline in Google Cloud ausgeführt, aber der lokale Code wartet auf den Abschluss des Cloudjobs und gibt das endgültige DataflowPipelineJob-Objekt zurück. Während der Jobausführung gibt der Dataflow-Dienst Aktualisierungen des Jobstatus sowie Konsolennachrichten aus.

Python

Wenn ein Apache Beam Python-Programm eine Pipeline in einem Dienst wie Dataflow ausführt, wird es normalerweise asynchron ausgeführt. Wenn das Hauptprogramm bis zum Abschluss der Pipeline blockiert werden soll, verwenden Sie die Methode wait_until_finish() des PipelineResult-Objekts, das von der run()-Methode des Runners zurückgegeben wird.

Go

Wenn ein Apache Beam Go-Programm eine Pipeline in Dataflow ausführt, ist diese standardmäßig synchron und wird bis zum Abschluss der Pipeline blockiert. Wenn Sie nicht blockieren möchten, gibt es zwei Möglichkeiten:

  1. Starten Sie den Job in einer Go-Routine.

    go func() {
      pr, err := beamx.Run(ctx, p)
      if err != nil {
        // Handle the error.
      }
      // Send beam.PipelineResult into a channel.
      results <- pr
    }()
    // Do other operations while the pipeline runs.
    
  2. Verwenden Sie das Befehlszeilen-Flag --async, das sich im Paket jobopts befindet.

Verwenden Sie die Dataflow-Monitoring-Oberfläche oder die Dataflow-Befehlszeile, um Ausführungsdetails aufzurufen, den Fortschritt zu beobachten oder den Jobabschlussstatus zu prüfen.

Streamingquellen verwenden

Java

Falls Ihre Pipeline aus einer unbegrenzten Datenquelle wie Pub/Sub liest, wird die Pipeline automatisch im Streamingmodus ausgeführt.

Python

Wenn die Pipeline unbegrenzte Datenquellen wie Pub/Sub verwendet, müssen Sie die Option streaming auf "true" festlegen.

Go

Falls Ihre Pipeline aus einer unbegrenzten Datenquelle wie Pub/Sub liest, wird die Pipeline automatisch im Streamingmodus ausgeführt.

Streamingjobs verwenden standardmäßig den Compute Engine-Maschinentyp n1-standard-2 oder höher.

Lokal starten

Anstatt auf verwalteten Cloudressourcen können Sie Ihre Pipeline auch lokal ausführen. Letzteres bietet gewisse Vorteile für das Testen, die Fehlerbehebung oder das Ausführen Ihrer Pipeline mit kleinen Datasets. Bei der lokalen Ausführung sind Sie beispielsweise nicht vom Dataflow-Remote-Dienst und dem zugehörigen Google Cloud-Projekt abhängig.

Wenn Sie die lokale Ausführung verwenden, müssen Sie Ihre Pipeline mit kleinen Datasets ausführen, damit sie in den lokalen Speicher passen. Ein kleines speicherinternes Dataset erstellen Sie mit einer Create-Transformation. Alternativ können Sie mit einer Read-Transformation mit kleinen lokalen oder Remote-Dateien arbeiten. Die lokale Ausführung bietet in der Regel eine schnellere und einfachere Möglichkeit, Tests und Fehlerbehebungen mit weniger externen Abhängigkeiten durchzuführen, wird aber durch den Speicher in Ihrer lokalen Umgebung begrenzt.

Der folgende Beispielcode zeigt, wie Sie eine Pipeline erstellen, die in Ihrer lokalen Umgebung ausgeführt wird.

Java

// Create and set our Pipeline Options.
PipelineOptions options = PipelineOptionsFactory.create();

// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);

Python

import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg')
args, beam_args = parser.parse_known_args()

# Create and set your Pipeline Options.
beam_options = PipelineOptions(beam_args)
args = beam_options.view_as(MyOptions)

with beam.Pipeline(options=beam_options) as pipeline:
  lines = (
      pipeline
      | beam.io.ReadFromText(args.input)
      | beam.io.WriteToText(args.output))

Go

// Parse options before beam.Init()
flag.Parse()

beam.Init()

p := beam.NewPipeline()

Nachdem Sie Ihre Pipeline erstellt haben, führen Sie sie aus.

Benutzerdefinierte Pipelineoptionen erstellen

Zusätzlich zu den standardmäßigen PipelineOptions können Sie eigene benutzerdefinierte Optionen hinzufügen. Außerdem können Sie mit der Befehlszeile von Apache Beam benutzerdefinierte Optionen mithilfe von Befehlszeilenargumenten im selben Format parsen.

Java

Definieren Sie zum Hinzufügen Ihrer eigenen Optionen eine Schnittstelle mit Getter- und Setter-Methoden für jede Option wie im folgenden Beispiel:

public interface MyOptions extends PipelineOptions {
  String getMyCustomOption();
  void setMyCustomOption(String myCustomOption);
}

Python

Verwenden Sie zum Hinzufügen Ihrer eigenen Optionen die Methode add_argument(), die sich genau wie das Python-Standardmodul argparse verhält, wie im folgenden Beispiel:

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument('--input')
    parser.add_argument('--output')

Go

Verwenden Sie zum Hinzufügen Ihrer eigenen Optionen das Go-Paket Flag, wie im folgenden Beispiel gezeigt:

var (
  input  = flag.String("input", "", "")
  output = flag.String("output", "", "")
)

Sie können auch eine Beschreibung angeben, die angezeigt wird, wenn ein Nutzer --help als Befehlszeilenargument übergibt, sowie einen Standardwert.

Java

Beschreibungen und Standardwerte legen Sie mithilfe von Annotationen so fest:

public interface MyOptions extends PipelineOptions {
  @Description("My custom command line argument.")
  @Default.String("DEFAULT")
  String getMyCustomOption();
  void setMyCustomOption(String myCustomOption);
}

Registrieren Sie die Schnittstelle möglichst mit PipelineOptionsFactory und übergeben Sie diese, wenn Sie das Objekt PipelineOptions erstellen. Wenn Sie Ihre Schnittstelle bei PipelineOptionsFactory registrieren, kann --help Ihre Schnittstelle für benutzerdefinierte Optionen finden und der Ausgabe des Befehls --help hinzufügen. PipelineOptionsFactory prüft, ob Ihre benutzerdefinierten Optionen mit allen anderen registrierten Optionen kompatibel sind.

Am folgenden Beispielcode sehen Sie, wie Sie Ihre Schnittstelle für benutzerdefinierte Optionen bei PipelineOptionsFactory registrieren:

PipelineOptionsFactory.register(MyOptions.class);
MyOptions options = PipelineOptionsFactory.fromArgs(args)
                                          .withValidation()
                                          .as(MyOptions.class);

Nun kann Ihre Pipeline --myCustomOption=value als Befehlszeilenargument akzeptieren.

Python

Beschreibungen und Standardwerte legen Sie so fest:

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument(
        '--input',
        default='gs://dataflow-samples/shakespeare/kinglear.txt',
        help='The file path for the input text to process.')
    parser.add_argument(
        '--output', required=True, help='The path prefix for output files.')

Go

Beschreibungen und Standardwerte legen Sie so fest:

var (
  input  = flag.String("input", "gs://MY_STORAGE_BUCKET/input", "Input for the pipeline")
  output = flag.String("output", "gs://MY_STORAGE_BUCKET/output", "Output for the pipeline")
)