Vorlage für Spanner to BigQuery

Die Vorlage „Cloud Spanner für BigQuery“ ist eine Batchpipeline, die Daten aus einer Spanner-Tabelle liest und in BigQuery schreibt.

Pipelineanforderungen

  • Die Spanner-Quelltabelle muss vor der Pipelineausführung vorhanden sein.
  • Das BigQuery-Dataset muss vorhanden sein, bevor Sie die Pipeline ausführen.
  • Eine JSON-Datei, die Ihr BigQuery-Schema beschreibt.

    Die Datei muss ein JSON-Array der obersten Ebene mit dem Namen fields enthalten. Der Inhalt des fields-Arrays muss das folgende Muster haben:
    {"name": "COLUMN_NAME", "type": "DATA_TYPE"}.

    Der folgende JSON-Code beschreibt ein BigQuery-Beispielschema:

    {
      "fields": [
        {
          "name": "location",
          "type": "STRING"
        },
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "STRING"
        },
        {
          "name": "color",
          "type": "STRING"
        },
        {
          "name": "coffee",
          "type": "STRING"
        }
      ]
    }

    Die Batchvorlage „Spanner für BigQuery“ unterstützt nicht den Import von Daten in Felder des Typs STRUCT (Eintrag) in der BigQuery-Zieltabelle.

Vorlagenparameter

Erforderliche Parameter

  • spannerInstanceId: Die Instanz-ID der Spanner-Datenbank, aus der gelesen werden soll.
  • spannerDatabaseId: Die Datenbank-ID der Spanner-Datenbank, die exportiert werden soll.
  • outputTableSpec: Der Speicherort der BigQuery-Ausgabetabelle, in die die Ausgabe geschrieben werden soll. Beispiel: <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>.Abhängig von der angegebenen createDisposition kann die Ausgabetabelle automatisch mit dem vom Nutzer angegebenen Avro-Schema erstellt werden.

Optionale Parameter

Führen Sie die Vorlage aus.

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Spanner to BigQuery templateaus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Spanner_to_BigQuery_Flex \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       spannerInstanceId=SPANNER_INSTANCE_ID,\
       spannerDatabaseId=SPANNER_DATABASE_ID,\
       spannerTableId=SPANNER_TABLE_ID,\
       sqlQuery=SQL_QUERY,\
       outputTableSpec=OUTPUT_TABLE_SPEC,\

Ersetzen Sie dabei Folgendes:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • SPANNER_INSTANCE_ID: Die Spanner-Instanz-ID
  • SPANNER_DATABASE_ID: Die Spanner-Datenbank-ID
  • SPANNER_TABLE_ID: der Name der Spanner-Tabelle
  • SQL_QUERY: Die SQL-Abfrage.
  • OUTPUT_TABLE_SPEC: den Speicherort der BigQuery-Tabelle

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "spannerInstanceId": "SPANNER_INSTANCE_ID",
       "spannerDatabaseId": "SPANNER_DATABASE_ID",
       "spannerTableId": "SPANNER_TABLE_ID",
       "sqlQuery": "SQL_QUERY",
       "outputTableSpec": "OUTPUT_TABLE_SPEC",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Spanner_to_BigQuery_Flex",
     "environment": { "maxWorkers": "10" }
  }
}

Ersetzen Sie dabei Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • SPANNER_INSTANCE_ID: Die Spanner-Instanz-ID
  • SPANNER_DATABASE_ID: Die Spanner-Datenbank-ID
  • SPANNER_TABLE_ID: der Name der Spanner-Tabelle
  • SQL_QUERY: Die SQL-Abfrage.
  • OUTPUT_TABLE_SPEC: den Speicherort der BigQuery-Tabelle
Java
/*
 * Copyright (C) 2022 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import static com.google.cloud.teleport.v2.utils.GCSUtils.getGcsFileAsString;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition.CREATE_NEVER;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.SpannerToBigQueryOptions;
import com.google.cloud.teleport.v2.transforms.BigQueryConverters;
import com.google.cloud.teleport.v2.transforms.SpannerToBigQueryTransform.StructToJson;
import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
import com.google.common.base.Strings;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;

/** Template to read data from a Spanner table and write into a BigQuery table. */
@Template(
    name = "Cloud_Spanner_to_BigQuery_Flex",
    category = TemplateCategory.BATCH,
    displayName = "Spanner to BigQuery",
    description =
        "The Spanner to BigQuery template is a batch pipeline that reads data from a Spanner table, and writes them to a BigQuery table.",
    optionsClass = SpannerToBigQueryOptions.class,
    flexContainerName = "spanner-to-bigquery",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/spanner-to-bigquery",
    contactInformation = "https://cloud.google.com/support")
public final class SpannerToBigQuery {

  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    PipelineOptionsFactory.register(SpannerToBigQueryOptions.class);
    SpannerToBigQueryOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(SpannerToBigQueryOptions.class);

    BigQueryIOUtils.validateBQStorageApiOptionsBatch(options);

    Pipeline pipeline = Pipeline.create(options);

    SpannerConfig spannerConfig =
        SpannerConfig.create()
            .withProjectId(
                options.getSpannerProjectId().isEmpty()
                    ? options.getProject()
                    : options.getSpannerProjectId())
            .withDatabaseId(options.getSpannerDatabaseId())
            .withInstanceId(options.getSpannerInstanceId())
            .withRpcPriority(options.getSpannerRpcPriority());

    SpannerIO.Read read = SpannerIO.read().withSpannerConfig(spannerConfig);

    if (!Strings.isNullOrEmpty(options.getSqlQuery())) {
      read = read.withQuery(options.getSqlQuery());
    } else if (!Strings.isNullOrEmpty(options.getSpannerTableId())) {
      read = read.withTable(options.getSpannerTableId());
    } else {
      throw new IllegalArgumentException("either sqlQuery or spannerTableId required");
    }
    if (Strings.isNullOrEmpty(options.getBigQuerySchemaPath())
        && CreateDisposition.valueOf(options.getCreateDisposition()) != CREATE_NEVER) {
      throw new IllegalArgumentException(
          "bigQuerySchemaPath is required if CreateDisposition is not CREATE_NEVER");
    }
    pipeline
        .apply(read)
        .apply(new StructToJson())
        .apply("Write To BigQuery", writeToBigQuery(options));

    pipeline.run();
  }

  private static Write<String> writeToBigQuery(SpannerToBigQueryOptions options) {
    if (CreateDisposition.valueOf(options.getCreateDisposition()) == CREATE_NEVER) {
      return BigQueryIO.<String>write()
          .to(options.getOutputTableSpec())
          .withWriteDisposition(WriteDisposition.valueOf(options.getWriteDisposition()))
          .withCreateDisposition(CreateDisposition.valueOf(options.getCreateDisposition()))
          .withExtendedErrorInfo()
          .withFormatFunction(BigQueryConverters::convertJsonToTableRow);
    }
    return BigQueryIO.<String>write()
        .to(options.getOutputTableSpec())
        .withWriteDisposition(WriteDisposition.valueOf(options.getWriteDisposition()))
        .withCreateDisposition(CreateDisposition.valueOf(options.getCreateDisposition()))
        .withExtendedErrorInfo()
        .withFormatFunction(BigQueryConverters::convertJsonToTableRow)
        .withJsonSchema(getGcsFileAsString(options.getBigQuerySchemaPath()));
  }
}

Nächste Schritte