Template Cloud Storage ke Elasticsearch

Template Cloud Storage ke Elasticsearch adalah pipeline batch yang membaca data dari file CSV yang disimpan di bucket Cloud Storage dan menulis data ke Elasticsearch sebagai dokumen JSON.

Persyaratan pipeline

  • Bucket Cloud Storage harus ada.
  • Host Elasticsearch di instance Google Cloud atau di Elasticsearch Cloud yang dapat diakses dari Dataflow harus ada.
  • Tabel BigQuery untuk output error harus ada.

Skema CSV

Jika file CSV berisi header, tetapkan parameter template containsHeaders ke true.

Jika tidak, buat file skema JSON yang mendeskripsikan data. Tentukan URI Cloud Storage dari file skema di parameter template jsonSchemaPath. Contoh berikut menunjukkan skema JSON:

[{"name":"id", "type":"text"}, {"name":"age", "type":"integer"}]

Atau, Anda dapat menyediakan fungsi yang ditentukan pengguna (UDF) yang mengurai teks CSV dan menghasilkan dokumen Elasticsearch.

Parameter template

Parameter Deskripsi
inputFileSpec Pola file Cloud Storage untuk menelusuri file CSV. Contoh: gs://mybucket/test-*.csv.
connectionUrl URL Elasticsearch dalam format https://hostname:[port] atau tentukan CloudID jika menggunakan Elastic Cloud.
apiKey Kunci API yang Dienkode Base64 yang digunakan untuk autentikasi.
index Indeks Elasticsearch tempat permintaan akan dikeluarkan, seperti my-index.
deadletterTable Tabel BigQuery Deadsheet yang akan menerima penyisipan yang gagal. Contoh: <your-project>:<your-dataset>.<your-table-name>.
containsHeaders (Opsional) Boolean untuk menunjukkan apakah header disertakan dalam CSV. false default.
delimiter (Opsional) Pemisah yang digunakan CSV. Contoh: ,
csvFormat (Opsional) Format CSV sesuai dengan format CSV Apache Commons. Default: Default.
jsonSchemaPath (Opsional) Jalur ke skema JSON. Default: null.
largeNumFiles (Opsional) Tetapkan ke true jika jumlah file puluhan ribu. Default: false.
javascriptTextTransformGcsPath (Opsional) URI Cloud Storage dari file .js yang menentukan fungsi yang ditentukan pengguna (UDF) JavaScript yang ingin Anda gunakan. Misalnya, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Opsional) Nama fungsi yang ditentukan pengguna (UDF) JavaScript yang ingin Anda gunakan. Misalnya, jika kode fungsi JavaScript Anda adalah myTransform(inJson) { /*...do stuff...*/ }, nama fungsi adalah myTransform. Untuk contoh UDF JavaScript, lihat Contoh UDF.
batchSize (Opsional) Ukuran tumpukan dalam jumlah dokumen. Default: 1000.
batchSizeBytes (Opsional) Ukuran tumpukan dalam jumlah byte. Default: 5242880 (5mb).
maxRetryAttempts (Opsional) Percobaan ulang maksimum, harus > 0. Default: tidak ada percobaan ulang.
maxRetryDuration (Opsional) Durasi percobaan ulang maksimum dalam milidetik, harus > 0. Default: tidak ada percobaan ulang.
csvFileEncoding (Opsional) Encoding file CSV.
propertyAsIndex (Opsional) Properti dalam dokumen yang diindeks yang nilainya akan menentukan metadata _index untuk disertakan dengan dokumen dalam permintaan massal (lebih diutamakan daripada UDF _index). Default: tidak ada.
propertyAsId (Opsional) Properti dalam dokumen yang diindeks yang nilainya akan menentukan metadata _id untuk disertakan dengan dokumen dalam permintaan massal (lebih diutamakan daripada UDF _id). Default: tidak ada.
javaScriptIndexFnGcsPath (Opsional) Jalur Cloud Storage ke sumber UDF JavaScript untuk fungsi yang akan menentukan metadata _index yang akan disertakan dengan dokumen dalam permintaan massal. Default: tidak ada.
javaScriptIndexFnName (Opsional) Nama fungsi UDF JavaScript untuk fungsi yang akan menentukan metadata _index untuk disertakan dengan dokumen dalam permintaan massal. Default: tidak ada.
javaScriptIdFnGcsPath (Opsional) Jalur Cloud Storage ke sumber UDF JavaScript untuk fungsi yang akan menentukan metadata _id yang akan disertakan dengan dokumen dalam permintaan massal. Default: tidak ada.
javaScriptIdFnName (Opsional) Nama fungsi UDF JavaScript untuk fungsi yang akan menentukan metadata _id untuk disertakan dengan dokumen dalam permintaan massal. Default: tidak ada.
javaScriptTypeFnGcsPath (Opsional) Jalur Cloud Storage ke sumber UDF JavaScript untuk fungsi yang akan menentukan metadata _type yang akan disertakan dengan dokumen dalam permintaan massal. Default: tidak ada.
javaScriptTypeFnName (Opsional) Nama fungsi UDF JavaScript untuk fungsi yang akan menentukan metadata _type untuk disertakan dengan dokumen dalam permintaan massal. Default: tidak ada.
javaScriptIsDeleteFnGcsPath (Opsional) Jalur Cloud Storage ke sumber UDF JavaScript untuk fungsi yang akan menentukan apakah dokumen harus dihapus, bukan disisipkan atau diperbarui. Fungsi ini harus menampilkan nilai string "true" atau "false". Default: tidak ada.
javaScriptIsDeleteFnName (Opsional) Nama fungsi UDF JavaScript untuk fungsi yang akan menentukan apakah dokumen harus dihapus, bukan disisipkan atau diperbarui. Fungsi ini harus menampilkan nilai string "true" atau "false". Default: tidak ada.
usePartialUpdate (Opsional) Apakah akan menggunakan update sebagian (update, bukan membuat atau mengindeks, mengizinkan dokumen parsial) dengan permintaan Elasticsearch. Default: false.
bulkInsertMethod (Opsional) Apakah akan menggunakan INDEX (indeks, mengizinkan pembaruan dan penyisipan) atau CREATE (membuat, error pada _id duplikat) dengan permintaan massal Elasticsearch. Default: CREATE.

