Il modello Apache Kafka to BigQuery è una pipeline di inserimento flussi che importa i dati di testo dai cluster Google Cloud Managed Service per Apache Kafka e poi restituisce i record risultanti alle tabelle BigQuery. Eventuali errori che si verificano durante l'inserimento dei dati nella tabella di output vengono inseriti in una tabella degli errori separata in BigQuery.
Puoi utilizzare il modello Apache Kafka to BigQuery anche con Kafka autogestito o esterno.
Requisiti della pipeline
- Il server broker Apache Kafka deve essere in esecuzione ed essere raggiungibile dalle macchine worker di Dataflow.
- Gli argomenti Apache Kafka devono esistere.
- Devi abilitare le API Dataflow, BigQuery e Cloud Storage. Se è richiesta l'autenticazione, devi anche attivare l'API Secret Manager.
- Crea un set di dati e una tabella BigQuery con lo schema appropriato per l'argomento di input Kafka. Se utilizzi più schemi nello stesso argomento e vuoi scrivere in più tabelle, non è necessario creare la tabella prima di configurare la pipeline.
- Quando la coda dei messaggi non recapitabili (non elaborati) per il modello è attivata, crea una tabella vuota che non abbia uno schema per la coda dei messaggi non recapitabili.
Formato dei messaggi Kafka
Il modello Apache Kafka to BigQuery supporta la lettura dei messaggi da Kafka nei seguenti formati:
CONFLUENT_AVRO_WIRE_FORMAT
, AVRO_BINARY_FORMAT
e JSON
.
Autenticazione
Il modello Apache Kafka to BigQuery supporta l'autenticazione SASL/PLAIN ai broker Kafka.
Parametri del modello
Parametri obbligatori
- readBootstrapServerAndTopic : l'argomento Kafka da cui leggere l'input.
- writeMode : modalità di scrittura: consente di scrivere record in una o più tabelle (in base allo schema). La modalità DYNAMIC_TABLE_NAMES è supportata solo per il formato del messaggio di origine AVRO_CONFLUENT_WIRE_FORMAT e l'origine dello schema SCHEMA_REGISTRY. Il nome della tabella di destinazione verrà generato automaticamente in base al nome dello schema Avro di ciascun messaggio. Può essere un singolo schema (creazione di una singola tabella) o più schemi (creazione di più tabelle). La modalità SINGLE_TABLE_NAME scrive in una singola tabella (singolo schema) specificata dall'utente. Il valore predefinito è SINGLE_TABLE_NAME.
- kafkaReadAuthenticationMode : la modalità di autenticazione da utilizzare con il cluster Kafka. Utilizza NONE per l'autenticazione senza password, SASL_PLAIN per nome utente e password SASL/PLAIN e TLS per l'autenticazione basata su certificato. APPLICATION_DEFAULT_CREDENTIALS deve essere utilizzato solo per il cluster Apache Kafka per BigQuery di Google Cloud, in quanto ti consente di autenticarti con Apache Kafka per BigQuery di Google Cloud utilizzando le credenziali predefinite dell'applicazione.
- messageFormat : il formato dei messaggi Kafka da leggere. I valori supportati sono AVRO_CONFLUENT_WIRE_FORMAT (Avro codificato da Confluent Schema Registry), AVRO_BINARY_ENCODING (Avro binario semplice) e JSON. Il valore predefinito è: AVRO_CONFLUENT_WIRE_FORMAT.
- useBigQueryDLQ : se true, i messaggi non riusciti verranno scritti in BigQuery con informazioni aggiuntive sugli errori. Il valore predefinito è false.
Parametri facoltativi
- outputTableSpec : posizione della tabella BigQuery in cui scrivere l'output. Il nome deve essere nel formato
<project>:<dataset>.<table_name>
. Lo schema della tabella deve corrispondere agli oggetti di input. - persistKafkaKey : se true, la pipeline manterrà la chiave del messaggio Kafka nella tabella BigQuery, in un campo
_key
di tipoBYTES
. Il valore predefinito è false (la chiave viene ignorata). - outputProject : progetto di output BigQuery in cui risiede il set di dati. Le tabelle verranno create dinamicamente nel set di dati. Il valore predefinito è vuoto.
- outputDataset : il set di dati BigQuery di output in cui scrivere l'output. Le tabelle verranno create dinamicamente nel set di dati. Se le tabelle vengono create in precedenza, i nomi devono seguire la convenzione di denominazione specificata. Il nome deve essere
bqTableNamePrefix + Avro Schema FullName
e ogni parola deve essere separata da un trattino "-". Il valore predefinito è vuoto. - bqTableNamePrefix : prefisso dei nomi da utilizzare durante la creazione delle tabelle di output BigQuery. Applicabile solo quando si utilizza il registry dello schema. Il valore predefinito è vuoto.
- createDisposition : BigQuery CreateDisposition. Ad esempio, CREATE_IF_NEEDED, CREATE_NEVER. Il valore predefinito è CREATE_IF_NEEDED.
- writeDisposition : BigQuery WriteDisposition. Ad esempio, WRITE_APPEND, WRITE_EMPTY o WRITE_TRUNCATE. Il valore predefinito è WRITE_APPEND.
- useAutoSharding : se true, la pipeline utilizza lo sharding automatico durante la scrittura in BigQuery. Il valore predefinito è
true
. - numStorageWriteApiStreams : specifica il numero di stream di scrittura. Questo parametro deve essere impostato. Il valore predefinito è 0.
- storageWriteApiTriggeringFrequencySec : specifica la frequenza di attivazione in secondi. Questo parametro deve essere impostato. Il valore predefinito è 5 secondi.
- useStorageWriteApiAtLeastOnce : questo parametro viene applicato solo se è attivata l'opzione "Utilizza l'API BigQuery Storage Write". Se è attivata, per l'API Storage Write verrà utilizzata la semantica almeno una volta, altrimenti verrà utilizzata la semantica esattamente una volta. Il valore predefinito è false.
- enableCommitOffsets : esegui il commit degli offset dei messaggi elaborati in Kafka. Se questa opzione è attivata, le lacune o l'elaborazione duplicata dei messaggi vengono ridotte al minimo al riavvio della pipeline. Richiede la specifica dell'ID gruppo di consumatori. Il valore predefinito è false.
- consumerGroupId : l'identificatore univoco del gruppo di consumatori a cui appartiene questa pipeline. Obbligatorio se è attivata l'opzione Commit Offsets to Kafka. Il valore predefinito è vuoto.
- kafkaReadOffset : il punto di partenza per la lettura dei messaggi quando non esistono offset committati. La prima inizia dall'inizio, l'ultima dal messaggio più recente. Il valore predefinito è latest.
- kafkaReadUsernameSecretId : l'ID segreto di Secret Manager di Google Cloud che contiene il nome utente Kafka da utilizzare con l'autenticazione SASL_PLAIN. (ad esempio, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>). Il valore predefinito è vuoto.
- kafkaReadPasswordSecretId : l'ID segreto di Secret Manager di Google Cloud che contiene la password di Kafka da utilizzare con l'autenticazione SASL_PLAIN. (ad esempio, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>). Il valore predefinito è vuoto.
- kafkaReadKeystoreLocation : il percorso di Google Cloud Storage del file dell'archivio chiavi Java (JKS) contenente il certificato TLS e la chiave privata da utilizzare per l'autenticazione con il cluster Kafka. (ad es. gs://your-bucket/keystore.jks).
- kafkaReadTruststoreLocation : il percorso di Google Cloud Storage del file dell'archivio attendibilità Java (JKS) contenente i certificati attendibili da utilizzare per verificare l'identità del broker Kafka.
- kafkaReadTruststorePasswordSecretId : l'ID segreto di Secret Manager di Google Cloud che contiene la password da utilizzare per accedere al file Java TrustStore (JKS) per l'autenticazione TLS di Kafka (esempio: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>).
- kafkaReadKeystorePasswordSecretId : l'ID secret di Secret Manager di Google Cloud che contiene la password da utilizzare per accedere al file Java KeyStore (JKS) per l'autenticazione TLS di Kafka. (esempio: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>).
- kafkaReadKeyPasswordSecretId : l'ID secret di Secret Manager di Google Cloud che contiene la password da utilizzare per accedere alla chiave privata all'interno del file dell'archivio chiavi Java (JKS) per l'autenticazione TLS di Kafka. (esempio: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>).
- schemaFormat : il formato dello schema Kafka. Può essere fornito come SINGLE_SCHEMA_FILE o SCHEMA_REGISTRY. Se è specificato SINGLE_SCHEMA_FILE, tutti i messaggi devono avere lo schema menzionato nel file dello schema avro. Se è specificato SCHEMA_REGISTRY, i messaggi possono avere uno o più schemi. Il valore predefinito è SINGLE_SCHEMA_FILE.
- confluentAvroSchemaPath : il percorso di Google Cloud Storage del singolo file dello schema Avro utilizzato per decodificare tutti i messaggi di un argomento. Il valore predefinito è vuoto.
- schemaRegistryConnectionUrl : l'URL dell'istanza Confluent Schema Registry utilizzata per gestire gli schemi Avro per la decodifica dei messaggi. Il valore predefinito è vuoto.
- binaryAvroSchemaPath : il percorso di Google Cloud Storage al file dello schema Avro utilizzato per decodificare i messaggi Avro con codifica binaria. Il valore predefinito è vuoto.
- schemaRegistryAuthenticationMode : modalità di autenticazione di Schema Registry. Può essere NONE, TLS o OAUTH. Il valore predefinito è NESSUNO.
- schemaRegistryTruststoreLocation : posizione del certificato SSL in cui sono archiviati l'archivio di attendibilità per l'autenticazione a Schema Registry. (esempio: /your-bucket/truststore.jks).
- schemaRegistryTruststorePasswordSecretId : SecretId in Secret Manager in cui è archiviata la password per accedere al secret nel truststore. (esempio: progetti/numero-del-tuo-progetto/secret/nome-del-tuo-secret/versioni/versione-del-tuo-secret).
- schemaRegistryKeystoreLocation : posizione dell'archivio chiavi contenente il certificato SSL e la chiave privata. (ad esempio /your-bucket/keystore.jks).
- schemaRegistryKeystorePasswordSecretId: SecretId in Secret Manager dove si trova la password per accedere al file del keystore (ad es. projects/your-project-number/secrets/your-secret-name/versions/your-secret-version).
- schemaRegistryKeyPasswordSecretId : SecretId della password richiesta per accedere alla chiave privata del client archiviata nel keystore (esempio: projects/your-project-number/secrets/your-secret-name/versions/your-secret-version).
- schemaRegistryOauthClientId : l'ID client utilizzato per autenticare il client Schema Registry in modalità OAUTH. Obbligatorio per il formato del messaggio AVRO_CONFLUENT_WIRE_FORMAT.
- schemaRegistryOauthClientSecretId : l'ID segreto di Secret Manager di Google Cloud che contiene il segreto client da utilizzare per autenticare il client Schema Registry in modalità OAUTH. Obbligatorio per il formato del messaggio AVRO_CONFLUENT_WIRE_FORMAT. (esempio: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>).
- schemaRegistryOauthScope : l'ambito del token di accesso utilizzato per autenticare il client Schema Registry in modalità OAUTH. Questo campo è facoltativo, poiché la richiesta può essere effettuata senza che venga passato un parametro di ambito. (ad es. openid).
- schemaRegistryOauthTokenEndpointUrl : l'URL basato su HTTP(S) per il provider di identità OAuth/OIDC utilizzato per autenticare il client Schema Registry in modalità OAUTH. Obbligatorio per il formato del messaggio AVRO_CONFLUENT_WIRE_FORMAT.
- outputDeadletterTable : nome completo della tabella BigQuery per i messaggi non riusciti. I messaggi che non sono riusciti a raggiungere la tabella di output per diversi motivi (ad es. schema non corrispondente, JSON in formato errato) vengono scritti in questa tabella. La tabella verrà creata dal modello. (esempio: your-project-id:your-dataset.your-table-name).
- javascriptTextTransformGcsPath : l'URI Cloud Storage del file .js che definisce la funzione definita dall'utente (UDF) JavaScript da utilizzare. (ad es. gs://my-bucket/my-udfs/my_file.js).
- javascriptTextTransformFunctionName : il nome della funzione definita dall'utente (UDF) JavaScript da utilizzare. Ad esempio, se il codice della funzione JavaScript è
myTransform(inJson) { /*...do stuff...*/ }
, il nome della funzione èmyTransform
. Per esempi di funzioni JavaScript definite dall'utente, consulta Esempi di funzioni UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples). - javascriptTextTransformReloadIntervalMinutes : specifica la frequenza con cui ricaricare la UDF, in minuti. Se il valore è maggiore di 0, Dataflow controlla periodicamente il file UDF in Cloud Storage e lo ricarica se il file viene modificato. Questo parametro ti consente di aggiornare la UDF durante l'esecuzione della pipeline, senza dover riavviare il job. Se il valore è 0, il ricaricamento dei DFF è disattivato. Il valore predefinito è 0.
Esegui il modello
Console
- Vai alla pagina Crea job da modello di Dataflow. Vai a Crea job da modello
- Nel campo Nome job, inserisci un nome univoco per il job.
- (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è
us-central1
.Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.
- Nel menu a discesa Modello di flusso di dati, seleziona the Kafka to BigQuery template.
- Nei campi dei parametri forniti, inserisci i valori dei parametri.
- (Facoltativo) Per passare dall'elaborazione exactly-once alla modalità flusso di dati Almeno una volta, seleziona Almeno una volta.
- Fai clic su Esegui job.
gcloud
Nella shell o nel terminale, esegui il modello:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_BigQuery_Flex \ --parameters \ outputTableSpec=BIGQUERY_TABLE,\ inputTopics=KAFKA_TOPICS,\ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ bootstrapServers=KAFKA_SERVER_ADDRESSES
Sostituisci quanto segue:
PROJECT_ID
: l'ID del progetto Google Cloud in cui vuoi eseguire il job DataflowJOB_NAME
: un nome di job univoco a tua sceltaREGION_NAME
: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1
VERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare la versione più recente del modello, disponibile nella cartella principale senza data del bucket: gs://dataflow-templates-REGION_NAME/latest/- il nome della versione, ad esempio
2023-09-12-00_RC00
, per utilizzare una versione specifica del modello, che si trova nidificata nella rispettiva cartella principale datata nel bucket: gs://dataflow-templates-REGION_NAME/
BIGQUERY_TABLE
: il nome della tabella BigQueryKAFKA_TOPICS
: l'elenco di argomenti Apache Kafka. Se vengono forniti più argomenti, devi eseguire la fuga delle virgole. Consultagcloud topic escaping
.PATH_TO_JAVASCRIPT_UDF_FILE
: l'URI Cloud Storage del file.js
che definisce la funzione JavaScript definita dall'utente (UDF) che vuoi utilizzare, ad esempiogs://my-bucket/my-udfs/my_file.js
JAVASCRIPT_FUNCTION
: il nome della funzione definita dall'utente (UDF) JavaScript che vuoi utilizzareAd esempio, se il codice della funzione JavaScript è
myTransform(inJson) { /*...do stuff...*/ }
, il nome della funzione èmyTransform
. Per esempi di funzioni JavaScript definite dall'utente, consulta Esempi di funzioni definite dall'utente.KAFKA_SERVER_ADDRESSES
: l'elenco di indirizzi IP del server broker Apache Kafka. Ogni indirizzo IP richiede il numero di porta da cui è accessibile il server. Ad esempio:35.70.252.199:9092
. Se vengono forniti più indirizzi, devi eseguire la fuga delle virgole. Consultagcloud topic escaping
.
API
Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "outputTableSpec": "BIGQUERY_TABLE", "inputTopics": "KAFKA_TOPICS", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "bootstrapServers": "KAFKA_SERVER_ADDRESSES" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_BigQuery_Flex", } }
Sostituisci quanto segue:
PROJECT_ID
: l'ID del progetto Google Cloud in cui vuoi eseguire il job DataflowJOB_NAME
: un nome di job univoco a tua sceltaLOCATION
: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1
VERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare la versione più recente del modello, disponibile nella cartella principale senza data del bucket: gs://dataflow-templates-REGION_NAME/latest/- il nome della versione, ad esempio
2023-09-12-00_RC00
, per utilizzare una versione specifica del modello, che si trova nidificata nella rispettiva cartella principale datata nel bucket: gs://dataflow-templates-REGION_NAME/
BIGQUERY_TABLE
: il nome della tabella BigQueryKAFKA_TOPICS
: l'elenco di argomenti Apache Kafka. Se vengono forniti più argomenti, devi eseguire la fuga delle virgole. Consultagcloud topic escaping
.PATH_TO_JAVASCRIPT_UDF_FILE
: l'URI Cloud Storage del file.js
che definisce la funzione JavaScript definita dall'utente (UDF) che vuoi utilizzare, ad esempiogs://my-bucket/my-udfs/my_file.js
JAVASCRIPT_FUNCTION
: il nome della funzione definita dall'utente (UDF) JavaScript che vuoi utilizzareAd esempio, se il codice della funzione JavaScript è
myTransform(inJson) { /*...do stuff...*/ }
, il nome della funzione èmyTransform
. Per esempi di funzioni JavaScript definite dall'utente, consulta Esempi di funzioni definite dall'utente.KAFKA_SERVER_ADDRESSES
: l'elenco di indirizzi IP del server broker Apache Kafka. Ogni indirizzo IP richiede il numero di porta da cui è accessibile il server. Ad esempio:35.70.252.199:9092
. Se vengono forniti più indirizzi, devi eseguire la fuga delle virgole. Consultagcloud topic escaping
.
Per ulteriori informazioni, consulta Scrivere dati da Kafka in BigQuery con Dataflow.
Passaggi successivi
- Scopri di più sui modelli Dataflow.
- Consulta l'elenco dei modelli forniti da Google.