Memproses data secara massal dengan Dataflow
Halaman ini memberikan contoh cara menggunakan Dataflow untuk melakukan operasi Firestore massal di pipeline Apache Beam. Apache Beam mendukung konektor untuk Firestore. Anda dapat menggunakan konektor ini untuk menjalankan operasi batch dan streaming di Dataflow.
Sebaiknya gunakan Dataflow dan Apache Beam untuk beban kerja pemrosesan data skala besar.
Konektor Firestore untuk Apache Beam tersedia di Java. Untuk selengkapnya tentang konektor Firestore, lihat Apache Beam SDK untuk Java.
Sebelum memulai
Sebelum membaca halaman ini, Anda harus sudah memahami Model pemrograman untuk Apache Beam.
Untuk menjalankan sampel, Anda harus mengaktifkan Dataflow API:
Contoh pipeline Firestore
Contoh di bawah menunjukkan pipeline yang menulis data serta pipeline yang membaca dan memfilter data. Anda dapat menggunakan contoh ini sebagai titik awal untuk pipeline Anda sendiri.
Menjalankan contoh pipeline
Kode sumber untuk contoh tersedia di repositori GitHub googleapis/java-firestore. Untuk menjalankan contoh ini, download kode sumber dan lihat README.
Contoh pipeline Write
Contoh berikut membuat dokumen dalam koleksi cities-beam-sample
:
public class ExampleFirestoreBeamWrite { private static final FirestoreOptions FIRESTORE_OPTIONS = FirestoreOptions.getDefaultInstance(); public static void main(String[] args) { runWrite(args, "cities-beam-sample"); } public static void runWrite(String[] args, String collectionId) { // create pipeline options from the passed in arguments PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class); Pipeline pipeline = Pipeline.create(options); RpcQosOptions rpcQosOptions = RpcQosOptions.newBuilder() .withHintMaxNumWorkers(options.as(DataflowPipelineOptions.class).getMaxNumWorkers()) .build(); // create some writes Write write1 = Write.newBuilder() .setUpdate( Document.newBuilder() // resolves to // projects/<projectId>/databases/<databaseId>/documents/<collectionId>/NYC .setName(createDocumentName(collectionId, "NYC")) .putFields("name", Value.newBuilder().setStringValue("New York City").build()) .putFields("state", Value.newBuilder().setStringValue("New York").build()) .putFields("country", Value.newBuilder().setStringValue("USA").build())) .build(); Write write2 = Write.newBuilder() .setUpdate( Document.newBuilder() // resolves to // projects/<projectId>/databases/<databaseId>/documents/<collectionId>/TOK .setName(createDocumentName(collectionId, "TOK")) .putFields("name", Value.newBuilder().setStringValue("Tokyo").build()) .putFields("country", Value.newBuilder().setStringValue("Japan").build()) .putFields("capital", Value.newBuilder().setBooleanValue(true).build())) .build(); // batch write the data pipeline .apply(Create.of(write1, write2)) .apply(FirestoreIO.v1().write().batchWrite().withRpcQosOptions(rpcQosOptions).build()); // run the pipeline pipeline.run().waitUntilFinish(); } private static String createDocumentName(String collectionId, String cityDocId) { String documentPath = String.format( "projects/%s/databases/%s/documents", FIRESTORE_OPTIONS.getProjectId(), FIRESTORE_OPTIONS.getDatabaseId()); return documentPath + "/" + collectionId + "/" + cityDocId; } }
Contoh ini menggunakan argumen berikut untuk mengonfigurasi dan menjalankan pipeline:
GOOGLE_CLOUD_PROJECT=project-id REGION=region TEMP_LOCATION=gs://temp-bucket/temp/ NUM_WORKERS=number-workers MAX_NUM_WORKERS=max-number-workers
Contoh Pipeline Read
Contoh pipeline berikut membaca dokumen dari koleksi cities-beam-sample
, menerapkan filter untuk dokumen dengan kolom country
ditetapkan ke USA
, dan menampilkan nama dokumen yang cocok.
public class ExampleFirestoreBeamRead { public static void main(String[] args) { runRead(args, "cities-beam-sample"); } public static void runRead(String[] args, String collectionId) { FirestoreOptions firestoreOptions = FirestoreOptions.getDefaultInstance(); PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class); Pipeline pipeline = Pipeline.create(options); RpcQosOptions rpcQosOptions = RpcQosOptions.newBuilder() .withHintMaxNumWorkers(options.as(DataflowPipelineOptions.class).getMaxNumWorkers()) .build(); pipeline .apply(Create.of(collectionId)) .apply( new FilterDocumentsQuery( firestoreOptions.getProjectId(), firestoreOptions.getDatabaseId())) .apply(FirestoreIO.v1().read().runQuery().withRpcQosOptions(rpcQosOptions).build()) .apply( ParDo.of( // transform each document to its name new DoFn<RunQueryResponse, String>() { @ProcessElement public void processElement(ProcessContext c) { c.output(Objects.requireNonNull(c.element()).getDocument().getName()); } })) .apply( ParDo.of( // print the document name new DoFn<String, Void>() { @ProcessElement public void processElement(ProcessContext c) { System.out.println(c.element()); } })); pipeline.run().waitUntilFinish(); } private static final class FilterDocumentsQuery extends PTransform<PCollection<String>, PCollection<RunQueryRequest>> { private final String projectId; private final String databaseId; public FilterDocumentsQuery(String projectId, String databaseId) { this.projectId = projectId; this.databaseId = databaseId; } @Override public PCollection<RunQueryRequest> expand(PCollection<String> input) { return input.apply( ParDo.of( new DoFn<String, RunQueryRequest>() { @ProcessElement public void processElement(ProcessContext c) { // select from collection "cities-collection-<uuid>" StructuredQuery.CollectionSelector collection = StructuredQuery.CollectionSelector.newBuilder() .setCollectionId(Objects.requireNonNull(c.element())) .build(); // filter where country is equal to USA StructuredQuery.Filter countryFilter = StructuredQuery.Filter.newBuilder() .setFieldFilter( StructuredQuery.FieldFilter.newBuilder() .setField( StructuredQuery.FieldReference.newBuilder() .setFieldPath("country") .build()) .setValue(Value.newBuilder().setStringValue("USA").build()) .setOp(StructuredQuery.FieldFilter.Operator.EQUAL)) .buildPartial(); RunQueryRequest runQueryRequest = RunQueryRequest.newBuilder() .setParent(DocumentRootName.format(projectId, databaseId)) .setStructuredQuery( StructuredQuery.newBuilder() .addFrom(collection) .setWhere(countryFilter) .build()) .build(); c.output(runQueryRequest); } })); } } }
Contoh ini menggunakan argumen berikut untuk mengonfigurasi dan menjalankan pipeline:
GOOGLE_CLOUD_PROJECT=project-id REGION=region TEMP_LOCATION=gs://temp-bucket/temp/ NUM_WORKERS=number-workers MAX_NUM_WORKERS=max-number-workers
Harga
Menjalankan workload Firestore di Dataflow akan menimbulkan biaya untuk penggunaan Firestore dan Dataflow. Penggunaan Dataflow dikenai biaya untuk resource yang digunakan oleh tugas Anda. Baca halaman harga Dataflow untuk mengetahui detailnya. Untuk mengetahui harga Firestore, lihat Halaman harga.
Langkah selanjutnya
- Lihat Menggunakan Firestore dan Apache Beam untuk pemrosesan data untuk contoh pipeline lainnya.
- Untuk informasi selengkapnya tentang Dataflow dan Apache Beam, lihat dokumentasi Dataflow.