Da Apache Kafka a modello BigQuery

Il modello da Apache Kafka a BigQuery è una pipeline in modalità flusso che importa dati di testo da Apache Kafka, esegue una funzione definita dall'utente (UDF) e restituisce i record risultanti in BigQuery. Eventuali errori che si verificano nella trasformazione dei dati, nell'esecuzione della funzione definita dall'utente o nell'inserimento nella tabella di output vengono inseriti in una tabella degli errori separata in BigQuery. Se la tabella degli errori non esiste prima dell'esecuzione, viene creata.

Requisiti della pipeline

  • La tabella BigQuery di output deve esistere.
  • Il server di broker Apache Kafka deve essere in esecuzione ed essere raggiungibile dalle macchine worker Dataflow.
  • Gli argomenti Apache Kafka devono esistere e i messaggi devono essere codificati in un formato JSON valido.

Parametri del modello

Parametri obbligatori

  • outputTableSpec : la posizione della tabella di output BigQuery in cui scrivere l'output. Ad esempio, <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>.In base al valore createDisposition specificato, la tabella di output potrebbe essere creata automaticamente utilizzando lo schema Avro fornito dall'utente.

Parametri facoltativi

  • readBootstrapServers : elenco di server Kafka Bootstrap, separato da virgole. Ad esempio: localhost:9092,127.0.0.1:9093.
  • bootstrapServers : l'indirizzo host dei server di broker Apache Kafka in esecuzione in un elenco separato da virgole. Ogni indirizzo host deve essere nel formato 35.70.252.199:9092. Ad esempio: localhost:9092,127.0.0.1:9093.
  • kafkaReadTopics : argomenti Kafka da cui leggere l'input. (Esempio: argomento1,argomento2).
  • inputTopics : gli argomenti di input di Apache Kafka da cui leggere in un elenco separato da virgole. (Esempio: argomento1,argomento2).
  • outputDeadletterTable : tabella BigQuery per i messaggi con errori. I messaggi non sono riusciti a raggiungere la tabella di output a causa di diversi motivi (ad es. schema non corrispondente, JSON non corretto) vengono scritti in questa tabella. Se non esiste, verrà creato durante l'esecuzione della pipeline. Se non specificato, viene utilizzato "outputTableSpec_error_records". Ad esempio: id-progetto:set-di-dati.nome-tabella.
  • messageFormat : il formato del messaggio. Può essere AVRO o JSON. Il valore predefinito è JSON.
  • avroSchemaPath : percorso Cloud Storage al file dello schema Avro. Ad esempio, gs://MyBucket/file.avsc.
  • useStorageWriteApiAtLeastOnce : questo parametro diventa effettivo solo se è abilitata l'opzione "Utilizza l'API BigQuery Storage Write". Se abilitata, verrà usata la semantica "at-least-once" per l'API Storage Write, altrimenti verrà usata la semantica "exactly-once". Il valore predefinito è: false.
  • javascriptTextTransformGcsPath : l'URI Cloud Storage del file .js che definisce la funzione JavaScript definita dall'utente da utilizzare. (Esempio: gs://my-bucket/my-udfs/my_file.js).
  • javascriptTextTransformFunctionName : il nome della funzione definita dall'utente di JavaScript da utilizzare. Ad esempio, se il codice della funzione JavaScript è myTransform(inJson) { /*...do stuff...*/ }, il nome della funzione è myTransform. Per esempi di funzioni JavaScript di esempio, consulta gli esempi di funzioni definite dall'utente (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes : specifica la frequenza con cui ricaricare la funzione definita dall'utente, in minuti. Se il valore è maggiore di 0, Dataflow controlla periodicamente il file della funzione definita dall'utente in Cloud Storage e, se il file viene modificato, la ricarica. Questo parametro consente di aggiornare la funzione definita dall'utente mentre la pipeline è in esecuzione, senza dover riavviare il job. Se il valore è 0, il ricaricamento delle funzioni definite dall'utente è disabilitato. Il valore predefinito è 0.
  • writeDisposition : il valore di WriteDisposition di BigQuery (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload). Ad esempio, WRITE_APPEND, WRITE_EMPTY o WRITE_TRUNCATE. Il valore predefinito è WRITE_APPEND.
  • createDisposition : la classe CreateDisposition di BigQuery (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload). Ad esempio, CREATE_IF_NEEDED e CREATE_NEVER. Il valore predefinito è CREATE_IF_NEEDED.
  • useStorageWriteApi : se il valore è true, la pipeline utilizza l'API BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). Il valore predefinito è false. Per saperne di più, consulta la pagina relativa all'utilizzo dell'API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • numStorageWriteApiStreams : quando si utilizza l'API Storage Write, specifica il numero di flussi di scrittura. Se useStorageWriteApi è true e useStorageWriteApiAtLeastOnce è false, devi impostare questo parametro. Il valore predefinito è 0.
  • storageWriteApiTriggeringFrequencySec : quando utilizzi l'API Storage Write, specifica la frequenza di attivazione in secondi. Se useStorageWriteApi è true e useStorageWriteApiAtLeastOnce è false, devi impostare questo parametro.

Funzione definita dall'utente

Facoltativamente, puoi estendere questo modello scrivendo una funzione definita dall'utente (UDF). Il modello chiama la funzione definita dall'utente per ogni elemento di input. I payload degli elementi sono serializzati come stringhe JSON. Per ulteriori informazioni, consulta Creare funzioni definite dall'utente per i modelli Dataflow.

Specifica della funzione

La funzione definita dall'utente ha le seguenti specifiche:

  • Input: il valore del record Kafka, serializzato come stringa JSON.
  • Output: una stringa JSON corrispondente allo schema della tabella di destinazione BigQuery.

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, vedi Località Dataflow.

  5. Nel menu a discesa Modello Dataflow, seleziona the Kafka to BigQuery template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. (Facoltativo) Per passare dall'elaborazione "exactly-once" alla modalità di streaming "at-least-once", seleziona Almeno una volta.
  8. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_BigQuery \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome job univoco a tua scelta
  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • BIGQUERY_TABLE: nome della tua tabella BigQuery
  • KAFKA_TOPICS: l'elenco degli argomenti Apache Kakfa. Se vengono forniti più argomenti, segui le instructions su come utilizzare i caratteri di escape per le virgole.
  • PATH_TO_JAVASCRIPT_UDF_FILE: l'URI Cloud Storage del file .js che definisce la funzione JavaScript definita dall'utente che vuoi utilizzare, ad esempio gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: il nome della funzione definita dall'utente dall'utente JavaScript che vuoi utilizzare

    Ad esempio, se il codice della funzione JavaScript è myTransform(inJson) { /*...do stuff...*/ }, il nome della funzione è myTransform. Per esempi di funzioni JavaScript definite, consulta gli esempi di funzioni definite dall'utente.

  • KAFKA_SERVER_ADDRESSES: elenco di indirizzi IP del server di broker Apache Kafka. Ogni indirizzo IP deve avere con sé il numero di porta da cui è accessibile il server. Ad esempio: 35.70.252.199:9092. Se vengono forniti più indirizzi, segui le instructions per l'escape delle virgole.

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per maggiori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_BigQuery",
   }
}
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome job univoco a tua scelta
  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • BIGQUERY_TABLE: nome della tua tabella BigQuery
  • KAFKA_TOPICS: l'elenco degli argomenti Apache Kakfa. Se vengono forniti più argomenti, segui le instructions su come utilizzare i caratteri di escape per le virgole.
  • PATH_TO_JAVASCRIPT_UDF_FILE: l'URI Cloud Storage del file .js che definisce la funzione JavaScript definita dall'utente che vuoi utilizzare, ad esempio gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: il nome della funzione definita dall'utente dall'utente JavaScript che vuoi utilizzare

    Ad esempio, se il codice della funzione JavaScript è myTransform(inJson) { /*...do stuff...*/ }, il nome della funzione è myTransform. Per esempi di funzioni JavaScript definite, consulta gli esempi di funzioni definite dall'utente.

  • KAFKA_SERVER_ADDRESSES: elenco di indirizzi IP del server di broker Apache Kafka. Ogni indirizzo IP deve avere con sé il numero di porta da cui è accessibile il server. Ad esempio: 35.70.252.199:9092. Se vengono forniti più indirizzi, segui le instructions per l'escape delle virgole.

Per maggiori informazioni, consulta Scrivere dati da Kafka a BigQuery con Dataflow.

Passaggi successivi