Dataflow-Pipelines mit Cloud Composer starten

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Auf dieser Seite wird beschrieben, wie Sie mit DataflowTemplateOperator Dataflow-Pipelines aus Cloud Composer starten. Die Pipeline „Cloud Storage Text für BigQuery“ ist eine Batchpipeline, mit der Sie in Cloud Storage gespeicherte Textdateien hochladen, diese mit einer von Ihnen bereitgestellten benutzerdefinierten JavaScript-Funktion (User Defined Function, UDF) transformieren und das Ergebnis in BigQuery ausgeben können.

werden eine benutzerdefinierte Funktion, eine Eingabedatei und ein JSON-Schema hochgeladen.
  in einen Cloud Storage-Bucket. Ein DAG, der auf diese Dateien verweist, startet eine Dataflow-Batchpipeline, die die benutzerdefinierte Funktion und die JSON-Schemadatei auf die Eingabedatei anwendet. Anschließend werden diese Inhalte in eine BigQuery-Tabelle hochgeladen

Übersicht

  • Bevor Sie den Workflow starten, erstellen Sie die folgenden Entitäten:

    • Eine leere BigQuery-Tabelle aus einem leeren Dataset, die die folgenden Spalten mit Informationen enthält: location, average_temperature, month und optional inches_of_rain, is_current und latest_measurement.

    • Eine JSON-Datei, die die Daten aus der Datei .txt in das richtige Format für das Schema der BigQuery-Tabelle normalisiert, wird normalisiert. Das JSON-Objekt hat ein Array von BigQuery Schema, wobei enthält jedes Objekt einen Spaltennamen, den Eingabetyp und die Angabe, kein Pflichtfeld.

    • Eine .txt-Eingabedatei mit den Daten, die im Batch hochgeladen werden in die BigQuery-Tabelle.

    • Eine in JavaScript geschriebene benutzerdefinierte Funktion, die jede Zeile der .txt-Datei in die für unsere Tabelle relevanten Variablen umwandelt.

    • Eine Airflow-DAG-Datei, die auf den Speicherort dieser Dateien verweist.

  • Als Nächstes laden Sie die Datei .txt, die UDF-Datei .js und die Schemadatei .json in einen Cloud Storage-Bucket hoch. Außerdem laden Sie den DAG in Ihre Cloud Composer-Umgebung hoch.

  • Nachdem der DAG hochgeladen wurde, führt Airflow eine Aufgabe daraus aus. Diese Aufgabe startet eine Dataflow-Pipeline, die die benutzerdefinierte Funktion auf die .txt-Datei anwendet und sie gemäß dem JSON-Schema formatiert.

  • Schließlich werden die Daten in die BigQuery-Tabelle die Sie zuvor erstellt haben.

Hinweise

  • Damit Sie schreiben können, müssen Sie mit JavaScript vertraut sein. die benutzerdefinierte Funktion.
  • In diesem Leitfaden wird davon ausgegangen, dass Sie bereits eine Cloud Composer-Umgebung haben. Weitere Informationen finden Sie unter Umgebung erstellen. Sie können mit dieser Anleitung jede Version von Cloud Composer verwenden.
  • Enable the Cloud Composer, Dataflow, Cloud Storage, BigQuery APIs.

    Enable the APIs

Leere BigQuery-Tabelle mit einer Schemadefinition erstellen

Erstellen Sie eine BigQuery-Tabelle mit einer Schemadefinition. Ich wird diese Schemadefinition später in diesem Leitfaden verwenden. Dieses Die BigQuery-Tabelle enthält die Ergebnisse des Batch-Uploads.

So erstellen Sie eine leere Tabelle mit einer Schemadefinition:

