Importations en flux continu

Cloud Storage permet de diffuser des données en flux continu dans un bucket sans avoir à les enregistrer au préalable dans un fichier. Cela peut s'avérer utile lorsque vous souhaitez importer des données sans en connaître la taille finale au début de l'importation, par exemple lorsque vous générez des données d'importation à partir d'un processus, ou lorsque vous compressez un objet à la volée.

Utiliser la validation de somme de contrôle lors de l'insertion en flux continu

Étant donné qu'une somme de contrôle ne peut être fournie que dans la requête initiale d'une importation, il est souvent impossible d'utiliser la validation de somme de contrôle de Cloud Storage lors de l'insertion en flux continu. Il est recommandé de toujours effectuer la validation de la somme de contrôle, et vous pouvez le faire manuellement une fois l'importation en flux continu terminée. Toutefois, la validation une fois le transfert terminé signifie que toutes les données corrompues sont accessibles pendant le délai nécessaire à la confirmation de la corruption et à leur suppression.

Si vous souhaitez exiger une validation de la somme de contrôle avant de conclure l'importation des données et leur mise à disposition, vous ne devez pas utiliser d'importation en flux continu. Vous devez choisir une autre option d'importation, qui effectue la validation de la somme de contrôle avant de finaliser l'objet.

Rôles requis

Pour obtenir les autorisations nécessaires pour diffuser des importations, demandez à votre administrateur de vous attribuer l'un des rôles suivants:

  • Pour les importations qui incluent un verrou de conservation des objets, demandez à votre administrateur de vous attribuer le rôle IAM d'administrateur des objets de l'espace de stockage (roles/storage.objectAdmin) pour le bucket.

  • Dans tous les autres cas, demandez à votre administrateur de vous attribuer le rôle IAM "Utilisateur des objets de l'espace de stockage" (roles/storage.objectUser) pour le bucket.

Ces rôles prédéfinis contiennent les autorisations requises pour diffuser des importations dans Cloud Storage. Pour afficher les autorisations exactes requises, développez la section Autorisations requises :

Autorisations requises

  • storage.objects.create
  • storage.objects.delete
    • Cette autorisation n'est requise que pour les importations qui écrasent un objet existant.
  • storage.objects.list
    • Cette autorisation n'est requise que pour exécuter les instructions de cette page à l'aide de Google Cloud CLI.
  • storage.objects.setRetention
    • Cette autorisation n'est requise que pour les importations comprenant un verrou de conservation des objets.

Vous pouvez également obtenir ces autorisations en utilisant d'autres rôles prédéfinis ou des rôles personnalisés.

Pour en savoir plus sur l'attribution de rôles dans des buckets, consultez la page Utiliser IAM avec des buckets.

Diffuser une importation

Les exemples suivants montrent comment effectuer une importation en flux continu d'un processus vers un objet Cloud Storage :

Console

La console Google Cloud n'accepte pas les importations en flux continu. Utilisez plutôt gcloud CLI.

Ligne de commande

  1. Dirigez les données vers la commande gcloud storage cp et utilisez un tiret pour l'URL source :

    PROCESS_NAME | gcloud storage cp - gs://BUCKET_NAME/OBJECT_NAME

    Où :

    • PROCESS_NAME est le nom du processus à partir duquel vous collectez des données. Exemple : collect_measurements.
    • BUCKET_NAME est le nom du bucket contenant l'objet. Par exemple, my_app_bucket.
    • OBJECT_NAME est le nom de l'objet créé à partir des données. Par exemple, data_measurements.

Bibliothèques clientes

C++

Pour en savoir plus, consultez la documentation de référence de l'API Cloud Storage en langage C++.

Pour vous authentifier auprès de Cloud Storage, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez la page Configurer l'authentification pour les bibliothèques clientes.

namespace gcs = ::google::cloud::storage;
using ::google::cloud::StatusOr;
[](gcs::Client client, std::string const& bucket_name,
   std::string const& object_name, int desired_line_count) {
  std::string const text = "Lorem ipsum dolor sit amet";
  gcs::ObjectWriteStream stream =
      client.WriteObject(bucket_name, object_name);

  for (int lineno = 0; lineno != desired_line_count; ++lineno) {
    // Add 1 to the counter, because it is conventional to number lines
    // starting at 1.
    stream << (lineno + 1) << ": " << text << "\n";
  }

  stream.Close();

  StatusOr<gcs::ObjectMetadata> metadata = std::move(stream).metadata();
  if (!metadata) throw std::move(metadata).status();
  std::cout << "Successfully wrote to object " << metadata->name()
            << " its size is: " << metadata->size()
            << "\nFull metadata: " << *metadata << "\n";
}

C#

Pour en savoir plus, consultez la documentation de référence de l'API Cloud Storage en langage C#.

