发送批量处理文档请求

向处理器发送批量(异步)处理请求。

深入探索

如需查看包含此代码示例的详细文档,请参阅以下内容:

代码示例

Java

如需了解详情,请参阅 Document AI Java API 参考文档

如需向 Document AI 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证


import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.paging.Page;
import com.google.cloud.documentai.v1.BatchDocumentsInputConfig;
import com.google.cloud.documentai.v1.BatchProcessMetadata;
import com.google.cloud.documentai.v1.BatchProcessRequest;
import com.google.cloud.documentai.v1.BatchProcessResponse;
import com.google.cloud.documentai.v1.Document;
import com.google.cloud.documentai.v1.DocumentOutputConfig;
import com.google.cloud.documentai.v1.DocumentOutputConfig.GcsOutputConfig;
import com.google.cloud.documentai.v1.DocumentProcessorServiceClient;
import com.google.cloud.documentai.v1.DocumentProcessorServiceSettings;
import com.google.cloud.documentai.v1.GcsDocument;
import com.google.cloud.documentai.v1.GcsDocuments;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import com.google.protobuf.util.JsonFormat;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class BatchProcessDocument {
  public static void batchProcessDocument()
      throws IOException, InterruptedException, TimeoutException, ExecutionException {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String location = "your-project-location"; // Format is "us" or "eu".
    String processerId = "your-processor-id";
    String outputGcsBucketName = "your-gcs-bucket-name";
    String outputGcsPrefix = "PREFIX";
    String inputGcsUri = "gs://your-gcs-bucket/path/to/input/file.pdf";
    batchProcessDocument(
        projectId, location, processerId, inputGcsUri, outputGcsBucketName, outputGcsPrefix);
  }

  public static void batchProcessDocument(
      String projectId,
      String location,
      String processorId,
      String gcsInputUri,
      String gcsOutputBucketName,
      String gcsOutputUriPrefix)
      throws IOException, InterruptedException, TimeoutException, ExecutionException {
    // Initialize client that will be used to send requests. This client only needs
    // to be created
    // once, and can be reused for multiple requests. After completing all of your
    // requests, call
    // the "close" method on the client to safely clean up any remaining background
    // resources.
    String endpoint = String.format("%s-documentai.googleapis.com:443", location);
    DocumentProcessorServiceSettings settings =
        DocumentProcessorServiceSettings.newBuilder().setEndpoint(endpoint).build();
    try (DocumentProcessorServiceClient client = DocumentProcessorServiceClient.create(settings)) {
      // The full resource name of the processor, e.g.:
      // projects/project-id/locations/location/processor/processor-id
      // You must create new processors in the Cloud Console first
      String name =
          String.format("projects/%s/locations/%s/processors/%s", projectId, location, processorId);

      GcsDocument gcsDocument =
          GcsDocument.newBuilder().setGcsUri(gcsInputUri).setMimeType("application/pdf").build();

      GcsDocuments gcsDocuments = GcsDocuments.newBuilder().addDocuments(gcsDocument).build();

      BatchDocumentsInputConfig inputConfig =
          BatchDocumentsInputConfig.newBuilder().setGcsDocuments(gcsDocuments).build();

      String fullGcsPath = String.format("gs://%s/%s/", gcsOutputBucketName, gcsOutputUriPrefix);
      GcsOutputConfig gcsOutputConfig = GcsOutputConfig.newBuilder().setGcsUri(fullGcsPath).build();

      DocumentOutputConfig documentOutputConfig =
          DocumentOutputConfig.newBuilder().setGcsOutputConfig(gcsOutputConfig).build();

      // Configure the batch process request.
      BatchProcessRequest request =
          BatchProcessRequest.newBuilder()
              .setName(name)
              .setInputDocuments(inputConfig)
              .setDocumentOutputConfig(documentOutputConfig)
              .build();

      OperationFuture<BatchProcessResponse, BatchProcessMetadata> future =
          client.batchProcessDocumentsAsync(request);

      // Batch process document using a long-running operation.
      // You can wait for now, or get results later.
      // Note: first request to the service takes longer than subsequent
      // requests.
      System.out.println("Waiting for operation to complete...");
      future.get();

      System.out.println("Document processing complete.");

      Storage storage = StorageOptions.newBuilder().setProjectId(projectId).build().getService();
      Bucket bucket = storage.get(gcsOutputBucketName);

      // List all of the files in the Storage bucket.
      Page<Blob> blobs = bucket.list(Storage.BlobListOption.prefix(gcsOutputUriPrefix + "/"));
      int idx = 0;
      for (Blob blob : blobs.iterateAll()) {
        if (!blob.isDirectory()) {
          System.out.printf("Fetched file #%d\n", ++idx);
          // Read the results

          // Download and store json data in a temp file.
          File tempFile = File.createTempFile("file", ".json");
          Blob fileInfo = storage.get(BlobId.of(gcsOutputBucketName, blob.getName()));
          fileInfo.downloadTo(tempFile.toPath());

          // Parse json file into Document.
          FileReader reader = new FileReader(tempFile);
          Document.Builder builder = Document.newBuilder();
          JsonFormat.parser().merge(reader, builder);

          Document document = builder.build();

          // Get all of the document text as one big string.
          String text = document.getText();

          // Read the text recognition output from the processor
          System.out.println("The document contains the following paragraphs:");
          Document.Page page1 = document.getPages(0);
          List<Document.Page.Paragraph> paragraphList = page1.getParagraphsList();
          for (Document.Page.Paragraph paragraph : paragraphList) {
            String paragraphText = getText(paragraph.getLayout().getTextAnchor(), text);
            System.out.printf("Paragraph text:%s\n", paragraphText);
          }

          // Form parsing provides additional output about
          // form-formatted PDFs. You must create a form
          // processor in the Cloud Console to see full field details.
          System.out.println("The following form key/value pairs were detected:");

          for (Document.Page.FormField field : page1.getFormFieldsList()) {
            String fieldName = getText(field.getFieldName().getTextAnchor(), text);
            String fieldValue = getText(field.getFieldValue().getTextAnchor(), text);

            System.out.println("Extracted form fields pair:");
            System.out.printf("\t(%s, %s))", fieldName, fieldValue);
          }

          // Clean up temp file.
          tempFile.deleteOnExit();
        }
      }
    }
  }

  // Extract shards from the text field
  private static String getText(Document.TextAnchor textAnchor, String text) {
    if (textAnchor.getTextSegmentsList().size() > 0) {
      int startIdx = (int) textAnchor.getTextSegments(0).getStartIndex();
      int endIdx = (int) textAnchor.getTextSegments(0).getEndIndex();
      return text.substring(startIdx, endIdx);
    }
    return "[NO TEXT]";
  }
}