Console

  1. Rufen Sie in der Google Cloud Console die Seite „BigQuery“ auf. Seite:

    BigQuery aufrufen

  2. Maximieren Sie im Navigationsbereich im Abschnitt Ressourcen Ihr Projekt.

  3. Klicken Sie im Detailbereich auf Dataset erstellen.

    Klicken Sie auf die Schaltfläche „Dataset erstellen“.

  4. Geben Sie auf der Seite „Dataset erstellen“ im Abschnitt Dataset-ID einen Namen für Ihren Dataset average_weather. Übernehmen Sie für alle anderen Felder die Standardwerte. Bundesstaat.

    Geben Sie als Dataset-ID den Namen "average_weather" ein.

  5. Klicken Sie auf Dataset erstellen.

  6. Kehren Sie zum Navigationsbereich zurück. Maximieren Sie im Abschnitt Ressourcen für Ihr Projekt. Klicken Sie dann auf das Dataset average_weather.

  7. Klicken Sie im Detailfeld auf Tabelle erstellen.

    Klicken Sie auf „Tabelle erstellen“.

  8. Wählen Sie auf der Seite Tabelle erstellen im Abschnitt Quelle die Option Leere Tabelle aus.

  9. Gehen Sie auf der Seite Create table (Tabelle erstellen) im Abschnitt Destination (Ziel) so vor:

    • Wählen Sie für Dataset-Name das Dataset average_weather aus.

      Wählen Sie die Dataset-Option für das Dataset "average_weather" aus.

    • Geben Sie im Feld Tabellenname den Namen average_weather ein.

    • Achten Sie darauf, dass der Tabellentyp auf Native Tabelle eingestellt ist.

  10. Geben Sie im Abschnitt Schema die Schemadefinition ein. Sie können einen der folgenden Ansätze verwenden:

    • Sie können Schemainformationen manuell eingeben, indem Sie Als Text bearbeiten aktivieren und das Tabellenschema als JSON-Array eingeben. Geben Sie Folgendes ein: Felder:

      [
          {
              "name": "location",
              "type": "GEOGRAPHY",
              "mode": "REQUIRED"
          },
          {
              "name": "average_temperature",
              "type": "INTEGER",
              "mode": "REQUIRED"
          },
          {
              "name": "month",
              "type": "STRING",
              "mode": "REQUIRED"
          },
          {
              "name": "inches_of_rain",
              "type": "NUMERIC"
          },
          {
              "name": "is_current",
              "type": "BOOLEAN"
          },
          {
              "name": "latest_measurement",
              "type": "DATE"
          }
      ]
      
    • Unter Feld hinzufügen können Sie das Schema manuell eingeben:

      Klicken Sie auf „Feld hinzufügen“, um die Felder einzugeben.

  11. Behalten Sie für Partitions- und Clustereinstellungen den Standardwert No partitioning bei.

  12. Lassen Sie im Abschnitt Erweiterte Optionen für Verschlüsselung das Feld Standardwert Google-managed key.

  13. Klicken Sie auf Tabelle erstellen.

bq

Verwenden Sie den Befehl bq mk, um ein leeres Dataset und eine Tabelle in diesem Dataset.

Führen Sie den folgenden Befehl aus, um ein Dataset mit dem durchschnittlichen globalen Wetter zu erstellen:

bq --location=LOCATION mk \
    --dataset PROJECT_ID:average_weather

Ersetzen Sie Folgendes:

  • LOCATION: die Region, in der sich die Umgebung befindet.
  • PROJECT_ID: die Projekt-ID.

Führen Sie den folgenden Befehl aus, um eine leere Tabelle in diesem Dataset mit Schemadefinition:

bq mk --table \
PROJECT_ID:average_weather.average_weather \
location:GEOGRAPHY,average_temperature:INTEGER,month:STRING,inches_of_rain:NUMERIC,is_current:BOOLEAN,latest_measurement:DATE

Nachdem die Tabelle erstellt wurde, können Sie die Ablaufzeit, die Beschreibung und die Labels der Tabelle aktualisieren. Ebenso können Sie die Schemadefinition ändern.

