Cloud Storage Text to Datastore 템플릿은 Cloud Storage에 저장된 텍스트 파일을 읽고 JSON으로 인코딩된 항목을 Datastore에 쓰는 일괄 파이프라인입니다.
입력 텍스트 파일의 각 줄은 지정된 JSON 형식이어야 합니다.
파이프라인 요구사항
대상 프로젝트에서 데이터 저장소를 활성화해야 합니다.
템플릿 매개변수
매개변수
설명
textReadPattern
텍스트 데이터 파일의 위치를 지정하는 Cloud Storage 경로 패턴입니다.
예를 들면 gs://mybucket/somepath/*.json입니다.
javascriptTextTransformGcsPath
(선택사항)
사용할 자바스크립트 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
javascriptTextTransformFunctionName
(선택사항)
사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다.
예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 JavaScript UDF는 UDF 예시를 참조하세요.
datastoreWriteProjectId
Datastore 항목을 쓸 위치의 Google Cloud 프로젝트 ID입니다.
datastoreHintNumWorkers
(선택사항) Datastore 증가 제한 단계의 예상 작업자 수에 대한 힌트입니다. 기본값은 500입니다.
errorWritePath
처리 중에 발생하는 쓰기 오류에 대해 사용할 오류 로그 출력 파일입니다. 예를 들면 gs://bucket-name/errors.txt입니다.
JAVASCRIPT_FUNCTION:
사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다.
예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 JavaScript UDF는 UDF 예시를 참조하세요.
PATH_TO_JAVASCRIPT_UDF_FILE:
사용할 자바스크립트 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
ERROR_FILE_WRITE_PATH: Cloud Storage에서 오류 파일에 사용할 경로
템플릿 소스 코드
Java
/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import com.google.cloud.teleport.metadata.MultiTemplate;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.templates.TextToDatastore.TextToDatastoreOptions;
import com.google.cloud.teleport.templates.common.DatastoreConverters.DatastoreWriteOptions;
import com.google.cloud.teleport.templates.common.DatastoreConverters.WriteJsonEntities;
import com.google.cloud.teleport.templates.common.ErrorConverters.ErrorWriteOptions;
import com.google.cloud.teleport.templates.common.ErrorConverters.LogErrors;
import com.google.cloud.teleport.templates.common.FirestoreNestedValueProvider;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.JavascriptTextTransformerOptions;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.TransformTextViaJavascript;
import com.google.cloud.teleport.templates.common.TextConverters.FilesystemReadOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.values.TupleTag;
/**
* Dataflow template which reads from a Text Source and writes JSON encoded Entities into Datastore.
* The JSON is expected to be in the format of: <a
* href="https://cloud.google.com/datastore/docs/reference/rest/v1/Entity">Entity</a>
*
* <p>Check out <a
* href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_GCS_Text_to_Datastore.md">README
* Datastore</a> or <a
* href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_GCS_Text_to_Firestore.md">README
* Firestore</a> for instructions on how to use or modify this template.
*/
@MultiTemplate({
@Template(
name = "GCS_Text_to_Datastore",
category = TemplateCategory.LEGACY,
displayName = "Text Files on Cloud Storage to Datastore [Deprecated]",
description =
"The Cloud Storage Text to Datastore template is a batch pipeline that reads from text files stored in "
+ "Cloud Storage and writes JSON encoded Entities to Datastore. "
+ "Each line in the input text files must be in the <a href=\"https://cloud.google.com/datastore/docs/reference/rest/v1/Entity\">specified JSON format</a>.",
optionsClass = TextToDatastoreOptions.class,
skipOptions = {
"firestoreWriteProjectId",
"firestoreWriteEntityKind",
"firestoreWriteNamespace",
"firestoreHintNumWorkers",
"javascriptTextTransformReloadIntervalMinutes",
// This template doesn't use neither firestoreWriteEntityKind/Namespace
// nor datastoreWriteEntityKind/Namespace pipeline options.
"datastoreWriteEntityKind",
"datastoreWriteNamespace"
},
documentation =
"https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-to-datastore",
contactInformation = "https://cloud.google.com/support",
preview = true,
requirements = {"Datastore must be enabled in the destination project."}),
@Template(
name = "GCS_Text_to_Firestore",
category = TemplateCategory.BATCH,
displayName = "Text Files on Cloud Storage to Firestore (Datastore mode)",
description =
"The Cloud Storage Text to Firestore template is a batch pipeline that reads from text files stored in "
+ "Cloud Storage and writes JSON encoded Entities to Firestore. "
+ "Each line in the input text files must be in the <a href=\"https://cloud.google.com/datastore/docs/reference/rest/v1/Entity\">specified JSON format</a>.",
optionsClass = TextToDatastoreOptions.class,
skipOptions = {
"datastoreWriteProjectId",
"datastoreWriteEntityKind",
"datastoreWriteNamespace",
"datastoreHintNumWorkers",
"javascriptTextTransformReloadIntervalMinutes",
// This template doesn't use neither firestoreWriteEntityKind/Namespace
// nor datastoreWriteEntityKind/Namespace pipeline options.
"firestoreWriteEntityKind",
"firestoreWriteNamespace"
},
documentation =
"https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-to-firestore",
contactInformation = "https://cloud.google.com/support",
requirements = {"Firestore must be enabled in the destination project."})
})
public class TextToDatastore {
public static <T> ValueProvider<T> selectProvidedInput(
ValueProvider<T> datastoreInput, ValueProvider<T> firestoreInput) {
return new FirestoreNestedValueProvider(datastoreInput, firestoreInput);
}
/** TextToDatastore Pipeline Options. */
public interface TextToDatastoreOptions
extends PipelineOptions,
FilesystemReadOptions,
JavascriptTextTransformerOptions,
DatastoreWriteOptions,
ErrorWriteOptions {}
/**
* Runs a pipeline which reads from a Text Source, passes the Text to a Javascript UDF, writes the
* JSON encoded Entities to a TextIO sink.
*
* <p>If your Text Source does not contain JSON encoded Entities, then you'll need to supply a
* Javascript UDF which transforms your data to be JSON encoded Entities.
*
* @param args arguments to the pipeline
*/
public static void main(String[] args) {
TextToDatastoreOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(TextToDatastoreOptions.class);
TupleTag<String> errorTag = new TupleTag<String>("errors") {};
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(TextIO.read().from(options.getTextReadPattern()))
.apply(
TransformTextViaJavascript.newBuilder()
.setFileSystemPath(options.getJavascriptTextTransformGcsPath())
.setFunctionName(options.getJavascriptTextTransformFunctionName())
.build())
.apply(
WriteJsonEntities.newBuilder()
.setProjectId(
selectProvidedInput(
options.getDatastoreWriteProjectId(), options.getFirestoreWriteProjectId()))
.setHintNumWorkers(
selectProvidedInput(
options.getDatastoreHintNumWorkers(), options.getFirestoreHintNumWorkers()))
.setErrorTag(errorTag)
.build())
.apply(
LogErrors.newBuilder()
.setErrorWritePath(options.getErrorWritePath())
.setErrorTag(errorTag)
.build());
pipeline.run();
}
}