Pour vous authentifier auprès de Cloud Storage, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez la page Configurer l'authentification pour les bibliothèques clientes.


using Google.Cloud.Storage.V1;
using System;
using System.IO;

public class UploadFileSample
{
    public void UploadFile(
        string bucketName = "your-unique-bucket-name",
        string localPath = "my-local-path/my-file-name",
        string objectName = "my-file-name")
    {
        var storage = StorageClient.Create();
        using var fileStream = File.OpenRead(localPath);
        storage.UploadObject(bucketName, objectName, null, fileStream);
        Console.WriteLine($"Uploaded {objectName}.");
    }
}

Go

Pour en savoir plus, consultez la documentation de référence de l'API Cloud Storage en langage Go.

Pour vous authentifier auprès de Cloud Storage, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez la page Configurer l'authentification pour les bibliothèques clientes.

import (
	"bytes"
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/storage"
)

// streamFileUpload uploads an object via a stream.
func streamFileUpload(w io.Writer, bucket, object string) error {
	// bucket := "bucket-name"
	// object := "object-name"
	ctx := context.Background()
	client, err := storage.NewClient(ctx)
	if err != nil {
		return fmt.Errorf("storage.NewClient: %w", err)
	}
	defer client.Close()

	b := []byte("Hello world.")
	buf := bytes.NewBuffer(b)

	ctx, cancel := context.WithTimeout(ctx, time.Second*50)
	defer cancel()

	// Upload an object with storage.Writer.
	wc := client.Bucket(bucket).Object(object).NewWriter(ctx)
	wc.ChunkSize = 0 // note retries are not supported for chunk size 0.

	if _, err = io.Copy(wc, buf); err != nil {
		return fmt.Errorf("io.Copy: %w", err)
	}
	// Data can continue to be added to the file until the writer is closed.
	if err := wc.Close(); err != nil {
		return fmt.Errorf("Writer.Close: %w", err)
	}
	fmt.Fprintf(w, "%v uploaded to %v.\n", object, bucket)

	return nil
}

Java

Pour en savoir plus, consultez la documentation de référence de l'API Cloud Storage en langage Java.

Pour vous authentifier auprès de Cloud Storage, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez la page Configurer l'authentification pour les bibliothèques clientes.


import com.google.cloud.WriteChannel;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

public class StreamObjectUpload {

  public static void streamObjectUpload(
      String projectId, String bucketName, String objectName, String contents) throws IOException {
    // The ID of your GCP project
    // String projectId = "your-project-id";

    // The ID of your GCS bucket
    // String bucketName = "your-unique-bucket-name";

    // The ID of your GCS object
    // String objectName = "your-object-name";

    // The string of contents you wish to upload
    // String contents = "Hello world!";

    Storage storage = StorageOptions.newBuilder().setProjectId(projectId).build().getService();
    BlobId blobId = BlobId.of(bucketName, objectName);
    BlobInfo blobInfo = BlobInfo.newBuilder(blobId).build();
    byte[] content = contents.getBytes(StandardCharsets.UTF_8);
    try (WriteChannel writer = storage.writer(blobInfo)) {
      writer.write(ByteBuffer.wrap(content));
      System.out.println(
          "Wrote to " + objectName + " in bucket " + bucketName + " using a WriteChannel.");
    }
  }
}

Node.js

Pour en savoir plus, consultez la documentation de référence de l'API Cloud Storage en langage Node.js.

Pour vous authentifier auprès de Cloud Storage, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez la page Configurer l'authentification pour les bibliothèques clientes.

/**
 * TODO(developer): Uncomment the following lines before running the sample
 */
// The ID of your GCS bucket
// const bucketName = 'your-unique-bucket-name';

// The new ID for your GCS file
// const destFileName = 'your-new-file-name';

// The content to be uploaded in the GCS file
// const contents = 'your file content';

// Imports the Google Cloud client library
const {Storage} = require('@google-cloud/storage');

// Import Node.js stream
const stream = require('stream');

// Creates a client
const storage = new Storage();

// Get a reference to the bucket
const myBucket = storage.bucket(bucketName);

// Create a reference to a file object
const file = myBucket.file(destFileName);

// Create a pass through stream from a string
const passthroughStream = new stream.PassThrough();
passthroughStream.write(contents);
passthroughStream.end();

async function streamFileUpload() {
  passthroughStream.pipe(file.createWriteStream()).on('finish', () => {
    // The file upload is complete
  });

  console.log(`${destFileName} uploaded to ${bucketName}`);
}

streamFileUpload().catch(console.error);

PHP

Pour en savoir plus, consultez la documentation de référence de l'API Cloud Storage en langage PHP.

Pour vous authentifier auprès de Cloud Storage, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez la page Configurer l'authentification pour les bibliothèques clientes.

