Mengalirkan pesan dari Pub/Sub menggunakan Dataflow

Dataflow adalah layanan terkelola sepenuhnya untuk mengubah dan memperkaya data dalam mode streaming (real-time) dan batch dengan keandalan dan ekspresi yang sama. Platform ini menyediakan lingkungan pengembangan pipeline yang disederhanakan menggunakan Apache Beam SDK, yang memiliki beragam rangkaian dasar windowing dan analisis sesi, serta ekosistem konektor sumber dan sink. Panduan memulai ini menunjukkan cara menggunakan Dataflow untuk:

  • Membaca pesan yang dipublikasikan ke topik Pub/Sub
  • Jendela (atau kelompokkan) pesan menurut stempel waktu
  • Menulis pesan ke Cloud Storage

Panduan memulai ini memperkenalkan penggunaan Dataflow dalam Java dan Python. SQL juga didukung. Panduan memulai ini juga ditawarkan sebagai tutorial Google Cloud Skills Boost yang menawarkan kredensial sementara untuk membantu Anda memulai.

Anda juga dapat memulai dengan menggunakan template Dataflow berbasis UI jika tidak ingin melakukan pemrosesan data kustom.

Sebelum memulai

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Buat atau pilih project Google Cloud.

    • Membuat project Google Cloud:

      gcloud projects create PROJECT_ID

      Ganti PROJECT_ID dengan nama untuk project Google Cloud yang Anda buat.

    • Pilih project Google Cloud yang Anda buat:

      gcloud config set project PROJECT_ID

      Ganti PROJECT_ID dengan nama project Google Cloud Anda.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Aktifkan API Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler:

    gcloud services enable dataflow.googleapis.com  compute.googleapis.com  logging.googleapis.com  storage-component.googleapis.com  storage-api.googleapis.com  pubsub.googleapis.com  cloudresourcemanager.googleapis.com  cloudscheduler.googleapis.com
  7. Menyiapkan autentikasi:

    1. Buat akun layanan:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Ganti SERVICE_ACCOUNT_NAME dengan nama untuk akun layanan.

    2. Memberikan peran ke akun layanan. Jalankan perintah berikut satu kali untuk setiap peran IAM berikut: roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Ganti kode berikut:

      • SERVICE_ACCOUNT_NAME: nama dari akun layanan.
      • PROJECT_ID: project ID dimana Anda membuat akun layanan
      • ROLE: peran yang akan diberikan
    3. Memberi Akun Google Anda peran yang memungkinkan Anda menggunakan peran akun layanan dan tambahkan akun layanan tersebut ke resource lain:

      gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser

      Ganti kode berikut:

      • SERVICE_ACCOUNT_NAME: nama dari akun layanan.
      • PROJECT_ID: project ID dimana Anda membuat akun layanan
      • USER_EMAIL: alamat email untuk Akun Google Anda
  8. Install the Google Cloud CLI.
  9. To initialize the gcloud CLI, run the following command:

    gcloud init
  10. Buat atau pilih project Google Cloud.

    • Membuat project Google Cloud:

      gcloud projects create PROJECT_ID

      Ganti PROJECT_ID dengan nama untuk project Google Cloud yang Anda buat.

    • Pilih project Google Cloud yang Anda buat:

      gcloud config set project PROJECT_ID

      Ganti PROJECT_ID dengan nama project Google Cloud Anda.

  11. Make sure that billing is enabled for your Google Cloud project.

  12. Aktifkan API Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler:

    gcloud services enable dataflow.googleapis.com  compute.googleapis.com  logging.googleapis.com  storage-component.googleapis.com  storage-api.googleapis.com  pubsub.googleapis.com  cloudresourcemanager.googleapis.com  cloudscheduler.googleapis.com
  13. Menyiapkan autentikasi:

    1. Buat akun layanan:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Ganti SERVICE_ACCOUNT_NAME dengan nama untuk akun layanan.

    2. Memberikan peran ke akun layanan. Jalankan perintah berikut satu kali untuk setiap peran IAM berikut: roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Ganti kode berikut:

      • SERVICE_ACCOUNT_NAME: nama dari akun layanan.
      • PROJECT_ID: project ID dimana Anda membuat akun layanan
      • ROLE: peran yang akan diberikan
    3. Memberi Akun Google Anda peran yang memungkinkan Anda menggunakan peran akun layanan dan tambahkan akun layanan tersebut ke resource lain:

      gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser

      Ganti kode berikut:

      • SERVICE_ACCOUNT_NAME: nama dari akun layanan.
      • PROJECT_ID: project ID dimana Anda membuat akun layanan
      • USER_EMAIL: alamat email untuk Akun Google Anda
  14. Buat kredensial autentikasi lokal untuk Akun Google Anda:

    gcloud auth application-default login

