Komponen Flink opsional Dataproc

Anda dapat mengaktifkan komponen tambahan seperti Flink saat membuat cluster Dataproc menggunakan fitur Komponen opsional. Halaman ini menunjukkan cara membuat cluster Dataproc dengan komponen opsional Apache Flink yang diaktifkan (cluster Flink), lalu menjalankan tugas Flink di cluster.

Anda dapat menggunakan cluster Flink untuk:

  1. Jalankan tugas Flink menggunakan resource Jobs Dataproc dari konsol Google Cloud, Google Cloud CLI, atau Dataproc API.

  2. Jalankan tugas Flink menggunakan flink CLI yang berjalan di node master cluster Flink.

  3. Menjalankan tugas Apache Beam di Flink

  4. Menjalankan Flink di cluster Kerberized

Anda dapat menggunakan konsol Google Cloud, Google Cloud CLI, atau Dataproc API untuk membuat cluster Dataproc yang mengaktifkan komponen Flink di cluster.

Rekomendasi: Gunakan cluster VM 1 master standar dengan komponen Flink. Cluster mode Ketersediaan Tinggi Dataproc (dengan 3 VM master) tidak mendukung mode ketersediaan tinggi Flink.

Anda dapat menjalankan tugas Flink menggunakan resource Jobs Dataproc dari Konsol Google Cloud, Google Cloud CLI, atau Dataproc API.

Konsol

