Importations en flux continu

Cloud Storage permet de diffuser des données en flux continu dans un bucket sans avoir à les enregistrer au préalable dans un fichier. Cela peut s'avérer utile lorsque vous souhaitez importer des données sans en connaître la taille finale au début de l'importation, par exemple lorsque vous générez des données d'importation à partir d'un processus, ou lorsque vous compressez un objet à la volée.

Utiliser la validation de somme de contrôle lors de l'insertion en flux continu

Étant donné qu'une somme de contrôle ne peut être fournie que dans la requête initiale d'une importation, il est souvent impossible d'utiliser la validation de somme de contrôle de Cloud Storage lors de l'insertion en flux continu. Il est recommandé de toujours effectuer la validation de la somme de contrôle, et vous pouvez le faire manuellement une fois l'importation en flux continu terminée. Toutefois, la validation une fois le transfert terminé signifie que toutes les données corrompues sont accessibles pendant le délai nécessaire à la confirmation de la corruption et à leur suppression.

Si vous souhaitez exiger une validation de la somme de contrôle avant de conclure l'importation des données et leur mise à disposition, vous ne devez pas utiliser d'importation en flux continu. Vous devez choisir une autre option d'importation, qui effectue la validation de la somme de contrôle avant de finaliser l'objet.

Prérequis

Les conditions préalables peuvent varier en fonction de l'outil utilisé :

Console

Pour suivre ce guide à l'aide de Google Cloud Console, vous devez disposer des autorisations IAM appropriées. Si le bucket auquel vous souhaitez accéder en streaming existe dans un projet que vous n'avez pas créé, vous devrez peut-être demander au propriétaire du projet qu'il vous attribue un rôle disposant des autorisations nécessaires.

Pour obtenir la liste des autorisations requises pour des actions spécifiques, consultez la page Autorisations IAM pour Google Cloud Console.

Pour obtenir la liste des rôles pertinents, consultez la page Rôles Cloud Storage. Vous pouvez également créer un rôle personnalisé disposant d'autorisations limitées spécifiques.

Ligne de commande

Pour suivre ce guide à l'aide d'un utilitaire de ligne de commande, vous devez disposer des autorisations IAM appropriées. Si le bucket cible pour les transferts en flux continu existe dans un projet que vous n'avez pas créé, vous devrez peut-être demander au propriétaire du projet qu'il vous attribue un rôle disposant des autorisations nécessaires.

Pour obtenir la liste des autorisations requises pour des actions spécifiques, consultez la page Autorisations IAM pour les commandes gcloud storage.

Pour obtenir la liste des rôles pertinents, consultez la page Rôles Cloud Storage. Vous pouvez également créer un rôle personnalisé disposant d'autorisations limitées spécifiques.

Bibliothèques clientes

Pour suivre ce guide à l'aide des bibliothèques clientes Cloud Storage, vous devez disposer des autorisations IAM appropriées. Si le bucket auquel vous souhaitez accéder en streaming existe dans un projet que vous n'avez pas créé, vous devrez peut-être demander au propriétaire du projet qu'il vous attribue un rôle disposant des autorisations nécessaires.

Sauf indication contraire, les requêtes de bibliothèque cliente sont effectuées via l'API JSON et nécessitent des autorisations, comme indiqué dans la section Autorisations IAM pour les méthodes JSON. Pour savoir quelles méthodes d'API JSON sont appelées lorsque vous envoyez des requêtes à l'aide d'une bibliothèque cliente, consignez les requêtes brutes.

Pour obtenir la liste des rôles IAM pertinents, consultez la page Rôles Cloud Storage. Vous pouvez également créer un rôle personnalisé disposant d'autorisations limitées spécifiques.

API REST

API JSON

Pour suivre ce guide à l'aide de l'API JSON, vous devez disposer des autorisations IAM appropriées. Si le bucket auquel vous souhaitez accéder en streaming existe dans un projet que vous n'avez pas créé, vous devrez peut-être demander au propriétaire du projet qu'il vous attribue un rôle disposant des autorisations nécessaires.

Pour obtenir la liste des autorisations requises pour des actions spécifiques, consultez la page Autorisations IAM pour les méthodes JSON.

Pour obtenir la liste des rôles pertinents, consultez la page Rôles Cloud Storage. Vous pouvez également créer un rôle personnalisé disposant d'autorisations limitées spécifiques.

Diffuser une importation

Les exemples suivants montrent comment effectuer une importation en flux continu d'un processus vers un objet Cloud Storage :

Console

La console Google Cloud n'accepte pas les importations en flux continu. Utilisez plutôt gcloud CLI.

Ligne de commande

  1. Dirigez les données vers la commande gcloud storage cp et utilisez un tiret pour l'URL source :

    PROCESS_NAME | gcloud storage cp - gs://BUCKET_NAME/OBJECT_NAME

    Où :

    • PROCESS_NAME est le nom du processus à partir duquel vous collectez des données. Exemple : collect_measurements.
    • BUCKET_NAME est le nom du bucket contenant l'objet. Par exemple, my_app_bucket.
    • OBJECT_NAME est le nom de l'objet créé à partir des données. Par exemple, data_measurements.

