Menyisipkan data ke BigQuery menggunakan tugas Untuk Setiap Paralel

Dalam tutorial ini, Anda akan membuat Integrasi Apigee dan sub-integrasi untuk memproses serangkaian kumpulan data. Untuk setiap kumpulan data, integrasi utama secara asinkron memanggil sub-integrasi, yang mengambil data untuk setiap kumpulan data dan memasukkannya sebagai baris dalam tabel dalam set data BigQuery.

Dalam tutorial ini, Anda akan menyelesaikan tugas berikut:

Sebelum memulai

  • Pastikan Anda memiliki akses ke Integrasi Apigee.
  • Lakukan hal berikut di project Google Cloud Anda:

    • Berikan peran berikut ke akun layanan yang ingin Anda gunakan untuk membuat koneksi:
      • roles/bigquery.dataEditor
      • roles/bigquery.readSessionUser
      • roles/secretmanager.viewer
      • roles/secretmanager.secretAccessor
    • Aktifkan layanan berikut:
      • secretmanager.googleapis.com (Secret Manager API)
      • connectors.googleapis.com (API Konektor)

      Jika layanan ini belum diaktifkan untuk project Anda sebelumnya, Anda akan diminta untuk mengaktifkannya saat membuat koneksi di halaman {i>Create Connection<i}.

Menyiapkan koneksi BigQuery

Mulailah dengan membuat set data dan tabel BigQuery untuk digunakan dalam tutorial ini. Setelah Anda membuat set data dan tabel, buat koneksi BigQuery. Anda akan menggunakan koneksi ini dalam integrasi nanti dalam tutorial ini.

Menyiapkan set data dan tabel BigQuery

Untuk menyiapkan set data dan tabel BigQuery Anda, lakukan langkah-langkah berikut:

  1. Di halaman Cloud Console, pilih project Google Cloud Anda.
  2. Untuk meluncurkan sesi Cloud Shell dari Konsol Google Cloud, klik Ikon Activate Cloud Shell ikon Activate Cloud Shell di Konsol Cloud. Tindakan ini akan meluncurkan sesi di panel bawah Konsol Google Cloud.
  3. Untuk mengaktifkan BigQuery API, masukkan perintah berikut di terminal Cloud Shell Anda:
    export PROJECT_ID=project_id
    export REGION=region
    gcloud services enable --project "${PROJECT_ID}" \
        bigquery.googleapis.com \
        bigquerystorage.googleapis.com
    Dalam perintah ini, ganti:
    • project_id dengan project ID project Google Cloud Anda.
    • region dengan region yang ingin Anda gunakan untuk membuat set data BigQuery.
  4. Untuk membuat set data BigQuery dengan nama bq_tutorial, masukkan perintah berikut di terminal Cloud Shell Anda:
          bq  --project_id ${PROJECT_ID} --location ${REGION} mk bq_tutorial
        
  5. Untuk membuat tabel BigQuery dengan nama tutorial, masukkan perintah berikut di terminal Cloud Shell Anda:
          bq --project_id ${PROJECT_ID} \
            query  \
            --nouse_legacy_sql \
          'create table bq_tutorial.tutorial (
          unique_key STRING NOT NULL,
          created_date STRING,
          closed_date STRING,
          agency STRING,
          agency_name STRING,
          complaint_type STRING,
          descriptor STRING,
          location_type STRING,
          incident_zip STRING,
          incident_address STRING,
          street_name STRING,
          cross_street_1 STRING,
          cross_street_2 STRING,
          intersection_street_1 STRING,
          intersection_street_2 STRING,
          address_type STRING,
          city STRING,
          landmark STRING,
          facility_type STRING,
          status STRING,
          due_date STRING,
          resolution_action_updated_date STRING,
          community_board STRING,
          borough STRING,
          x_coordinate_state_plane STRING,
          y_coordinate_state_plane STRING,
          park_facility_name STRING,
          park_borough STRING,
          school_name STRING,
          school_number STRING,
          school_region STRING,
          school_code STRING,
          school_phone_number STRING,
          school_address STRING,
          school_city STRING,
          school_state STRING,
          school_zip STRING,
          school_not_found STRING,
          school_or_citywide_complaint STRING,
          vehicle_type STRING,
          taxi_company_borough STRING,
          taxi_pick_up_location STRING,
          bridge_highway_name STRING,
          bridge_highway_direction STRING,
          bridge_highway_segment STRING,
          road_ramp STRING,
          garage_lot_name STRING,
          ferry_direction STRING,
          ferry_terminal_name STRING,
          latitude STRING,
          longitude STRING,
          location STRING
          ) '
      
  6. Verify that your BigQuery table is created.
    1. In the Cloud console page, click the Navigation menu.
    2. In the Analytics section, click BigQuery.
    3. Expand your project and confirm that the bq_tutorial dataset is listed.
    4. Expand the bq_tutorial dataset and confirm that the tutorial table is listed.
    5. Click the documents table to view the schema.

