Source code for google.appengine.ext.bulkload.csv_connector

#!/usr/bin/env python
#
# Copyright 2007 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#





"""Bulkloader CSV reading and writing.

Handle the CSV format specified in a bulkloader.yaml file.
"""












import codecs
import cStringIO
import csv
import encodings
import encodings.ascii
import encodings.cp1252
import encodings.latin_1
import encodings.utf_8

from google.appengine.ext.bulkload import bulkloader_errors
from google.appengine.ext.bulkload import connector_interface


[docs]def utf8_recoder(stream, encoding): """Generator that reads an encoded stream and reencodes to UTF-8.""" for line in codecs.getreader(encoding)(stream): yield line.encode('utf-8')
[docs]class UnicodeDictWriter(object): """Based on UnicodeWriter in http://docs.python.org/library/csv.html.""" def __init__(self, stream, fieldnames, encoding='utf-8', **kwds): """Initialzer. Args: stream: Stream to write to. fieldnames: Fieldnames to pass to the DictWriter. encoding: Desired encoding. kwds: Additional arguments to pass to the DictWriter. """ writer = codecs.getwriter(encoding) if (writer is encodings.utf_8.StreamWriter or writer is encodings.ascii.StreamWriter or writer is encodings.latin_1.StreamWriter or writer is encodings.cp1252.StreamWriter): self.no_recoding = True self.encoder = codecs.getencoder(encoding) self.writer = csv.DictWriter(stream, fieldnames, **kwds) else: self.no_recoding = False self.encoder = codecs.getencoder('utf-8') self.queue = cStringIO.StringIO() self.writer = csv.DictWriter(self.queue, fieldnames, **kwds) self.stream = writer(stream)
[docs] def writerow(self, row): """Wrap writerow method.""" row_encoded = dict([(k, self.encoder(v)[0]) for (k, v) in row.iteritems()]) self.writer.writerow(row_encoded) if self.no_recoding: return data = self.queue.getvalue() data = data.decode('utf-8') self.stream.write(data) self.queue.truncate(0)
[docs]class CsvConnector(connector_interface.ConnectorInterface): """Read/write a (possibly encoded) CSV file."""
[docs] @classmethod def create_from_options(cls, options, name): """Factory using an options dictionary. Args: options: Dictionary of options: columns: 'from_header' or blank. column_list: overrides columns specifically. encoding: encoding of the file. e.g. 'utf-8' (default), 'windows-1252'. skip_import_header_row: True to ignore the header line on import. Defaults False, except must be True if columns=from_header. print_export_header_row: True to print a header line on export. Defaults to False except if columns=from_header. import_options: Other kwargs to pass in, like "dialect". export_options: Other kwargs to pass in, like "dialect". name: The name of this transformer, for use in error messages. Returns: CsvConnector object described by the specified options. Raises: InvalidConfiguration: If the config is invalid. """ column_list = options.get('column_list', None) columns = None if not column_list: columns = options.get('columns', 'from_header') if columns != 'from_header': raise bulkloader_errors.InvalidConfiguration( 'CSV columns must be "from_header", or a column_list ' 'must be specified. (In transformer name %s.)' % name) csv_encoding = options.get('encoding', 'utf-8') skip_import_header_row = options.get('skip_import_header_row', columns == 'from_header') if columns == 'from_header' and not skip_import_header_row: raise bulkloader_errors.InvalidConfiguration( 'When CSV columns are "from_header", the header row must always ' 'be skipped. (In transformer name %s.)' % name) print_export_header_row = options.get('print_export_header_row', columns == 'from_header') import_options = options.get('import_options', {}) export_options = options.get('export_options', {}) return cls(columns, column_list, skip_import_header_row, print_export_header_row, csv_encoding, import_options, export_options)
def __init__(self, columns, column_list, skip_import_header_row, print_export_header_row, csv_encoding=None, import_options=None, export_options=None): """Initializer. Args: columns: 'from_header' or blank column_list: overrides columns specifically. skip_import_header_row: True to ignore the header line on import. Defaults False, except must be True if columns=from_header. print_export_header_row: True to print a header line on export. Defaults to False except if columns=from_header. csv_encoding: encoding of the file. import_options: Other kwargs to pass in, like "dialect". export_options: Other kwargs to pass in, like "dialect". """ self.columns = columns self.from_header = (columns == 'from_header') self.column_list = column_list self.skip_import_header_row = skip_import_header_row self.print_export_header_row = print_export_header_row self.csv_encoding = csv_encoding self.dict_generator = None self.output_stream = None self.csv_writer = None self.bulkload_state = None self.import_options = import_options or {} self.export_options = export_options or {}
[docs] def generate_import_record(self, filename, bulkload_state): """Generator, yields dicts for nodes found as described in the options. Args: filename: Filename to read. bulkload_state: Passed bulkload_state. Yields: Neutral dict, one per row in the CSV file. """ self.bulkload_state = bulkload_state input_stream = open(filename) input_stream = utf8_recoder(input_stream, self.csv_encoding) self.dict_generator = csv.DictReader(input_stream, self.column_list, **self.import_options) discard_line = self.skip_import_header_row and not self.from_header line_number = 0 for input_dict in self.dict_generator: line_number = line_number + 1 if discard_line: discard_line = False continue decoded_dict = {} for key, value in input_dict.iteritems(): if key == None: raise bulkloader_errors.InvalidImportData( 'Got more values in row than headers on line %d.' % (line_number)) if not self.column_list: key = unicode(key, 'utf-8') if value: value = unicode(value, 'utf-8') decoded_dict[key] = value yield decoded_dict
[docs] def initialize_export(self, filename, bulkload_state): """Initialize the output file. Args: filename: Filename to write. bulkload_state: Passed bulkload_state. """ self.bulkload_state = bulkload_state self.output_stream = open(filename, 'wb')
def __initialize_csv_writer(self, dictionary): """Actual initialization, happens on the first entity being written.""" write_header = self.print_export_header_row if self.from_header: export_column_list = tuple(dictionary) else: export_column_list = self.column_list self.csv_writer = UnicodeDictWriter(self.output_stream, export_column_list, self.csv_encoding, **self.export_options) if write_header: self.csv_writer.writerow(dict(zip(export_column_list, export_column_list)))
[docs] def write_dict(self, dictionary): """Write one record for the specified entity.""" if not self.csv_writer: self.__initialize_csv_writer(dictionary) self.csv_writer.writerow(dictionary)
[docs] def finalize_export(self): self.output_stream.close()