Vorlage „Bigtable-Änderungsstreams zu BigQuery“

Die Vorlage „Bigtable-Änderungsstreams zu BigQuery“ ist eine Streamingpipeline, die Bigtable-Datenänderungseinträge streamt und mithilfe von Dataflow in BigQuery-Tabellen schreibt.

Mit einem Bigtable-Änderungsstream können Sie Datenmutationen auf Tabellenbasis abonnieren. Wenn Sie Änderungsstreams für Tabellen abonnieren, gelten die folgenden Einschränkungen:

  • Es werden nur geänderte Zellen und Deskriptoren von Löschvorgängen zurückgegeben.
  • Es wird nur der neue Wert einer geänderten Zelle zurückgegeben.

Wenn Datenänderungsdatensätze in BigQuery geschrieben werden, können Zeilen in der falschen Reihenfolge eingefügt werden im Vergleich zur ursprünglichen Bigtable-Commit-Zeitstempelreihenfolge.

Änderungslog-Zeilen, die aufgrund eines persistenten Fehlers nicht in BigQuery geschrieben werden können, werden dauerhaft in ein Verzeichnis für unzustellbare Nachrichten (Cloud Storage für nicht verarbeitete Nachrichten) in Cloud Storage geschrieben, damit sie von Nutzern analysiert oder weiter verarbeitet werden können.

Wenn die erforderliche BigQuery-Tabelle nicht vorhanden ist, wird sie von der Pipeline erstellt. Andernfalls wird eine vorhandene BigQuery-Tabelle verwendet. Das Schema vorhandener BigQuery-Tabellen muss die Spalten in der folgenden Tabelle enthalten.

Jede neue BigQuery-Zeile enthält einen Datenänderungsdatensatz, der vom Änderungsstream aus der entsprechenden Zeile in Ihrer Bigtable-Tabelle zurückgegeben wird.

BigQuery-Ausgabetabellenschema

Spaltenname Typ Nullwerte zulässig Beschreibung
row_key STRING oder BYTES Nein Der Zeilenschlüssel der geänderten Zeile. Wenn die Pipelineoption writeRowkeyAsBytes auf true gesetzt ist, muss der Typ der Spalte BYTES sein. Andernfalls verwenden Sie den Typ STRING.
mod_type STRING Nein Der Typ der Zeilenmutation. Verwenden Sie einen der folgenden Werte: SET_CELL, DELETE_CELLS oder DELETE_FAMILY.
column_family STRING Nein Die von der Zeilenmutation betroffene Spaltenfamilie
column STRING Ja Der von der Zeilenmutation betroffene Spaltenqualifizierer. Legen Sie für den Mutationstyp DELETE_FAMILY den Wert NULL fest.
commit_timestamp TIMESTAMP Nein Der Zeitpunkt, an dem Bigtable die Mutation anwendet.
big_query_commit_timestamp TIMESTAMP Ja Optional: Gibt die Zeit an, zu der BigQuery die Zeile in eine Ausgabetabelle schreibt. Das Feld wird nicht ausgefüllt, wenn der Spaltenname im Wert der Pipelineoption bigQueryChangelogTableFieldsToIgnore vorhanden ist.
timestamp TIMESTAMP oder INT64 Ja Der Zeitstempelwert der Zelle, die von der Mutation betroffen ist. Wenn die Pipelineoption writeNumericTimestamps auf true gesetzt ist, muss der Typ der Spalte INT64 sein. Andernfalls verwenden Sie den Typ TIMESTAMP. Legen Sie für die Mutationstypen DELETE_CELLS und DELETE_FAMILY den Wert NULL fest.
timestamp_from TIMESTAMP oder INT64 Ja Beschreibt einen inklusiven Start des Zeitstempelintervalls für alle Zellen, die von der Mutation DELETE_CELLS gelöscht wurden. Legen Sie für andere Mutationstypen NULL fest.
timestamp_to TIMESTAMP oder INT64 Ja Beschreibt ein exklusives Ende des Zeitstempelintervalls für alle Zellen, die mit der Mutation DELETE_CELLS gelöscht werden. Legen Sie für andere Mutationstypen NULL fest.
is_gc BOOL Nein Optional: Wenn die Mutation durch eine Richtlinie für die automatische Speicherbereinigung ausgelöst wird, legen Sie true fest. In allen anderen Fällen legen Sie false fest. Das Feld wird nicht ausgefüllt, wenn der Spaltenname im Wert der Pipelineoption bigQueryChangelogTableFieldsToIgnore enthalten ist.
source_instance STRING Nein Optional: Beschreibt den Namen der Bigtable-Instanz, von der die Mutation stammt. Das Feld wird nicht ausgefüllt, wenn der Spaltenname im Wert der Pipelineoption bigQueryChangelogTableFieldsToIgnore enthalten ist.
source_cluster STRING Nein Optional: Beschreibt den Namen des Bigtable-Clusters, aus dem die Mutation stammt. Das Feld wird nicht ausgefüllt, wenn der Spaltenname im Wert der Pipelineoption bigQueryChangelogTableFieldsToIgnore enthalten ist.
source_table STRING Nein Optional: Beschreibt den Namen der Bigtable-Tabelle, auf die die Mutation angewendet wird. Der Wert in dieser Spalte kann nützlich sein, wenn mehrere Bigtable-Tabellen Änderungen an derselben BigQuery-Tabelle streamen. Das Feld wird nicht ausgefüllt, wenn der Spaltenname im Wert der Pipelineoption bigQueryChangelogTableFieldsToIgnore enthalten ist.
tiebreaker INT64 Nein Optional: Wenn zwei Mutationen von verschiedenen Bigtable-Clustern gleichzeitig registriert werden, wird die Mutation mit dem höchsten tiebreaker-Wert auf die Quelltabelle angewendet. Mutationen mit niedrigeren tiebreaker-Werten werden verworfen. Das Feld wird nicht ausgefüllt, wenn der Spaltenname im Wert der Pipelineoption bigQueryChangelogTableFieldsToIgnore enthalten ist.
value STRING oder BYTES Ja Der neue Wert, der von der Mutation festgelegt wird. Wenn die Pipelineoption writeValuesAsBytes auf true gesetzt ist, muss der Typ der Spalte BYTES sein. Andernfalls verwenden Sie den Typ STRING. Der Wert wird für SET_CELL-Mutationen festgelegt. Bei anderen Mutationstypen wird der Wert auf NULL festgelegt.

