Source code for core.model.rnn

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from .superclass import *


[docs]class LSTM(NormalModel): """ Using `Long Short Term Memory <https://keras.io/layers/recurrent/>`_ model to predict the occupancy level This is a normal supervised learning model. :parameter train: the labelled ground truth Dataset for training the model :type train: core.data.dataset.Dataset :parameter test: the Dataset for testing by using sensor data only :type test: core.data.dataset.Dataset :parameter hm_epochs: maximum number of epoches :type hm_epochs: int :parameter batch_size: size of minibatches for stochastic optimizers :type batch_size: int :parameter cell: number of LSTM cells in the hidden layer :type cell: int :parameter learn_rate: the initial learning rate used. It controls the step-size in updating the weights :type learn_rate: float :rtype: numpy.ndarray :return: Predicted occupancy level corresponding to the test Dataset """ def __init__(self, train, test): self.train = train # if self.train.occupancy.shape[1] == 1: # change_to_one_hot(self.train) self.test = test # if self.test.occupancy.shape[1] == 1: # change_to_one_hot(self.test) self.hm_epochs = 10 self.batch_size = 60 self.cell = 75 self.learn_rate = 0.0001
[docs] def run(self): from keras.models import Sequential from keras.layers import LSTM, TimeDistributed, Dense from numpy import zeros, reshape model = Sequential() model.add(LSTM(batch_input_shape=(self.batch_size, 1, self.train.data.shape[1]), units=self.cell, return_sequences=True, # True: output at all steps. False: output as last step. stateful=True)) # add output layer model.add(TimeDistributed(Dense(1))) model.compile(optimizer='adam', loss='mse') for _ in range(self.hm_epochs): epoch_loss = 0 for i in range(int(self.train.data.shape[0] / self.batch_size)): epoch_x = self.train.data[i * self.batch_size: (i + 1) * self.batch_size].reshape(self.batch_size, 1, self.train.data.shape[1]) epoch_y = self.train.occupancy[i * self.batch_size: (i + 1) * self.batch_size].reshape(self.batch_size, 1, self.train.occupancy.shape[1]) cost = model.train_on_batch(epoch_x, epoch_y) epoch_loss += cost final = zeros(self.test.occupancy.shape, dtype=float) for i in range(int(self.test.data.shape[0] / self.batch_size)): predict = model.predict(self.test.data[i * self.batch_size: (i + 1) * self.batch_size].reshape(self.batch_size, 1, self.test.data.shape[1]), self.test.data.shape[0]) predict = predict.reshape((-1, self.train.occupancy.shape[1])) final[i * self.batch_size:(i + 1) * self.batch_size, :] = predict if self.test.data.shape[0] % self.batch_size: predict = model.predict(self.test.data[-self.batch_size:].reshape(self.batch_size, 1, self.test.data.shape[1]), self.test.data.shape[0]) predict = predict.reshape((-1, self.train.occupancy.shape[1])) final[-self.batch_size:, :] = predict return reshape(final.argmax(axis=1), (-1, 1))
[docs]class DALSTM(DomainAdaptiveModel): """ Using `Long Short Term Memory <https://keras.io/layers/recurrent/>`_ model to predict the occupancy level This is a domain-adaptive semi-supervised learning model. :parameter source: the source domain with full knowledge for training the model :type source: core.data.dataset.Dataset :parameter target_retrain: the labelled ground truth Dataset in the target domain for re-training the model :type target_retrain: ``None`` or core.data.dataset.Dataset :parameter target_test: the Dataset in the rest of the target domain for testing by using sensor data only :type target_test: core.data.dataset.Dataset :parameter hm_epochs: maximum number of epoches :type hm_epochs: int :parameter batch_size: size of minibatches for stochastic optimizers :type batch_size: int :parameter cell: number of LSTM cells in the hidden layer :type cell: int :parameter learn_rate: the initial learning rate used. It controls the step-size in updating the weights :type learn_rate: float :rtype: numpy.ndarray :return: Predicted occupancy level corresponding to the test Dataset """ def __init__(self, source, target_retrain, target_test): self.source = source self.target_retrain = target_retrain self.target_test = target_test self.hm_epochs = 10 self.batch_size = 60 self.cell = 75 self.learn_rate = 0.00001
[docs] def run(self): from keras.models import Sequential from keras.layers import LSTM, TimeDistributed, Dense from keras.optimizers import Adam from numpy import zeros model = Sequential() model.add(LSTM(batch_input_shape=(self.batch_size, 1, self.source.data.shape[1]), units=self.cell, return_sequences=True, # True: output at all steps. False: output as last step. stateful=True)) # add output layer model.add(TimeDistributed(Dense(self.source.occupancy.shape[1]))) model.compile(optimizer=Adam(self.learn_rate), loss='mse') for _ in range(self.hm_epochs): epoch_loss = 0 for i in range(int(self.source.data.shape[0] / self.batch_size)): epoch_x = self.source.data[i * self.batch_size: (i + 1) * self.batch_size].reshape(self.batch_size, 1, self.source.data.shape[1]) epoch_y = self.source.occupancy[i * self.batch_size: (i + 1) * self.batch_size].reshape(self.batch_size, 1, self.source.occupancy.shape[1]) cost = model.train_on_batch(epoch_x, epoch_y) epoch_loss += cost if self.target_retrain is None: self.hm_epochs = 0 for _ in range(self.hm_epochs): epoch_loss = 0 for i in range(int(self.target_retrain.data.shape[0] / self.batch_size)): epoch_x = self.target_retrain.data[i * self.batch_size: (i + 1) * self.batch_size].reshape(self.batch_size, 1, self.target_retrain.data.shape[1]) epoch_y = self.target_retrain.occupancy[i * self.batch_size: (i + 1) * self.batch_size].reshape(self.batch_size, 1, self.target_retrain.occupancy.shape[1]) cost = model.train_on_batch(epoch_x, epoch_y) epoch_loss += cost final = zeros(self.target_test.occupancy.shape, dtype=float) for i in range(int(self.target_test.data.shape[0] / self.batch_size)): predict = model.predict(self.target_test.data[i * self.batch_size: (i + 1) * self.batch_size].reshape(self.batch_size, 1, self.target_test.data.shape[1]), self.target_test.data.shape[0]) predict = predict.reshape((predict.shape[0], self.source.occupancy.shape[1])) final[i * self.batch_size:(i + 1) * self.batch_size, :] = predict if self.target_test.data.shape[0] % self.batch_size: predict = model.predict(self.target_test.data[-self.batch_size:].reshape(self.batch_size, 1, self.target_test.data.shape[1]), self.target_test.data.shape[0]) predict = predict.reshape((predict.shape[0], self.source.occupancy.shape[1])) final[-self.batch_size:, :] = predict return final