Google mette a disposizione una serie di modelli Dataflow open source. Per informazioni generali sui modelli, consulta Modelli di Dataflow. Per un elenco di tutti i modelli forniti da Google, consulta la Guida introduttiva ai modelli forniti da Google.
Questa guida documenta i modelli di utilità.
Conversione del formato file (Avro, Parquet, CSV)
Il modello di conversione del formato file è una pipeline batch che converte i file archiviati in Cloud Storage da un formato supportato a un altro.
Sono supportate le seguenti conversioni di formato:
- CSV ad Avro
- Da CSV a Parquet
- Da Avro a Parquet
- Da Parquet ad Avro
Requisiti per questa pipeline:
- Il bucket Cloud Storage di output deve esistere prima dell'esecuzione della pipeline.
Parametri del modello
Parametro | Descrizione |
---|---|
inputFileFormat |
Il formato del file di input. Il valore deve essere [csv, avro, parquet] . |
outputFileFormat |
Il formato file di output. Il valore deve essere [avro, parquet] . |
inputFileSpec |
Il pattern del percorso di Cloud Storage per i file di input. Ad esempio, gs://bucket-name/path/*.csv |
outputBucket |
La cartella Cloud Storage per scrivere i file di output. Questo percorso deve terminare con una barra.
Ad esempio, gs://bucket-name/output/ |
schema |
Percorso Cloud Storage del file di schema Avro. Ad esempio, gs://bucket-name/schema/my-schema.avsc |
containsHeaders |
(Facoltativo) I file CSV di input contengono un record di intestazione (true/false). Il valore predefinito è false . Obbligatorio solo durante la lettura dei file CSV. |
csvFormat |
(Facoltativo) La specifica del formato CSV da utilizzare per analizzare i record. Il valore predefinito è Default .
Per saperne di più, consulta la pagina Formato CSV Apache. |
delimiter |
(Facoltativo) Il delimitatore di campo utilizzato dai file CSV di input. |
outputFilePrefix |
(Facoltativo) Il prefisso del file di output. Il valore predefinito è output . |
numShards |
(Facoltativo) Il numero di shard del file di output. |
Esecuzione del modello di conversione Formato file
Console
- Vai alla pagina Crea job da modello di Dataflow. Vai a Crea job da modello
- Nel campo Nome job, inserisci un nome univoco per il job.
- (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. L'endpoint della regione predefinito è
us-central1
.Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta le località di Dataflow.
- Dal menu a discesa Modello Dataflow, seleziona the Convert file formats template.
- Nei campi dei parametri forniti, inserisci i valori dei parametri.
- Fai clic su Esegui job.
gcloud
Nella shell o nel terminale, esegui il modello:
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/File_Format_Conversion \ --parameters \ inputFileFormat=INPUT_FORMAT,\ outputFileFormat=OUTPUT_FORMAT,\ inputFileSpec=INPUT_FILES,\ schema=SCHEMA,\ outputBucket=OUTPUT_FOLDER
Sostituisci quanto segue:
PROJECT_ID
: l'ID del progetto Cloud in cui vuoi eseguire il job DataflowJOB_NAME
: un nome job univoco a tua sceltaREGION_NAME
: l'endpoint a livello di area geografica in cui vuoi eseguire il deployment del tuo job Dataflow, ad esempious-central1
VERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare la versione più recente del modello, disponibile nella cartella principale senza data del bucket: gs://dataflow-templates/latest/- Il nome della versione, come
2021-09-20-00_RC00
, per utilizzare una versione specifica del modello, che può essere nidificata nella rispettiva cartella padre con data nel bucket: gs://dataflow-templates/
INPUT_FORMAT
: il formato del file di input; deve essere[csv, avro, parquet]
OUTPUT_FORMAT
: il formato file dei file di output; deve essere[avro, parquet]
INPUT_FILES
: pattern del percorso per i file di inputOUTPUT_FOLDER
: la tua cartella Cloud Storage per i file di outputSCHEMA
: percorso al file di schema Avro
API
Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per maggiori informazioni
sull'API e sui suoi ambiti di autorizzazione, consulta
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputFileFormat": "INPUT_FORMAT", "outputFileFormat": "OUTPUT_FORMAT", "inputFileSpec": "INPUT_FILES", "schema": "SCHEMA", "outputBucket": "OUTPUT_FOLDER" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/File_Format_Conversion", } }
Sostituisci quanto segue:
PROJECT_ID
: l'ID del progetto Cloud in cui vuoi eseguire il job DataflowJOB_NAME
: un nome job univoco a tua sceltaLOCATION
: l'endpoint a livello di area geografica in cui vuoi eseguire il deployment del tuo job Dataflow, ad esempious-central1
VERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare la versione più recente del modello, disponibile nella cartella principale senza data del bucket: gs://dataflow-templates/latest/- Il nome della versione, come
2021-09-20-00_RC00
, per utilizzare una versione specifica del modello, che può essere nidificata nella rispettiva cartella padre con data nel bucket: gs://dataflow-templates/
INPUT_FORMAT
: il formato del file di input; deve essere[csv, avro, parquet]
OUTPUT_FORMAT
: il formato file dei file di output; deve essere[avro, parquet]
INPUT_FILES
: pattern del percorso per i file di inputOUTPUT_FOLDER
: la tua cartella Cloud Storage per i file di outputSCHEMA
: percorso al file di schema Avro
Comprimi in blocco i file di Cloud Storage
Il modello di compressione collettiva dei file Cloud Storage è una pipeline batch che comprime i file in Cloud Storage in una posizione specificata. Questo modello può essere utile quando devi comprimere
gruppi di file di grandi dimensioni come parte di un processo di archiviazione periodico. Le modalità di compressione supportate sono:
BZIP2
, DEFLATE
e GZIP
.
I file in uscita nella posizione di destinazione seguiranno uno schema di denominazione del nome file originale aggiunto
con l'estensione della modalità di compressione. Le estensioni aggiunte saranno una delle seguenti:
.bzip2
, .deflate
, .gz
.
Requisiti per questa pipeline:
- La compressione deve essere in uno dei seguenti formati:
BZIP2
,DEFLATE
eGZIP
. - La directory di output deve esistere prima dell'esecuzione della pipeline.
Parametri del modello
Parametro | Descrizione |
---|---|
inputFilePattern |
Il pattern del file di input da cui leggere. Ad esempio, gs://bucket-name/uncompressed/*.txt . |
outputDirectory |
La posizione di output in cui scrivere. Ad esempio, gs://bucket-name/compressed/ . |
outputFailureFile |
Il file di output del log degli errori da utilizzare per gli errori di scrittura che si verificano durante il processo di compressione. Ad esempio, gs://bucket-name/compressed/failed.csv . Se non sono presenti errori, il file viene comunque creato, ma sarà vuoto. I contenuti del file sono in formato CSV (nome file, errore) e sono composti da una riga per ogni file che non viene compresso. |
compression |
L'algoritmo di compressione utilizzato per comprimere i file abbinati. Il valore deve essere uno dei seguenti:
BZIP2 , DEFLATE , GZIP
|
Esecuzione del modello di compressione collettiva dei file Cloud Storage
Console
- Vai alla pagina Crea job da modello di Dataflow. Vai a Crea job da modello
- Nel campo Nome job, inserisci un nome univoco per il job.
- (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. L'endpoint della regione predefinito è
us-central1
.Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta le località di Dataflow.
- Dal menu a discesa Modello Dataflow, seleziona the Bulk Compress Files on Cloud Storage template.
- Nei campi dei parametri forniti, inserisci i valori dei parametri.
- Fai clic su Esegui job.
gcloud
Nella shell o nel terminale, esegui il modello:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Bulk_Compress_GCS_Files \ --region REGION_NAME \ --parameters \ inputFilePattern=gs://BUCKET_NAME/uncompressed/*.txt,\ outputDirectory=gs://BUCKET_NAME/compressed,\ outputFailureFile=gs://BUCKET_NAME/failed/failure.csv,\ compression=COMPRESSION
Sostituisci quanto segue:
JOB_NAME
: un nome job univoco a tua sceltaREGION_NAME
: l'endpoint a livello di area geografica in cui vuoi eseguire il deployment del tuo job Dataflow, ad esempious-central1
VERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare la versione più recente del modello, disponibile nella cartella principale senza data del bucket: gs://dataflow-templates/latest/- Il nome della versione, come
2021-09-20-00_RC00
, per utilizzare una versione specifica del modello, che può essere nidificata nella rispettiva cartella padre con data nel bucket: gs://dataflow-templates/
BUCKET_NAME
: nome del bucket Cloud StorageCOMPRESSION
: la tua scelta di algoritmo di compressione
API
Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per maggiori informazioni
sull'API e sui suoi ambiti di autorizzazione, consulta
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Bulk_Compress_GCS_Files { "jobName": "JOB_NAME", "parameters": { "inputFilePattern": "gs://BUCKET_NAME/uncompressed/*.txt", "outputDirectory": "gs://BUCKET_NAME/compressed", "outputFailureFile": "gs://BUCKET_NAME/failed/failure.csv", "compression": "COMPRESSION" }, "environment": { "zone": "us-central1-f" } }
Sostituisci quanto segue:
PROJECT_ID
: l'ID del progetto Cloud in cui vuoi eseguire il job DataflowJOB_NAME
: un nome job univoco a tua sceltaLOCATION
: l'endpoint a livello di area geografica in cui vuoi eseguire il deployment del tuo job Dataflow, ad esempious-central1
VERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare la versione più recente del modello, disponibile nella cartella principale senza data del bucket: gs://dataflow-templates/latest/- Il nome della versione, come
2021-09-20-00_RC00
, per utilizzare una versione specifica del modello, che può essere nidificata nella rispettiva cartella padre con data nel bucket: gs://dataflow-templates/
BUCKET_NAME
: nome del bucket Cloud StorageCOMPRESSION
: la tua scelta di algoritmo di compressione
Decomprimi in blocco i file di Cloud Storage
Il modello di decompressione collettiva di Cloud Storage Files è una pipeline batch che decomprime i file su Cloud Storage in una posizione specificata. Questa funzionalità è utile quando vuoi utilizzare dati compressi per ridurre al minimo i costi della larghezza di banda di rete durante una migrazione, ma vuoi massimizzare la velocità di elaborazione analitica operando su dati non compressi dopo la migrazione. La pipeline gestisce automaticamente più modalità di compressione durante una singola esecuzione e determina la modalità di decompressione da utilizzare in base all'estensione del file (.bzip2
, .deflate
, .gz
, .zip
).
Requisiti per questa pipeline:
- I file da decomprimere devono avere uno dei seguenti formati:
Bzip2
,Deflate
,Gzip
eZip
. - La directory di output deve esistere prima dell'esecuzione della pipeline.
Parametri del modello
Parametro | Descrizione |
---|---|
inputFilePattern |
Il pattern del file di input da cui leggere. Ad esempio, gs://bucket-name/compressed/*.gz . |
outputDirectory |
La posizione di output in cui scrivere. Ad esempio, gs://bucket-name/decompressed . |
outputFailureFile |
Il file di output del log degli errori da utilizzare per gli errori di scrittura che si verificano durante il processo di decompressione. Ad esempio, gs://bucket-name/decompressed/failed.csv . Se non sono presenti errori, il file viene comunque creato, ma sarà vuoto. I contenuti del file sono in formato CSV (nome file, errore) e sono composti da una riga per ogni file che non viene decompresso. |
Esecuzione del modello di file decompresso Cloud Storage in blocco
Console
- Vai alla pagina Crea job da modello di Dataflow. Vai a Crea job da modello
- Nel campo Nome job, inserisci un nome univoco per il job.
- (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. L'endpoint della regione predefinito è
us-central1
.Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta le località di Dataflow.
- Dal menu a discesa Modello Dataflow, seleziona the Bulk Decompress Files on Cloud Storage template.
- Nei campi dei parametri forniti, inserisci i valori dei parametri.
- Fai clic su Esegui job.
gcloud
Nella shell o nel terminale, esegui il modello:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Bulk_Decompress_GCS_Files \ --region REGION_NAME \ --parameters \ inputFilePattern=gs://BUCKET_NAME/compressed/*.gz,\ outputDirectory=gs://BUCKET_NAME/decompressed,\ outputFailureFile=OUTPUT_FAILURE_FILE_PATH
Sostituisci quanto segue:
JOB_NAME
: un nome job univoco a tua sceltaREGION_NAME
: l'endpoint a livello di area geografica in cui vuoi eseguire il deployment del tuo job Dataflow, ad esempious-central1
VERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare la versione più recente del modello, disponibile nella cartella principale senza data del bucket: gs://dataflow-templates/latest/- Il nome della versione, come
2021-09-20-00_RC00
, per utilizzare una versione specifica del modello, che può essere nidificata nella rispettiva cartella padre con data nel bucket: gs://dataflow-templates/
BUCKET_NAME
: nome del bucket Cloud StorageOUTPUT_FAILURE_FILE_PATH
: la tua scelta di percorso nel file contenente le informazioni sull'errore
API
Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per maggiori informazioni
sull'API e sui suoi ambiti di autorizzazione, consulta
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Bulk_Decompress_GCS_Files { "jobName": "JOB_NAME", "parameters": { "inputFilePattern": "gs://BUCKET_NAME/compressed/*.gz", "outputDirectory": "gs://BUCKET_NAME/decompressed", "outputFailureFile": "OUTPUT_FAILURE_FILE_PATH" }, "environment": { "zone": "us-central1-f" } }
Sostituisci quanto segue:
PROJECT_ID
: l'ID del progetto Cloud in cui vuoi eseguire il job DataflowJOB_NAME
: un nome job univoco a tua sceltaLOCATION
: l'endpoint a livello di area geografica in cui vuoi eseguire il deployment del tuo job Dataflow, ad esempious-central1
VERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare la versione più recente del modello, disponibile nella cartella principale senza data del bucket: gs://dataflow-templates/latest/- Il nome della versione, come
2021-09-20-00_RC00
, per utilizzare una versione specifica del modello, che può essere nidificata nella rispettiva cartella padre con data nel bucket: gs://dataflow-templates/
BUCKET_NAME
: nome del bucket Cloud StorageOUTPUT_FAILURE_FILE_PATH
: la tua scelta di percorso nel file contenente le informazioni sull'errore
Eliminazione collettiva Datastore [deprecata]
Questo modello è deprecato e verrà rimosso nel primo trimestre del 2022. Esegui la migrazione al modello Eliminazione collettiva Firestore.
Il modello di eliminazione collettiva di Datastore è una pipeline che legge in Entità da Datastore con una determinata query GQL e poi elimina tutte le Entità corrispondenti nel progetto di destinazione selezionato. La pipeline può facoltativamente trasferire le entità Datastore codificate JSON all'UDF JavaScript, che puoi utilizzare per filtrare le entità restituendo valori nulli.
Requisiti per questa pipeline:
- Datastore deve essere configurato nel progetto prima di eseguire il modello.
- Se viene letta ed eliminata da istanze Datastore separate, l'account di servizio worker di Dataflow deve disporre dell'autorizzazione per leggere da un'istanza ed eliminare dall'altra.
Parametri del modello
Parametro | Descrizione |
---|---|
datastoreReadGqlQuery |
Query GQL che specifica a quali entità corrispondere per l'eliminazione. L'utilizzo di una query di sole chiavi può migliorare le prestazioni. Ad esempio: "SELECT__key__ FROM MyKind". |
datastoreReadProjectId |
ID progetto GCP dell'istanza Datastore da cui vuoi leggere le entità (utilizzando la query GQL) utilizzate per la corrispondenza. |
datastoreDeleteProjectId |
ID progetto GCP dell'istanza di Datastore da cui eliminare le entità corrispondenti. Può essere uguale a datastoreReadProjectId se vuoi leggere ed eliminare all'interno della stessa istanza di Datastore. |
datastoreReadNamespace |
(Facoltativo) Spazio dei nomi delle entità richieste. Imposta come "" per lo spazio dei nomi predefinito. |
datastoreHintNumWorkers |
(Facoltativo) Suggerimento per il numero previsto di worker nel passaggio di limitazione del datastore. Il valore predefinito è 500 . |
javascriptTextTransformGcsPath |
(Facoltativo)
L'URI Cloud Storage del file .js che definisce la funzione definita dall'utente JavaScript (UDF) che vuoi utilizzare. Ad esempio: gs://my-bucket/my-udfs/my_file.js .
|
javascriptTextTransformFunctionName |
(Facoltativo)
Il nome della funzione definita dall'utente JavaScript che vuoi utilizzare.
Ad esempio, se il codice della funzione JavaScript è myTransform(inJson) { /*...do stuff...*/ } , il nome della funzione è myTransform . Per un esempio di UDF JavaScript, consulta
Esempi di funzioni UDF.
Se questa funzione restituisce un valore non definito o nullo per
una determinata entità Datastore, l'entità non viene eliminata. |
Esecuzione del modello di eliminazione collettiva di Datastore
Console
- Vai alla pagina Crea job da modello di Dataflow. Vai a Crea job da modello
- Nel campo Nome job, inserisci un nome univoco per il job.
- (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. L'endpoint della regione predefinito è
us-central1
.Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta le località di Dataflow.
- Dal menu a discesa Modello Dataflow, seleziona the Bulk Delete Entities in Datastore template.
- Nei campi dei parametri forniti, inserisci i valori dei parametri.
- Fai clic su Esegui job.
gcloud
Nella shell o nel terminale, esegui il modello:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Datastore_to_Datastore_Delete \ --region REGION_NAME \ --parameters \ datastoreReadGqlQuery="GQL_QUERY",\ datastoreReadProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID,\ datastoreDeleteProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID
Sostituisci quanto segue:
JOB_NAME
: un nome job univoco a tua sceltaREGION_NAME
: l'endpoint a livello di area geografica in cui vuoi eseguire il deployment del tuo job Dataflow, ad esempious-central1
VERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare la versione più recente del modello, disponibile nella cartella principale senza data del bucket: gs://dataflow-templates/latest/- Il nome della versione, come
2021-09-20-00_RC00
, per utilizzare una versione specifica del modello, che può essere nidificata nella rispettiva cartella padre con data nel bucket: gs://dataflow-templates/
GQL_QUERY
: la query che utilizzerai per trovare una corrispondenza con le entità per l'eliminazioneDATASTORE_READ_AND_DELETE_PROJECT_ID
: l'ID progetto dell'istanza del datastore. Questo esempio legge ed elimina dalla stessa istanza Datastore.
API
Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per maggiori informazioni
sull'API e sui suoi ambiti di autorizzazione, consulta
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Datastore_to_Datastore_Delete { "jobName": "JOB_NAME", "parameters": { "datastoreReadGqlQuery": "GQL_QUERY", "datastoreReadProjectId": "DATASTORE_READ_AND_DELETE_PROJECT_ID", "datastoreDeleteProjectId": "DATASTORE_READ_AND_DELETE_PROJECT_ID" }, "environment": { "zone": "us-central1-f" } } }
Sostituisci quanto segue:
PROJECT_ID
: l'ID del progetto Cloud in cui vuoi eseguire il job DataflowJOB_NAME
: un nome job univoco a tua sceltaLOCATION
: l'endpoint a livello di area geografica in cui vuoi eseguire il deployment del tuo job Dataflow, ad esempious-central1
VERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare la versione più recente del modello, disponibile nella cartella principale senza data del bucket: gs://dataflow-templates/latest/- Il nome della versione, come
2021-09-20-00_RC00
, per utilizzare una versione specifica del modello, che può essere nidificata nella rispettiva cartella padre con data nel bucket: gs://dataflow-templates/
GQL_QUERY
: la query che utilizzerai per trovare una corrispondenza con le entità per l'eliminazioneDATASTORE_READ_AND_DELETE_PROJECT_ID
: l'ID progetto dell'istanza del datastore. Questo esempio legge ed elimina dalla stessa istanza Datastore.
Eliminazione collettiva Firestore
Il modello di eliminazione collettiva Firestore è una pipeline che legge in Entità da Firestore con una determinata query GQL e poi elimina tutte le Entità corrispondenti nel progetto di destinazione selezionato. La pipeline può facoltativamente trasferire le entità Firestore con codifica JSON all'UDF JavaScript, che puoi utilizzare per filtrare le entità restituendo valori nulli.
Requisiti per questa pipeline:
- Firestore deve essere configurato nel progetto prima di eseguire il modello.
- Se leggi ed elimina da istanze Firestore separate, l'account di servizio worker di Dataflow deve avere l'autorizzazione per leggere da un'istanza ed eliminare dall'altra.
Parametri del modello
Parametro | Descrizione |
---|---|
firestoreReadGqlQuery |
Query GQL che specifica a quali entità corrispondere per l'eliminazione. L'utilizzo di una query di sole chiavi può migliorare le prestazioni. Ad esempio: "SELECT__key__ FROM MyKind". |
firestoreReadProjectId |
ID progetto GCP dell'istanza Firestore da cui vuoi leggere le entità (utilizzando la query GQL) utilizzate per la corrispondenza. |
firestoreDeleteProjectId |
ID progetto GCP dell'istanza Firestore da cui eliminare le entità corrispondenti. Può essere uguale a firestoreReadProjectId se vuoi leggere ed eliminare all'interno della stessa istanza di Firestore. |
firestoreReadNamespace |
(Facoltativo) Spazio dei nomi delle entità richieste. Imposta come "" per lo spazio dei nomi predefinito. |
firestoreHintNumWorkers |
(Facoltativo) Suggerimento per il numero previsto di worker nel passaggio di limitazione dell'applicazione graduale di Firestore. Il valore predefinito è 500 . |
javascriptTextTransformGcsPath |
(Facoltativo)
L'URI Cloud Storage del file .js che definisce la funzione definita dall'utente JavaScript (UDF) che vuoi utilizzare. Ad esempio: gs://my-bucket/my-udfs/my_file.js .
|
javascriptTextTransformFunctionName |
(Facoltativo)
Il nome della funzione definita dall'utente JavaScript che vuoi utilizzare.
Ad esempio, se il codice della funzione JavaScript è myTransform(inJson) { /*...do stuff...*/ } , il nome della funzione è myTransform . Per un esempio di UDF JavaScript, consulta
Esempi di funzioni UDF.
Se questa funzione restituisce un valore non definito o nullo per
una determinata entità Firestore, l'entità non viene eliminata. |
Esecuzione del modello di eliminazione collettiva Firestore
Console
- Vai alla pagina Crea job da modello di Dataflow. Vai a Crea job da modello
- Nel campo Nome job, inserisci un nome univoco per il job.
- (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. L'endpoint della regione predefinito è
us-central1
.Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta le località di Dataflow.
- Dal menu a discesa Modello Dataflow, seleziona the Bulk Delete Entities in Firestore template.
- Nei campi dei parametri forniti, inserisci i valori dei parametri.
- Fai clic su Esegui job.
gcloud
Nella shell o nel terminale, esegui il modello:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Firestore_to_Firestore_Delete \ --region REGION_NAME \ --parameters \ firestoreReadGqlQuery="GQL_QUERY",\ firestoreReadProjectId=FIRESTORE_READ_AND_DELETE_PROJECT_ID,\ firestoreDeleteProjectId=FIRESTORE_READ_AND_DELETE_PROJECT_ID
Sostituisci quanto segue:
JOB_NAME
: un nome job univoco a tua sceltaREGION_NAME
: l'endpoint a livello di area geografica in cui vuoi eseguire il deployment del tuo job Dataflow, ad esempious-central1
VERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare la versione più recente del modello, disponibile nella cartella principale senza data del bucket: gs://dataflow-templates/latest/- Il nome della versione, come
2021-09-20-00_RC00
, per utilizzare una versione specifica del modello, che può essere nidificata nella rispettiva cartella padre con data nel bucket: gs://dataflow-templates/
GQL_QUERY
: la query che utilizzerai per trovare una corrispondenza con le entità per l'eliminazioneFIRESTORE_READ_AND_DELETE_PROJECT_ID
: il tuo ID progetto dell'istanza di Firestore. Questo esempio legge ed elimina dalla stessa istanza Firestore.
API
Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per maggiori informazioni
sull'API e sui suoi ambiti di autorizzazione, consulta
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Firestore_to_Firestore_Delete { "jobName": "JOB_NAME", "parameters": { "firestoreReadGqlQuery": "GQL_QUERY", "firestoreReadProjectId": "FIRESTORE_READ_AND_DELETE_PROJECT_ID", "firestoreDeleteProjectId": "FIRESTORE_READ_AND_DELETE_PROJECT_ID" }, "environment": { "zone": "us-central1-f" } } }
Sostituisci quanto segue:
PROJECT_ID
: l'ID del progetto Cloud in cui vuoi eseguire il job DataflowJOB_NAME
: un nome job univoco a tua sceltaLOCATION
: l'endpoint a livello di area geografica in cui vuoi eseguire il deployment del tuo job Dataflow, ad esempious-central1
VERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare la versione più recente del modello, disponibile nella cartella principale senza data del bucket: gs://dataflow-templates/latest/- Il nome della versione, come
2021-09-20-00_RC00
, per utilizzare una versione specifica del modello, che può essere nidificata nella rispettiva cartella padre con data nel bucket: gs://dataflow-templates/
GQL_QUERY
: la query che utilizzerai per trovare una corrispondenza con le entità per l'eliminazioneFIRESTORE_READ_AND_DELETE_PROJECT_ID
: il tuo ID progetto dell'istanza di Firestore. Questo esempio legge ed elimina dalla stessa istanza Firestore.
Streaming di flussi di dati su Pub/Sub, BigQuery e Cloud Storage
Il modello di generatore di dati in streaming viene utilizzato per generare un numero illimitato o fisso di record o messaggi sintetici in base allo schema fornito dall'utente alla frequenza specificata. Le destinazioni compatibili includono argomenti Pub/Sub, tabelle BigQuery e bucket Cloud Storage.
Di seguito sono riportati alcuni esempi di casi d'uso possibili:
- Simula la pubblicazione di eventi in tempo reale su larga scala in un argomento Pub/Sub per misurare e determinare il numero e le dimensioni dei consumatori necessari per elaborare gli eventi pubblicati.
- Genera dati sintetici in una tabella BigQuery o in un bucket Cloud Storage per valutare i benchmark di prestazioni o servire come proof of concept.
Sink e formati di codifica supportati
La tabella seguente descrive i sink e i formati di codifica supportati da questo modello:JSON | Avro | Parquet | |
---|---|---|---|
Pub/Sub | Sì | Yes | No |
BigQuery | Sì | No | No |
Cloud Storage | Sì | Sì | Sì |
La libreria Generatore di dati JSON utilizzata dalla pipeline consente di utilizzare varie funzioni faker per ogni campo dello schema. Per ulteriori informazioni sulle funzioni faker e sul formato dello schema, consulta la documentazione relativa a json-data-generator.
Requisiti per questa pipeline:
- Crea un file di schema dei messaggi e archivialo in una posizione di Cloud Storage.
- Il target di output deve esistere prima dell'esecuzione. La destinazione deve essere un argomento Pub/Sub, una tabella BigQuery o un bucket Cloud Storage a seconda del tipo di sink.
- Se la codifica di output è Avro o Parquet, crea un file di schema Avro e archivialo in un percorso Cloud Storage.
Parametri del modello
Parametro | Descrizione |
---|---|
schemaLocation |
Posizione del file di schema. Ad esempio: gs://mybucket/filename.json . |
qps |
Numero di messaggi da pubblicare al secondo. Ad esempio: 100 . |
sinkType |
(Facoltativo) Tipo di sink di output. I valori possibili sono PUBSUB , BIGQUERY , GCS . Il valore predefinito è PUBSUB. |
outputType |
(Facoltativo) Tipo di codifica di output. I valori possibili sono JSON , AVRO , PARQUET . Il valore predefinito è JSON. |
avroSchemaLocation |
(Facoltativo) Posizione del file di schema AVRO. Obbligatorio quando outputType è AVRO o PARQUET. Ad esempio: gs://mybucket/filename.avsc . |
topic |
(Facoltativo) Nome dell'argomento Pub/Sub in cui la pipeline deve pubblicare i dati. Obbligatorio quando sinkType è Pub/Sub. Ad esempio: . |
outputTableSpec |
(Facoltativo) Nome della tabella BigQuery di output. Obbligatorio quando sinkType è BigQuery. Ad esempio: . |
writeDisposition |
(Facoltativo) BigQuery Write Disposition. I valori possibili sono WRITE_APPEND , WRITE_EMPTY o WRITE_TRUNCATE . L'impostazione predefinita è WRITE_APPEND. |
outputDeadletterTable |
(Facoltativo) Nome della tabella BigQuery di output in cui conservare i record non riusciti. Se non viene fornito, la pipeline crea una tabella durante l'esecuzione con il nome {output_table_name}_error_records. Ad esempio: . |
outputDirectory |
(Facoltativo) Percorso della posizione di Cloud Storage di output. Obbligatorio quando sinkType è Cloud Storage. Ad esempio: gs://mybucket/pathprefix/ . |
outputFilenamePrefix |
(Facoltativo) Il prefisso del nome file dei file di output scritti in Cloud Storage. Il valore predefinito è output--. |
windowDuration |
(Facoltativo) Intervallo di finestra in cui l'output viene scritto in Cloud Storage. Il valore predefinito è 1 m (ovvero 1 minuto). |
numShards |
(Facoltativo) Numero massimo di shard dell'output. Obbligatorio quando sinkType è Cloud Storage e deve essere impostato su 1 o su un numero superiore. |
messagesLimit |
(Facoltativo) Numero massimo di messaggi di output. Il valore predefinito è 0 che indica illimitato. |
autoscalingAlgorithm |
(Facoltativo) Algoritmo utilizzato per la scalabilità automatica dei worker. I valori possibili sono THROUGHPUT_BASED per abilitare la scalabilità automatica o NONE per disabilitare. |
maxNumWorkers |
(Facoltativo) Numero massimo di macchine worker. Ad esempio: 10 . |
Esecuzione del modello di generatore di dati in streaming
Console
- Vai alla pagina Crea job da modello di Dataflow. Vai a Crea job da modello
- Nel campo Nome job, inserisci un nome univoco per il job.
- (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. L'endpoint della regione predefinito è
us-central1
.Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta le località di Dataflow.
- Dal menu a discesa Modello Dataflow, seleziona the Streaming Data Generator template.
- Nei campi dei parametri forniti, inserisci i valori dei parametri.
- Fai clic su Esegui job.
gcloud
Nella shell o nel terminale, esegui il modello:
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Streaming_Data_Generator \ --parameters \ schemaLocation=SCHEMA_LOCATION,\ qps=QPS,\ topic=PUBSUB_TOPIC
Sostituisci quanto segue:
PROJECT_ID
: l'ID del progetto Cloud in cui vuoi eseguire il job DataflowREGION_NAME
: l'endpoint a livello di area geografica in cui vuoi eseguire il deployment del tuo job Dataflow, ad esempious-central1
JOB_NAME
: un nome job univoco a tua sceltaVERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare la versione più recente del modello, disponibile nella cartella principale senza data del bucket: gs://dataflow-templates/latest/- Il nome della versione, come
2021-09-20-00_RC00
, per utilizzare una versione specifica del modello, che può essere nidificata nella rispettiva cartella padre con data nel bucket: gs://dataflow-templates/
SCHEMA_LOCATION
: percorso del file di schema in Cloud Storage. Ad esempio:gs://mybucket/filename.json
.QPS
: numero di messaggi da pubblicare al secondoPUBSUB_TOPIC
: l'argomento Pub/Sub di output. Ad esempio:projects/my-project-ID/topics/my-topic-ID
.
API
Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per maggiori informazioni
sull'API e sui suoi ambiti di autorizzazione, consulta
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "schemaLocation": "SCHEMA_LOCATION", "qps": "QPS", "topic": "PUBSUB_TOPIC" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Streaming_Data_Generator", } }
Sostituisci quanto segue:
PROJECT_ID
: l'ID del progetto Cloud in cui vuoi eseguire il job DataflowLOCATION
: l'endpoint a livello di area geografica in cui vuoi eseguire il deployment del tuo job Dataflow, ad esempious-central1
JOB_NAME
: un nome job univoco a tua sceltaVERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare la versione più recente del modello, disponibile nella cartella principale senza data del bucket: gs://dataflow-templates/latest/- Il nome della versione, come
2021-09-20-00_RC00
, per utilizzare una versione specifica del modello, che può essere nidificata nella rispettiva cartella padre con data nel bucket: gs://dataflow-templates/
SCHEMA_LOCATION
: percorso del file di schema in Cloud Storage. Ad esempio:gs://mybucket/filename.json
.QPS
: numero di messaggi da pubblicare al secondoPUBSUB_TOPIC
: l'argomento Pub/Sub di output. Ad esempio:projects/my-project-ID/topics/my-topic-ID
.