Menyiapkan project Pub/Sub

  1. Buat variabel untuk bucket, project, dan region Anda. Nama bucket Cloud Storage harus unik secara global. Pilih region Dataflow yang dekat dengan tempat Anda menjalankan perintah di panduan memulai ini. Nilai variabel REGION harus berupa nama wilayah yang valid. Untuk mengetahui informasi selengkapnya tentang region dan lokasi, lihat Lokasi Dataflow.

    BUCKET_NAME=BUCKET_NAME
    PROJECT_ID=$(gcloud config get-value project)
    TOPIC_ID=TOPIC_ID
    REGION=DATAFLOW_REGION
    SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
  2. Buat bucket Cloud Storage yang dimiliki project ini:

    gsutil mb gs://$BUCKET_NAME
  3. Buat topik Pub/Sub dalam project ini:

    gcloud pubsub topics create $TOPIC_ID
  4. Buat tugas Cloud Scheduler di project ini. Tugas ini memublikasikan pesan ke topik Pub/Sub pada interval satu menit.

    Jika aplikasi App Engine tidak ada untuk project, langkah ini akan membuat aplikasi.

    gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \
        --topic=$TOPIC_ID --message-body="Hello!" --location=$REGION

    Mulai tugas.

    gcloud scheduler jobs run publisher-job --location=$REGION
  5. Gunakan perintah berikut untuk meng-clone repositori panduan memulai dan membuka direktori kode contoh:

    Java

    git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
    cd java-docs-samples/pubsub/streaming-analytics

    Python

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    cd python-docs-samples/pubsub/streaming-analytics
    pip install -r requirements.txt  # Install Apache Beam dependencies

Mengalirkan pesan dari Pub/Sub ke Cloud Storage

Contoh kode

Kode contoh ini menggunakan Dataflow untuk:

  • Membaca pesan Pub/Sub.
  • Jendela (atau kelompokkan) pesan ke dalam interval ukuran tetap dengan memublikasikan stempel waktu.
  • Tulis pesan di setiap jendela ke file di Cloud Storage.

Java


import java.io.IOException;
import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;

public class PubSubToGcs {
  /*
   * Define your own configuration options. Add your own arguments to be processed
   * by the command-line parser, and specify default values for them.
   */
  public interface PubSubToGcsOptions extends StreamingOptions {
    @Description("The Cloud Pub/Sub topic to read from.")
    @Required
    String getInputTopic();

    void setInputTopic(String value);

    @Description("Output file's window size in number of minutes.")
    @Default.Integer(1)
    Integer getWindowSize();

    void setWindowSize(Integer value);

    @Description("Path of the output file including its filename prefix.")
    @Required
    String getOutput();

    void setOutput(String value);
  }

  public static void main(String[] args) throws IOException {
    // The maximum number of shards when writing output.
    int numShards = 1;

    PubSubToGcsOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubToGcsOptions.class);

    options.setStreaming(true);

    Pipeline pipeline = Pipeline.create(options);

    pipeline
        // 1) Read string messages from a Pub/Sub topic.
        .apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
        // 2) Group the messages into fixed-sized minute intervals.
        .apply(Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
        // 3) Write one file to GCS for every window of messages.
        .apply("Write Files to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));

    // Execute the pipeline and wait until it finishes running.
    pipeline.run().waitUntilFinish();
  }
}

Python

import argparse
from datetime import datetime
import logging
import random

from apache_beam import (
    DoFn,
    GroupByKey,
    io,
    ParDo,
    Pipeline,
    PTransform,
    WindowInto,
    WithKeys,
)
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows

class GroupMessagesByFixedWindows(PTransform):
    """A composite transform that groups Pub/Sub messages based on publish time
    and outputs a list of tuples, each containing a message and its publish time.
    """

    def __init__(self, window_size, num_shards=5):
        # Set window size to 60 seconds.
        self.window_size = int(window_size * 60)
        self.num_shards = num_shards

    def expand(self, pcoll):
        return (
            pcoll
            # Bind window info to each element using element timestamp (or publish time).
            | "Window into fixed intervals"
            >> WindowInto(FixedWindows(self.window_size))
            | "Add timestamp to windowed elements" >> ParDo(AddTimestamp())
            # Assign a random key to each windowed element based on the number of shards.
            | "Add key" >> WithKeys(lambda _: random.randint(0, self.num_shards - 1))
            # Group windowed elements by key. All the elements in the same window must fit
            # memory for this. If not, you need to use `beam.util.BatchElements`.
            | "Group by key" >> GroupByKey()
        )

class AddTimestamp(DoFn):
    def process(self, element, publish_time=DoFn.TimestampParam):
        """Processes each windowed element by extracting the message body and its
        publish time into a tuple.
        """
        yield (
            element.decode("utf-8"),
            datetime.utcfromtimestamp(float(publish_time)).strftime(
                "%Y-%m-%d %H:%M:%S.%f"
            ),
        )

class WriteToGCS(DoFn):
    def __init__(self, output_path):
        self.output_path = output_path

    def process(self, key_value, window=DoFn.WindowParam):
        """Write messages in a batch to Google Cloud Storage."""

        ts_format = "%H:%M"
        window_start = window.start.to_utc_datetime().strftime(ts_format)
        window_end = window.end.to_utc_datetime().strftime(ts_format)
        shard_id, batch = key_value
        filename = "-".join([self.output_path, window_start, window_end, str(shard_id)])