Pipelineanforderungen

  • Die angegebene Bigtable-Quellinstanz.
  • Die angegebene Bigtable-Quelltabelle In der Tabelle müssen Änderungsstreams aktiviert sein.
  • Das angegebene Bigtable-Anwendungsprofil.
  • Das angegebene BigQuery-Ziel-Dataset.

Vorlagenparameter

Parameter Beschreibung
bigtableReadInstanceId Die Bigtable-Quellinstanz-ID.
bigtableReadTableId Die Bigtable-Quelltabellen-ID.
bigtableChangeStreamAppProfile Die Bigtable-Anwendungsprofil-ID. Das Anwendungsprofil muss Single-Cluster-Routing verwenden und Transaktionen für einzelne Zeilen zulassen.
bigQueryDataset Der Dataset-Name der BigQuery-Zieltabelle.
writeNumericTimestamps Optional: Schreiben Sie den Bigtable-Zeitstempel als BigQuery-INT64. Wenn true festgelegt ist, werden Werte in die Spalte INT64 geschrieben. Andernfalls werden Werte in die Spalte TIMESTAMP geschrieben. Betroffene Spalten: timestamp, timestamp_from und timestamp_to Die Standardeinstellung ist false. Wenn dieser Wert auf true gesetzt ist, wird die Zeit seit der Unix-Epoche (1. Januar 1970 um UTC) in Mikrosekunden gemessen.
writeRowkeyAsBytes Optional: Zeilenschlüssel als BigQuery-BYTES schreiben. Wenn true festgelegt ist, werden Zeilenschlüssel in die Spalte BYTES geschrieben. Andernfalls werden Zeilenschlüssel in die Spalte STRING geschrieben. Die Standardeinstellung ist false.
writeValuesAsBytes Optional: Werte als BigQuery-BYTES schreiben. Wenn true festgelegt ist, werden Werte in die Spalte BYTES geschrieben. Andernfalls werden Werte in die Spalte STRING geschrieben. Die Standardeinstellung ist false.
bigQueryChangelogTableName Optional: BigQuery-Zieltabellenname. Wenn nicht angegeben, wird der Wert bigtableReadTableId + "_changelog" verwendet.
bigQueryProjectId Optional: Die Projekt-ID des BigQuery-Datasets. Der Standardwert ist das Projekt für den Dataflow-Job.
bigtableReadProjectId Optional: Die Bigtable-Projekt-ID. Der Standardwert ist das Projekt für den Dataflow-Job.
bigtableChangeStreamMetadataInstanceId Optional: Die Bigtable-Änderung streamt die Metadateninstanz-ID.
bigtableChangeStreamMetadataTableTableId Optional: Die Bigtable-Änderung streamt Metadatentabellen-ID.
bigtableChangeStreamCharset Optional: Die Bigtable-Änderung streamt den Namen des Zeichensatzes beim Lesen von Werten und Spaltenqualifizierern.
bigtableChangeStreamStartTimestamp Optional: Der Start-Zeitstempel (einschließlich), der zum Lesen von Änderungsstreams verwendet wird. Beispiel: 2022-05-05T07:59:59Z. Die Standardeinstellung ist der Zeitstempel der Pipeline-Startzeit.
bigtableChangeStreamIgnoreColumnFamilies Optional: Eine durch Kommas getrennte Liste von Änderungen der Spaltenfamilien, die ignoriert werden sollen.
bigtableChangeStreamIgnoreColumns Optional: Eine durch Kommas getrennte Liste mit Spaltennamen, die ignoriert werden sollen.
bigtableChangeStreamName Optional: Eindeutiger Name der Clientpipeline. Ermöglicht die Fortsetzung der Verarbeitung ab dem Punkt, an dem eine zuvor ausgeführte Pipeline beendet wurde. Standardmäßig wird der automatisch generierte Name verwendet. Weitere Informationen finden Sie in den Dataflow-Joblogs.
bigtableChangeStreamResume Optional: Wenn dieser Wert auf true gesetzt ist, wird eine neue Pipeline ab dem Punkt, an dem eine zuvor ausgeführte Pipeline mit demselben bigtableChangeStreamName-Wert beendet wird, mit der Verarbeitung fortgesetzt. Wenn die Pipeline mit dem angegebenen bigtableChangeStreamName-Wert noch nie ausgeführt wurde, wird eine neue Pipeline nicht gestartet. Wenn false festgelegt ist, wird eine neue Pipeline gestartet. Wenn eine Pipeline mit demselben bigtableChangeStreamName-Wert bereits für die angegebene Quelle ausgeführt wurde, wird eine neue Pipeline nicht gestartet. Die Standardeinstellung ist false.
bigQueryChangelogTableFieldsToIgnore Optional: Eine durch Kommas getrennte Liste der Änderungslogspalten, die, wenn angegeben, nicht erstellt und ausgefüllt werden. Verwenden Sie einen der folgenden unterstützten Werte: is_gc, source_instance, source_cluster, source_table, tiebreaker oder big_query_commit_timestamp. Standardmäßig werden alle Spalten ausgefüllt.
bigQueryChangelogTablePartitionExpirationMs Optional: Legt die Ablaufzeit der Partition des Änderungslogs in Millisekunden fest. Wenn true festgelegt ist, werden Partitionen gelöscht, die älter als die angegebene Anzahl von Millisekunden sind. Standardmäßig ist kein Ablauf festgelegt.
bigQueryChangelogTablePartitionGranularity Optional: Gibt einen Detaillierungsgrad zur Partitionierung der Änderungslogtabelle an. Wenn dieses Flag festgelegt ist, wird die Tabelle partitioniert. Verwenden Sie einen der folgenden unterstützten Werte: HOUR, DAY, MONTH oder YEAR. Standardmäßig ist die Tabelle nicht partitioniert.
dlqDirectory Optional: Das Verzeichnis für die Warteschlange für unzustellbare Nachrichten. Einträge, die nicht verarbeitet werden können, werden in diesem Verzeichnis gespeichert. Der Standardwert ist ein Verzeichnis unter dem temporären Speicherort des Dataflow-Jobs. In den meisten Fällen können Sie den Standardpfad verwenden.

