DEEP LEARNING FOR UNIVARIATE SERIES WORKOUT
Reference run in PyCharm:
# From Reference https://machinelearningmastery.com/how-to-develop-deep-learning-models-for-univariate-time-series-forecasting/
import pandas as pd
import pandas_ta
import matplotlib.pyplot as plt
import statistics
#for DL
from math import sqrt
from numpy import mean
from numpy import std
from pandas import DataFrame
from pandas import concat
from pandas import read_csv
from sklearn.metrics import mean_squared_error
from matplotlib import pyplot
from statistics import median
def linReg():
import pandas as pd
df = pd.read_csv('TSLA.csv')
# Load .csv file as DataFrame
df = pd.read_csv('TSLA.csv')
# print the data
print(df)
# print some summary statistics
print(df.describe())
# Indexing data using a DatetimeIndex
df.set_index(pd.DatetimeIndex(df['Date']), inplace=True)
# Keep only the 'Adj Close' Value
df = df[['Adj Close']]
# Re-inspect data
print(df)
print(df.info())
plt.plot(df[['Adj Close']])
plt.title('TESLA Share Price')
plt.xlabel('Year')
plt.ylabel('Adj Close Volume')
plt.savefig('TESLA.png')
plt.show()
df.ta.ema(close='Adj Close',length=10, append=True)
# will give Nan Values for First 10 Rows
# We have to fillup data
df = df.iloc[10:]
print(df.head(10))
#
plt.plot(df['Adj Close'])
plt.plot(df['EMA_10'])
plt.xlabel('Year')
plt.ylabel('Adj Close/EMA_10')
plt.title('TESLA Share Price with EMA overlaid')
plt.legend(["blue", "orange"], loc=0)
plt.savefig('TESLA_EMA_10.png')
plt.show()
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(df[['Adj Close']], df[['EMA_10']], test_size=.2)
#
from sklearn.linear_model import LinearRegression
# Create Regression Model
model = LinearRegression()
# Train the model
model.fit(X_train, y_train)
# Use model to make predictions
y_pred = model.predict(X_test)
#
# # #Test Set
print(X_test.describe())
# # # Training set
print(X_train.describe())
# #
#
# # y_pred_1000=model.predict([['1000']])
# # print('...........',y_pred_1000)
#
# from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
# # Printout relevant metrics
# print("Model Coefficients:", model.coef_) # [[0.98176283]]
# print("Mean Absolute Error:", mean_absolute_error(y_test, y_pred)) #6.21531704292117
# print("Coefficient of Determination:", r2_score(y_test, y_pred)) #0.9942788743625711
#https://machinelearningmastery.com/how-to-develop-deep-learning-models-for-univariate-time-series-forecasting/
def drawSeries():
df= pd.read_csv('carsales.csv', header=0, index_col=0)
print(df.shape)
plt.plot(df)
plt.show()
################################################DL
def deepLearn1():
# persistence
# split a univariate dataset into train/test sets
def train_test_split(data, n_test):
return data[:-n_test], data[-n_test:]
# transform list into supervised learning format
def series_to_supervised(data, n_in=1, n_out=1):
df = DataFrame(data)
cols = list()
# input sequence (t-n, ... t-1)
for i in range(n_in, 0, -1):
cols.append(df.shift(i))
# forecast sequence (t, t+1, ... t+n)
for i in range(0, n_out):
cols.append(df.shift(-i))
# put it all together
agg = concat(cols, axis=1)
# drop rows with NaN values
agg.dropna(inplace=True)
return agg.values
# root mean squared error or rmse
def measure_rmse(actual, predicted):
return sqrt(mean_squared_error(actual, predicted))
# difference dataset
def difference(data, interval):
return [data[i] - data[i - interval] for i in range(interval, len(data))]
# fit a model
def model_fit(train, config):
return None
# forecast with a pre-fit model
def model_predict(model, history, config):
values = list()
for offset in config:
values.append(history[-offset])
return median(values)
# walk-forward validation for univariate data
def walk_forward_validation(data, n_test, cfg):
predictions = list()
# split dataset
train, test = train_test_split(data, n_test)
# fit model
model = model_fit(train, cfg)
# seed history with training dataset
history = [x for x in train]
# step over each time-step in the test set
for i in range(len(test)):
# fit model and make forecast for history
yhat = model_predict(model, history, cfg)
# store forecast in list of predictions
predictions.append(yhat)
# add actual observation to history for the next loop
history.append(test[i])
# estimate prediction error
error = measure_rmse(test, predictions)
print(' > %.3f' % error)
return error
# repeat evaluation of a config
def repeat_evaluate(data, config, n_test, n_repeats=30):
# fit and evaluate the model n times
scores = [walk_forward_validation(data, n_test, config) for _ in range(n_repeats)]
return scores
# summarize model performance
def summarize_scores(name, scores):
# print a summary
scores_m, score_std = mean(scores), std(scores)
print('%s: %.3f RMSE (+/- %.3f)' % (name, scores_m, score_std))
# box and whisker plot
pyplot.boxplot(scores)
pyplot.show()
series = read_csv('carsales.csv', header=0, index_col=0)
data = series.values
# data split
n_test = 12
# define config
config = [12, 24, 36]
# grid search
scores = repeat_evaluate(data, config, n_test)
# summarize scores
summarize_scores('persistence', scores)
##############################################MLP
#for MLP
from math import sqrt
from numpy import array
from numpy import mean
from numpy import std
from pandas import DataFrame
from pandas import concat
from pandas import read_csv
from sklearn.metrics import mean_squared_error
from keras.models import Sequential
from keras.layers import Dense
from matplotlib import pyplot
def MLP():
# evaluate mlp
# split a univariate dataset into train/test sets
def train_test_split(data, n_test):
return data[:-n_test], data[-n_test:]
# transform list into supervised learning format
def series_to_supervised(data, n_in=1, n_out=1):
df = DataFrame(data)
cols = list()
# input sequence (t-n, ... t-1)
for i in range(n_in, 0, -1):
cols.append(df.shift(i))
# forecast sequence (t, t+1, ... t+n)
for i in range(0, n_out):
cols.append(df.shift(-i))
# put it all together
agg = concat(cols, axis=1)
# drop rows with NaN values
agg.dropna(inplace=True)
return agg.values
# root mean squared error or rmse
def measure_rmse(actual, predicted):
return sqrt(mean_squared_error(actual, predicted))
# fit a model
def model_fit(train, config):
# unpack config
n_input, n_nodes, n_epochs, n_batch = config
# prepare data
data = series_to_supervised(train, n_in=n_input)
train_x, train_y = data[:, :-1], data[:, -1]
# define model
model = Sequential()
model.add(Dense(n_nodes, activation='relu', input_dim=n_input))
model.add(Dense(1))
model.compile(loss='mse', optimizer='adam')
# fit
model.fit(train_x, train_y, epochs=n_epochs, batch_size=n_batch, verbose=0)
return model
# forecast with a pre-fit model
def model_predict(model, history, config):
# unpack config
n_input, _, _, _ = config
# prepare data
x_input = array(history[-n_input:]).reshape(1, n_input)
# forecast
yhat = model.predict(x_input, verbose=0)
return yhat[0]
# walk-forward validation for univariate data
def walk_forward_validation(data, n_test, cfg):
predictions = list()
# split dataset
train, test = train_test_split(data, n_test)
# fit model
model = model_fit(train, cfg)
# seed history with training dataset
history = [x for x in train]
# step over each time-step in the test set
for i in range(len(test)):
# fit model and make forecast for history
yhat = model_predict(model, history, cfg)
# store forecast in list of predictions
predictions.append(yhat)
# add actual observation to history for the next loop
history.append(test[i])
# estimate prediction error
error = measure_rmse(test, predictions)
print(' > %.3f' % error)
return error
# repeat evaluation of a config
def repeat_evaluate(data, config, n_test, n_repeats=30):
# fit and evaluate the model n times
scores = [walk_forward_validation(data, n_test, config) for _ in range(n_repeats)]
return scores
# summarize model performance
def summarize_scores(name, scores):
# print a summary
scores_m, score_std = mean(scores), std(scores)
print('%s: %.3f RMSE (+/- %.3f)' % (name, scores_m, score_std))
# box and whisker plot
pyplot.boxplot(scores)
pyplot.show()
series = read_csv('carsales.csv', header=0, index_col=0)
data = series.values
# data split
n_test = 12
# define config
config = [24, 500, 100, 100]
# grid search
scores = repeat_evaluate(data, config, n_test)
# summarize scores
summarize_scores('mlp', scores)
#CNN
#for CNN
from math import sqrt
from numpy import array
from numpy import mean
from numpy import std
from pandas import DataFrame
from pandas import concat
from pandas import read_csv
from sklearn.metrics import mean_squared_error
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import Flatten
from keras.layers.convolutional import Conv1D
from keras.layers.convolutional import MaxPooling1D
from matplotlib import pyplot
def ConvNet():
# evaluate cnn
# split a univariate dataset into train/test sets
def train_test_split(data, n_test):
return data[:-n_test], data[-n_test:]
# transform list into supervised learning format
def series_to_supervised(data, n_in=1, n_out=1):
df = DataFrame(data)
cols = list()
# input sequence (t-n, ... t-1)
for i in range(n_in, 0, -1):
cols.append(df.shift(i))
# forecast sequence (t, t+1, ... t+n)
for i in range(0, n_out):
cols.append(df.shift(-i))
# put it all together
agg = concat(cols, axis=1)
# drop rows with NaN values
agg.dropna(inplace=True)
return agg.values
# root mean squared error or rmse
def measure_rmse(actual, predicted):
return sqrt(mean_squared_error(actual, predicted))
# fit a model
def model_fit(train, config):
# unpack config
n_input, n_filters, n_kernel, n_epochs, n_batch = config
# prepare data
data = series_to_supervised(train, n_in=n_input)
train_x, train_y = data[:, :-1], data[:, -1]
train_x = train_x.reshape((train_x.shape[0], train_x.shape[1], 1))
# define model
model = Sequential()
model.add(Conv1D(filters=n_filters, kernel_size=n_kernel, activation='relu', input_shape=(n_input, 1)))
model.add(Conv1D(filters=n_filters, kernel_size=n_kernel, activation='relu'))
model.add(MaxPooling1D(pool_size=2))
model.add(Flatten())
model.add(Dense(1))
model.compile(loss='mse', optimizer='adam')
# fit
model.fit(train_x, train_y, epochs=n_epochs, batch_size=n_batch, verbose=0)
return model
# forecast with a pre-fit model
def model_predict(model, history, config):
# unpack config
n_input, _, _, _, _ = config
# prepare data
x_input = array(history[-n_input:]).reshape((1, n_input, 1))
# forecast
yhat = model.predict(x_input, verbose=0)
return yhat[0]
# walk-forward validation for univariate data
def walk_forward_validation(data, n_test, cfg):
predictions = list()
# split dataset
train, test = train_test_split(data, n_test)
# fit model
model = model_fit(train, cfg)
# seed history with training dataset
history = [x for x in train]
# step over each time-step in the test set
for i in range(len(test)):
# fit model and make forecast for history
yhat = model_predict(model, history, cfg)
# store forecast in list of predictions
predictions.append(yhat)
# add actual observation to history for the next loop
history.append(test[i])
# estimate prediction error
error = measure_rmse(test, predictions)
print(' > %.3f' % error)
return error
# repeat evaluation of a config
def repeat_evaluate(data, config, n_test, n_repeats=30):
# fit and evaluate the model n times
scores = [walk_forward_validation(data, n_test, config) for _ in range(n_repeats)]
return scores
# summarize model performance
def summarize_scores(name, scores):
# print a summary
scores_m, score_std = mean(scores), std(scores)
print('%s: %.3f RMSE (+/- %.3f)' % (name, scores_m, score_std))
# box and whisker plot
pyplot.boxplot(scores)
pyplot.show()
series = read_csv('carsales.csv', header=0, index_col=0)
data = series.values
# data split
n_test = 12
# define config
config = [36, 256, 3, 100, 100]
# grid search
scores = repeat_evaluate(data, config, n_test)
# summarize scores
summarize_scores('cnn', scores)
# #LSTM
from math import sqrt
from numpy import array
from numpy import mean
from numpy import std
from pandas import DataFrame
from pandas import concat
from pandas import read_csv
from sklearn.metrics import mean_squared_error
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import LSTM
from matplotlib import pyplot
#
#
def LsTM():
# evaluate lstm
# split a univariate dataset into train/test sets
def train_test_split(data, n_test):
return data[:-n_test], data[-n_test:]
# transform list into supervised learning format
def series_to_supervised(data, n_in=1, n_out=1):
df = DataFrame(data)
cols = list()
# input sequence (t-n, ... t-1)
for i in range(n_in, 0, -1):
cols.append(df.shift(i))
# forecast sequence (t, t+1, ... t+n)
for i in range(0, n_out):
cols.append(df.shift(-i))
# put it all together
agg = concat(cols, axis=1)
# drop rows with NaN values
agg.dropna(inplace=True)
return agg.values
# root mean squared error or rmse
def measure_rmse(actual, predicted):
return sqrt(mean_squared_error(actual, predicted))
# difference dataset
def difference(data, interval):
return [data[i] - data[i - interval] for i in range(interval, len(data))]
# fit a model
def model_fit(train, config):
# unpack config
n_input, n_nodes, n_epochs, n_batch, n_diff = config
# prepare data
if n_diff > 0:
train = difference(train, n_diff)
data = series_to_supervised(train, n_in=n_input)
train_x, train_y = data[:, :-1], data[:, -1]
train_x = train_x.reshape((train_x.shape[0], train_x.shape[1], 1))
# define model
model = Sequential()
model.add(LSTM(n_nodes, activation='relu', input_shape=(n_input, 1)))
model.add(Dense(n_nodes, activation='relu'))
model.add(Dense(1))
model.compile(loss='mse', optimizer='adam')
# fit
model.fit(train_x, train_y, epochs=n_epochs, batch_size=n_batch, verbose=0)
return model
# forecast with a pre-fit model
def model_predict(model, history, config):
# unpack config
n_input, _, _, _, n_diff = config
# prepare data
correction = 0.0
if n_diff > 0:
correction = history[-n_diff]
history = difference(history, n_diff)
x_input = array(history[-n_input:]).reshape((1, n_input, 1))
# forecast
yhat = model.predict(x_input, verbose=0)
return correction + yhat[0]
# walk-forward validation for univariate data
def walk_forward_validation(data, n_test, cfg):
predictions = list()
# split dataset
train, test = train_test_split(data, n_test)
# fit model
model = model_fit(train, cfg)
# seed history with training dataset
history = [x for x in train]
# step over each time-step in the test set
for i in range(len(test)):
# fit model and make forecast for history
yhat = model_predict(model, history, cfg)
# store forecast in list of predictions
predictions.append(yhat)
# add actual observation to history for the next loop
history.append(test[i])
# estimate prediction error
error = measure_rmse(test, predictions)
print(' > %.3f' % error)
return error
# repeat evaluation of a config
def repeat_evaluate(data, config, n_test, n_repeats=30):
# fit and evaluate the model n times
scores = [walk_forward_validation(data, n_test, config) for _ in range(n_repeats)]
return scores
# summarize model performance
def summarize_scores(name, scores):
# print a summary
scores_m, score_std = mean(scores), std(scores)
print('%s: %.3f RMSE (+/- %.3f)' % (name, scores_m, score_std))
# box and whisker plot
pyplot.boxplot(scores)
pyplot.show()
series = read_csv('carsales.csv', header=0, index_col=0)
data = series.values
# data split
n_test = 12
# define config
config = [36, 50, 100, 100, 12]
# grid search
scores = repeat_evaluate(data, config, n_test)
# summarize scores
summarize_scores('lstm', scores)
#--------------------------------------------------------------------------------------
from math import sqrt
from numpy import array
from numpy import mean
from numpy import std
from pandas import DataFrame
from pandas import concat
from pandas import read_csv
from sklearn.metrics import mean_squared_error
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import LSTM
from keras.layers import TimeDistributed
from keras.layers import Flatten
from keras.layers.convolutional import Conv1D
from keras.layers.convolutional import MaxPooling1D
from matplotlib import pyplot
def CNNLSTM():
# evaluate cnn lstm
# split a univariate dataset into train/test sets
def train_test_split(data, n_test):
return data[:-n_test], data[-n_test:]
# transform list into supervised learning format
def series_to_supervised(data, n_in=1, n_out=1):
df = DataFrame(data)
cols = list()
# input sequence (t-n, ... t-1)
for i in range(n_in, 0, -1):
cols.append(df.shift(i))
# forecast sequence (t, t+1, ... t+n)
for i in range(0, n_out):
cols.append(df.shift(-i))
# put it all together
agg = concat(cols, axis=1)
# drop rows with NaN values
agg.dropna(inplace=True)
return agg.values
# root mean squared error or rmse
def measure_rmse(actual, predicted):
return sqrt(mean_squared_error(actual, predicted))
# fit a model
def model_fit(train, config):
# unpack config
n_seq, n_steps, n_filters, n_kernel, n_nodes, n_epochs, n_batch = config
n_input = n_seq * n_steps
# prepare data
data = series_to_supervised(train, n_in=n_input)
train_x, train_y = data[:, :-1], data[:, -1]
train_x = train_x.reshape((train_x.shape[0], n_seq, n_steps, 1))
# define model
model = Sequential()
model.add(TimeDistributed(
Conv1D(filters=n_filters, kernel_size=n_kernel, activation='relu', input_shape=(None, n_steps, 1))))
model.add(TimeDistributed(Conv1D(filters=n_filters, kernel_size=n_kernel, activation='relu')))
model.add(TimeDistributed(MaxPooling1D(pool_size=2)))
model.add(TimeDistributed(Flatten()))
model.add(LSTM(n_nodes, activation='relu'))
model.add(Dense(n_nodes, activation='relu'))
model.add(Dense(1))
model.compile(loss='mse', optimizer='adam')
# fit
model.fit(train_x, train_y, epochs=n_epochs, batch_size=n_batch, verbose=0)
return model
# forecast with a pre-fit model
def model_predict(model, history, config):
# unpack config
n_seq, n_steps, _, _, _, _, _ = config
n_input = n_seq * n_steps
# prepare data
x_input = array(history[-n_input:]).reshape((1, n_seq, n_steps, 1))
# forecast
yhat = model.predict(x_input, verbose=0)
return yhat[0]
# walk-forward validation for univariate data
def walk_forward_validation(data, n_test, cfg):
predictions = list()
# split dataset
train, test = train_test_split(data, n_test)
# fit model
model = model_fit(train, cfg)
# seed history with training dataset
history = [x for x in train]
# step over each time-step in the test set
for i in range(len(test)):
# fit model and make forecast for history
yhat = model_predict(model, history, cfg)
# store forecast in list of predictions
predictions.append(yhat)
# add actual observation to history for the next loop
history.append(test[i])
# estimate prediction error
error = measure_rmse(test, predictions)
print(' > %.3f' % error)
return error
# repeat evaluation of a config
def repeat_evaluate(data, config, n_test, n_repeats=30):
# fit and evaluate the model n times
scores = [walk_forward_validation(data, n_test, config) for _ in range(n_repeats)]
return scores
# summarize model performance
def summarize_scores(name, scores):
# print a summary
scores_m, score_std = mean(scores), std(scores)
print('%s: %.3f RMSE (+/- %.3f)' % (name, scores_m, score_std))
# box and whisker plot
pyplot.boxplot(scores)
pyplot.show()
series = read_csv('carsales.csv', header=0, index_col=0)
data = series.values
# data split
n_test = 12
# define config
config = [3, 12, 64, 3, 100, 200, 100]
# grid search
scores = repeat_evaluate(data, config, n_test)
# summarize scores
summarize_scores('cnn-lstm', scores)
#-----------------------------------------------------------------------------------------------------------------------
# # evaluate convlstm
from math import sqrt
from numpy import array
from numpy import mean
from numpy import std
from pandas import DataFrame
from pandas import concat
from pandas import read_csv
from sklearn.metrics import mean_squared_error
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import Flatten
from keras.layers import ConvLSTM2D
from matplotlib import pyplot
def CONVLSTM():
# split a univariate dataset into train/test sets
def train_test_split(data, n_test):
return data[:-n_test], data[-n_test:]
# transform list into supervised learning format
def series_to_supervised(data, n_in=1, n_out=1):
df = DataFrame(data)
cols = list()
# input sequence (t-n, ... t-1)
for i in range(n_in, 0, -1):
cols.append(df.shift(i))
# forecast sequence (t, t+1, ... t+n)
for i in range(0, n_out):
cols.append(df.shift(-i))
# put it all together
agg = concat(cols, axis=1)
# drop rows with NaN values
agg.dropna(inplace=True)
return agg.values
# root mean squared error or rmse
def measure_rmse(actual, predicted):
return sqrt(mean_squared_error(actual, predicted))
# difference dataset
def difference(data, interval):
return [data[i] - data[i - interval] for i in range(interval, len(data))]
# fit a model
def model_fit(train, config):
# unpack config
n_seq, n_steps, n_filters, n_kernel, n_nodes, n_epochs, n_batch = config
n_input = n_seq * n_steps
# prepare data
data = series_to_supervised(train, n_in=n_input)
train_x, train_y = data[:, :-1], data[:, -1]
train_x = train_x.reshape((train_x.shape[0], n_seq, 1, n_steps, 1))
# define model
model = Sequential()
model.add(
ConvLSTM2D(filters=n_filters, kernel_size=(1, n_kernel), activation='relu', input_shape=(n_seq, 1, n_steps, 1)))
model.add(Flatten())
model.add(Dense(n_nodes, activation='relu'))
model.add(Dense(1))
model.compile(loss='mse', optimizer='adam')
# fit
model.fit(train_x, train_y, epochs=n_epochs, batch_size=n_batch, verbose=0)
return model
# forecast with a pre-fit model
def model_predict(model, history, config):
# unpack config
n_seq, n_steps, _, _, _, _, _ = config
n_input = n_seq * n_steps
# prepare data
x_input = array(history[-n_input:]).reshape((1, n_seq, 1, n_steps, 1))
# forecast
yhat = model.predict(x_input, verbose=0)
return yhat[0]
# walk-forward validation for univariate data
def walk_forward_validation(data, n_test, cfg):
predictions = list()
# split dataset
train, test = train_test_split(data, n_test)
# fit model
model = model_fit(train, cfg)
# seed history with training dataset
history = [x for x in train]
# step over each time-step in the test set
for i in range(len(test)):
# fit model and make forecast for history
yhat = model_predict(model, history, cfg)
# store forecast in list of predictions
predictions.append(yhat)
# add actual observation to history for the next loop
history.append(test[i])
# estimate prediction error
error = measure_rmse(test, predictions)
print(' > %.3f' % error)
return error
# repeat evaluation of a config
def repeat_evaluate(data, config, n_test, n_repeats=30):
# fit and evaluate the model n times
scores = [walk_forward_validation(data, n_test, config) for _ in range(n_repeats)]
return scores
# summarize model performance
def summarize_scores(name, scores):
# print a summary
scores_m, score_std = mean(scores), std(scores)
print('%s: %.3f RMSE (+/- %.3f)' % (name, scores_m, score_std))
# box and whisker plot
pyplot.boxplot(scores)
pyplot.show()
series = read_csv('carsales.csv', header=0, index_col=0)
data = series.values
# data split
n_test = 12
# define config
config = [3, 12, 256, 3, 200, 200, 100]
# grid search
scores = repeat_evaluate(data, config, n_test)
# summarize scores
summarize_scores('convlstm', scores)
#################################################### Below Results May Vary depends upon Stochositic nature
drawSeries() #Univariate Series
deepLearn1() #persistence: 1841.156 RMSE (+/- 0.000)
MLP() #mlp: 1573.869 RMSE (+/- 121.720)
ConvNet() #cnn: 1557.113 RMSE (+/- 59.820)
LsTM() #lstm: 2091.674 RMSE (+/- 72.054)
CNNLSTM() #cnn-lstm: 1630.113 RMSE (+/- 180.719)
CONVLSTM() #convlstm: 1768.205 RMSE (+/- 230.159)
Just for Your Information. Happy Deep Learning with AMET ODL.