        with io.gcsio.GcsIO().open(filename=filename, mode="w") as f:
            for message_body, publish_time in batch:
                f.write(f"{message_body},{publish_time}\n".encode())

def run(input_topic, output_path, window_size=1.0, num_shards=5, pipeline_args=None):
    # Set `save_main_session` to True so DoFns can access globally imported modules.
    pipeline_options = PipelineOptions(
        pipeline_args, streaming=True, save_main_session=True
    )

    with Pipeline(options=pipeline_options) as pipeline:
        (
            pipeline
            # Because `timestamp_attribute` is unspecified in `ReadFromPubSub`, Beam
            # binds the publish time returned by the Pub/Sub server for each message
            # to the element's timestamp parameter, accessible via `DoFn.TimestampParam`.
            # https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.ReadFromPubSub
            | "Read from Pub/Sub" >> io.ReadFromPubSub(topic=input_topic)
            | "Window into" >> GroupMessagesByFixedWindows(window_size, num_shards)
            | "Write to GCS" >> ParDo(WriteToGCS(output_path))
        )

if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--input_topic",
        help="The Cloud Pub/Sub topic to read from."
        '"projects/<PROJECT_ID>/topics/<TOPIC_ID>".',
    )
    parser.add_argument(
        "--window_size",
        type=float,
        default=1.0,
        help="Output file's window size in minutes.",
    )
    parser.add_argument(
        "--output_path",
        help="Path of the output GCS file including the prefix.",
    )
    parser.add_argument(
        "--num_shards",
        type=int,
        default=5,
        help="Number of shards to use when writing windowed elements to GCS.",
    )
    known_args, pipeline_args = parser.parse_known_args()

    run(
        known_args.input_topic,
        known_args.output_path,
        known_args.window_size,
        known_args.num_shards,
        pipeline_args,
    )

Memulai pipeline

Untuk memulai pipeline, jalankan perintah berikut:

Java

mvn compile exec:java \
  -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \
  -Dexec.cleanupDaemonThreads=false \
  -Dexec.args=" \
    --project=$PROJECT_ID \
    --region=$REGION \
    --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \
    --output=gs://$BUCKET_NAME/samples/output \
    --gcpTempLocation=gs://$BUCKET_NAME/temp \
    --runner=DataflowRunner \
    --windowSize=2 \
    --serviceAccount=$SERVICE_ACCOUNT"

Python

python PubSubToGCS.py \
  --project=$PROJECT_ID \
  --region=$REGION \
  --input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID \
  --output_path=gs://$BUCKET_NAME/samples/output \
  --runner=DataflowRunner \
  --window_size=2 \
  --num_shards=2 \
  --temp_location=gs://$BUCKET_NAME/temp \
  --service_account_email=$SERVICE_ACCOUNT

Perintah sebelumnya berjalan secara lokal dan meluncurkan tugas Dataflow yang berjalan di cloud. Saat perintah menampilkan JOB_MESSAGE_DETAILED: Workers have started successfully, keluar dari program lokal menggunakan Ctrl+C.

Mengamati progres tugas dan pipeline

Anda dapat mengamati progres tugas di konsol Dataflow.

Buka konsol Dataflow

Mengamati kemajuan pekerjaan

Buka tampilan detail pekerjaan untuk melihat:

  • Struktur pekerjaan
  • Log tugas
  • Metrik stage

Mengamati kemajuan pekerjaan

Anda mungkin perlu menunggu beberapa menit untuk melihat file output di Cloud Storage.

Mengamati kemajuan pekerjaan

Atau, gunakan command line di bawah untuk memeriksa file mana yang telah ditulis.

gsutil ls gs://${BUCKET_NAME}/samples/

Outputnya akan terlihat seperti berikut ini:

Java

gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1

Python

gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1

Pembersihan

Agar tidak menimbulkan biaya pada akun Google Cloud Anda untuk resource yang digunakan pada halaman ini, hapus project Google Cloud yang berisi resource tersebut.

  1. Hapus tugas Cloud Scheduler.

    gcloud scheduler jobs delete publisher-job --location=$REGION
  2. Di konsol Dataflow, hentikan tugas. Membatalkan pipeline tanpa menghabiskannya.

  3. Hapus topik.

    gcloud pubsub topics delete $TOPIC_ID
  4. Hapus file yang dibuat oleh pipeline.

    gsutil -m rm -rf "gs://${BUCKET_NAME}/samples/output*"
    gsutil -m rm -rf "gs://${BUCKET_NAME}/temp/*"
  5. Hapus bucket Cloud Storage.

    gsutil rb gs://${BUCKET_NAME}

  6. Hapus akun layanan:
    gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
  7. Opsional: Cabut kredensial autentikasi yang Anda buat, dan hapus file kredensial lokal.

    gcloud auth application-default revoke
  8. Opsional: Cabut kredensial dari gcloud CLI.

    gcloud auth revoke

Langkah selanjutnya