Elabora i dati in blocco con Dataflow
Questa pagina fornisce esempi di come utilizzare Dataflow per eseguire operazioni Firestore in blocco in una pipeline Apache Beam. Apache Beam supporta un connettore per Firestore. Puoi utilizzare questo collegatore per eseguire operazioni in batch e in streaming in Dataflow.
Consigliamo di utilizzare Dataflow e Apache Beam per dati su larga scala per l'elaborazione dei carichi di lavoro.
Il connettore Firestore per Apache Beam è disponibile in Java. Per ulteriori informazioni sul connettore Firestore, consulta l'SDK Apache Beam per Java.
Prima di iniziare
Prima di leggere questa pagina, dovresti conoscere le Modello di programmazione per Apache Beam.
Per eseguire i sample, devi attivare l'API Dataflow:
Pipeline di Firestore di esempio
Gli esempi seguenti mostrano una pipeline che scrive i dati e una che legge e filtra i dati. Puoi utilizzare questi esempi come punto di partenza per le tue pipeline.
Esecuzione delle pipeline di esempio
Il codice sorgente per gli esempi è disponibile nella Repository GitHub googleapis/java-firestore. Per eseguire questi esempi, scarica il codice sorgente e consulta il file README.
Esempio di pipeline Write
L'esempio seguente crea documenti nella raccolta cities-beam-sample
:
public class ExampleFirestoreBeamWrite { private static final FirestoreOptions FIRESTORE_OPTIONS = FirestoreOptions.getDefaultInstance(); public static void main(String[] args) { runWrite(args, "cities-beam-sample"); } public static void runWrite(String[] args, String collectionId) { // create pipeline options from the passed in arguments PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class); Pipeline pipeline = Pipeline.create(options); RpcQosOptions rpcQosOptions = RpcQosOptions.newBuilder() .withHintMaxNumWorkers(options.as(DataflowPipelineOptions.class).getMaxNumWorkers()) .build(); // create some writes Write write1 = Write.newBuilder() .setUpdate( Document.newBuilder() // resolves to // projects/<projectId>/databases/<databaseId>/documents/<collectionId>/NYC .setName(createDocumentName(collectionId, "NYC")) .putFields("name", Value.newBuilder().setStringValue("New York City").build()) .putFields("state", Value.newBuilder().setStringValue("New York").build()) .putFields("country", Value.newBuilder().setStringValue("USA").build())) .build(); Write write2 = Write.newBuilder() .setUpdate( Document.newBuilder() // resolves to // projects/<projectId>/databases/<databaseId>/documents/<collectionId>/TOK .setName(createDocumentName(collectionId, "TOK")) .putFields("name", Value.newBuilder().setStringValue("Tokyo").build()) .putFields("country", Value.newBuilder().setStringValue("Japan").build()) .putFields("capital", Value.newBuilder().setBooleanValue(true).build())) .build(); // batch write the data pipeline .apply(Create.of(write1, write2)) .apply(FirestoreIO.v1().write().batchWrite().withRpcQosOptions(rpcQosOptions).build()); // run the pipeline pipeline.run().waitUntilFinish(); } private static String createDocumentName(String collectionId, String cityDocId) { String documentPath = String.format( "projects/%s/databases/%s/documents", FIRESTORE_OPTIONS.getProjectId(), FIRESTORE_OPTIONS.getDatabaseId()); return documentPath + "/" + collectionId + "/" + cityDocId; } }
L'esempio utilizza i seguenti argomenti per configurare ed eseguire una pipeline:
GOOGLE_CLOUD_PROJECT=project-id REGION=region TEMP_LOCATION=gs://temp-bucket/temp/ NUM_WORKERS=number-workers MAX_NUM_WORKERS=max-number-workers
Pipeline Read
di esempio
La pipeline di esempio seguente legge i documenti da cities-beam-sample
raccolta, applica un filtro per i documenti in cui il campo country
è impostato su
USA
, mentre restituisce i nomi dei documenti corrispondenti.
public class ExampleFirestoreBeamRead { public static void main(String[] args) { runRead(args, "cities-beam-sample"); } public static void runRead(String[] args, String collectionId) { FirestoreOptions firestoreOptions = FirestoreOptions.getDefaultInstance(); PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class); Pipeline pipeline = Pipeline.create(options); RpcQosOptions rpcQosOptions = RpcQosOptions.newBuilder() .withHintMaxNumWorkers(options.as(DataflowPipelineOptions.class).getMaxNumWorkers()) .build(); pipeline .apply(Create.of(collectionId)) .apply( new FilterDocumentsQuery( firestoreOptions.getProjectId(), firestoreOptions.getDatabaseId())) .apply(FirestoreIO.v1().read().runQuery().withRpcQosOptions(rpcQosOptions).build()) .apply( ParDo.of( // transform each document to its name new DoFn<RunQueryResponse, String>() { @ProcessElement public void processElement(ProcessContext c) { c.output(Objects.requireNonNull(c.element()).getDocument().getName()); } })) .apply( ParDo.of( // print the document name new DoFn<String, Void>() { @ProcessElement public void processElement(ProcessContext c) { System.out.println(c.element()); } })); pipeline.run().waitUntilFinish(); } private static final class FilterDocumentsQuery extends PTransform<PCollection<String>, PCollection<RunQueryRequest>> { private final String projectId; private final String databaseId; public FilterDocumentsQuery(String projectId, String databaseId) { this.projectId = projectId; this.databaseId = databaseId; } @Override public PCollection<RunQueryRequest> expand(PCollection<String> input) { return input.apply( ParDo.of( new DoFn<String, RunQueryRequest>() { @ProcessElement public void processElement(ProcessContext c) { // select from collection "cities-collection-<uuid>" StructuredQuery.CollectionSelector collection = StructuredQuery.CollectionSelector.newBuilder() .setCollectionId(Objects.requireNonNull(c.element())) .build(); // filter where country is equal to USA StructuredQuery.Filter countryFilter = StructuredQuery.Filter.newBuilder() .setFieldFilter( StructuredQuery.FieldFilter.newBuilder() .setField( StructuredQuery.FieldReference.newBuilder() .setFieldPath("country") .build()) .setValue(Value.newBuilder().setStringValue("USA").build()) .setOp(StructuredQuery.FieldFilter.Operator.EQUAL)) .buildPartial(); RunQueryRequest runQueryRequest = RunQueryRequest.newBuilder() .setParent(DocumentRootName.format(projectId, databaseId)) .setStructuredQuery( StructuredQuery.newBuilder() .addFrom(collection) .setWhere(countryFilter) .build()) .build(); c.output(runQueryRequest); } })); } } }
L'esempio utilizza i seguenti argomenti per configurare ed eseguire una pipeline:
GOOGLE_CLOUD_PROJECT=project-id REGION=region TEMP_LOCATION=gs://temp-bucket/temp/ NUM_WORKERS=number-workers MAX_NUM_WORKERS=max-number-workers
Prezzi
L'esecuzione di un carico di lavoro Firestore in Dataflow comporta dei costi per l'utilizzo di Firestore e di Dataflow. L'utilizzo di Dataflow viene fatturato in base alle risorse utilizzate dai tuoi job. Per informazioni dettagliate, consulta la pagina dei prezzi di Dataflow. Per i prezzi di Firestore, consulta Pagina dei prezzi.
Passaggi successivi
- Per un altro esempio di pipeline, consulta Utilizzare Firestore e Apache Beam per l'elaborazione dei dati.
- Per ulteriori informazioni su Dataflow e Apache Beam, consulta la documentazione di Dataflow.