Create a BigQuery connection

Next, you'll create a BigQuery connection. A BigQuery connection lets you insert, read, update and delete rows in a BigQuery table and use the resulting output in an integration. After creating the BigQuery connection, you'll use this connection in an integration later in this tutorial to add rows to the BigQuery table.

To create a BigQuery connection, complete the following steps:

  1. In the Cloud console page, select your Google Cloud project.
  2. Open the connections page.
  3. Click + CREATE NEW to open the Create Connection page.
  4. Configure the connection:
    1. In the Create Connection section, complete the following:
      • Connector: Select BigQuery from the drop down list of available Connectors.
      • Connector version: Select the latest Connector version from the drop down list of available versions.
      • In the Connection Name field, enter a name for the Connection instance. For this tutorial, enter connector-bq-tutorial.
      • Optionally, add a Description of the connection instance.
      • Service Account: Select a service account that has the required roles.
      • Project ID: Enter the ID of the Google Cloud project where the BigQuery data resides.
      • Dataset ID: Enter the ID of the BigQuery dataset that you want to use. For this tutorial, enter bq_tutorial.
      • Optionally, click + ADD LABEL to add a label in the form of a key/value pair.
      • Click Next.
    2. Location: Select a region from where the connection will run. Supported regions for connectors include:

        For the list of all the supported regions, see Locations.

      • Click Next.
    3. Authentication: The BigQuery connection does not require authentication configuration. Click Next.
    4. Review: Review your connection's configuration details. In this section, the connection and authentication details of the new connection are displayed for your review.
  5. Click Create.

Set up a sub-integration

In this tutorial, the sub-integration takes each record sent to it by the main integration and inserts it as a row in the tutorial table in the bq_tutorial dataset.

Create a sub-integration

To create the sub-integration, complete the following steps:

  1. In the Apigee UI, select your Apigee Organization.
  2. Click Develop > Integrations.
  3. Click Create integration.
  4. In the Create Integration dialog, do the following:
    • Enter a name, for example, enter Process-each-record
    • Optionally, enter a description. For example, enter API Trigger to process each record (sub-integration)
    • Select the region where you want to create your integration.
  5. Click Create to open the integration editor.

Add an API Trigger

To add an API Trigger to the integration, do the following:

  1. In the integration editor, select Add a task/trigger > Triggers to display a list of available triggers.
  2. Drag the API Trigger element to the integration editor.

Add a Data Mapping task

To add a Data Mapping task in the integration, complete the following steps:

  1. Select +Add a task/trigger > Tasks in the integration editor to display the list of available tasks.
  2. Drag the Data Mapping element to the integration editor.

Configure the BigQuery connection