Bibliothèques clientes

C++

Pour en savoir plus, consultez la documentation de référence de l'API Cloud Storage en langage C++.

Pour vous authentifier auprès de Cloud Storage, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

namespace gcs = ::google::cloud::storage;
using ::google::cloud::StatusOr;
[](gcs::Client client, std::string const& bucket_name,
   std::string const& object_name, int desired_line_count) {
  std::string const text = "Lorem ipsum dolor sit amet";
  gcs::ObjectWriteStream stream =
      client.WriteObject(bucket_name, object_name);

  for (int lineno = 0; lineno != desired_line_count; ++lineno) {
    // Add 1 to the counter, because it is conventional to number lines
    // starting at 1.
    stream << (lineno + 1) << ": " << text << "\n";
  }

  stream.Close();

  StatusOr<gcs::ObjectMetadata> metadata = std::move(stream).metadata();
  if (!metadata) throw std::move(metadata).status();
  std::cout << "Successfully wrote to object " << metadata->name()
            << " its size is: " << metadata->size()
            << "\nFull metadata: " << *metadata << "\n";
}

C#

Pour en savoir plus, consultez la documentation de référence de l'API Cloud Storage en langage C#.

Pour vous authentifier auprès de Cloud Storage, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


using Google.Cloud.Storage.V1;
using System;
using System.IO;

public class UploadFileSample
{
    public void UploadFile(
        string bucketName = "your-unique-bucket-name",
        string localPath = "my-local-path/my-file-name",
        string objectName = "my-file-name")
    {
        var storage = StorageClient.Create();
        using var fileStream = File.OpenRead(localPath);
        storage.UploadObject(bucketName, objectName, null, fileStream);
        Console.WriteLine($"Uploaded {objectName}.");
    }
}

Go

Pour en savoir plus, consultez la documentation de référence de l'API Cloud Storage en langage Go.

Pour vous authentifier auprès de Cloud Storage, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

import (
	"bytes"
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/storage"
)

// streamFileUpload uploads an object via a stream.
func streamFileUpload(w io.Writer, bucket, object string) error {
	// bucket := "bucket-name"
	// object := "object-name"
	ctx := context.Background()
	client, err := storage.NewClient(ctx)
	if err != nil {
		return fmt.Errorf("storage.NewClient: %w", err)
	}
	defer client.Close()

	b := []byte("Hello world.")
	buf := bytes.NewBuffer(b)

	ctx, cancel := context.WithTimeout(ctx, time.Second*50)
	defer cancel()

	// Upload an object with storage.Writer.
	wc := client.Bucket(bucket).Object(object).NewWriter(ctx)
	wc.ChunkSize = 0 // note retries are not supported for chunk size 0.

	if _, err = io.Copy(wc, buf); err != nil {
		return fmt.Errorf("io.Copy: %w", err)
	}
	// Data can continue to be added to the file until the writer is closed.
	if err := wc.Close(); err != nil {
		return fmt.Errorf("Writer.Close: %w", err)
	}
	fmt.Fprintf(w, "%v uploaded to %v.\n", object, bucket)

	return nil
}

Java

Pour en savoir plus, consultez la documentation de référence de l'API Cloud Storage en langage Java.

Pour vous authentifier auprès de Cloud Storage, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


import com.google.cloud.WriteChannel;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

public class StreamObjectUpload {

  public static void streamObjectUpload(
      String projectId, String bucketName, String objectName, String contents) throws IOException {
    // The ID of your GCP project
    // String projectId = "your-project-id";

    // The ID of your GCS bucket
    // String bucketName = "your-unique-bucket-name";

    // The ID of your GCS object
    // String objectName = "your-object-name";

    // The string of contents you wish to upload
    // String contents = "Hello world!";

    Storage storage = StorageOptions.newBuilder().setProjectId(projectId).build().getService();
    BlobId blobId = BlobId.of(bucketName, objectName);
    BlobInfo blobInfo = BlobInfo.newBuilder(blobId).build();
    byte[] content = contents.getBytes(StandardCharsets.UTF_8);
    try (WriteChannel writer = storage.writer(blobInfo)) {
      writer.write(ByteBuffer.wrap(content));
      System.out.println(
          "Wrote to " + objectName + " in bucket " + bucketName + " using a WriteChannel.");
    }
  }
}

Node.js

Pour en savoir plus, consultez la documentation de référence de l'API Cloud Storage en langage Node.js.

Pour vous authentifier auprès de Cloud Storage, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

/**
 * TODO(developer): Uncomment the following lines before running the sample
 */
// The ID of your GCS bucket
// const bucketName = 'your-unique-bucket-name';

// The new ID for your GCS file
// const destFileName = 'your-new-file-name';

// The content to be uploaded in the GCS file
// const contents = 'your file content';

// Imports the Google Cloud client library
const {Storage} = require('@google-cloud/storage');

// Import Node.js stream
const stream = require('stream');

// Creates a client
const storage = new Storage();

// Get a reference to the bucket
const myBucket = storage.bucket(bucketName);

// Create a reference to a file object
const file = myBucket.file(destFileName);

