Dataflow로 일괄 데이터 처리
이 페이지에서는 Dataflow를 사용하여 Apache Beam 파이프라인에서 일괄 Firestore 작업을 수행하는 방법에 대한 예시를 보여줍니다. Apache Beam은 Firestore용 커넥터를 지원합니다. 이 커넥터를 사용하여 Dataflow에서 일괄 및 스트리밍 작업을 실행할 수 있습니다.
대규모 데이터 처리 워크로드에는 Dataflow 및 Apache Beam을 사용하는 것이 좋습니다.
Apache Beam용 Firestore 커넥터는 자바로 제공됩니다. Firestore 커넥터에 대한 자세한 내용은 자바용 Apache Beam SDK를 참조하세요.
시작하기 전에
이 페이지를 읽기 전에 Apache Beam을 위한 프로그래밍 모델을 숙지해야 합니다.
샘플을 실행하려면 Dataflow API를 사용 설정해야 합니다.
Firestore 파이프라인 예시
아래 예시는 데이터를 기록하는 파이프라인과 데이터를 읽고 필터링하는 파이프라인을 보여줍니다. 이 샘플을 자체 파이프라인의 시작점으로 사용할 수 있습니다.
샘플 파이프라인 실행
샘플의 소스 코드는 googleapis/java-firestore GitHub 저장소에서 제공됩니다. 다음 샘플을 실행하려면 소스 코드를 다운로드하고 리드미를 참조하세요.
Write
파이프라인 예시
다음은 cities-beam-sample
컬렉션에 문서를 만드는 예시입니다.
public class ExampleFirestoreBeamWrite { private static final FirestoreOptions FIRESTORE_OPTIONS = FirestoreOptions.getDefaultInstance(); public static void main(String[] args) { runWrite(args, "cities-beam-sample"); } public static void runWrite(String[] args, String collectionId) { // create pipeline options from the passed in arguments PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class); Pipeline pipeline = Pipeline.create(options); RpcQosOptions rpcQosOptions = RpcQosOptions.newBuilder() .withHintMaxNumWorkers(options.as(DataflowPipelineOptions.class).getMaxNumWorkers()) .build(); // create some writes Write write1 = Write.newBuilder() .setUpdate( Document.newBuilder() // resolves to // projects/<projectId>/databases/<databaseId>/documents/<collectionId>/NYC .setName(createDocumentName(collectionId, "NYC")) .putFields("name", Value.newBuilder().setStringValue("New York City").build()) .putFields("state", Value.newBuilder().setStringValue("New York").build()) .putFields("country", Value.newBuilder().setStringValue("USA").build())) .build(); Write write2 = Write.newBuilder() .setUpdate( Document.newBuilder() // resolves to // projects/<projectId>/databases/<databaseId>/documents/<collectionId>/TOK .setName(createDocumentName(collectionId, "TOK")) .putFields("name", Value.newBuilder().setStringValue("Tokyo").build()) .putFields("country", Value.newBuilder().setStringValue("Japan").build()) .putFields("capital", Value.newBuilder().setBooleanValue(true).build())) .build(); // batch write the data pipeline .apply(Create.of(write1, write2)) .apply(FirestoreIO.v1().write().batchWrite().withRpcQosOptions(rpcQosOptions).build()); // run the pipeline pipeline.run().waitUntilFinish(); } private static String createDocumentName(String collectionId, String cityDocId) { String documentPath = String.format( "projects/%s/databases/%s/documents", FIRESTORE_OPTIONS.getProjectId(), FIRESTORE_OPTIONS.getDatabaseId()); return documentPath + "/" + collectionId + "/" + cityDocId; } }
이 예시에서는 다음 인수를 사용하여 파이프라인을 구성하고 실행합니다.
GOOGLE_CLOUD_PROJECT=project-id REGION=region TEMP_LOCATION=gs://temp-bucket/temp/ NUM_WORKERS=number-workers MAX_NUM_WORKERS=max-number-workers
Read
파이프라인 예시
다음은 cities-beam-sample
컬렉션에서 문서를 읽고, country
필드가 USA
로 설정된 문서에 대해 필터를 적용하고, 일치하는 문서 이름을 반환하는 예시 파이프라인입니다.
public class ExampleFirestoreBeamRead { public static void main(String[] args) { runRead(args, "cities-beam-sample"); } public static void runRead(String[] args, String collectionId) { FirestoreOptions firestoreOptions = FirestoreOptions.getDefaultInstance(); PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class); Pipeline pipeline = Pipeline.create(options); RpcQosOptions rpcQosOptions = RpcQosOptions.newBuilder() .withHintMaxNumWorkers(options.as(DataflowPipelineOptions.class).getMaxNumWorkers()) .build(); pipeline .apply(Create.of(collectionId)) .apply( new FilterDocumentsQuery( firestoreOptions.getProjectId(), firestoreOptions.getDatabaseId())) .apply(FirestoreIO.v1().read().runQuery().withRpcQosOptions(rpcQosOptions).build()) .apply( ParDo.of( // transform each document to its name new DoFn<RunQueryResponse, String>() { @ProcessElement public void processElement(ProcessContext c) { c.output(Objects.requireNonNull(c.element()).getDocument().getName()); } })) .apply( ParDo.of( // print the document name new DoFn<String, Void>() { @ProcessElement public void processElement(ProcessContext c) { System.out.println(c.element()); } })); pipeline.run().waitUntilFinish(); } private static final class FilterDocumentsQuery extends PTransform<PCollection<String>, PCollection<RunQueryRequest>> { private final String projectId; private final String databaseId; public FilterDocumentsQuery(String projectId, String databaseId) { this.projectId = projectId; this.databaseId = databaseId; } @Override public PCollection<RunQueryRequest> expand(PCollection<String> input) { return input.apply( ParDo.of( new DoFn<String, RunQueryRequest>() { @ProcessElement public void processElement(ProcessContext c) { // select from collection "cities-collection-<uuid>" StructuredQuery.CollectionSelector collection = StructuredQuery.CollectionSelector.newBuilder() .setCollectionId(Objects.requireNonNull(c.element())) .build(); // filter where country is equal to USA StructuredQuery.Filter countryFilter = StructuredQuery.Filter.newBuilder() .setFieldFilter( StructuredQuery.FieldFilter.newBuilder() .setField( StructuredQuery.FieldReference.newBuilder() .setFieldPath("country") .build()) .setValue(Value.newBuilder().setStringValue("USA").build()) .setOp(StructuredQuery.FieldFilter.Operator.EQUAL)) .buildPartial(); RunQueryRequest runQueryRequest = RunQueryRequest.newBuilder() .setParent(DocumentRootName.format(projectId, databaseId)) .setStructuredQuery( StructuredQuery.newBuilder() .addFrom(collection) .setWhere(countryFilter) .build()) .build(); c.output(runQueryRequest); } })); } } }
이 예시에서는 다음 인수를 사용하여 파이프라인을 구성하고 실행합니다.
GOOGLE_CLOUD_PROJECT=project-id REGION=region TEMP_LOCATION=gs://temp-bucket/temp/ NUM_WORKERS=number-workers MAX_NUM_WORKERS=max-number-workers
가격 책정
Dataflow에서 Firestore 워크로드를 실행하면 Firestore 사용량 및 Dataflow 사용에 대한 비용이 발생합니다. Dataflow 사용에 대해서는 작업에 사용되는 리소스 비용이 청구됩니다. 자세한 내용은 Dataflow 가격 책정 페이지를 참조하세요. Firestore 가격 책정에 대해서는 가격 책정 페이지를 참조하세요.
다음 단계
- 다른 파이프라인 예시는 데이터 처리를 위해 Firestore 및 Apache Beam 사용을 참조하세요.
- Dataflow 및 Apache Beam에 대한 자세한 내용은 Dataflow 문서를 참조하세요.