Python

Diesen Code speichern unter dataflowtemplateoperator_create_dataset_and_table_helper.py und aktualisieren Sie die darin enthaltenen Variablen, um Ihr Projekt und Ihren Standort widerzuspiegeln. führen Sie ihn mit dem folgenden Befehl aus:

python dataflowtemplateoperator_create_dataset_and_table_helper.py

Python

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Cloud Composer zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.


# Make sure to follow the quickstart setup instructions beforehand.
# See instructions here:
# https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries

# Before running the sample, be sure to install the bigquery library
# in your local environment by running pip install google.cloud.bigquery

from google.cloud import bigquery

# TODO(developer): Replace with your values
project = "your-project"  # Your GCP Project
location = "US"  # the location where you want your BigQuery data to reside. For more info on possible locations see https://cloud.google.com/bigquery/docs/locations
dataset_name = "average_weather"


def create_dataset_and_table(project, location, dataset_name):
    # Construct a BigQuery client object.
    client = bigquery.Client(project)

    dataset_id = f"{project}.{dataset_name}"

    # Construct a full Dataset object to send to the API.
    dataset = bigquery.Dataset(dataset_id)

    # Set the location to your desired location for the dataset.
    # For more information, see this link:
    # https://cloud.google.com/bigquery/docs/locations
    dataset.location = location

    # Send the dataset to the API for creation.
    # Raises google.api_core.exceptions.Conflict if the Dataset already
    # exists within the project.
    dataset = client.create_dataset(dataset)  # Make an API request.

    print(f"Created dataset {client.project}.{dataset.dataset_id}")

    # Create a table from this dataset.

    table_id = f"{client.project}.{dataset_name}.average_weather"

    schema = [
        bigquery.SchemaField("location", "GEOGRAPHY", mode="REQUIRED"),
        bigquery.SchemaField("average_temperature", "INTEGER", mode="REQUIRED"),
        bigquery.SchemaField("month", "STRING", mode="REQUIRED"),
        bigquery.SchemaField("inches_of_rain", "NUMERIC", mode="NULLABLE"),
        bigquery.SchemaField("is_current", "BOOLEAN", mode="NULLABLE"),
        bigquery.SchemaField("latest_measurement", "DATE", mode="NULLABLE"),
    ]

    table = bigquery.Table(table_id, schema=schema)
    table = client.create_table(table)  # Make an API request.
    print(f"Created table {table.project}.{table.dataset_id}.{table.table_id}")

Cloud Storage-Bucket erstellen

Erstellen Sie einen Bucket, der alle für den Workflow erforderlichen Dateien enthält. Der DAG, den Sie erstellt, wird auf die Dateien verwiesen, die Sie in dieses Storage-Bucket. So erstellen Sie einen neuen Storage-Bucket:

Console

  1. Öffnen Sie Cloud Storage in der Google Cloud Console.

    Cloud Storage aufrufen

  2. Klicken Sie auf Bucket erstellen, um das Formular zum Erstellen eines Buckets zu öffnen.

    1. Geben Sie die Bucket-Informationen ein und klicken Sie zum Ausführen der einzelnen Schritte jeweils auf Weiter:

      • Geben Sie einen global eindeutigen Namen für den Bucket an. In diesem Leitfaden werden bucketName als Beispiel.

      • Wählen Sie Region als Standorttyp aus. Wählen Sie als Nächstes einen Speicherort für die Bucket-Daten aus.

      • Wählen Sie Standard als Standard-Speicherklasse für Ihre Daten aus.

      • Wählen Sie Einheitliche Zugriffssteuerung für den Zugriff auf Ihre Objekte aus.

    2. Klicken Sie auf Fertig.

gcloud

Führen Sie den Befehl gcloud storage buckets create aus:

gcloud storage buckets create gs://bucketName/