Fungsi yang ditentukan pengguna (UDF)

Template ini mendukung fungsi yang ditentukan pengguna (UDF) di beberapa titik dalam pipeline, yang dijelaskan di bawah. Untuk mengetahui informasi selengkapnya, lihat Membuat fungsi yang ditentukan pengguna untuk template Dataflow.

Fungsi transformasi teks

Mengubah data CSV menjadi dokumen Elasticsearch.

Parameter template:

  • javascriptTextTransformGcsPath: URI Cloud Storage file JavaScript.
  • javascriptTextTransformFunctionName: nama fungsi JavaScript.

Spesifikasi fungsi:

  • Input: satu baris dari file CSV input.
  • Output: dokumen JSON string yang akan disisipkan ke Elasticsearch.

Fungsi indeks

Menampilkan indeks yang memiliki dokumen tersebut.

Parameter template:

  • javaScriptIndexFnGcsPath: URI Cloud Storage file JavaScript.
  • javaScriptIndexFnName: nama fungsi JavaScript.

Spesifikasi fungsi:

  • Input: dokumen Elasticsearch, yang diserialisasi sebagai string JSON.
  • Output: nilai kolom metadata _index dokumen.

Fungsi ID dokumen

Menampilkan ID dokumen.

Parameter template:

  • javaScriptIdFnGcsPath: URI Cloud Storage file JavaScript.
  • javaScriptIdFnName: nama fungsi JavaScript.

Spesifikasi fungsi:

  • Input: dokumen Elasticsearch, yang diserialisasi sebagai string JSON.
  • Output: nilai kolom metadata _id dokumen.

Fungsi penghapusan dokumen

Menentukan apakah akan menghapus dokumen. Untuk menggunakan fungsi ini, tetapkan mode sisipkan massal ke INDEX dan sediakan fungsi ID dokumen.

Parameter template:

  • javaScriptIsDeleteFnGcsPath: URI Cloud Storage file JavaScript.
  • javaScriptIsDeleteFnName: nama fungsi JavaScript.

Spesifikasi fungsi:

  • Input: dokumen Elasticsearch, yang diserialisasi sebagai string JSON.
  • Output: menampilkan string "true" untuk menghapus dokumen, atau "false" untuk memperbarui dan menyisipkan dokumen.

Fungsi jenis pemetaan

Menampilkan jenis pemetaan dokumen.

Parameter template:

  • javaScriptTypeFnGcsPath: URI Cloud Storage file JavaScript.
  • javaScriptTypeFnName: nama fungsi JavaScript.

Spesifikasi fungsi:

  • Input: dokumen Elasticsearch, yang diserialisasi sebagai string JSON.
  • Output: nilai kolom metadata _type dokumen.

Menjalankan template

Konsol

  1. Buka halaman Create job from template Dataflow.
  2. Buka Buat tugas dari template
  3. Di kolom Job name, masukkan nama pekerjaan yang unik.
  4. Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region default-nya adalah us-central1.

    Untuk daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.

  5. Dari menu drop-down Dataflow template, pilih the Cloud Storage to Elasticsearch template.
  6. Di kolom parameter yang disediakan, masukkan parameter value Anda.
  7. Klik Run job.

gcloud

Di shell atau terminal Anda, jalankan template:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID\
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_to_Elasticsearch \
    --parameters \
inputFileSpec=INPUT_FILE_SPEC,\
connectionUrl=CONNECTION_URL,\
apiKey=APIKEY,\
index=INDEX,\
deadletterTable=DEADLETTER_TABLE,\

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama pekerjaan unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • REGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow, misalnya us-central1
  • INPUT_FILE_SPEC: pola file Cloud Storage Anda.
  • CONNECTION_URL: URL Elasticsearch Anda.
  • APIKEY: kunci API yang dienkode base64 untuk autentikasi.
  • INDEX: indeks Elasticsearch Anda.
  • DEADLETTER_TABLE: tabel BigQuery Anda.

API

Untuk menjalankan template menggunakan REST API, kirim permintaan HTTP POST. Untuk informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputFileSpec": "INPUT_FILE_SPEC",
          "connectionUrl": "CONNECTION_URL",
          "apiKey": "APIKEY",
          "index": "INDEX",
          "deadletterTable": "DEADLETTER_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_to_Elasticsearch",
   }
}

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama pekerjaan unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • LOCATION: region tempat Anda ingin men-deploy tugas Dataflow, misalnya us-central1
  • INPUT_FILE_SPEC: pola file Cloud Storage Anda.
  • CONNECTION_URL: URL Elasticsearch Anda.
  • APIKEY: kunci API yang dienkode base64 untuk autentikasi.
  • INDEX: indeks Elasticsearch Anda.
  • DEADLETTER_TABLE: tabel BigQuery Anda.

Langkah selanjutnya