Membaca dari Cloud Storage ke Dataflow

Untuk membaca data dari Cloud Storage ke Dataflow, gunakan konektor I/O TextIO atau AvroIO Apache Beam.

Menyertakan dependensi library Google Cloud

Untuk menggunakan konektor TextIO atau AvroIO dengan Cloud Storage, sertakan dependensi berikut. Library ini menyediakan pengendali skema untuk nama file "gs://".

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam.version}</version>
</dependency>

Python

apache-beam[gcp]==VERSION

Go

import _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"

Untuk informasi selengkapnya, lihat Menginstal Apache Beam SDK.

Mengaktifkan gRPC di konektor I/O Apache Beam di Dataflow

Anda dapat terhubung ke Cloud Storage menggunakan gRPC melalui konektor I/O Apache Beam di Dataflow. gRPC adalah framework panggilan prosedur jarak jauh (RPC) open source berperforma tinggi yang dikembangkan oleh Google dan dapat Anda gunakan untuk berinteraksi dengan Cloud Storage.

Untuk mempercepat permintaan baca tugas Dataflow ke Cloud Storage, Anda dapat mengaktifkan konektor I/O Apache Beam di Dataflow untuk menggunakan gRPC.

Command line

  1. Pastikan Anda menggunakan Apache Beam SDK versi 2.55.0 atau yang lebih baru.
  2. Untuk menjalankan tugas Dataflow, gunakan opsi pipeline --additional-experiments=use_grpc_for_gcs. Untuk informasi tentang berbagai opsi pipeline, lihat Flag opsional.

Apache Beam SDK

  1. Pastikan Anda menggunakan Apache Beam SDK versi 2.55.0 atau yang lebih baru.
  2. Untuk menjalankan tugas Dataflow, gunakan opsi pipeline --experiments=use_grpc_for_gcs. Untuk informasi tentang berbagai opsi pipeline, lihat Opsi dasar.

Anda dapat mengonfigurasi konektor I/O Apache Beam di Dataflow untuk menghasilkan metrik terkait gRPC di Cloud Monitoring. Metrik terkait gRPC dapat membantu Anda melakukan hal berikut:

  • Pantau dan optimalkan performa permintaan gRPC ke Cloud Storage.
  • Memecahkan masalah dan men-debug masalah.
  • Dapatkan insight tentang penggunaan dan perilaku aplikasi Anda.

Untuk informasi tentang cara mengonfigurasi konektor I/O Apache Beam di Dataflow untuk menghasilkan metrik terkait gRPC, lihat Menggunakan metrik sisi klien. Jika pengumpulan metrik tidak diperlukan untuk kasus penggunaan Anda, Anda dapat memilih untuk tidak ikut pengumpulan metrik. Untuk mengetahui petunjuknya, lihat Memilih tidak menggunakan metrik sisi klien.

Keparalelan

Konektor TextIO dan AvroIO mendukung dua tingkat paralelisme:

  • Setiap file diberi kunci secara terpisah, sehingga beberapa pekerja dapat membacanya.
  • Jika file tidak dikompresi, konektor dapat membaca sub-rentang setiap file secara terpisah, sehingga menghasilkan tingkat paralelisme yang sangat tinggi. Pemisahan ini hanya dapat dilakukan jika setiap baris dalam file adalah kumpulan data yang bermakna. Misalnya, tidak tersedia secara default untuk file JSON.

Performa

Tabel berikut menunjukkan metrik performa untuk membaca dari Cloud Storage. Beban kerja dijalankan di satu pekerja e2-standard2, menggunakan Apache Beam SDK 2.49.0 untuk Java. Mereka tidak menggunakan Runner v2.

100 juta data | 1 kB | 1 kolom Throughput (byte) Throughput (elemen)
Melihat 320 MBps 320.000 elemen per detik

Metrik ini didasarkan pada pipeline batch sederhana. Pengujian ini dimaksudkan untuk membandingkan performa antara konektor I/O, dan tidak selalu mewakili pipeline di dunia nyata. Performa pipeline Dataflow bersifat kompleks, dan merupakan fungsi dari jenis VM, data yang sedang diproses, performa sumber dan sink eksternal, serta kode pengguna. Metrik didasarkan pada menjalankan Java SDK, dan tidak mewakili karakteristik performa SDK bahasa lainnya. Untuk mengetahui informasi selengkapnya, lihat Performa Beam IO.

Praktik terbaik

  • Hindari penggunaan watchForNewFiles dengan Cloud Storage. Pendekatan ini tidak dapat diskalakan dengan baik untuk pipeline produksi yang besar, karena konektor harus menyimpan daftar file yang dilihat dalam memori. Daftar tidak dapat dihapus dari memori, yang mengurangi memori kerja pekerja seiring waktu. Sebaiknya gunakan notifikasi Pub/Sub untuk Cloud Storage. Untuk informasi selengkapnya, lihat Pola pemrosesan file.

  • Jika nama file dan konten file adalah data yang berguna, gunakan class FileIO untuk membaca nama file. Misalnya, nama file mungkin berisi metadata yang berguna saat memproses data dalam file. Untuk informasi selengkapnya, lihat Mengakses nama file. Dokumentasi FileIO juga menunjukkan contoh pola ini.

Contoh

Contoh berikut menunjukkan cara membaca dari Cloud Storage.

Java

Untuk melakukan autentikasi ke Dataflow, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;

public class ReadFromStorage {
  public static Pipeline createPipeline(Options options) {
    var pipeline = Pipeline.create(options);
    pipeline
        // Read from a text file.
        .apply(TextIO.read().from(
            "gs://" + options.getBucket() + "/*.txt"))
        .apply(
            MapElements.into(TypeDescriptors.strings())
                .via(
                    (x -> {
                      System.out.println(x);
                      return x;
                    })));
    return pipeline;
  }
}

Langkah selanjutnya