Source code for sagemaker.core.serializers.utils

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
#     http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""Placeholder docstring"""
from __future__ import absolute_import

import struct
import sys
import numpy as np


def _write_feature_tensor(resolved_type, record, vector):
    """Placeholder Docstring"""
    raise NotImplementedError()
    # if resolved_type == "Int32":
    #     record.features["values"].int32_tensor.values.extend(vector)
    # elif resolved_type == "Float64":
    #     record.features["values"].float64_tensor.values.extend(vector)
    # elif resolved_type == "Float32":
    #     record.features["values"].float32_tensor.values.extend(vector)


def _write_label_tensor(resolved_type, record, scalar):
    """Placeholder Docstring"""
    raise NotImplementedError()
    # if resolved_type == "Int32":
    #     record.label["values"].int32_tensor.values.extend([scalar])
    # elif resolved_type == "Float64":
    #     record.label["values"].float64_tensor.values.extend([scalar])
    # elif resolved_type == "Float32":
    #     record.label["values"].float32_tensor.values.extend([scalar])


def _write_keys_tensor(resolved_type, record, vector):
    """Placeholder Docstring"""
    raise NotImplementedError()
    # if resolved_type == "Int32":
    #     record.features["values"].int32_tensor.keys.extend(vector)
    # elif resolved_type == "Float64":
    #     record.features["values"].float64_tensor.keys.extend(vector)
    # elif resolved_type == "Float32":
    #     record.features["values"].float32_tensor.keys.extend(vector)


def _write_shape(resolved_type, record, scalar):
    """Placeholder Docstring"""
    raise NotImplementedError()
    # if resolved_type == "Int32":
    #     record.features["values"].int32_tensor.shape.extend([scalar])
    # elif resolved_type == "Float64":
    #     record.features["values"].float64_tensor.shape.extend([scalar])
    # elif resolved_type == "Float32":
    #     record.features["values"].float32_tensor.shape.extend([scalar])


[docs] def write_numpy_to_dense_tensor(file, array, labels=None): """Writes a numpy array to a dense tensor Args: file: array: labels: """ raise NotImplementedError()
# # Validate shape of array and labels, resolve array and label types # if not len(array.shape) == 2: # raise ValueError("Array must be a Matrix") # if labels is not None: # if not len(labels.shape) == 1: # raise ValueError("Labels must be a Vector") # if labels.shape[0] not in array.shape: # raise ValueError( # "Label shape {} not compatible with array shape {}".format( # labels.shape, array.shape # ) # ) # resolved_label_type = _resolve_type(labels.dtype) # resolved_type = _resolve_type(array.dtype) # # # Write each vector in array into a Record in the file object # record = Record() # for index, vector in enumerate(array): # record.Clear() # _write_feature_tensor(resolved_type, record, vector) # if labels is not None: # _write_label_tensor(resolved_label_type, record, labels[index]) # _write_recordio(file, record.SerializeToString())
[docs] def write_spmatrix_to_sparse_tensor(file, array, labels=None): """Writes a scipy sparse matrix to a sparse tensor Args: file: array: labels: """ raise NotImplementedError()
# try: # import scipy # except ImportError as e: # logging.warning( # "scipy failed to import. Sparse matrix functions will be impaired or broken." # ) # # Any subsequent attempt to use scipy will raise the ImportError # scipy = DeferredError(e) # # if not scipy.sparse.issparse(array): # raise TypeError("Array must be sparse") # # # Validate shape of array and labels, resolve array and label types # if not len(array.shape) == 2: # raise ValueError("Array must be a Matrix") # if labels is not None: # if not len(labels.shape) == 1: # raise ValueError("Labels must be a Vector") # if labels.shape[0] not in array.shape: # raise ValueError( # "Label shape {} not compatible with array shape {}".format( # labels.shape, array.shape # ) # ) # resolved_label_type = _resolve_type(labels.dtype) # resolved_type = _resolve_type(array.dtype) # # csr_array = array.tocsr() # n_rows, n_cols = csr_array.shape # # record = Record() # for row_idx in range(n_rows): # record.Clear() # row = csr_array.getrow(row_idx) # # Write values # _write_feature_tensor(resolved_type, record, row.data) # # Write keys # _write_keys_tensor(resolved_type, record, row.indices.astype(np.uint64)) # # # Write labels # if labels is not None: # _write_label_tensor(resolved_label_type, record, labels[row_idx]) # # # Write shape # _write_shape(resolved_type, record, n_cols) # # _write_recordio(file, record.SerializeToString())
[docs] def read_records(file): """Eagerly read a collection of amazon Record protobuf objects from file. Args: file: """ raise NotImplementedError()
# records = [] # for record_data in read_recordio(file): # record = Record() # record.ParseFromString(record_data) # records.append(record) # return records # MXNet requires recordio records have length in bytes that's a multiple of 4 # This sets up padding bytes to append to the end of the record, for diferent # amounts of padding required. padding = {} for amount in range(4): if sys.version_info >= (3,): padding[amount] = bytes([0x00 for _ in range(amount)]) else: padding[amount] = bytearray([0x00 for _ in range(amount)]) _kmagic = 0xCED7230A def _write_recordio(f, data): """Writes a single data point as a RecordIO record to the given file. Args: f: data: """ length = len(data) f.write(struct.pack("I", _kmagic)) f.write(struct.pack("I", length)) pad = (((length + 3) >> 2) << 2) - length f.write(data) f.write(padding[pad])
[docs] def read_recordio(f): """Placeholder Docstring""" while True: try: (read_kmagic,) = struct.unpack("I", f.read(4)) except struct.error: return assert read_kmagic == _kmagic (len_record,) = struct.unpack("I", f.read(4)) pad = (((len_record + 3) >> 2) << 2) - len_record yield f.read(len_record) if pad: f.read(pad)
def _resolve_type(dtype): """Placeholder Docstring""" if dtype == np.dtype(int): return "Int32" if dtype == np.dtype(float): return "Float64" if dtype == np.dtype("float32"): return "Float32" raise ValueError("Unsupported dtype {} on array".format(dtype))