Now you are ready to use the BigQuery connection that you created earlier in the sub-integration. To configure the BigQuery connection in this integration, complete the following steps:

  1. Select +Add a task/trigger > Tasks in the integration editor to display the list of available tasks.
  2. Drag the Connectors element to the integration editor.
  3. Click the Connectors task element on the designer to view the task configuration pane.
  4. Click the edit icon on the right panel and update the Label to Insert row to BigQuery.
  5. Click Configure task.

    The Configure connector task dialog appears.

  6. In the Configure connector task dialog, do the following:
    1. Select the connection region where you created your BigQuery connection.
    2. Select the BigQuery connection that you want to use. For this tutorial, select connector-bq-tutorial.
    3. Once a connection is chosen, the Type column appears. Select Entities and then tutorial from the list of available entities.
    4. Once a type is chosen, the Operation column appears. Select Create.
    5. Click Done to complete the connection configuration and close the dialog.

Connect the integration elements

Next, add edge connections to connect the API Trigger to the Data Mapping task and the Data Mapping task to the Connectors task. An edge connection is a connection between any two elements in an integration. For more information on edges and edge conditions, see Edges.

To add the edge connections, complete the following steps:

  1. Click the Fork control point at the bottom of the API Trigger element. Drag and drop the edge connection at the Join control point at the top of the Data Mapping element.
  2. Click the Fork control point at the bottom of the Data Mapping element. Drag and drop the edge connection at the Join control point at the top of the Connectors element.

Configure the Data Mapping task

To configure the Data Mapping task, complete the following steps:

  1. In the integration editor, click the Data Mapping task to view the task configuration pane.
  2. Click Open Data Mapping Editor.
  3. In the Data Mapping Editor, click Add to add a new variable.
  4. In the Create Variable dialog, enter the following information:
    • Name: Enter record.
    • Data Type: Select JSON.
    • Schema: Select Infer from a sample JSON payload. Enter the following sample JSON payload:
                  {
                    "unique_key":"304271",
                    "created_date":"02/06/2007 12:00:00 AM",
                    "closed_date":"03/01/2007 12:00:00 AM",
                    "agency":"TLC",
                    "agency_name":"Taxi and Limousine Commission",
                    "complaint_type":"Taxi Complaint",
                    "descriptor":"Driver Complaint",
                    "location_type":"Street",
                    "incident_zip":"10001",
                    "incident_address":"",
                    "street_name":"",
                    "cross_street_1":"",
                    "cross_street_2":"",
                    "intersection_street_1":"WEST 29 STREET",
                    "intersection_street_2":"7 AVENUE",
                    "address_type":"INTERSECTION",
                    "city":"NEW YORK",
                    "landmark":"",
                    "facility_type":"N/A",
                    "status":"Closed",
                    "due_date":"02/28/2007 12:00:00 AM",
                    "resolution_action_updated_date":"03/01/2007 12:00:00 AM",
                    "community_board":"05 MANHATTAN",
                    "borough":"MANHATTAN",
                    "x_coordinate_state_plane":"986215",
                    "y_coordinate_state_plane":"211740",
                    "park_facility_name":"",
                    "park_borough":"MANHATTAN",
                    "school_name":"",
                    "school_number":"",
                    "school_region":"",
                    "school_code":"",
                    "school_phone_number":"",
                    "school_address":"",
                    "school_city":"",
                    "school_state":"",
                    "school_zip":"",
                    "school_not_found":"",
                    "school_or_citywide_complaint":"",
                    "vehicle_type":"",
                    "taxi_company_borough":"",
                    "taxi_pick_up_location":"Other",
                    "bridge_highway_name":"",
                    "bridge_highway_direction":"",
                    "road_ramp":"",
                    "bridge_highway_segment":"",
                    "garage_lot_name":"",
                    "ferry_direction":"",
                    "ferry_terminal_name":"",
                    "latitude":"40.74785373937869",
                    "longitude":"-73.99290823133913",
                    "location":"(40.74785373937869, -73.99290823133913)"
                  }
                
    • Klik Create.
    • Saat variabel dibuat, di Editor Pemetaan Data, selesaikan langkah-langkah berikut:
      • Tarik variabel record baru ke kolom Input.
      • Tarik variabel connectorInputPayload ke kolom Output.
    • Tutup Editor Pemetaan Data untuk kembali ke editor integrasi.