use Google\Cloud\Storage\StorageClient;
use Google\Cloud\Storage\WriteStream;

/**
 * Upload a chunked file stream.
 *
 * @param string $bucketName The name of your Cloud Storage bucket.
 *        (e.g. 'my-bucket')
 * @param string $objectName The name of your Cloud Storage object.
 *        (e.g. 'my-object')
 * @param string $contents The contents to upload via stream chunks.
 *        (e.g. 'these are my contents')
 */
function upload_object_stream(string $bucketName, string $objectName, string $contents): void
{
    $storage = new StorageClient();
    $bucket = $storage->bucket($bucketName);
    $writeStream = new WriteStream(null, [
        'chunkSize' => 1024 * 256, // 256KB
    ]);
    $uploader = $bucket->getStreamableUploader($writeStream, [
        'name' => $objectName,
    ]);
    $writeStream->setUploader($uploader);
    $stream = fopen('data://text/plain,' . $contents, 'r');
    while (($line = stream_get_line($stream, 1024 * 256)) !== false) {
        $writeStream->write($line);
    }
    $writeStream->close();

    printf('Uploaded %s to gs://%s/%s' . PHP_EOL, $contents, $bucketName, $objectName);
}

Python

Pour en savoir plus, consultez la documentation de référence de l'API Cloud Storage en langage Python.

Pour vous authentifier auprès de Cloud Storage, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez la page Configurer l'authentification pour les bibliothèques clientes.

from google.cloud import storage


def upload_blob_from_stream(bucket_name, file_obj, destination_blob_name):
    """Uploads bytes from a stream or other file-like object to a blob."""
    # The ID of your GCS bucket
    # bucket_name = "your-bucket-name"

    # The stream or file (file-like object) from which to read
    # import io
    # file_obj = io.BytesIO()
    # file_obj.write(b"This is test data.")

    # The desired name of the uploaded GCS object (blob)
    # destination_blob_name = "storage-object-name"

    # Construct a client-side representation of the blob.
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    # Rewind the stream to the beginning. This step can be omitted if the input
    # stream will always be at a correct position.
    file_obj.seek(0)

    # Upload data from the stream to your bucket.
    blob.upload_from_file(file_obj)

    print(
        f"Stream data uploaded to {destination_blob_name} in bucket {bucket_name}."
    )

Ruby

Pour en savoir plus, consultez la documentation de référence de l'API Cloud Storage en langage Ruby.

Pour vous authentifier auprès de Cloud Storage, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez la page Configurer l'authentification pour les bibliothèques clientes.


# The ID of your GCS bucket
# bucket_name = "your-unique-bucket-name"

# The stream or file (file-like object) from which to read
# local_file_obj = StringIO.new "This is test data."

# Name of a file in the Storage bucket
# file_name   = "some_file.txt"

require "google/cloud/storage"

storage = Google::Cloud::Storage.new
bucket  = storage.bucket bucket_name

local_file_obj.rewind
bucket.create_file local_file_obj, file_name

puts "Stream data uploaded to #{file_name} in bucket #{bucket_name}"

API REST

API JSON

Pour effectuer une importation en flux continu, utilisez l'une des méthodes suivantes :

  • Importation avec reprise, avec les ajustements suivants :

    • Lorsque vous importez les données du fichier, utilisez une importation par fragments.

    • Comme vous ne connaissez pas la taille totale du fichier tant que vous n'avez pas atteint le fragment final, utilisez la valeur * pour la taille totale du fichier dans l'en-tête Content-Range des fragments intermédiaires.

      Par exemple, si le premier fragment importé a une taille de 512 Kio, l'en-tête Content-Range du fragment est bytes 0-524287/*. S'il reste 64 000 octets après le premier fragment, vous envoyez un fragment final contenant les octets restants et ayant un en-tête Content-Range avec la valeur bytes 524288-588287/588288.

  • Importation par requête unique, avec les ajustements suivants :

API XML

Pour effectuer une importation en flux continu, utilisez l'une des méthodes suivantes :

  • Importation en plusieurs parties de l'API XML

  • Importation avec reprise, avec les ajustements suivants :

    • Lorsque vous importez les données du fichier, utilisez une importation par fragments.

    • Comme vous ne connaissez pas la taille totale du fichier tant que vous n'avez pas atteint le fragment final, utilisez la valeur * pour la taille totale du fichier dans l'en-tête Content-Range des fragments intermédiaires.

      Par exemple, si le premier fragment importé a une taille de 512 Kio, l'en-tête Content-Range du fragment est bytes 0-524287/*. S'il reste 64 000 octets après le premier fragment, vous envoyez un fragment final contenant les octets restants et ayant un en-tête Content-Range avec la valeur bytes 524288-588287/588288.

  • Importation par requête unique, avec les ajustements suivants :

Étape suivante