Untuk mengirimkan contoh tugas penghitungan kata Flink dari konsol:

  1. Buka halaman Kirim tugas Dataproc di konsol Google Cloud di browser Anda.

  2. Isi kolom di halaman Mengirim tugas:

    1. Pilih nama Cluster dari daftar cluster.
    2. Tetapkan Jenis pekerjaan ke Flink.
    3. Tetapkan Main class or jar ke org.apache.flink.examples.java.wordcount.WordCount.
    4. Tetapkan File jar ke file:///usr/lib/flink/examples/batch/WordCount.jar.
      • file:/// menunjukkan file yang terletak di cluster. Dataproc menginstal WordCount.jar saat membuat cluster Flink.
      • Kolom ini juga menerima jalur Cloud Storage (gs://BUCKET/JARFILE) atau jalur Hadoop Distributed File System (HDFS) (hdfs://PATH_TO_JAR).
  3. Klik Kirim.

    • Output driver tugas ditampilkan di halaman Detail tugas.
    • Tugas Flink tercantum di halaman Jobs Dataproc di konsol Google Cloud.
    • Klik Hentikan atau Hapus dari halaman Tugas atau Detail tugas untuk menghentikan atau menghapus tugas.

gcloud

Untuk mengirimkan tugas Flink ke cluster Flink Dataproc, jalankan perintah gcloud CLI gcloud dataproc jobs submit secara lokal di jendela terminal atau di Cloud Shell.

gcloud dataproc jobs submit flink \
    --cluster=CLUSTER_NAME \
    --region=REGION \
    --class=MAIN_CLASS \
    --jar=JAR_FILE \
    -- JOB_ARGS

Catatan:

  • CLUSTER_NAME: Tentukan nama cluster Flink Dataproc yang akan menjadi tujuan pengiriman tugas.
  • REGION: Tentukan region Compute Engine tempat cluster berada.
  • MAIN_CLASS: Tentukan class main dari aplikasi Flink Anda, seperti:
    • org.apache.flink.examples.java.wordcount.WordCount
  • JAR_FILE: Menentukan file jar aplikasi Flink. Anda dapat menentukan:
    • File jar yang diinstal di cluster, menggunakan awalan file:///`:
      • file:///usr/lib/flink/examples/streaming/TopSpeedWindowing.jar
      • file:///usr/lib/flink/examples/batch/WordCount.jar
    • File jar di Cloud Storage: gs://BUCKET/JARFILE
    • File jar di HDFS: hdfs://PATH_TO_JAR
  • JOB_ARGS: Secara opsional, tambahkan argumen tugas setelah tanda hubung ganda (--).

  • Setelah mengirimkan tugas, output driver tugas akan ditampilkan di terminal lokal atau Cloud Shell.

    Program execution finished
    Job with JobID 829d48df4ebef2817f4000dfba126e0f has finished.
    Job Runtime: 13610 ms
    ...
    (after,1)
    (and,12)
    (arrows,1)
    (ay,1)
    (be,4)
    (bourn,1)
    (cast,1)
    (coil,1)
    (come,1)

REST

Bagian ini menunjukkan cara mengirimkan tugas Flink ke cluster Flink Dataproc menggunakan Dataproc jobs.submit API.

Sebelum menggunakan salah satu data permintaan, lakukan penggantian berikut:

  • PROJECT_ID: Google Cloud project ID
  • REGION: cluster region
  • CLUSTER_NAME: Menentukan nama cluster Flink Dataproc yang akan menerima tugas

Metode HTTP dan URL:

POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/regions/REGION/jobs:submit

Meminta isi JSON:

{
  "job": {
    "placement": {
      "clusterName": "CLUSTER_NAME"
    },
    "flinkJob": {
      "mainClass": "org.apache.flink.examples.java.wordcount.WordCount",
      "jarFileUris": [
        "file:///usr/lib/flink/examples/batch/WordCount.jar"
      ]
    }
  }
}

Untuk mengirim permintaan Anda, perluas salah satu opsi berikut:

Anda akan melihat respons JSON seperti berikut:

{
  "reference": {
    "projectId": "PROJECT_ID",
    "jobId": "JOB_ID"
  },
  "placement": {
    "clusterName": "CLUSTER_NAME",
    "clusterUuid": "CLUSTER_UUID"
  },
  "flinkJob": {
    "mainClass": "org.apache.flink.examples.java.wordcount.WordCount",
    "args": [
      "1000"
    ],
    "jarFileUris": [
      "file:///usr/lib/flink/examples/batch/WordCount.jar"
    ]
  },
  "status": {
    "state": "PENDING",
    "stateStartTime": "2020-10-07T20:16:21.759Z"
  },
  "jobUuid": "JOB_UUID"
}
  • Tugas Flink tercantum di halaman Jobs Dataproc di konsol Google Cloud.
  • Anda dapat mengklik Stop atau Delete dari halaman Jobs atau Job details di konsol Google Cloud untuk menghentikan atau menghapus tugas.

Daripada menjalankan tugas Flink menggunakan resource Jobs Dataproc, Anda dapat menjalankan tugas Flink di node master cluster Flink menggunakan flink CLI.

Bagian berikut menjelaskan berbagai cara untuk menjalankan tugas CLI flink di cluster Flink Dataproc.

  1. SSH ke node master: Gunakan utilitas SSH untuk membuka jendela terminal di VM master cluster.

  2. Menetapkan classpath: Lakukan inisialisasi classpath Hadoop dari jendela terminal SSH di VM master cluster Flink:

    export HADOOP_CLASSPATH=$(hadoop classpath)
    
  3. Menjalankan tugas Flink: Anda dapat menjalankan tugas Flink dalam berbagai mode deployment di YARN: mode aplikasi, per tugas, dan sesi.

    1. Mode aplikasi: Mode Aplikasi Flink didukung oleh image Dataproc versi 2.0 dan yang lebih baru. Mode ini menjalankan metode main() tugas di Pengelola Tugas YARN. Cluster akan dinonaktifkan setelah tugas selesai.

      Contoh pengiriman tugas:

      flink run-application \
          -t yarn-application \
          -Djobmanager.memory.process.size=1024m \
          -Dtaskmanager.memory.process.size=2048m \
          -Djobmanager.heap.mb=820 \
          -Dtaskmanager.heap.mb=1640 \
          -Dtaskmanager.numberOfTaskSlots=2 \
          -Dparallelism.default=4 \
          /usr/lib/flink/examples/batch/WordCount.jar
      

      Mencantumkan tugas yang sedang berjalan:

      ./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
      

      Membatalkan tugas yang sedang berjalan:

      ./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
      
    2. Mode per tugas: Mode Flink ini mengeksekusi metode main() tugas di sisi klien.

      Contoh pengiriman tugas:

      flink run \
          -m yarn-cluster \
          -p 4 \
          -ys 2 \
          -yjm 1024m \
          -ytm 2048m \
          /usr/lib/flink/examples/batch/WordCount.jar
      
    3. Mode sesi: Memulai sesi Flink YARN yang berjalan lama, lalu mengirimkan satu atau beberapa tugas ke sesi.

      1. Memulai sesi: Anda dapat memulai sesi Flink dengan salah satu cara berikut:

        1. Buat cluster Flink, dengan menambahkan tanda --metadata flink-start-yarn-session=true ke perintah gcloud dataproc clusters create (Lihat Membuat cluster Flink Dataproc). Dengan mengaktifkan flag ini, setelah cluster dibuat, Dataproc akan menjalankan /usr/bin/flink-yarn-daemon untuk memulai sesi Flink di cluster.

          ID aplikasi YARN sesi disimpan di /tmp/.yarn-properties-${USER}. Anda dapat menampilkan ID dengan perintah yarn application -list.

        2. Jalankan skrip yarn-session.sh Flink, yang telah diinstal sebelumnya di VM master cluster, dengan setelan kustom:

          Contoh dengan setelan kustom:

          /usr/lib/flink/bin/yarn-session.sh \
              -s 1 \
              -jm 1024m \
              -tm 2048m \
              -nm flink-dataproc \
              --detached
          
        3. Jalankan skrip wrapper /usr/bin/flink-yarn-daemon Flink dengan setelan default:

          . /usr/bin/flink-yarn-daemon
          
      2. Kirim tugas ke sesi: Jalankan perintah berikut untuk mengirimkan tugas Flink ke sesi.

        flink run -m <var>FLINK_MASTER_URL</var>/usr/lib/flink/examples/batch/WordCount.jar
        
        • FLINK_MASTER_URL: URL, termasuk host dan port, dari VM master Flink tempat tugas dieksekusi. Hapus http:// prefix dari URL. URL ini tercantum dalam output perintah saat Anda memulai sesi Flink. Anda dapat menjalankan perintah berikut untuk mencantumkan URL ini di kolom Tracking-URL:
        yarn application -list -appId=<yarn-app-id> | sed 's#http://##'
           ```
        
      3. Mencantumkan tugas dalam sesi: Untuk mencantumkan tugas Flink dalam sesi, lakukan salah satu hal berikut:

        • Jalankan flink list tanpa argumen. Perintah ini mencari ID aplikasi YARN sesi di /tmp/.yarn-properties-${USER}.

        • Dapatkan ID aplikasi YARN sesi dari /tmp/.yarn-properties-${USER} atau output yarn application -list, lalu jalankan <code>flink list -yid YARN_APPLICATION_ID.

        • Jalankan flink list -m FLINK_MASTER_URL.

      4. Hentikan sesi: Untuk menghentikan sesi, dapatkan ID aplikasi YARN sesi dari /tmp/.yarn-properties-${USER} atau output yarn application -list, lalu jalankan salah satu perintah berikut:

        echo "stop" | /usr/lib/flink/bin/yarn-session.sh -id YARN_APPLICATION_ID
        
        yarn application -kill YARN_APPLICATION_ID
        

Anda dapat menjalankan tugas Apache Beam di Dataproc menggunakan FlinkRunner.

Anda dapat menjalankan tugas Beam di Flink dengan cara berikut:

  1. Tugas Java Beam
  2. Tugas Beam Portabel

Tugas Java Beam

Kemas tugas Beam Anda ke dalam file JAR. Berikan file JAR yang dipaketkan dengan dependensi yang diperlukan untuk menjalankan tugas.

Contoh berikut menjalankan tugas Java Beam dari node master cluster Dataproc.

  1. Buat cluster Dataproc dengan komponen Flink diaktifkan.

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    
    • --optional-components: Flink.
    • --image-version: versi image cluster, yang menentukan versi Flink yang diinstal di cluster (misalnya, lihat versi komponen Apache Flink yang tercantum untuk empat versi rilis image 2.0.x terbaru dan sebelumnya).
    • --region: Region Dataproc yang didukung.
    • --enable-component-gateway: mengaktifkan akses ke UI Pengelola Tugas Flink.
    • --scopes: mengaktifkan akses ke API Google Cloud oleh cluster Anda (lihat Praktik terbaik cakupan). Cakupan cloud-platform diaktifkan secara default (Anda tidak perlu menyertakan setelan tanda ini) saat membuat cluster yang menggunakan image Dataproc versi 2.1 atau yang lebih baru.
  2. Gunakan utilitas SSH untuk membuka jendela terminal di node master cluster Flink.

  3. Mulai sesi Flink YARN di node master cluster Dataproc.

    . /usr/bin/flink-yarn-daemon
    

    Catat versi Flink di cluster Dataproc Anda.

    flink --version
    
  4. Di komputer lokal, buat contoh penghitungan kata Beam kanonis di Java.

    Pilih versi Beam yang kompatibel dengan versi Flink di cluster Dataproc Anda. Lihat tabel Kompatibilitas Versi Flink yang mencantumkan kompatibilitas versi Beam-Flink.

    Buka file POM yang dihasilkan. Periksa versi runner Beam Flink yang ditentukan oleh tag <flink.artifact.name>. Jika versi runner Beam Flink dalam nama artefak Flink tidak cocok dengan versi Flink di cluster Anda, perbarui nomor versi agar cocok.

    mvn archetype:generate \
        -DarchetypeGroupId=org.apache.beam \
        -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
        -DarchetypeVersion=BEAM_VERSION \
        -DgroupId=org.example \
        -DartifactId=word-count-beam \
        -Dversion="0.1" \
        -Dpackage=org.apache.beam.examples \
        -DinteractiveMode=false
    
  5. Kemas contoh jumlah kata.

    mvn package -Pflink-runner
    
  6. Upload file JAR uber yang dipaketkan, word-count-beam-bundled-0.1.jar (~135 MB) ke node master cluster Dataproc Anda. Anda dapat menggunakan gcloud storage cp untuk transfer file yang lebih cepat ke cluster Dataproc dari Cloud Storage.

    1. Di terminal lokal, buat bucket Cloud Storage, lalu upload JAR uber.

      gcloud storage buckets create BUCKET_NAME
      
      gcloud storage cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
      
    2. Di node master Dataproc, download JAR uber.

      gcloud storage cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
      
  7. Jalankan tugas Java Beam di node master cluster Dataproc.

    flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \
        --runner=FlinkRunner \
        --output=gs://BUCKET_NAME/java-wordcount-out
    
  8. Pastikan hasilnya ditulis ke bucket Cloud Storage Anda.

    gcloud storage cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
    
  9. Hentikan sesi YARN Flink.

    yarn application -list
    
    yarn application -kill YARN_APPLICATION_ID
    

Tugas Beam Portabel

Untuk menjalankan tugas Beam yang ditulis dalam Python, Go, dan bahasa lain yang didukung, Anda dapat menggunakan FlinkRunner dan PortableRunner seperti yang dijelaskan di halaman Flink Runner Beam (lihat juga Portability Framework Roadmap).

Contoh berikut menjalankan tugas Beam portabel di Python dari node master cluster Dataproc.

  1. Buat cluster Dataproc dengan mengaktifkan komponen Flink dan Docker.

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK,DOCKER \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    

    Catatan:

    • --optional-components: Flink dan Docker.
    • --image-version: Versi image cluster, yang menentukan versi Flink yang diinstal di cluster (misalnya, lihat versi komponen Apache Flink yang tercantum untuk empat versi rilis image 2.0.x terbaru dan sebelumnya).
    • --region: Region Dataproc yang tersedia.
    • --enable-component-gateway: Mengaktifkan akses ke UI Pengelola Tugas Flink.
    • --scopes: Mengaktifkan akses ke API Google Cloud oleh cluster Anda (lihat Praktik terbaik cakupan). Cakupan cloud-platform diaktifkan secara default (Anda tidak perlu menyertakan setelan tanda ini) saat membuat cluster yang menggunakan image Dataproc versi 2.1 atau yang lebih baru.
  2. Gunakan gcloud CLI secara lokal atau di Cloud Shell untuk membuat bucket Cloud Storage. Anda akan menentukan BUCKET_NAME saat menjalankan contoh program penghitungan kata.

    gcloud storage buckets create BUCKET_NAME
    
  3. Di jendela terminal pada VM cluster, mulai sesi YARN Flink. Catat URL master Flink, alamat master Flink tempat tugas dijalankan. Anda akan menentukan FLINK_MASTER_URL saat menjalankan contoh program penghitungan kata.

    . /usr/bin/flink-yarn-daemon
    

    Tampilkan dan catat versi Flink yang menjalankan cluster Dataproc. Anda akan menentukan FLINK_VERSION saat menjalankan contoh program penghitungan kata.

    flink --version
    
  4. Instal library Python yang diperlukan untuk tugas di node master cluster.

  5. Instal versi Beam yang kompatibel dengan versi Flink di cluster.

    python -m pip install apache-beam[gcp]==BEAM_VERSION
    
  6. Jalankan contoh penghitungan kata di node master cluster.

    python -m apache_beam.examples.wordcount \
        --runner=FlinkRunner \
        --flink_version=FLINK_VERSION \
        --flink_master=FLINK_MASTER_URL
        --flink_submit_uber_jar \
        --output=gs://BUCKET_NAME/python-wordcount-out
    

    Catatan:

    • --runner: FlinkRunner.
    • --flink_version: FLINK_VERSION, yang dicatat sebelumnya.
    • --flink_master: FLINK_MASTER_URL, yang dicatat sebelumnya.
    • --flink_submit_uber_jar: Gunakan JAR uber untuk menjalankan tugas Beam.
    • --output: BUCKET_NAME, dibuat sebelumnya.
  7. Pastikan hasil ditulis ke bucket Anda.

    gcloud storage cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
    
  8. Hentikan sesi YARN Flink.

    1. Dapatkan ID aplikasi.
    yarn application -list
    
    1. Insert the <var>YARN_APPLICATION_ID</var>, then stop the session.
    
    yarn application -kill 
    

Komponen Flink Dataproc mendukung cluster Kerberized. Tiket Kerberos yang valid diperlukan untuk mengirimkan dan mempertahankan tugas Flink atau memulai cluster Flink. Secara default, tiket Kerberos tetap berlaku selama tujuh hari.

Antarmuka web Pengelola Tugas Flink tersedia saat tugas Flink atau cluster sesi Flink sedang berjalan. Untuk menggunakan antarmuka web:

  1. Buat cluster Flink Dataproc.
  2. Setelah pembuatan cluster, klik Component Gateway link YARN ResourceManager di tab Web Interface pada halaman Cluster details di Konsol Google Cloud.
  3. Di UI YARN Resource Manager, identifikasi entri aplikasi cluster Flink. Bergantung pada status penyelesaian tugas, link ApplicationMaster atau History akan dicantumkan.
  4. Untuk tugas streaming yang berjalan lama, klik link ApplicationManager untuk membuka dasbor Flink; untuk tugas yang telah selesai, klik link History untuk melihat detail tugas.