Memublikasikan sub-integrasi

Untuk memublikasikan sub-integrasi, klik Publikasikan di editor integrasi.

Menyiapkan integrasi utama

Di bagian ini, Anda akan menyiapkan integrasi utama, yang menggunakan tugas For Setiap Paralel untuk memproses setiap data. Integrasi utama ini kemudian memanggil sub-integrasi satu kali untuk setiap kumpulan data.

Membuat integrasi utama

Untuk membuat integrasi utama, selesaikan langkah-langkah berikut:

  1. Di UI Apigee, pilih Organisasi Apigee Anda.
  2. Klik Develop > Integrasi.
  3. Klik Create integration.
  4. Dalam dialog Create Integration, lakukan hal berikut:
    • Masukkan nama, misalnya, masukkan data proses.
    • Masukkan deskripsi (opsional). Misalnya, masukkan Pemicu API untuk memproses data (integrasi utama)
    • Pilih wilayah tempat Anda ingin membuat integrasi.
  5. Klik Create untuk membuka editor integrasi.

Menambahkan Pemicu API

Untuk menambahkan Pemicu API ke integrasi, lakukan hal berikut:

  1. Di editor integrasi, pilih Add a task/trigger > Pemicu untuk menampilkan daftar pemicu yang tersedia.
  2. Tarik elemen Pemicu API ke editor integrasi.

Tambahkan Untuk Setiap tugas Paralel

Untuk menambahkan tugas Untuk Setiap Paralel dalam integrasi, selesaikan langkah-langkah berikut:

  1. Pilih +Tambahkan tugas/pemicu > Tasks di editor integrasi untuk menampilkan daftar tugas yang tersedia.
  2. Tarik elemen For Setiap Parallel ke editor integrasi.

Menghubungkan elemen integrasi

Selanjutnya, tambahkan koneksi edge untuk menghubungkan Pemicu API ke tugas Untuk Setiap Paralel.

Untuk menambahkan koneksi edge, klik titik kontrol Fork di bagian bawah elemen Pemicu API. Tarik lalu lepas koneksi edge pada titik kontrol Join di bagian atas elemen For Setiap Parallel task.

Mengonfigurasi Untuk Setiap tugas Paralel

