Il modello da Apache Kafka a BigQuery è una pipeline di inserimento flussi che importa dati di testo dai cluster Google Cloud Managed Service per Apache Kafka e poi invia i record risultanti alle tabelle BigQuery. Eventuali errori che si verificano durante l'inserimento dei dati nella tabella di output vengono inseriti in una tabella di errori separata in BigQuery.
Puoi anche utilizzare il modello da Apache Kafka a BigQuery con Kafka autogestito o esterno.
Requisiti della pipeline
- Il server del broker Apache Kafka deve essere in esecuzione ed essere raggiungibile dalle macchine worker Dataflow.
- Gli argomenti Apache Kafka devono esistere.
- Devi abilitare le API Dataflow, BigQuery e Cloud Storage. Se è richiesta l'autenticazione, devi abilitare anche l'API Secret Manager.
- Crea un set di dati e una tabella BigQuery con lo schema appropriato per il tuo argomento di input Kafka. Se utilizzi più schemi nello stesso argomento e vuoi scrivere su più tabelle, non è necessario creare la tabella prima di configurare la pipeline.
- Quando la coda dei messaggi non recapitabili (messaggi non elaborati) per il modello è abilitata, crea una tabella vuota priva di uno schema per la coda dei messaggi non recapitabili.
Formato dei messaggi Kafka
Il modello da Apache Kafka a BigQuery supporta la lettura di messaggi da Kafka nei seguenti formati:
CONFLUENT_AVRO_WIRE_FORMAT
, AVRO_BINARY_FORMAT
e JSON
.
Autenticazione
Il modello da Apache Kafka a BigQuery supporta l'autenticazione SASL/PLAIN per i broker Kafka.
Parametri del modello
Parametri obbligatori
- readBootstrapServerAndTopic : argomento Kafka da cui leggere l'input.
- kafkaReadAuthenticationMode : la modalità di autenticazione da utilizzare con il cluster Kafka. Utilizza NONE per nessuna autenticazione o SASL_PLAIN per nome utente e password SASL/PLAIN. Apache Kafka per BigQuery supporta solo la modalità di autenticazione SASL_PLAIN. Il valore predefinito è: SASL_PLAIN.
- writeMode : modalità di scrittura: scrive i record in una o più tabelle (in base allo schema). La modalità DYNAMIC_TABLE_NAMES è supportata solo per il formato dei messaggi di origine AVRO_CONFLUENT_WIRE_FORMAT e l'origine dello schema SCHEMA_REGISTRY. Il nome della tabella di destinazione verrà generato automaticamente in base al nome dello schema Avro di ciascun messaggio. Può essere un singolo schema (creazione di un'unica tabella) o più schemi (creazione di più tabelle). La modalità SINGLE_TABLE_NAME scrive in una singola tabella (schema singolo) specificata dall'utente. Il valore predefinito è SINGLE_TABLE_NAME.
- useBigQueryDLQ : se true, i messaggi con errori verranno scritti in BigQuery con ulteriori informazioni sull'errore. La tabella messaggi non recapitabili deve essere creata senza schema. Il valore predefinito è false.
- messageFormat : il formato dei messaggi Kafka da leggere. I valori supportati sono AVRO_CONFLUENT_WIRE_FORMAT (Avro codificato da Confluent Schema Registry), AVRO_BINARY_ENCODING (Avro binario normale) e JSON. Il valore predefinito è: AVRO_CONFLUENT_WIRE_FORMAT.
Parametri facoltativi
- outputTableSpec : posizione della tabella BigQuery in cui scrivere l'output. Il nome deve essere nel formato
<project>:<dataset>.<table_name>
. Lo schema della tabella deve corrispondere agli oggetti di input. - persistKafkaKey : se true, la pipeline manterrà la chiave di messaggio Kafka nella tabella BigQuery, in un campo
_key
di tipoBYTES
. Il valore predefinito è false (la chiave viene ignorata). - outputProject : progetto di output BigQuery in ogni parte del set di dati. Le tabelle verranno create dinamicamente nel set di dati. Il campo predefinito è vuoto.
- outputDataset : set di dati di output BigQuery in cui scrivere l'output. Le tabelle verranno create dinamicamente nel set di dati. Se le tabelle vengono create in anticipo, i nomi delle tabelle devono seguire la convenzione di denominazione specificata. Il nome deve essere
bqTableNamePrefix + Avro Schema FullName
e ogni parola sarà separata da un trattino "-". Il campo predefinito è vuoto. - bqTableNamePrefix : Prefisso di denominazione da utilizzare durante la creazione delle tabelle di output BigQuery. Applicabile solo quando si utilizza il registro di schema. Il campo predefinito è vuoto.
- createDisposition : CreateDisposition di BigQuery. Ad esempio CREATE_IF_NEEDED, CREATE_NEVER. Il valore predefinito è: CREATE_IF_NEEDED.
- writeDisposition : ScriviDisposition di BigQuery. Ad esempio, WRITE_APPEND, WRITE_EMPTY o WRITE_TRUNCATE. Il valore predefinito è: WRITE_APPEND.
- useAutoSharding : se true, la pipeline utilizza il sharding automatico durante il passaggio a BigQuery. Il valore predefinito è
true
. - numStorageWriteApiStreams : specifica il numero di flussi di scrittura (questo parametro deve essere impostato). Il valore predefinito è 0.
- storageWriteApiTriggeringFrequencySec : specifica la frequenza di attivazione in secondi. È necessario impostare questo parametro. Il valore predefinito è 5 secondi.
- useStorageWriteApiAtLeastOnce : questo parametro viene applicato soltanto se viene selezionato "Utilizza API BigQuery Storage Scrivi" sia abilitato. Se l'opzione è abilitata, per l'API Storage scrivere verrà utilizzata la semantica "at-least-once", altrimenti verrà utilizzata la semantica "exactly-once". Il valore predefinito è false.
- outputDeadletterTable : tabella BigQuery per i messaggi con errori. I messaggi non sono riusciti a raggiungere la tabella di output per diversi motivi (ad es. schema non corrispondente, json in formato non valido) vengono scritti in questa tabella. ad esempio id-progetto:set-di-dati.nome-tabella.
- enableCommitOffsets : esegue il commit degli offset dei messaggi elaborati in Kafka. Se abilitata, questa opzione ridurrà al minimo le lacune o l'elaborazione duplicata dei messaggi quando riavvii la pipeline. Richiede la specifica dell'ID gruppo di consumatori. Il valore predefinito è false.
- consumerGroupId : l'identificatore univoco del gruppo di consumatori a cui appartiene questa pipeline. Obbligatorio se è abilitato il Commit Offsets in Kafka. Il campo predefinito è vuoto.
- kafkaReadOffset : il punto di partenza per la lettura dei messaggi quando non esistono offset di commit. La prima parte dell'email parte dall'inizio, la più recente dall'ultimo messaggio. Il valore predefinito è: più recente.
- kafkaReadUsernameSecretId : l'ID secret di Google Cloud Secret Manager che contiene il nome utente Kafka da utilizzare con l'autenticazione SASL_PLAIN. (Esempio: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>). Il campo predefinito è vuoto.
- kafkaReadPasswordSecretId : l'ID secret di Google Cloud Secret Manager che contiene la password Kafka da utilizzare con l'autenticazione SASL_PLAIN. (Esempio: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>). Il campo predefinito è vuoto.
- schemaFormat : il formato dello schema Kafka. Può essere fornito nel formato SINGLE_SCHEMA_FILE o SCHEMA_REGISTRY. Se viene specificato SINGLE_SCHEMA_FILE, tutti i messaggi devono avere lo schema indicato nel file di schema avro. Se viene specificato SCHEMA_REGISTRY, i messaggi possono avere un solo schema o più schemi. Il valore predefinito è: SINGLE_SCHEMA_FILE.
- confluentAvroSchemaPath : il percorso di Google Cloud Storage del singolo file di schema Avro utilizzato per decodificare tutti i messaggi in un argomento. Il campo predefinito è vuoto.
- schemaRegistryConnectionUrl : l'URL dell'istanza del Confluent Schema Registry utilizzato per gestire gli schemi Avro per la decodifica dei messaggi. Il campo predefinito è vuoto.
- binaryAvroSchemaPath : il percorso di Google Cloud Storage del file di schema Avro utilizzato per decodificare i messaggi Avro con codifica binaria. Il campo predefinito è vuoto.
Esegui il modello
Console
- Vai alla pagina Crea job da modello di Dataflow. Vai a Crea job da modello
- Nel campo Nome job, inserisci un nome univoco per il job.
- (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. Il valore predefinito
è
us-central1
.Per un elenco di regioni in cui è possibile eseguire un job Dataflow, consulta Località di Dataflow.
- Dal menu a discesa Modello Dataflow, seleziona the Kafka to BigQuery template.
- Inserisci i valori parametro negli appositi campi.
- (Facoltativo) Per passare dall'elaborazione "exactly-once" all'impostazione modalità flusso di dati almeno una volta, seleziona Almeno una volta.
- Fai clic su Esegui job.
gcloud
Nella shell o nel terminale, esegui il modello:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_BigQuery \ --parameters \ outputTableSpec=BIGQUERY_TABLE,\ inputTopics=KAFKA_TOPICS,\ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ bootstrapServers=KAFKA_SERVER_ADDRESSES
Sostituisci quanto segue:
PROJECT_ID
: L'ID progetto Google Cloud in cui vuoi eseguire il job DataflowJOB_NAME
: un nome job univoco di tua sceltaREGION_NAME
: la regione in cui vuoi di eseguire il deployment del job Dataflow, ad esempious-central1
VERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare la versione più recente del modello, disponibile in cartella principale non-dated nel bucket: gs://dataflow-templates-REGION_NAME/latest/- il nome della versione, ad esempio
2023-09-12-00_RC00
, per utilizzare una versione specifica , che puoi trovare nidificata nella rispettiva cartella principale con data del bucket: gs://dataflow-templates-REGION_NAME/
BIGQUERY_TABLE
: il nome della tua tabella BigQueryKAFKA_TOPICS
: elenco degli argomenti di Apache Kakfa. Se vengono forniti più argomenti, le virgole devono essere precedute dal carattere di escape. Vedigcloud topic escaping
.PATH_TO_JAVASCRIPT_UDF_FILE
: L'URI Cloud Storage del file.js
che definisce il codice JavaScript definito dall'utente che vuoi utilizzare, ad esempiogs://my-bucket/my-udfs/my_file.js
JAVASCRIPT_FUNCTION
: il nome della funzione definita dall'utente (UDF) JavaScript che vuoi utilizzareAd esempio, se il codice della funzione JavaScript è
myTransform(inJson) { /*...do stuff...*/ }
, il nome della funzione èmyTransform
. Per esempi di funzioni JavaScript definite dall'utente, consulta Esempi di funzioni definite dall'utente.KAFKA_SERVER_ADDRESSES
: l'elenco di indirizzi IP del server broker Apache Kafka. Ogni indirizzo IP richiede il numero di porta da cui è accessibile il server. Ad esempio:35.70.252.199:9092
. Se vengono forniti più indirizzi, devi utilizzare il carattere di escape per le virgole. Vedigcloud topic escaping
.
API
Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sul
API e i relativi ambiti di autorizzazione, consulta
projects.templates.launch
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "outputTableSpec": "BIGQUERY_TABLE", "inputTopics": "KAFKA_TOPICS", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "bootstrapServers": "KAFKA_SERVER_ADDRESSES" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_BigQuery", } }
Sostituisci quanto segue:
PROJECT_ID
: L'ID progetto Google Cloud in cui vuoi eseguire il job DataflowJOB_NAME
: un nome job univoco di tua sceltaLOCATION
: la regione in cui vuoi di eseguire il deployment del job Dataflow, ad esempious-central1
VERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare la versione più recente del modello, disponibile in cartella principale non-dated nel bucket: gs://dataflow-templates-REGION_NAME/latest/- il nome della versione, ad esempio
2023-09-12-00_RC00
, per utilizzare una versione specifica , che puoi trovare nidificata nella rispettiva cartella principale con data del bucket: gs://dataflow-templates-REGION_NAME/
BIGQUERY_TABLE
: il nome della tua tabella BigQueryKAFKA_TOPICS
: elenco degli argomenti di Apache Kakfa. Se vengono forniti più argomenti, le virgole devono essere precedute dal carattere di escape. Vedigcloud topic escaping
.PATH_TO_JAVASCRIPT_UDF_FILE
: L'URI Cloud Storage del file.js
che definisce il codice JavaScript definito dall'utente che vuoi utilizzare, ad esempiogs://my-bucket/my-udfs/my_file.js
JAVASCRIPT_FUNCTION
: il nome della funzione definita dall'utente (UDF) JavaScript che vuoi utilizzareAd esempio, se il codice della funzione JavaScript è
myTransform(inJson) { /*...do stuff...*/ }
, il nome della funzione èmyTransform
. Per esempi di funzioni JavaScript definite dall'utente, consulta Esempi di funzioni definite dall'utente.KAFKA_SERVER_ADDRESSES
: l'elenco di indirizzi IP del server broker Apache Kafka. Ogni indirizzo IP richiede il numero di porta da cui è accessibile il server. Ad esempio:35.70.252.199:9092
. Se vengono forniti più indirizzi, devi utilizzare il carattere di escape per le virgole. Vedigcloud topic escaping
.
Per ulteriori informazioni, vedi Scrivere dati da Kafka a BigQuery con Dataflow.
Passaggi successivi
- Scopri di più sui modelli Dataflow.
- Consulta l'elenco dei modelli forniti da Google.