Ersetzen Sie Folgendes:

  • bucketName: der Name des Buckets, den Sie zuvor in diesem Abschnitt erstellt haben. .

Codebeispiele

C#

Richten Sie Standardanmeldedaten für Anwendungen ein, um sich bei Cloud Composer zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.


using Google.Apis.Storage.v1.Data;
using Google.Cloud.Storage.V1;
using System;

public class CreateBucketSample
{
    public Bucket CreateBucket(
        string projectId = "your-project-id",
        string bucketName = "your-unique-bucket-name")
    {
        var storage = StorageClient.Create();
        var bucket = storage.CreateBucket(projectId, bucketName);
        Console.WriteLine($"Created {bucketName}.");
        return bucket;
    }
}

Go

Richten Sie Standardanmeldedaten für Anwendungen ein, um sich bei Cloud Composer zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/storage"
)

// createBucket creates a new bucket in the project.
func createBucket(w io.Writer, projectID, bucketName string) error {
	// projectID := "my-project-id"
	// bucketName := "bucket-name"
	ctx := context.Background()
	client, err := storage.NewClient(ctx)
	if err != nil {
		return fmt.Errorf("storage.NewClient: %w", err)
	}
	defer client.Close()

	ctx, cancel := context.WithTimeout(ctx, time.Second*30)
	defer cancel()

	bucket := client.Bucket(bucketName)
	if err := bucket.Create(ctx, projectID, nil); err != nil {
		return fmt.Errorf("Bucket(%q).Create: %w", bucketName, err)
	}
	fmt.Fprintf(w, "Bucket %v created\n", bucketName)
	return nil
}

Java

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Cloud Composer zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;

public class CreateBucket {
  public static void createBucket(String projectId, String bucketName) {
    // The ID of your GCP project
    // String projectId = "your-project-id";

    // The ID to give your GCS bucket
    // String bucketName = "your-unique-bucket-name";

    Storage storage = StorageOptions.newBuilder().setProjectId(projectId).build().getService();

    Bucket bucket = storage.create(BucketInfo.newBuilder(bucketName).build());

    System.out.println("Created bucket " + bucket.getName());
  }
}

Python

Richten Sie Standardanmeldedaten für Anwendungen ein, um sich bei Cloud Composer zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

from google.cloud import storage


def create_bucket(bucket_name):
    """Creates a new bucket."""
    # bucket_name = "your-new-bucket-name"

    storage_client = storage.Client()

    bucket = storage_client.create_bucket(bucket_name)

    print(f"Bucket {bucket.name} created")

Ruby

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Cloud Composer zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

def create_bucket bucket_name:
  # The ID to give your GCS bucket
  # bucket_name = "your-unique-bucket-name"

  require "google/cloud/storage"

  storage = Google::Cloud::Storage.new
  bucket  = storage.create_bucket bucket_name

  puts "Created bucket: #{bucket.name}"
end

BigQuery-Schema im JSON-Format für Ihre Ausgabetabelle erstellen

Erstellen Sie eine JSON-formatierte BigQuery-Schemadatei, die der zuvor erstellten Ausgabetabelle entspricht. Die Feldnamen, -typen und -modi müssen mit denen übereinstimmen, die zuvor in der BigQuery-Tabelle definiert wurden Schema. Durch diese Datei werden die Daten aus der Datei .txt in ein Format umgewandelt. sind mit Ihrem BigQuery-Schema kompatibel. Benennen Sie diese Datei jsonSchema.json.

{
    "BigQuery Schema": [
    {
        "name": "location",
        "type": "GEOGRAPHY",
        "mode": "REQUIRED"
    },
    {
        "name": "average_temperature",
        "type": "INTEGER",
        "mode": "REQUIRED"
    },
    {
        "name": "month",
        "type": "STRING",
        "mode": "REQUIRED"
    },
    {
        "name": "inches_of_rain",
        "type": "NUMERIC"
    },
    {
        "name": "is_current",
        "type": "BOOLEAN"
    },
    {
        "name": "latest_measurement",
        "type": "DATE"
    }]
}