Untuk mengonfigurasi tugas Untuk Setiap Paralel, selesaikan langkah-langkah berikut:

  1. Di editor integrasi, klik tugas For Every Parallel untuk melihat panel konfigurasi tugas.
  2. Di bawah Array Selection > Daftar yang akan diiterasi, klik Tambahkan variabel baru untuk menambahkan variabel baru.
  3. Dalam dialog Buat Variabel, masukkan informasi berikut:
    • Name: Masukkan records
    • Jenis Data: Pilih JSON.
    • Skema: Pilih Inferensi dari payload JSON sampel. Masukkan contoh payload JSON berikut:
                    [{
                      "unique_key":"304271",
                      "created_date":"02/06/2007 12:00:00 AM",
                      "closed_date":"03/01/2007 12:00:00 AM",
                      "agency":"TLC",
                      "agency_name":"Taxi and Limousine Commission",
                      "complaint_type":"Taxi Complaint",
                      "descriptor":"Driver Complaint",
                      "location_type":"Street",
                      "incident_zip":"10001",
                      "incident_address":"",
                      "street_name":"",
                      "cross_street_1":"",
                      "cross_street_2":"",
                      "intersection_street_1":"WEST 29 STREET",
                      "intersection_street_2":"7 AVENUE",
                      "address_type":"INTERSECTION",
                      "city":"NEW YORK",
                      "landmark":"",
                      "facility_type":"N/A",
                      "status":"Closed",
                      "due_date":"02/28/2007 12:00:00 AM",
                      "resolution_action_updated_date":"03/01/2007 12:00:00 AM",
                      "community_board":"05 MANHATTAN",
                      "borough":"MANHATTAN",
                      "x_coordinate_state_plane":"986215",
                      "y_coordinate_state_plane":"211740",
                      "park_facility_name":"",
                      "park_borough":"MANHATTAN",
                      "school_name":"",
                      "school_number":"",
                      "school_region":"",
                      "school_code":"",
                      "school_phone_number":"",
                      "school_address":"",
                      "school_city":"",
                      "school_state":"",
                      "school_zip":"",
                      "school_not_found":"",
                      "school_or_citywide_complaint":"",
                      "vehicle_type":"",
                      "taxi_company_borough":"",
                      "taxi_pick_up_location":"Other",
                      "bridge_highway_name":"",
                      "bridge_highway_direction":"",
                      "road_ramp":"",
                      "bridge_highway_segment":"",
                      "garage_lot_name":"",
                      "ferry_direction":"",
                      "ferry_terminal_name":"",
                      "latitude":"40.74785373937869",
                      "longitude":"-73.99290823133913",
                      "location":"(40.74785373937869, -73.99290823133913)"
                    }]
                  
  4. Klik Create.
  5. Di bagian Detail Sub-integrasi, masukkan informasi berikut:
    • ID Pemicu API: Pilih elemen Pemicu API di sub-integrasi. Misalnya, pilih Process-each-record_API_1.
    • Execution strategy: Pilih ASYNC.
    • Pilih Run a single integration.
  6. Di bagian On setiap execution, untuk Where to map individual array element, masukkan nama variabel dalam tugas pemetaan data di sub-integrasi. Dalam kasus ini, masukkan data. Variabel sub-integrasi hanya dicantumkan untuk integrasi yang dipublikasikan. Jika variabel tidak tercantum, muat ulang halaman, karena perlu waktu beberapa saat agar variabel terlihat setelah sub-integrasi dipublikasikan.

Memublikasikan integrasi utama

Untuk memublikasikan integrasi utama, di editor integrasi, klik Publikasikan.

Membuat Proxy API Apigee

Untuk memicu integrasi, buat proxy API Apigee dengan integrasi Anda sebagai target. Untuk melakukannya, selesaikan langkah-langkah berikut:

  1. Buat akun layanan di project Google Cloud Anda dan tetapkan peran yang diperlukan, Apigee Integration Invoker. Untuk mengetahui informasi tentang cara menetapkan peran IAM, lihat Peran dan izin IAM.
  2. Login ke UI Apigee.
  3. Di menu navigasi, pilih Develop > Proxy API.
  4. Klik Create New.
  5. Di wizard Create Proxy, klik Integration target.
  6. Di halaman Proxy details, masukkan informasi berikut:
    • Name: Masukkan process-records.
    • Base path: Masukkan /v1/process-records.
    • Wilayah integrasi: Pilih wilayah yang Anda gunakan untuk membuat integrasi.
    • Target integrasi: Pilih integrasi utama yang Anda buat. Untuk tutorial ini, pilih process-records.
    • Pemicu: Pilih Pemicu API yang diperlukan. Untuk tutorial ini, pilih process-records_API_1.
    • Jenis endpoint: Pilih Sinkronisasi.
  7. Klik Berikutnya dan di halaman Kebijakan umum, klik Berikutnya lagi.
  8. Di halaman Summary, klik Create.
  9. Saat proxy dibuat, klik Edit Proxy.
  10. Klik tab Mengembangkan.
  11. Di bagian Kebijakan, perbarui kebijakan Tetapkan Permintaan Integrasi dengan kebijakan berikut:
    <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    <SetIntegrationRequest continueOnError="false" enabled="true" name="set-integration-request">
      <DisplayName>Set Integration Request</DisplayName>
      <ProjectId>project_id</ProjectId>
      <IntegrationName>process-records</IntegrationName>
      <IntegrationRegion>region</IntegrationRegion>
      <ApiTrigger>api_trigger/process-records_API_1</ApiTrigger>
      <Parameters>
        <Parameter name="records" type="json" ref="request.content"/>
      </Parameters>
    </SetIntegrationRequest>
    Ganti:
    • project_id dengan project ID project Google Cloud Anda.
    • region dengan wilayah tempat Anda membuat integrasi.
  12. Simpan perubahan Anda.
  13. Klik Deploy.
  14. Di Revision, pilih revisi yang telah diperbarui.
  15. Di Environment, pilih lingkungan yang ingin Anda gunakan untuk men-deploy integrasi.
  16. Saat men-deploy proxy API, berikan email akun layanan yang Anda buat sebelumnya dengan peran yang diperlukan.
  17. Klik Deploy.

