Vorlage „Cloud Pub/Sub für Cloud Storage“

Mit der Vorlage „Serverless for Apache Spark Cloud Pub/Sub to Cloud Storage“ können Sie Daten aus Pub/Sub in Cloud Storage extrahieren.

Vorlage verwenden

Führen Sie die Vorlage mit der gcloud CLI oder der Dataproc API aus.

gcloud

Ersetzen Sie folgende Werte, bevor sie einen der Befehlsdaten verwenden:

  • PROJECT_ID: erforderlich. Die Google Cloud Projekt-ID, die in den IAM-Einstellungen aufgeführt ist.
  • REGION: erforderlich. Compute Engine-Region.
  • SUBNET: Optional. Wenn kein Subnetz angegeben ist, wird das Subnetz in der angegebenen REGION im Netzwerk default ausgewählt.

    Beispiel: projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME

  • TEMPLATE_VERSION: erforderlich. Geben Sie latest für die neueste Vorlagenversion oder das Datum einer bestimmten Version an, z. B. 2023-03-17_v0.1.0-beta. Rufen Sie gs://dataproc-templates-binaries auf oder führen Sie gcloud storage ls gs://dataproc-templates-binaries aus, um die verfügbaren Vorlagenversionen aufzulisten.
  • PUBSUB_SUBSCRIPTION_PROJECT_ID: erforderlich. Die Google Cloud Projekt-ID, die in den IAM-Einstellungen aufgeführt ist und das Pub/Sub-Eingabeabo enthält, das gelesen werden soll.
  • SUBSCRIPTION: erforderlich. Name des Pub/Sub-Abos.
  • CLOUD_STORAGE_OUTPUT_BUCKET_NAME: erforderlich. Name des Cloud Storage-Bucket, in dem die Ausgabe gespeichert wird.

    Hinweis:Die Ausgabedateien werden im Ordner output/ im Bucket gespeichert.

  • FORMAT: erforderlich. Ausgabedatenformat. Optionen: avro oder json.

    Hinweis:Wenn avro, müssen Sie dem gcloud CLI-Flag oder API-Feld jarsfile:///usr/lib/spark/connector/spark-avro.jar“ hinzufügen.

    Beispiel (das Präfix file:// verweist auf eine Serverless for Apache Spark-JAR-Datei):

    --jars=file:///usr/lib/spark/connector/spark-avro.jar, [ ... other jars]
  • TIMEOUT: Optional. Zeit in Millisekunden vor dem Beenden des Streams. Der Standardwert ist 60000.
  • DURATION: Optional. Häufigkeit der Schreibvorgänge in Cloud Storage in Sekunden. Standardmäßig 15 Sekunden.
  • NUM_RECEIVERS: Optional. Anzahl der Streams, die parallel aus einem Pub/Sub-Abo gelesen werden. Der Standardwert ist 5.
  • BATCHSIZE: Optional. Anzahl der Datensätze, die in einem Roundtrip in Cloud Storage eingefügt werden sollen. Der Standardwert ist 1000.
  • SERVICE_ACCOUNT: Optional. Wenn nicht angegeben, wird das Compute Engine-Standarddienstkonto verwendet.
  • PROPERTY und PROPERTY_VALUE: Optional. Durch Kommas getrennte Liste von Spark-Eigenschaft=value-Paaren.
  • LABEL und LABEL_VALUE: Optional. Durch Kommas getrennte Liste von label=value-Paaren.
  • LOG_LEVEL: Optional. Ebene der Protokollierung. Kann einer der folgenden Werte sein: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE oder WARN. Standard: INFO.
  • KMS_KEY: Optional. Der Cloud Key Management Service-Schlüssel, der für die Verschlüsselung verwendet werden soll. Wenn kein Schlüssel angegeben ist, werden Daten mit einem Google-owned and Google-managed encryption keyim Ruhezustand verschlüsselt.

    Beispiel: projects/PROJECT_ID/regions/REGION/keyRings/KEY_RING_NAME/cryptoKeys/KEY_NAME

Führen Sie folgenden Befehl aus:

Linux, macOS oder Cloud Shell

gcloud dataproc batches submit spark \
    --class=com.google.cloud.dataproc.templates.main.DataProcTemplate \
    --version="1.2" \
    --project="PROJECT_ID" \
    --region="REGION" \
    --jars="gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar" \
    --subnet="SUBNET" \
    --kms-key="KMS_KEY" \
    --service-account="SERVICE_ACCOUNT" \
    --properties="PROPERTY=PROPERTY_VALUE" \
    --labels="LABEL=LABEL_VALUE" \
    -- --template=PUBSUBTOGCS \
    --templateProperty log.level="LOG_LEVEL" \
    --templateProperty pubsubtogcs.input.project.id="PUBSUB_SUBSCRIPTION_PROJECT_ID" \
    --templateProperty pubsubtogcs.input.subscription="SUBSCRIPTION" \
    --templateProperty pubsubtogcs.gcs.bucket.name="CLOUD_STORAGE_OUTPUT_BUCKET_NAME" \
    --templateProperty pubsubtogcs.gcs.output.data.format="FORMAT" \
    --templateProperty pubsubtogcs.timeout.ms="TIMEOUT" \
    --templateProperty pubsubtogcs.streaming.duration.seconds="DURATION" \
    --templateProperty pubsubtogcs.total.receivers="NUM_RECEIVERS" \
    --templateProperty pubsubtogcs.batch.size="BATCHSIZE"

Windows (PowerShell)

gcloud dataproc batches submit spark `
    --class=com.google.cloud.dataproc.templates.main.DataProcTemplate `
    --version="1.2" `
    --project="PROJECT_ID" `
    --region="REGION" `
    --jars="gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar" `
    --subnet="SUBNET" `
    --kms-key="KMS_KEY" `
    --service-account="SERVICE_ACCOUNT" `
    --properties="PROPERTY=PROPERTY_VALUE" `
    --labels="LABEL=LABEL_VALUE" `
    -- --template=PUBSUBTOGCS `
    --templateProperty log.level="LOG_LEVEL" `
    --templateProperty pubsubtogcs.input.project.id="PUBSUB_SUBSCRIPTION_PROJECT_ID" `
    --templateProperty pubsubtogcs.input.subscription="SUBSCRIPTION" `
    --templateProperty pubsubtogcs.gcs.bucket.name="CLOUD_STORAGE_OUTPUT_BUCKET_NAME" `
    --templateProperty pubsubtogcs.gcs.output.data.format="FORMAT" `
    --templateProperty pubsubtogcs.timeout.ms="TIMEOUT" `
    --templateProperty pubsubtogcs.streaming.duration.seconds="DURATION" `
    --templateProperty pubsubtogcs.total.receivers="NUM_RECEIVERS" `
    --templateProperty pubsubtogcs.batch.size="BATCHSIZE"

Windows (cmd.exe)

gcloud dataproc batches submit spark ^
    --class=com.google.cloud.dataproc.templates.main.DataProcTemplate ^
    --version="1.2" ^
    --project="PROJECT_ID" ^
    --region="REGION" ^
    --jars="gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar" ^
    --subnet="SUBNET" ^
    --kms-key="KMS_KEY" ^
    --service-account="SERVICE_ACCOUNT" ^
    --properties="PROPERTY=PROPERTY_VALUE" ^
    --labels="LABEL=LABEL_VALUE" ^
    -- --template=PUBSUBTOGCS ^
    --templateProperty log.level="LOG_LEVEL" ^
    --templateProperty pubsubtogcs.input.project.id="PUBSUB_SUBSCRIPTION_PROJECT_ID" ^
    --templateProperty pubsubtogcs.input.subscription="SUBSCRIPTION" ^
    --templateProperty pubsubtogcs.gcs.bucket.name="CLOUD_STORAGE_OUTPUT_BUCKET_NAME" ^
    --templateProperty pubsubtogcs.gcs.output.data.format="FORMAT" ^
    --templateProperty pubsubtogcs.timeout.ms="TIMEOUT" ^
    --templateProperty pubsubtogcs.streaming.duration.seconds="DURATION" ^
    --templateProperty pubsubtogcs.total.receivers="NUM_RECEIVERS" ^
    --templateProperty pubsubtogcs.batch.size="BATCHSIZE"

REST

Ersetzen Sie diese Werte in den folgenden Anfragedaten:

  • PROJECT_ID: erforderlich. Die Google Cloud Projekt-ID, die in den IAM-Einstellungen aufgeführt ist.
  • REGION: erforderlich. Compute Engine-Region.
  • SUBNET: Optional. Wenn kein Subnetz angegeben ist, wird das Subnetz in der angegebenen REGION im Netzwerk default ausgewählt.

    Beispiel: projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME

  • TEMPLATE_VERSION: erforderlich. Geben Sie latest für die neueste Vorlagenversion oder das Datum einer bestimmten Version an, z. B. 2023-03-17_v0.1.0-beta. Rufen Sie gs://dataproc-templates-binaries auf oder führen Sie gcloud storage ls gs://dataproc-templates-binaries aus, um die verfügbaren Vorlagenversionen aufzulisten.
  • PUBSUB_SUBSCRIPTION_PROJECT_ID: erforderlich. Die Google Cloud Projekt-ID, die in den IAM-Einstellungen aufgeführt ist und das Pub/Sub-Eingabeabo enthält, das gelesen werden soll.
  • SUBSCRIPTION: erforderlich. Name des Pub/Sub-Abos.
  • CLOUD_STORAGE_OUTPUT_BUCKET_NAME: erforderlich. Name des Cloud Storage-Bucket, in dem die Ausgabe gespeichert wird.

    Hinweis:Die Ausgabedateien werden im Ordner output/ im Bucket gespeichert.

  • FORMAT: erforderlich. Ausgabedatenformat. Optionen: avro oder json.

    Hinweis:Wenn avro, müssen Sie dem gcloud CLI-Flag oder API-Feld jarsfile:///usr/lib/spark/connector/spark-avro.jar“ hinzufügen.

    Beispiel (das Präfix file:// verweist auf eine Serverless for Apache Spark-JAR-Datei):

    --jars=file:///usr/lib/spark/connector/spark-avro.jar, [ ... other jars]
  • TIMEOUT: Optional. Zeit in Millisekunden vor dem Beenden des Streams. Der Standardwert ist 60000.
  • DURATION: Optional. Häufigkeit der Schreibvorgänge in Cloud Storage in Sekunden. Standardmäßig 15 Sekunden.
  • NUM_RECEIVERS: Optional. Anzahl der Streams, die parallel aus einem Pub/Sub-Abo gelesen werden. Der Standardwert ist 5.
  • BATCHSIZE: Optional. Anzahl der Datensätze, die in einem Roundtrip in Cloud Storage eingefügt werden sollen. Der Standardwert ist 1000.
  • SERVICE_ACCOUNT: Optional. Wenn nicht angegeben, wird das Compute Engine-Standarddienstkonto verwendet.
  • PROPERTY und PROPERTY_VALUE: Optional. Durch Kommas getrennte Liste von Spark-Eigenschaft=value-Paaren.
  • LABEL und LABEL_VALUE: Optional. Durch Kommas getrennte Liste von label=value-Paaren.
  • LOG_LEVEL: Optional. Ebene der Protokollierung. Kann einer der folgenden Werte sein: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE oder WARN. Standard: INFO.
  • KMS_KEY: Optional. Der Cloud Key Management Service-Schlüssel, der für die Verschlüsselung verwendet werden soll. Wenn kein Schlüssel angegeben ist, werden Daten mit einem Google-owned and Google-managed encryption keyim Ruhezustand verschlüsselt.

    Beispiel: projects/PROJECT_ID/regions/REGION/keyRings/KEY_RING_NAME/cryptoKeys/KEY_NAME

HTTP-Methode und URL:

POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/locations/REGION/batches

JSON-Text anfordern:


{
  "environmentConfig":{
    "executionConfig":{
      "subnetworkUri":"SUBNET",
      "kmsKey": "KMS_KEY",
      "serviceAccount": "SERVICE_ACCOUNT"
    }
  },
  "labels": {
    "LABEL": "LABEL_VALUE"
  },
  "runtimeConfig": {
    "version": "1.2",
    "properties": {
      "PROPERTY": "PROPERTY_VALUE"
    }
  },
  "sparkBatch":{
    "mainClass":"com.google.cloud.dataproc.templates.main.DataProcTemplate",
    "args":[
      "--template","PUBSUBTOGCS",
      "--templateProperty","log.level=LOG_LEVEL",
      "--templateProperty","pubsubtogcs.input.project.id=PUBSUB_SUBSCRIPTION_PROJECT_ID",
      "--templateProperty","pubsubtogcs.input.subscription=SUBSCRIPTION",
      "--templateProperty","pubsubtogcs.gcs.bucket.name=CLOUD_STORAGE_OUTPUT_BUCKET_NAME",
      "--templateProperty","pubsubtogcs.gcs.output.data.format=FORMAT",
      "--templateProperty","pubsubtogcs.timeout.ms=TIMEOUT",
      "--templateProperty","pubsubtogcs.streaming.duration.seconds=DURATION",
      "--templateProperty","pubsubtogcs.total.receivers=NUM_RECEIVERS",
      "--templateProperty","pubsubtogcs.batch.size=BATCHSIZE"
    ],
    "jarFileUris":[
      "file:///usr/lib/spark/connector/spark-avro.jar", "gs://dataproc-templates-binaries/TEMPLATE_VERSION/java/dataproc-templates.jar"
    ]
  }
}

Wenn Sie die Anfrage senden möchten, maximieren Sie eine der folgenden Optionen:

Sie sollten eine JSON-Antwort ähnlich wie diese erhalten:


{
  "name": "projects/PROJECT_ID/regions/REGION/operations/OPERATION_ID",
  "metadata": {
    "@type": "type.googleapis.com/google.cloud.dataproc.v1.BatchOperationMetadata",
    "batch": "projects/PROJECT_ID/locations/REGION/batches/BATCH_ID",
    "batchUuid": "de8af8d4-3599-4a7c-915c-798201ed1583",
    "createTime": "2023-02-24T03:31:03.440329Z",
    "operationType": "BATCH",
    "description": "Batch"
  }
}