JavaScript-Datei erstellen, um Ihre Daten zu formatieren

In dieser Datei definieren Sie Ihre benutzerdefinierte Funktion, die die Logik zur Transformation der Textzeilen in Ihrer Eingabedatei. Beachten Sie, dass dies verwendet jede Textzeile in Ihrer Eingabedatei als eigenes Argument, wird die Funktion für jede Zeile Ihrer Eingabedatei einmal ausgeführt. Datei benennen transformCSVtoJSON.js


function transformCSVtoJSON(line) {
  var values = line.split(',');
  var properties = [
    'location',
    'average_temperature',
    'month',
    'inches_of_rain',
    'is_current',
    'latest_measurement',
  ];
  var weatherInCity = {};

  for (var count = 0; count < values.length; count++) {
    if (values[count] !== 'null') {
      weatherInCity[properties[count]] = values[count];
    }
  }

  return JSON.stringify(weatherInCity);
}

Eingabedatei erstellen

Diese Datei enthält die Informationen, die Sie in Ihre BigQuery-Tabelle hochladen möchten. Kopieren Sie diese Datei lokal und benennen Sie sie inputFile.txt.

POINT(40.7128 74.006),45,'July',null,true,2020-02-16
POINT(41.8781 87.6298),23,'October',13,false,2015-02-13
POINT(48.8566 2.3522),80,'December',null,true,null
POINT(6.5244 3.3792),15,'March',14,true,null

Dateien in den Bucket hochladen

Laden Sie die folgenden Dateien in den von Ihnen erstellten Cloud Storage-Bucket hoch früher:

  • JSON-formatiertes BigQuery-Schema (.json)
  • Benutzerdefinierte JavaScript-Funktion (transformCSVtoJSON.js)
  • Die Eingabedatei für den zu verarbeitenden Text (.txt)

Console

  1. Wechseln Sie in der Cloud Console zur Seite Cloud Storage-Buckets.

    Buckets aufrufen

  2. Klicken Sie in der Liste der Buckets auf den gewünschten Bucket.

  3. Führen Sie auf dem Tab „Objekte“ für den Bucket einen der folgenden Schritte aus:

    • Fügen Sie die gewünschten Dateien per Drag-and-drop von Ihrem Desktop oder Dateimanager in den Hauptbereich der Google Cloud Console.

    • Klicken Sie auf die Schaltfläche Dateien hochladen und wählen Sie die Dateien aus, die Sie hochladen möchten. und klicken Sie auf Öffnen.

gcloud

Führen Sie den Befehl gcloud storage cp aus:

gcloud storage cp OBJECT_LOCATION gs://bucketName

Ersetzen Sie Folgendes:

  • bucketName: der Name des Buckets, den Sie zuvor in diesem Leitfaden erstellt haben.
  • OBJECT_LOCATION: der lokale Pfad zu Ihrem Objekt. Beispiel: Desktop/transformCSVtoJSON.js

Codebeispiele

Python

Richten Sie Standardanmeldedaten für Anwendungen ein, um sich bei Cloud Composer zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

from google.cloud import storage


def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    # The ID of your GCS bucket
    # bucket_name = "your-bucket-name"
    # The path to your file to upload
    # source_file_name = "local/path/to/file"
    # The ID of your GCS object
    # destination_blob_name = "storage-object-name"

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    # Optional: set a generation-match precondition to avoid potential race conditions
    # and data corruptions. The request to upload is aborted if the object's
    # generation number does not match your precondition. For a destination
    # object that does not yet exist, set the if_generation_match precondition to 0.
    # If the destination object already exists in your bucket, set instead a
    # generation-match precondition using its generation number.
    generation_match_precondition = 0

    blob.upload_from_filename(source_file_name, if_generation_match=generation_match_precondition)

    print(
        f"File {source_file_name} uploaded to {destination_blob_name}."
    )