Menguji integrasi Anda

Untuk menguji integrasi Anda, selesaikan langkah-langkah berikut:

  1. Download contoh data ke Cloud Shell Anda:
    1. Untuk meluncurkan sesi Cloud Shell dari Konsol Google Cloud, klik Ikon Activate Cloud Shell ikon Activate Cloud Shell di Konsol Cloud. Tindakan ini akan meluncurkan sesi di panel bawah Konsol Google Cloud.
    2. Masukkan perintah berikut di terminal Cloud Shell Anda:
      wget https://raw.githubusercontent.com/GoogleCloudPlatform/application-integration-samples/main/assets/bq-sample-dataset.json
              
    3. Untuk memastikan contoh data telah didownload, masukkan perintah berikut di terminal Cloud Shell Anda:
      ls -la bq-sample-dataset.json
      File yang didownload akan tercantum di terminal Cloud Shell.
  2. Untuk mengaktifkan proses debug di proxy Apigee, selesaikan langkah-langkah berikut:
    1. Beralih kembali ke Proxy API yang baru saja Anda buat ke UI Apigee.
    2. Klik tab Debug.
    3. Klik Start debug session dan masukkan informasi berikut:
      1. Pilih Environment tempat Anda ingin menjalankan sesi debug.
      2. (Opsional) Dari menu drop-down Filter, pilih filter yang akan diterapkan ke semua transaksi dalam sesi debug yang Anda buat.Defaultnya adalah None (All transactions), yang menyertakan semua transaksi dalam data debug.
      3. Klik Mulai.
  3. Untuk memulai pengujian, masukkan perintah berikut di terminal Cloud Shell Anda:
    export APIGEE_DOMAIN=<your-Apigee-domain>
    export SAMPLE_DOCS=$(jq  $(r=$((RANDOM % 1000)) ; echo ".[$r:$((r + 3))]") < bq-sample-dataset.json)
            
    curl -X POST https://$APIGEE_DOMAIN/v1/process-records \
      -H 'Content-Type: application/json' \
      -d "$SAMPLE_DOCS"
          
    Pengujian ini memilih tiga entri acak dari set data sampel dan meneruskannya ke integrasi utama. Integrasi utama meneruskan setiap entri ke sub-integrasi, yang menambahkan data sebagai baris di tabel BigQuery Anda.
  4. Untuk memverifikasi bahwa tabel BigQuery Anda sekarang berisi kumpulan data ini, lakukan langkah-langkah berikut:
    1. Di halaman Cloud Console, klik Navigation menu.
    2. Di bagian Analytics, klik BigQuery.
    3. Perluas project Anda dan klik set data bq_tutorial.
    4. Luaskan set data bq_tutorial, lalu klik tabel tutorial.
    5. Klik tab Table Explorer untuk melihat data yang disisipkan.

Langkah selanjutnya

Coba buat integrasi dengan konektor lain. Untuk daftar dari semua konektor yang didukung, lihat Referensi konektor.