Notice: Over the next few months, we're reorganizing the App Engine documentation site to make it easier to find content and better align with the rest of Google Cloud products. The same content will be available, but the navigation will now match the rest of the Cloud products. If you have feedback or questions as you navigate the site, click Send Feedback.

Python 2 is no longer supported by the community. We recommend that you migrate Python 2 apps to Python 3.
Stay organized with collections Save and categorize content based on your preferences.

Source code for google.appengine.ext.bulkload.csv_connector

#!/usr/bin/env python
# Copyright 2007 Google Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

"""Bulkloader CSV reading and writing.

Handle the CSV format specified in a bulkloader.yaml file.

import codecs
import cStringIO
import csv
import encodings
import encodings.ascii
import encodings.cp1252
import encodings.latin_1
import encodings.utf_8

from google.appengine.ext.bulkload import bulkloader_errors
from google.appengine.ext.bulkload import connector_interface

[docs]def utf8_recoder(stream, encoding): """Generator that reads an encoded stream and reencodes to UTF-8.""" for line in codecs.getreader(encoding)(stream): yield line.encode('utf-8')
[docs]class UnicodeDictWriter(object): """Based on UnicodeWriter in""" def __init__(self, stream, fieldnames, encoding='utf-8', **kwds): """Initialzer. Args: stream: Stream to write to. fieldnames: Fieldnames to pass to the DictWriter. encoding: Desired encoding. kwds: Additional arguments to pass to the DictWriter. """ writer = codecs.getwriter(encoding) if (writer is encodings.utf_8.StreamWriter or writer is encodings.ascii.StreamWriter or writer is encodings.latin_1.StreamWriter or writer is encodings.cp1252.StreamWriter): self.no_recoding = True self.encoder = codecs.getencoder(encoding) self.writer = csv.DictWriter(stream, fieldnames, **kwds) else: self.no_recoding = False self.encoder = codecs.getencoder('utf-8') self.queue = cStringIO.StringIO() self.writer = csv.DictWriter(self.queue, fieldnames, **kwds) = writer(stream)
[docs] def writerow(self, row): """Wrap writerow method.""" row_encoded = dict([(k, self.encoder(v)[0]) for (k, v) in row.iteritems()]) self.writer.writerow(row_encoded) if self.no_recoding: return data = self.queue.getvalue() data = data.decode('utf-8') self.queue.truncate(0)
[docs]class CsvConnector(connector_interface.ConnectorInterface): """Read/write a (possibly encoded) CSV file."""
[docs] @classmethod def create_from_options(cls, options, name): """Factory using an options dictionary. Args: options: Dictionary of options: columns: 'from_header' or blank. column_list: overrides columns specifically. encoding: encoding of the file. e.g. 'utf-8' (default), 'windows-1252'. skip_import_header_row: True to ignore the header line on import. Defaults False, except must be True if columns=from_header. print_export_header_row: True to print a header line on export. Defaults to False except if columns=from_header. import_options: Other kwargs to pass in, like "dialect". export_options: Other kwargs to pass in, like "dialect". name: The name of this transformer, for use in error messages. Returns: CsvConnector object described by the specified options. Raises: InvalidConfiguration: If the config is invalid. """ column_list = options.get('column_list', None) columns = None if not column_list: columns = options.get('columns', 'from_header') if columns != 'from_header': raise bulkloader_errors.InvalidConfiguration( 'CSV columns must be "from_header", or a column_list ' 'must be specified. (In transformer name %s.)' % name) csv_encoding = options.get('encoding', 'utf-8') skip_import_header_row = options.get('skip_import_header_row', columns == 'from_header') if columns == 'from_header' and not skip_import_header_row: raise bulkloader_errors.InvalidConfiguration( 'When CSV columns are "from_header", the header row must always ' 'be skipped. (In transformer name %s.)' % name) print_export_header_row = options.get('print_export_header_row', columns == 'from_header') import_options = options.get('import_options', {}) export_options = options.get('export_options', {}) return cls(columns, column_list, skip_import_header_row, print_export_header_row, csv_encoding, import_options, export_options)
def __init__(self, columns, column_list, skip_import_header_row, print_export_header_row, csv_encoding=None, import_options=None, export_options=None): """Initializer. Args: columns: 'from_header' or blank column_list: overrides columns specifically. skip_import_header_row: True to ignore the header line on import. Defaults False, except must be True if columns=from_header. print_export_header_row: True to print a header line on export. Defaults to False except if columns=from_header. csv_encoding: encoding of the file. import_options: Other kwargs to pass in, like "dialect". export_options: Other kwargs to pass in, like "dialect". """ self.columns = columns self.from_header = (columns == 'from_header') self.column_list = column_list self.skip_import_header_row = skip_import_header_row self.print_export_header_row = print_export_header_row self.csv_encoding = csv_encoding self.dict_generator = None self.output_stream = None self.csv_writer = None self.bulkload_state = None self.import_options = import_options or {} self.export_options = export_options or {}
[docs] def generate_import_record(self, filename, bulkload_state): """Generator, yields dicts for nodes found as described in the options. Args: filename: Filename to read. bulkload_state: Passed bulkload_state. Yields: Neutral dict, one per row in the CSV file. """ self.bulkload_state = bulkload_state input_stream = open(filename) input_stream = utf8_recoder(input_stream, self.csv_encoding) self.dict_generator = csv.DictReader(input_stream, self.column_list, **self.import_options) discard_line = self.skip_import_header_row and not self.from_header line_number = 0 for input_dict in self.dict_generator: line_number = line_number + 1 if discard_line: discard_line = False continue decoded_dict = {} for key, value in input_dict.iteritems(): if key == None: raise bulkloader_errors.InvalidImportData( 'Got more values in row than headers on line %d.' % (line_number)) if not self.column_list: key = unicode(key, 'utf-8') if value: value = unicode(value, 'utf-8') decoded_dict[key] = value yield decoded_dict
[docs] def initialize_export(self, filename, bulkload_state): """Initialize the output file. Args: filename: Filename to write. bulkload_state: Passed bulkload_state. """ self.bulkload_state = bulkload_state self.output_stream = open(filename, 'wb')
def __initialize_csv_writer(self, dictionary): """Actual initialization, happens on the first entity being written.""" write_header = self.print_export_header_row if self.from_header: export_column_list = tuple(dictionary) else: export_column_list = self.column_list self.csv_writer = UnicodeDictWriter(self.output_stream, export_column_list, self.csv_encoding, **self.export_options) if write_header: self.csv_writer.writerow(dict(zip(export_column_list, export_column_list)))
[docs] def write_dict(self, dictionary): """Write one record for the specified entity.""" if not self.csv_writer: self.__initialize_csv_writer(dictionary) self.csv_writer.writerow(dictionary)
[docs] def finalize_export(self): self.output_stream.close()