Source code for core.data.dataset

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Core dataset format. The standard data structure for occupancy detection
#
# Accessible instances:
#     self.time_column_index
#     self.binary
#     self.labelled
#     self.data
#     self.occupancy
#     self.feature_mapping
#     self.feature_list
#     self.room_mapping
#     self.room_list
# Methods:
#     change_values
#     change_occupancy
#     change_room_mapping
#     change_feature_mapping
#     set_feature_name
#     change_feature_name
#     remove_feature
#     select_feature
#     add_room
#     pop_room
#     split
#     copy
# Built-in Methods:
#     __iter__
#     __next__
#     __getitem__
#     __len__
#     __add__
#     __sub__
#     __str__


[docs]class Dataset: """ Core data set format. The standard data structure for occupancy and sensor data. .. note:: All attributes are copies of the original values, therefore the changes will only be seen if user use methods to update values of ``self``. :var time_column_index: the timestamp column in ``self.data`` :vartype time_column_index: int :var binary: indicate the occupancy data in ``self`` has binary encoding or not :vartype binary: bool :var labelled: indicate whether the occupancy data in ``self`` is available or not :vartype labelled: bool :parameter: None :rtype: core.data.dataset.Dataset """ def __init__(self): from numpy import asarray self.__data = asarray([]) self.__occupancy = asarray([]) # feature_list: column, column: feature_list self.__feature_column_mapping = {} # room_list: [start_row, end_row], room_counter: room_list self.__room_mapping = {} self.iter_helper = 0 self.time_column_index = None self.binary = True self.labelled = False @property def data(self): """ :rtype: numpy.ndarray :return: a copy of the sensor data in numpy.ndarray """ return self.__data.copy() @property def occupancy(self): """ :rtype: numpy.ndarray :return: a copy of the occupancy data in numpy.ndarray """ return self.__occupancy.copy() @property def feature_mapping(self): """ :rtype: dict :return: a bidirectional dictionary map feature names with corresponding column index """ return self.__feature_column_mapping.copy() @property def feature_list(self): """ :rtype: list(str) :return: a list contains all feature names """ return [self.__feature_column_mapping[i] for i in range(len(self.__feature_column_mapping) // 2)] @property def room_mapping(self): """ :rtype: dict :return: a bidirectional dictionary map room names with corresponding row index tuple (start, end) """ return self.__room_mapping.copy() @property def room_list(self): """ :rtype: list(str) :return: a list contains all room names """ return [self.__room_mapping[i] for i in range(len(self))]
[docs] def change_values(self, data): """ Replace the sensor data of ``self.data``. :parameter data: new sensor data have same shape with original sensor data :type data: numpy.ndarray :return: None """ self.__data = data
[docs] def change_occupancy(self, occupancy): """ Replace the data of ``self.occupancy``. :parameter occupancy: new occupancy data have same number of rows with original occupancy data :type occupancy: numpy.ndarray :return: None """ self.__occupancy = occupancy
[docs] def change_room_mapping(self, room): """ Replace the *room_mapping* within ``self``. :parameter room: new room mapping rule with bidirectional dict :type room: dict :return: None """ self.__room_mapping = room
[docs] def change_feature_mapping(self, feature_mapping): """ Replace the *feature_mapping* within ``self``. :parameter feature_mapping: new feature mapping rule with bidirectional dict :type feature_mapping: dict :return: None """ self.__feature_column_mapping = feature_mapping
[docs] def set_feature_name(self, feature_list): """ Replace all features' name in given order. :parameter feature_list: new feature name list have length same as number of columns of ``self.data`` :type feature_list: list :return: None """ from collections import Iterable if not isinstance(feature_list, Iterable): raise TypeError("Headers must iterable") if len(feature_list) != self.__data.shape[1]: raise ValueError("Number of headers does not equal to the number of features") feature_list = list(map(str, feature_list)) self.__feature_column_mapping = {} for i in range(len(feature_list)): self.__feature_column_mapping[i] = feature_list[i] self.__feature_column_mapping[feature_list[i]] = i
[docs] def change_feature_name(self, old, new): """ Replace one feature's name. :parameter old: original name for the feature in ``self`` :type old: str :parameter new: new name name for the feature in ``self`` :type new: str :return: None """ if old not in self.__feature_column_mapping.keys(): raise KeyError("The feature {} does not exist in the dataset!".format(old)) if new in self.__feature_column_mapping.keys(): raise KeyError("The feature {} already exist in the dataset!".format(new)) column = self.__feature_column_mapping[old] self.__feature_column_mapping[column] = new self.__feature_column_mapping.pop(old) self.__feature_column_mapping[new] = column
# Can remove one or more feature
[docs] def remove_feature(self, features, error=True): """ Remove one or multiple features from the ``self.data``. :parameter features: one or multiple features that need to be removed :type features: str or list(str) :parameter error: whether throw an error if a name of feature is not available in ``self`` :type error: bool :return: None """ from collections import Iterable if not isinstance(features, Iterable) or isinstance(features, str): features = [features] column = list(range(self.__data.shape[1])) time_name = self.__feature_column_mapping[self.time_column_index] for feature in features: if feature not in self.__feature_column_mapping.keys(): if error: raise KeyError("The feature {} does not exist in the dataset!".format(feature)) else: column.remove(self.__feature_column_mapping[feature]) if self.__feature_column_mapping[feature] == self.time_column_index: self.time_column_index = None new_header = [self.__feature_column_mapping[i] for i in column] if time_name in new_header: self.time_column_index = new_header.index(time_name) self.__data = self.__data[:, column] self.set_feature_name(new_header)
# Can select one or more feature
[docs] def select_feature(self, features, error=True): """ Select one or multiple features from the ``self.data``, remove rest features. :parameter features: one or multiple features that need to be selected :type features: str or list(str) :parameter error: whether throw an error if any one of the name in parameter is not available in ``self`` :type error: bool :return: None """ from collections import Iterable if not isinstance(features, Iterable) or isinstance(features, str): features = [features] column = [] time_name = self.__feature_column_mapping[self.time_column_index] for feature in features: if feature not in self.__feature_column_mapping.keys(): if error: raise KeyError("The feature {} does not exist in the dataset!".format(feature)) else: column.append(self.__feature_column_mapping[feature]) new_header = [self.__feature_column_mapping[i] for i in column] if self.time_column_index not in new_header: self.time_column_index = None new_header = [self.__feature_column_mapping[i] for i in column] if time_name in new_header: self.time_column_index = new_header.index(time_name) self.__data = self.__data[:, column] self.set_feature_name(new_header)
# data is a float matrix of data. All time value need to be changed to its timestamp (datetime.timestamp()) # if no feature_list line, assume all data have same order as before.
[docs] def add_room(self, data, occupancy=None, room_name=None, header=True): """ Add a new room to ``self``. ``self.data`` can automatically expand. :parameter data: sensor data from the new room :type data: numpy.ndarray :parameter occupancy: occupancy data from the new room. If ``None`` then fill with ``numpy.nan`` :type occupancy: None or numpy.ndarray :parameter room_name: the name of the new room. If ``None`` then assign a unique index :type room_name: None or str :parameter header: Indicate whether the new room have a header on the first row :type header: bool :return: None """ from numpy import asarray, unique, full, nan, concatenate if header: if isinstance(header, bool): features = list(data[0]) data = data[1:] else: features = header else: features = list(range(len(list(data[0])))) try: data = asarray(data, dtype=float) if occupancy is not None: occupancy = asarray(occupancy, dtype=float) self.labelled = True if unique(occupancy).shape[0] > 2: self.binary = False else: occupancy = full([data.shape[0], 1], nan) except ValueError: raise ValueError("Data cannot convert to float or the shape of data is not a matrix") if len(features) != data.shape[1]: raise ValueError("Number of headers does not equal to the number of features") if occupancy is not None and occupancy.shape[0] != data.shape[0]: raise ValueError("Number of ground truth does not equal to the number of entries") if room_name is None: room_name = len(self.__room_mapping) self.__room_mapping[len(self)] = str(room_name) self.__room_mapping[str(room_name)] = (self.__data.shape[0], self.__data.shape[0] + data.shape[0]) if not self.__data.shape[0]: self.__data = data self.set_feature_name(features) if len(occupancy.shape) == 1: occupancy.shape += (1,) self.__occupancy = occupancy else: if header: target_column = [] source_column = [] rest_column = [] for i in range(len(features)): if features[i] in self.__feature_column_mapping.keys(): target_column.append(self.__feature_column_mapping[features[i]]) source_column.append(i) else: rest_column.append(i) else: target_column = list(range(self.__data.shape[1])) source_column = target_column rest_column = list(range(self.__data.shape[1], data.shape[1])) new_data = full([data.shape[0], self.__data.shape[1] + len(rest_column)], nan) new_data[:, target_column] = data[:, source_column] new_data[:, self.__data.shape[1]:] = data[:, rest_column] self.__data = concatenate((self.__data, full([self.__data.shape[0], len(rest_column)], nan)), axis=1) self.__data = concatenate((self.__data, new_data), axis=0) new_header = [] i = 0 while self.__feature_column_mapping.get(i, False): new_header.append(self.__feature_column_mapping[i]) i += 1 for name in rest_column: new_header.append(name) self.set_feature_name(new_header) if len(occupancy.shape) == 1: occupancy.shape += (1,) self.__occupancy = concatenate((self.__occupancy, occupancy), axis=0)
[docs] def pop_room(self, room_name): """ Remove a room from ``self``. :parameter room_name: name of the room need to be removed :type room_name: str :rtype: core.data.dataset.Dataset :return: removed Dataset """ from numpy import delete, unique, isnan if room_name not in self.__room_mapping.keys(): raise KeyError("This dataset do not contain room_list {}".format(room_name)) a, b = self.__room_mapping[room_name] pop_data, pop_occupancy = self[room_name] new_dataset = Dataset() new_dataset.add_room(pop_data, pop_occupancy, room_name=room_name, header=self.feature_list) self.__data = delete(self.__data, range(a, b), axis=0) self.__occupancy = delete(self.__occupancy, range(a, b), axis=0) unique_entry = unique(self.__occupancy) if unique_entry.shape[0] <= 2: self.binary = True if unique_entry.shape[0] == 1 and isnan(unique_entry[0]): self.labelled = False remove_col = b - a found = False for i in range(len(self) - 1): if self.__room_mapping[i] == room_name: found = True if found: new_a, new_b = self.__room_mapping[self.__room_mapping[i + 1]] self.__room_mapping[self.__room_mapping[i]] = (new_a - remove_col, new_b - remove_col) self.__room_mapping[i] = self.__room_mapping[i + 1] self.__room_mapping.pop(room_name) self.__room_mapping.pop(len(self) - 1) return new_dataset
[docs] def split(self, percentage): """ Separate ``self`` into two smaller ``core.data.dataset.Dataset`` objects by given split point. :parameter percentage: percentage of the row in the first part :type percentage: float :return: None """ front_dataset = Dataset() back_dataset = Dataset() front_dataset.time_column_index = self.time_column_index front_dataset.binary = self.binary front_dataset.labelled = self.labelled back_dataset.time_column_index = self.time_column_index back_dataset.binary = self.binary back_dataset.labelled = self.labelled split_point = round(percentage * self.__data.shape[0]) for room in self.room_list: room_data, room_occupancy = self[room] if self.__room_mapping[room][1] <= split_point: front_dataset.add_room(room_data, room_occupancy, room_name=room, header=self.feature_list) elif self.__room_mapping[room][0] > split_point: back_dataset.add_room(room_data, room_occupancy, room_name=room, header=self.feature_list) else: start_pos = self.__room_mapping[room][0] mid_pos = split_point - start_pos front_room_data = room_data[:mid_pos, :] front_room_occupancy = room_occupancy[:mid_pos, :] back_room_data = room_data[mid_pos:, :] back_room_occupancy = room_occupancy[mid_pos:, :] front_dataset.add_room(front_room_data, front_room_occupancy, room_name="Partially " + str(room), header=self.feature_list) back_dataset.add_room(back_room_data, back_room_occupancy, room_name="Partially " + str(room), header=self.feature_list) return front_dataset, back_dataset
[docs] def copy(self): """ Make a copy of ``self``. :parameter: None :rtype: core.data.dataset.Dataset :return: A same copy of ``self``, with different addresses for all values """ duplicate = Dataset() duplicate.change_values(self.__data.copy()) duplicate.change_occupancy(self.__occupancy.copy()) duplicate.change_feature_mapping(self.__feature_column_mapping.copy()) duplicate.change_room_mapping(self.__room_mapping.copy()) duplicate.time_column_index = self.time_column_index duplicate.binary = self.binary duplicate.labelled = self.labelled return duplicate
def __iter__(self): self.iter_helper = 0 return self def __next__(self): room_name = self.__room_mapping.get(self.iter_helper, None) if room_name is None: raise StopIteration else: self.iter_helper += 1 a, b = self.__room_mapping[room_name] return self.__data[a:b, :], self.__occupancy[a:b, :] def __getitem__(self, room_name): a, b = self.__room_mapping[room_name] return self.__data[a:b, :], self.__occupancy[a:b, :] def __len__(self): return len(self.__room_mapping) // 2 def __add__(self, other): if not isinstance(other, Dataset): raise TypeError("Dataset need to add with Dataset") rooms = other.room_list header = other.feature_list for room in rooms: data, occupancy = other[room] while room in self.__room_mapping.values(): room = str(int(room) + 1) self.add_room(data, occupancy=occupancy, room_name=room, header=header) return self def __sub__(self, other): if not isinstance(other, Dataset): raise TypeError("Dataset need to sub with Dataset") rooms = other.room_list for room in rooms: if room in self.room_list: self.remove_room(room) return self def __str__(self): return str(self.__dict__)