Ruby

Richten Sie Standardanmeldedaten für Anwendungen ein, um sich bei Cloud Composer zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

def upload_file bucket_name:, local_file_path:, file_name: nil
  # The ID of your GCS bucket
  # bucket_name = "your-unique-bucket-name"

  # The path to your file to upload
  # local_file_path = "/local/path/to/file.txt"

  # The ID of your GCS object
  # file_name = "your-file-name"

  require "google/cloud/storage"

  storage = Google::Cloud::Storage.new
  bucket  = storage.bucket bucket_name, skip_lookup: true

  file = bucket.create_file local_file_path, file_name

  puts "Uploaded #{local_file_path} as #{file.name} in bucket #{bucket_name}"
end

DataflowTemplateOperator konfigurieren

Bevor Sie den DAG ausführen, legen Sie die folgenden Airflow-Variablen fest.

Airflow-Variable Wert
project_id Projekt-ID
gce_zone Compute Engine-Zone, in der sich der Dataflow-Cluster befindet müssen erstellt werden
bucket_path Der Speicherort des von Ihnen erstellten Cloud Storage-Bucket früher

Nun verweisen Sie auf die Dateien, die Sie zuvor erstellt haben, um einen DAG zu erstellen, mit dem der Dataflow-Workflow gestartet wird. Kopieren Sie diesen DAG und speichern Sie ihn lokal als composer-dataflow-dag.py.

Airflow 2



"""Example Airflow DAG that creates a Cloud Dataflow workflow which takes a
text file and adds the rows to a BigQuery table.

This DAG relies on four Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id - Google Cloud Project ID to use for the Cloud Dataflow cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataflow cluster should be
  created.
For more info on zones where Dataflow is available see:
https://cloud.google.com/dataflow/docs/resources/locations
* bucket_path - Google Cloud Storage bucket where you've stored the User Defined
Function (.js), the input file (.txt), and the JSON schema (.json).
"""

import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataflow import (
    DataflowTemplatedJobStartOperator,
)
from airflow.utils.dates import days_ago

bucket_path = "{{var.value.bucket_path}}"
project_id = "{{var.value.project_id}}"
gce_zone = "{{var.value.gce_zone}}"


default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "dataflow_default_options": {
        "project": project_id,
        # Set to your zone
        "zone": gce_zone,
        # This is a subfolder for storing temporary files, like the staged pipeline job.
        "tempLocation": bucket_path + "/tmp/",
    },
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    # The id you will see in the DAG airflow page
    "composer_dataflow_dag",
    default_args=default_args,
    # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:
    start_template_job = DataflowTemplatedJobStartOperator(
        # The task id of your job
        task_id="dataflow_operator_transform_csv_to_bq",
        # The name of the template that you're using.
        # Below is a list of all the templates you can use.
        # For versions in non-production environments, use the subfolder 'latest'
        # https://cloud.google.com/dataflow/docs/guides/templates/provided-batch#gcstexttobigquery
        template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
        # Use the link above to specify the correct parameters for your template.
        parameters={
            "javascriptTextTransformFunctionName": "transformCSVtoJSON",
            "JSONPath": bucket_path + "/jsonSchema.json",
            "javascriptTextTransformGcsPath": bucket_path + "/transformCSVtoJSON.js",
            "inputFilePattern": bucket_path + "/inputFile.txt",
            "outputTable": project_id + ":average_weather.average_weather",
            "bigQueryLoadingTemporaryDirectory": bucket_path + "/tmp/",
        },
    )

Airflow 1



"""Example Airflow DAG that creates a Cloud Dataflow workflow which takes a
text file and adds the rows to a BigQuery table.

This DAG relies on four Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id - Google Cloud Project ID to use for the Cloud Dataflow cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataflow cluster should be
  created.
  created.
Learn more about the difference between the two here:
https://cloud.google.com/compute/docs/regions-zones
* bucket_path - Google Cloud Storage bucket where you've stored the User Defined
Function (.js), the input file (.txt), and the JSON schema (.json).
"""