Node.js

如需了解详情,请参阅 Document AI Node.js API 参考文档

如需向 Document AI 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const projectId = 'YOUR_PROJECT_ID';
// const location = 'YOUR_PROJECT_LOCATION'; // Format is 'us' or 'eu'
// const processorId = 'YOUR_PROCESSOR_ID';
// const gcsInputUri = 'YOUR_SOURCE_PDF';
// const gcsOutputUri = 'YOUR_STORAGE_BUCKET';
// const gcsOutputUriPrefix = 'YOUR_STORAGE_PREFIX';

// Imports the Google Cloud client library
const {DocumentProcessorServiceClient} =
  require('@google-cloud/documentai').v1;
const {Storage} = require('@google-cloud/storage');

// Instantiates Document AI, Storage clients
const client = new DocumentProcessorServiceClient();
const storage = new Storage();

const {default: PQueue} = require('p-queue');

async function batchProcessDocument() {
  const name = `projects/${projectId}/locations/${location}/processors/${processorId}`;

  // Configure the batch process request.
  const request = {
    name,
    inputDocuments: {
      gcsDocuments: {
        documents: [
          {
            gcsUri: gcsInputUri,
            mimeType: 'application/pdf',
          },
        ],
      },
    },
    documentOutputConfig: {
      gcsOutputConfig: {
        gcsUri: `${gcsOutputUri}/${gcsOutputUriPrefix}/`,
      },
    },
  };

  // Batch process document using a long-running operation.
  // You can wait for now, or get results later.
  // Note: first request to the service takes longer than subsequent
  // requests.
  const [operation] = await client.batchProcessDocuments(request);

  // Wait for operation to complete.
  await operation.promise();
  console.log('Document processing complete.');

  // Query Storage bucket for the results file(s).
  const query = {
    prefix: gcsOutputUriPrefix,
  };

  console.log('Fetching results ...');

  // List all of the files in the Storage bucket
  const [files] = await storage.bucket(gcsOutputUri).getFiles(query);

  // Add all asynchronous downloads to queue for execution.
  const queue = new PQueue({concurrency: 15});
  const tasks = files.map((fileInfo, index) => async () => {
    // Get the file as a buffer
    const [file] = await fileInfo.download();

    console.log(`Fetched file #${index + 1}:`);

    // The results stored in the output Storage location
    // are formatted as a document object.
    const document = JSON.parse(file.toString());
    const {text} = document;

    // Extract shards from the text field
    const getText = textAnchor => {
      if (!textAnchor.textSegments || textAnchor.textSegments.length === 0) {
        return '';
      }

      // First shard in document doesn't have startIndex property
      const startIndex = textAnchor.textSegments[0].startIndex || 0;
      const endIndex = textAnchor.textSegments[0].endIndex;

      return text.substring(startIndex, endIndex);
    };

    // Read the text recognition output from the processor
    console.log('The document contains the following paragraphs:');

    const [page1] = document.pages;
    const {paragraphs} = page1;
    for (const paragraph of paragraphs) {
      const paragraphText = getText(paragraph.layout.textAnchor);
      console.log(`Paragraph text:\n${paragraphText}`);
    }

    // Form parsing provides additional output about
    // form-formatted PDFs. You  must create a form
    // processor in the Cloud Console to see full field details.
    console.log('\nThe following form key/value pairs were detected:');

    const {formFields} = page1;
    for (const field of formFields) {
      const fieldName = getText(field.fieldName.textAnchor);
      const fieldValue = getText(field.fieldValue.textAnchor);

      console.log('Extracted key value pair:');
      console.log(`\t(${fieldName}, ${fieldValue})`);
    }
  });
  await queue.addAll(tasks);
}