// Create a pass through stream from a string
const passthroughStream = new stream.PassThrough();
passthroughStream.write(contents);
passthroughStream.end();

async function streamFileUpload() {
  passthroughStream.pipe(file.createWriteStream()).on('finish', () => {
    // The file upload is complete
  });

  console.log(`${destFileName} uploaded to ${bucketName}`);
}

streamFileUpload().catch(console.error);

PHP

Pour en savoir plus, consultez la documentation de référence de l'API Cloud Storage en langage PHP.

Pour vous authentifier auprès de Cloud Storage, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

use Google\Cloud\Storage\StorageClient;
use Google\Cloud\Storage\WriteStream;

/**
 * Upload a chunked file stream.
 *
 * @param string $bucketName The name of your Cloud Storage bucket.
 *        (e.g. 'my-bucket')
 * @param string $objectName The name of your Cloud Storage object.
 *        (e.g. 'my-object')
 * @param string $contents The contents to upload via stream chunks.
 *        (e.g. 'these are my contents')
 */
function upload_object_stream(string $bucketName, string $objectName, string $contents): void
{
    $storage = new StorageClient();
    $bucket = $storage->bucket($bucketName);
    $writeStream = new WriteStream(null, [
        'chunkSize' => 1024 * 256, // 256KB
    ]);
    $uploader = $bucket->getStreamableUploader($writeStream, [
        'name' => $objectName,
    ]);
    $writeStream->setUploader($uploader);
    $stream = fopen('data://text/plain,' . $contents, 'r');
    while (($line = stream_get_line($stream, 1024 * 256)) !== false) {
        $writeStream->write($line);
    }
    $writeStream->close();

    printf('Uploaded %s to gs://%s/%s' . PHP_EOL, $contents, $bucketName, $objectName);
}

Python

Pour en savoir plus, consultez la documentation de référence de l'API Cloud Storage en langage Python.

Pour vous authentifier auprès de Cloud Storage, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

from google.cloud import storage


def upload_blob_from_stream(bucket_name, file_obj, destination_blob_name):
    """Uploads bytes from a stream or other file-like object to a blob."""
    # The ID of your GCS bucket
    # bucket_name = "your-bucket-name"

    # The stream or file (file-like object) from which to read
    # import io
    # file_obj = io.BytesIO()
    # file_obj.write(b"This is test data.")

    # The desired name of the uploaded GCS object (blob)
    # destination_blob_name = "storage-object-name"

    # Construct a client-side representation of the blob.
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    # Rewind the stream to the beginning. This step can be omitted if the input
    # stream will always be at a correct position.
    file_obj.seek(0)

    # Upload data from the stream to your bucket.
    blob.upload_from_file(file_obj)

    print(
        f"Stream data uploaded to {destination_blob_name} in bucket {bucket_name}."
    )

Ruby

Pour en savoir plus, consultez la documentation de référence de l'API Cloud Storage en langage Ruby.

Pour vous authentifier auprès de Cloud Storage, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


# The ID of your GCS bucket
# bucket_name = "your-unique-bucket-name"

# The stream or file (file-like object) from which to read
# local_file_obj = StringIO.new "This is test data."

# Name of a file in the Storage bucket
# file_name   = "some_file.txt"

require "google/cloud/storage"

storage = Google::Cloud::Storage.new
bucket  = storage.bucket bucket_name

local_file_obj.rewind
bucket.create_file local_file_obj, file_name

puts "Stream data uploaded to #{file_name} in bucket #{bucket_name}"

API REST

API JSON

Pour effectuer une importation en flux continu, suivez les instructions permettant d'effectuer une importation avec reprise, en prenant en compte les éléments suivants :

  • Lorsque vous importez les données du fichier, utilisez une importation par fragments.

  • Comme vous ne connaissez pas la taille totale du fichier tant que vous n'avez pas atteint le fragment final, utilisez la valeur * pour la taille totale du fichier dans l'en-tête Content-Range des fragments intermédiaires.

    Par exemple, si le premier fragment importé a une taille de 512 Ko, l'en-tête Content-Range du fragment est bytes 0-524287/*. S'il reste 64 000 octets après le premier fragment, vous envoyez un fragment final contenant les octets restants et ayant un en-tête Content-Range avec la valeur bytes 524288-588287/588288.

API XML

Pour effectuer une importation en flux continu, utilisez l'une des méthodes suivantes :

  • Importation en plusieurs parties de l'API XML

  • Importation avec reprise, avec les ajustements suivants :

    • Lorsque vous importez les données du fichier, utilisez une importation par fragments.

    • Comme vous ne connaissez pas la taille totale du fichier tant que vous n'avez pas atteint le fragment final, utilisez la valeur * pour la taille totale du fichier dans l'en-tête Content-Range des fragments intermédiaires.

      Par exemple, si le premier fragment importé a une taille de 512 Kio, l'en-tête Content-Range du fragment est bytes 0-524287/*. S'il reste 64 000 octets après le premier fragment, vous envoyez un fragment final contenant les octets restants et ayant un en-tête Content-Range avec la valeur bytes 524288-588287/588288.

Étapes suivantes