Source code for google.appengine.api.files.shuffler

#!/usr/bin/env python
#
# Copyright 2007 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#




"""Files API.

.. deprecated:: 1.8.1
   Use Google Cloud Storage Client library instead.

Files API Shuffler interface"""



import logging

from google.appengine.api.files import file as files
from google.appengine.api.files import blobstore
from google.appengine.api.files import file_service_pb
from google.appengine.runtime import apiproxy_errors

__all__ = [
           'shuffle',
           'available',
           ]


class ShufflerUnavailableError(files.Error):
  """Shuffler service is not available."""


[docs]def shuffle(job_name, input_file_list, output_file_list, callback): """Shuffle mapreduce files using the shuffler service. Args: job_name: unique shuffle job name as string. input_file_list: list of files api file names to shuffle. Files should be in records format with serialized KeyValue protocol buffer as record. output_file_list: list of files api file names to store shuffle result. Files should not be finalized. They will be of records format with serialized KeyValues protocol buffer as record. callback: shuffle service call back specification. Can be either url - the task in default queue with default parameters will be enqueued. It can also be a dict with following keys: url: url to call back version: app version to call method: HTTP method to use (POST or GET) queue: queue name to enqueue a task in. Raises: ShufflerUnavailableError if shuffler service is not available. """ if not available(): raise ShufflerUnavailableError() request = file_service_pb.ShuffleRequest() response = file_service_pb.ShuffleResponse() request.set_shuffle_name(job_name) if isinstance(callback, dict): request.mutable_callback().set_url(callback["url"]) if "version" in callback: request.mutable_callback().set_app_version_id(callback["version"]) if "method" in callback: request.mutable_callback().set_method(callback["method"]) if "queue" in callback: request.mutable_callback().set_queue(callback["queue"]) else: request.mutable_callback().set_url(callback) request.set_shuffle_size_bytes(0) for file_name in input_file_list: shuffle_input = request.add_input() shuffle_input.set_format( file_service_pb.ShuffleEnums.RECORDS_KEY_VALUE_PROTO_INPUT) shuffle_input.set_path(file_name) shuffle_output = request.mutable_output() shuffle_output.set_format( file_service_pb.ShuffleEnums.RECORDS_KEY_MULTI_VALUE_PROTO_OUTPUT) for file_name in output_file_list: shuffle_output.add_path(file_name) files._make_call("Shuffle", request, response)
[docs]def available(): """Determine if shuffler service is available for the app. Returns: True if shuffler service is available, False otherwise. """ return False

Monitor your resources on the go

Get the Google Cloud Console app to help you manage your projects.

Send feedback about...

App Engine standard environment for Python