import datetime

from airflow import models
from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator
from airflow.utils.dates import days_ago

bucket_path = "{{var.value.bucket_path}}"
project_id = "{{var.value.project_id}}"
gce_zone = "{{var.value.gce_zone}}"


default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "dataflow_default_options": {
        "project": project_id,
        # Set to your zone
        "zone": gce_zone,
        # This is a subfolder for storing temporary files, like the staged pipeline job.
        "tempLocation": bucket_path + "/tmp/",
    },
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    # The id you will see in the DAG airflow page
    "composer_dataflow_dag",
    default_args=default_args,
    # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:
    start_template_job = DataflowTemplateOperator(
        # The task id of your job
        task_id="dataflow_operator_transform_csv_to_bq",
        # The name of the template that you're using.
        # Below is a list of all the templates you can use.
        # For versions in non-production environments, use the subfolder 'latest'
        # https://cloud.google.com/dataflow/docs/guides/templates/provided-batch#gcstexttobigquery
        template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
        # Use the link above to specify the correct parameters for your template.
        parameters={
            "javascriptTextTransformFunctionName": "transformCSVtoJSON",
            "JSONPath": bucket_path + "/jsonSchema.json",
            "javascriptTextTransformGcsPath": bucket_path + "/transformCSVtoJSON.js",
            "inputFilePattern": bucket_path + "/inputFile.txt",
            "outputTable": project_id + ":average_weather.average_weather",
            "bigQueryLoadingTemporaryDirectory": bucket_path + "/tmp/",
        },
    )

DAG in Cloud Storage hochladen

Laden Sie den DAG in den Ordner /dags der Umgebung hoch. Bucket. Sobald der Upload erfolgreich abgeschlossen wurde, können Sie ihn in der Cloud Composer-Umgebung auf den Link DAGs-Ordner sehen.

Der DAGs-Ordner in Ihrer Umgebung enthält Ihren DAG.

Aufgabenstatus ansehen

  1. Rufen Sie die Airflow-Weboberfläche auf.
  2. Klicken Sie auf der Seite „DAGs“ auf den DAG-Namen, z. B. composerDataflowDAG.
  3. Klicken Sie auf der DAGs-Detailseite auf Grafikansicht.
  4. Prüfen Sie den Status:

    • Failed: Die Aufgabe wird in einem roten Kästchen angezeigt. Sie können auch den Mauszeiger auf eine Aufgabe bewegen und nach Status: Fehlgeschlagen suchen.

    • Success: Die Aufgabe wird von einem grünen Feld umgeben. Sie können auch den Mauszeiger über die Aufgabe halten und nach State: Success suchen.

Nach einigen Minuten können Sie die Ergebnisse in Dataflow und BigQuery prüfen.

Job in Dataflow ansehen

  1. Rufen Sie in der Google Cloud Console die Seite Dataflow auf.

    Zu Dataflow

  2. Der Job hat den Namen „dataflow_operator_transform_csv_to_bq“ mit einer eindeutigen ID am Ende des Namens durch einen Bindestrich hinzugefügt. Beispiel:

    der Dataflow-Job hat eine eindeutige ID

  3. Klicken Sie auf den Namen, um die Informationen zum Stellenangebot.

    alle Jobdetails ansehen

Ergebnisse in BigQuery anzeigen

  1. Öffnen Sie in der Google Cloud Console die Seite BigQuery.

    BigQuery aufrufen

  2. Sie können Abfragen mit Standard-SQL senden. Verwenden Sie die folgende Abfrage, um die Zeilen anzuzeigen, die Ihrer Tabelle hinzugefügt wurden:

    SELECT * FROM projectId.average_weather.average_weather