Python

如需了解详情,请参阅 Document AI Python API 参考文档

如需向 Document AI 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

import re
from typing import Optional

from google.api_core.client_options import ClientOptions
from google.api_core.exceptions import InternalServerError
from google.api_core.exceptions import RetryError
from google.cloud import documentai  # type: ignore
from google.cloud import storage

# TODO(developer): Uncomment these variables before running the sample.
# project_id = "YOUR_PROJECT_ID"
# location = "YOUR_PROCESSOR_LOCATION" # Format is "us" or "eu"
# processor_id = "YOUR_PROCESSOR_ID" # Create processor before running sample
# gcs_output_uri = "YOUR_OUTPUT_URI" # Must end with a trailing slash `/`. Format: gs://bucket/directory/subdirectory/
# processor_version_id = "YOUR_PROCESSOR_VERSION_ID" # Optional. Example: pretrained-ocr-v1.0-2020-09-23

# TODO(developer): You must specify either `gcs_input_uri` and `mime_type` or `gcs_input_prefix`
# gcs_input_uri = "YOUR_INPUT_URI" # Format: gs://bucket/directory/file.pdf
# input_mime_type = "application/pdf"
# gcs_input_prefix = "YOUR_INPUT_URI_PREFIX" # Format: gs://bucket/directory/
# field_mask = "text,entities,pages.pageNumber"  # Optional. The fields to return in the Document object.