Führen Sie die Vorlage aus.

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Bigtable change streams to BigQuery template aus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery \
    --parameters \
bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\
bigtableReadTableId=BIGTABLE_TABLE_ID,\
bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\
bigQueryDataset=BIGQUERY_DESTINATION_DATASET

Dabei gilt:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • BIGTABLE_INSTANCE_ID: Ihre Bigtable-Instanz-ID.
  • BIGTABLE_TABLE_ID: Ihre Bigtable-Tabellen-ID.
  • BIGTABLE_APPLICATION_PROFILE_ID: Ihre Bigtable-Anwendungsprofil-ID
  • BIGQUERY_DESTINATION_DATASET: der Name des BigQuery-Ziel-Datasets

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery",
    "parameters": {
        "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID",
        "bigtableReadTableId": "BIGTABLE_TABLE_ID",
        "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID",
        "bigQueryDataset": "BIGQUERY_DESTINATION_DATASET"
    }
  }
}

Dabei gilt:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • BIGTABLE_INSTANCE_ID: Ihre Bigtable-Instanz-ID.
  • BIGTABLE_TABLE_ID: Ihre Bigtable-Tabellen-ID.
  • BIGTABLE_APPLICATION_PROFILE_ID: Ihre Bigtable-Anwendungsprofil-ID
  • BIGQUERY_DESTINATION_DATASET: der Name des BigQuery-Ziel-Datasets

Nächste Schritte