def batch_process_documents(
    project_id: str,
    location: str,
    processor_id: str,
    gcs_output_uri: str,
    processor_version_id: Optional[str] = None,
    gcs_input_uri: Optional[str] = None,
    input_mime_type: Optional[str] = None,
    gcs_input_prefix: Optional[str] = None,
    field_mask: Optional[str] = None,
    timeout: int = 400,
) -> None:
    # You must set the `api_endpoint` if you use a location other than "us".
    opts = ClientOptions(api_endpoint=f"{location}-documentai.googleapis.com")

    client = documentai.DocumentProcessorServiceClient(client_options=opts)

    if gcs_input_uri:
        # Specify specific GCS URIs to process individual documents
        gcs_document = documentai.GcsDocument(
            gcs_uri=gcs_input_uri, mime_type=input_mime_type
        )
        # Load GCS Input URI into a List of document files
        gcs_documents = documentai.GcsDocuments(documents=[gcs_document])
        input_config = documentai.BatchDocumentsInputConfig(gcs_documents=gcs_documents)
    else:
        # Specify a GCS URI Prefix to process an entire directory
        gcs_prefix = documentai.GcsPrefix(gcs_uri_prefix=gcs_input_prefix)
        input_config = documentai.BatchDocumentsInputConfig(gcs_prefix=gcs_prefix)

    # Cloud Storage URI for the Output Directory
    gcs_output_config = documentai.DocumentOutputConfig.GcsOutputConfig(
        gcs_uri=gcs_output_uri, field_mask=field_mask
    )

    # Where to write results
    output_config = documentai.DocumentOutputConfig(gcs_output_config=gcs_output_config)

    if processor_version_id:
        # The full resource name of the processor version, e.g.:
        # projects/{project_id}/locations/{location}/processors/{processor_id}/processorVersions/{processor_version_id}
        name = client.processor_version_path(
            project_id, location, processor_id, processor_version_id
        )
    else:
        # The full resource name of the processor, e.g.:
        # projects/{project_id}/locations/{location}/processors/{processor_id}
        name = client.processor_path(project_id, location, processor_id)

    request = documentai.BatchProcessRequest(
        name=name,
        input_documents=input_config,
        document_output_config=output_config,
    )

    # BatchProcess returns a Long Running Operation (LRO)
    operation = client.batch_process_documents(request)

    # Continually polls the operation until it is complete.
    # This could take some time for larger files
    # Format: projects/{project_id}/locations/{location}/operations/{operation_id}
    try:
        print(f"Waiting for operation {operation.operation.name} to complete...")
        operation.result(timeout=timeout)
    # Catch exception when operation doesn't finish before timeout
    except (RetryError, InternalServerError) as e:
        print(e.message)

    # NOTE: Can also use callbacks for asynchronous processing
    #
    # def my_callback(future):
    #   result = future.result()
    #
    # operation.add_done_callback(my_callback)

    # After the operation is complete,
    # get output document information from operation metadata
    metadata = documentai.BatchProcessMetadata(operation.metadata)

    if metadata.state != documentai.BatchProcessMetadata.State.SUCCEEDED:
        raise ValueError(f"Batch Process Failed: {metadata.state_message}")

    storage_client = storage.Client()

    print("Output files:")
    # One process per Input Document
    for process in list(metadata.individual_process_statuses):
        # output_gcs_destination format: gs://BUCKET/PREFIX/OPERATION_NUMBER/INPUT_FILE_NUMBER/
        # The Cloud Storage API requires the bucket name and URI prefix separately
        matches = re.match(r"gs://(.*?)/(.*)", process.output_gcs_destination)
        if not matches:
            print(
                "Could not parse output GCS destination:",
                process.output_gcs_destination,
            )
            continue

        output_bucket, output_prefix = matches.groups()

        # Get List of Document Objects from the Output Bucket
        output_blobs = storage_client.list_blobs(output_bucket, prefix=output_prefix)

        # Document AI may output multiple JSON files per source file
        for blob in output_blobs:
            # Document AI should only output JSON files to GCS
            if blob.content_type != "application/json":
                print(
                    f"Skipping non-supported file: {blob.name} - Mimetype: {blob.content_type}"
                )
                continue

            # Download JSON File as bytes object and convert to Document Object
            print(f"Fetching {blob.name}")
            document = documentai.Document.from_json(
                blob.download_as_bytes(), ignore_unknown_fields=True
            )

            # For a full list of Document object attributes, please reference this page:
            # https://cloud.google.com/python/docs/reference/documentai/latest/google.cloud.documentai_v1.types.Document

            # Read the text recognition output from the processor
            print("The document contains the following text:")
            print(document.text)

后续步骤

如需搜索和过滤其他 Google Cloud 产品的代码示例,请参阅 Google Cloud 示例浏览器