"""
This module can be used to read cycling data of the CX2, CS2 and PL type cells as
a dataframe. It converts cumulative values into individual values for
each cycle and determines net charge of the battery at every datapoint.
It can also be used to train and test a LSTM model and predict discharge capacity
using the LSTM model.
"""
import datetime
import os
from os import listdir
from os.path import isfile, join
import re
# import matplotlib.pyplot as plt
# import seaborn as sns
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import LSTM
from keras.models import load_model
# @profile
[docs]def date_time_converter(date_time_list):
"""
This function gets the numpy array with date_time in matlab format
and returns a numpy array with date_time in human readable format.
"""
if not isinstance(date_time_list, list):
raise TypeError("date_time_list should be a list")
# Empty array to hold the results
date_time_human = []
for i in date_time_list:
date_time_human.append(
datetime.datetime.fromordinal(
int(i)) +
datetime.timedelta(
days=i %
1) -
datetime.timedelta(
days=366))
return date_time_human
# @profile
[docs]def get_dict_files(data_dir, file_name_format, ignore_file_indices):
"""
This function finds all the files at the location of the file name
format as specified and then creates a dictionary after ignoring the
list of file specified
Args:
data_dir (string): This is the absolute path to the data directory.
file_name_format (string): Format of the filename, used to deduce other
files.
ignore_file_indices (list, int): This list of ints tells
which to ignore.
Returns:
The dictionary with all data from files dataframes.
"""
# get the list of files in the directory
onlyfiles = [f for f in listdir(data_dir) if isfile(join(data_dir, f))]
# Extract the experiment name from the file_name_format
exp_name = file_name_format[0:4]
# Empty dictionary to hold all the dataframe for various files
dict_files = {}
# Iterate over all the files of certain type and get the file number from
# them
for filename in onlyfiles:
if exp_name in filename:
# Extract the filenumber from the name
file_number = re.search(
exp_name + r'\((.+?)\).csv',
filename).group(1)
# Give a value of dataframe to each key
dict_files[int(file_number)] = pd.read_csv(
join(data_dir, filename))
# Empty dictionary to hold the ordered dictionaries
dict_ordered = {}
# Sort the dictionary based on keys
for key in sorted(dict_files.keys()):
dict_ordered[key] = dict_files[key]
# Keys with files to keep, remove the ignore indices from all keys
wanted_keys = np.array(
list(set(dict_ordered.keys()) - set(ignore_file_indices)))
# Remove the ignored dataframes for characterization
dict_ord_cycling_data = {k: dict_ordered[k] for k in wanted_keys}
return dict_ord_cycling_data
[docs]def concat_dict_dataframes(dict_ord_cycling_data):
"""
This function takes in a dictionary with ordered keys
and concatenates the dataframes in the values of the
dictionary to create a large dataframe with all the records.
Args:
dict_ord_cycling_data (dict):
The dictionary with ordered integer keys and dataframes as values
Returns:
The dataframe after concatenation
"""
# Raise an exception if the type of the inputs is not correct
if not isinstance(dict_ord_cycling_data, dict):
raise TypeError('dict_ord_cycling_data is not of type dict')
#print(dict_ord_cycling_data.keys())
for i in dict_ord_cycling_data.keys():
# Raise an exception if the type of the keys is not integers
# print(type(i))
if not isinstance(i, (int, np.int64)):
raise TypeError('a key in the dictionary is not an integer')
for i in dict_ord_cycling_data.values():
# Raise an exception if the type of the values is not a dataframe
if not isinstance(i, pd.DataFrame):
raise TypeError('a value in the dictionary is not a pandas ' +
'dataframe')
# print(i.columns)
# Raise am exception if the necessary columns are not found in the df
if not {
'Cycle',
'Charge_Ah',
'Discharge_Ah',
'Time_sec',
'Current_Amp',
'Voltage_Volt'}.issubset(i.columns):
raise Exception("the dataframe doesnt have the columns 'Cycle'" +
", 'Charge_Ah', 'Discharge_Ah', " +
"'Time_sec', 'Voltage_Volt', 'Current_Amp' ")
# Concatenate the dataframes to create the total dataframe
df_out = None
for k in dict_ord_cycling_data.keys():
if df_out is None:
df_next = dict_ord_cycling_data[k]
df_out = pd.DataFrame(data=None, columns=df_next.columns)
df_out = pd.concat([df_out, df_next])
else:
df_next = dict_ord_cycling_data[k]
df_next['Cycle'] = np.array(
df_next['Cycle']) + max(np.array(df_out['Cycle']))
df_next['Time_sec'] = np.array(
df_next['Time_sec']) + max(np.array(df_out['Time_sec']))
df_next['Charge_Ah'] = np.array(
df_next['Charge_Ah']) + max(np.array(df_out['Charge_Ah']))
df_next['Discharge_Ah'] = np.array(
df_next['Discharge_Ah']) + max(np.array(df_out['Discharge_Ah']))
df_out = pd.concat([df_out, df_next])
return df_out
[docs]def get_cycle_capacities(df_out):
"""
This function takes the dataframe, creates a new index and then calculates
capacities per cycle from cumulative charge and discharge capacities
Args:
df_out (pandas.DataFrame):
Concatenated dataframe
Returns:
the dataframe with capacities per cycle
"""
# Raise am exception if the necessary columns are not found in the df
if not {'Cycle', 'Charge_Ah', 'Discharge_Ah', 'Time_sec', 'Current_Amp',
'Voltage_Volt'}.issubset(df_out.columns):
raise Exception("the dataframe doesnt have the columns 'Cycle'" +
", 'Charge_Ah', 'Discharge_Ah', " +
"'Time_sec', 'Voltage_Volt', 'Current_Amp' ")
# Reset the index and drop the old index
df_out_indexed = df_out.reset_index(drop=True)
# Proceed further with correcting the capacity
df_grouped = df_out_indexed.groupby(['Cycle']).count()
# Get the indices when a cycle starts
cycle_start_indices = df_grouped['Time_sec'].cumsum()
# Get the charge_Ah per cycle
# Create numpy array to store the old charge_Ah row, and then
# perform transformation on it, rather than in the pandas series
# this is a lot faster in this case
charge_cycle_ah = np.array(df_out_indexed['Charge_Ah'])
charge_ah = np.array(df_out_indexed['Charge_Ah'])
for i in range(1, len(cycle_start_indices)):
begin_value = cycle_start_indices.iloc[i - 1]
end_value = cycle_start_indices.iloc[i]
charge_cycle_ah[begin_value:end_value] = charge_ah[begin_value:end_value] - \
charge_ah[begin_value - 1]
df_out_indexed['charge_cycle_ah'] = charge_cycle_ah
# Get the discharge_Ah per cycle
discharge_cycle_ah = np.array(df_out_indexed['Discharge_Ah'])
discharge_ah = np.array(df_out_indexed['Discharge_Ah'])
for i in range(1, len(cycle_start_indices)):
begin_value = cycle_start_indices.iloc[i - 1]
end_value = cycle_start_indices.iloc[i]
discharge_cycle_ah[begin_value:end_value] = discharge_ah[begin_value:end_value] - \
discharge_ah[begin_value - 1]
df_out_indexed['discharge_cycle_ah'] = discharge_cycle_ah
# This is the data column we can use for prediction.
# This is not totally accurate, as this still has some points that go negative,
# due to incorrect discharge_Ah values every few cycles.
# But the machine learning algorithm should consider these as outliers and
# hopefully get over it. We can come back and correct this.
df_out_indexed['capacity_ah'] = charge_cycle_ah - discharge_cycle_ah
df_out_indexed.rename(columns={'Current_Amp':'Current(A)','Voltage_Volt':'Voltage(V)'},
inplace=True)
return df_out_indexed
# @profile
[docs]def pl_samples_file_reader(data_dir, file_name_format, ignore_file_indices):
"""
This function reads in the data for PL Samples experiment and returns a
nice dataframe with cycles in ascending order.
Args:
data_dir (string): This is the absolute path to the data directory.
file_name_format (string): Format of the filename, used to deduce other files.
ignore_file_indices (list, int): This list of ints tells which to ignore.
Returns:
The complete test data in a dataframe with extra column for capacity in Ah.
"""
# Raise an exception if the type of the inputs is not correct
if not isinstance(data_dir, str):
raise TypeError('data_dir is not of type string')
if not isinstance(file_name_format, str):
raise TypeError('file_name_format is not of type string')
if not isinstance(ignore_file_indices, list):
raise TypeError("ignore_file_indices should be a list")
for ignore_file_indice in ignore_file_indices:
if not isinstance(ignore_file_indice, int):
raise TypeError("""ignore_file_indices elements should be
of type integer""")
if not os.path.exists(join(data_dir, file_name_format)):
raise FileNotFoundError("File {} not found in the location {}"
.format(file_name_format, data_dir))
dict_ord_cycling_data = get_dict_files(
data_dir, file_name_format, ignore_file_indices)
df_out = concat_dict_dataframes(dict_ord_cycling_data)
####
# This has been commented out for performance, as we do not need date_time
####
# Convert the Date_Time from matlab datenum to human readable Date_Time
# First convert the series into a numpy array
# date_time_matlab = df_out['Date_Time'].tolist()
# # Apply the conversion to the numpy array
# df_out['Date_Time_new'] = date_time_converter(date_time_matlab)
# Get the cycle capacities from cumulative capacities
df_out_indexed = get_cycle_capacities(df_out)
return df_out_indexed
# Wrapping function to train the LSTM model and calculate model_loss,
# and response to the testing data set.
[docs]def model_training(data_dir, file_name_format, sheet_name):
"""
This function converts cumulative battery cycling data into individual cycle data
and trains the LSTM model with the converted data set.
Args:
data_dir (string): This is the absolute path to the data directory.
file_name_format (string): Format of the filename, used to deduce other files.
sheet_name(string or int): Sheet name or sheet number in the excel file containing
the relevant data.
Returns:
model_loss(dictionary): Returns the history dictionary (more info to be added)
y_hat(array): Predicted response for the testing dataset.
# y_prediction(array): Predicted response for the completely new dataset
# (The input has to be the time series cycling data including values of
# Current, Voltage and Discharge Capacity)
"""
# The function 'cx2_file_reader' is used to read all the excel files
# in the given path and convert the given cumulative data into individual
# cycle data.
individual_cycle_data = cx2_file_reader(data_dir, file_name_format, sheet_name)
# The function 'data_formatting' is used to drop the unnecesary columns
# from the training data i.e. only the features considered in the model
# (Current, Voltage and Discharge capacity) are retained.
formatted_data = data_formatting(individual_cycle_data)
# The function 'series_to_supervised' is used to frame the time series training
# data as supervised learning dataset.
learning_df = series_to_supervised(
formatted_data, n_in=1, n_out=1, dropnan=True)
# The function 'long_short_term_memory' is used to train the model
# and predict response for the new input dataset.
model_loss, y_hat = long_short_term_memory(learning_df)
return model_loss, y_hat
# Function to predict the discharge capacity using the trained LSTM model.
[docs]def model_prediction(input_data):
"""
This function can be used to forecast the discharge capacity of a battery using
the trained LSTM model
Args:
input_data(dataframe): This is the dataframe containing the current, voltage and
discharge capacity values at a prior time which can be used to forecast discharge
capacity at a further time.
Returns:
y_predicted: The forecasted values of discharge capacity.
"""
# The function 'series_to_supervised' is used to frame the time series training
# data as supervised learning dataset.
learning_df = series_to_supervised(
input_data, n_in=1, n_out=1, dropnan=True)
learning_df = learning_df.iloc[:, 0:3].values
# Reshaping the input dataset.
learning_df = learning_df.reshape(
(learning_df.shape[0], 1, learning_df.shape[1]))
# Predicting the discharge values using the saved LSTM model.
module_dir = os.path.dirname(os.path.abspath(__file__))
model_path = join(module_dir,'models')
model = load_model(join(model_path,'lstm_trained_model.h5'))
y_predicted = model.predict(learning_df)
return y_predicted
# Wrapping function only to merge and convert cumulative data to
# individual cycle data.
[docs]def cx2_file_reader(data_dir, file_name_format, sheet_name):
"""
This function reads in the data for CX2 samples experiment and returns
a well formatted dataframe with cycles in ascending order.
Args:
data_dir (string): This is the absolute path to the data directory.
file_name_format (string): Format of the filename, used to deduce other files.
sheet_name (string): Sheet name containing the data in the excel file.
Returns:
The complete test data in a dataframe with extra column for capacity in Ah.
"""
# Raise an exception if the type of the inputs is not correct
if not isinstance(data_dir, str):
raise TypeError('data_dir is not of type string')
if not isinstance(file_name_format, str):
raise TypeError('file_name_format is not of type string')
if not isinstance(sheet_name, (str, int)):
raise TypeError('Sheet_Name format is not of type string or integer')
if not os.path.exists(join(data_dir, file_name_format)):
raise FileNotFoundError("File {} not found in the location {}"
.format(file_name_format, data_dir))
# Get the list of files in the directory
path = join(data_dir, file_name_format)
files = listdir(path)
# Extract the experiment name from the file_name_format
# exp_name = file_name_format[0:6]
# Filtering out and reading the excel files in the data directory
file_names = list(filter(lambda x: x[-5:] == '.xlsx', files))
# Sorting the file names using the
# 'file_name_sorting' function.
sorted_name_list = file_name_sorting(file_names)
# Reading dataframes according to the date of experimentation
# using 'reading_dataframes' function.
sorted_df = reading_dataframes(sorted_name_list, sheet_name, path)
# Merging all the dataframes and adjusting the cycle index
# using the 'concat_df' function.
cycle_data = concat_df(sorted_df)
# Calculating the net capacity of the battery at every datapoint
# using the function 'capacity'.
capacity_data = capacity(cycle_data)
# Returns the dataframe with new cycle indices and capacity data.
return capacity_data
[docs]def file_name_sorting(file_name_list):
"""
This function sorts all the file names according to the date
on the file name.
Args:
file_name_list(list): List containing all the file names to be read
Returns:
A list of file names sorted according to the date on the file name.
"""
filename = pd.DataFrame(data=file_name_list, columns=['file_name'])
# Splitting the file name into different columns
filename['cell_type'], filename['cell_num'], filename['month'], filename[
'day'], filename['year'] = filename['file_name'].str.split('_', 4).str
filename['year'], filename['ext'] = filename['year'].str.split('.', 1).str
filename['date'] = ''
# Merging the year, month and date column to create a string for DateTime
# object.
filename['date'] = filename['year'].map(
str) + filename['month'].map(str) + filename['day'].map(str)
# Creating a DateTime object.
filename['date_time'] = ''
filename['date_time'] = pd.to_datetime(filename['date'], format="%y%m%d")
# Sorting the file names according to the
# created DateTime object.
filename.sort_values(['date_time'], inplace=True)
# Created a list of sorted file names
sorted_file_names = filename['file_name'].values
return sorted_file_names
[docs]def reading_dataframes(file_names, sheet_name, path):
"""
This function reads all the files in the sorted
file names list as a dataframe
Args(list):
file_names: Sorted file names list
sheet_name: Sheet name in the excel file containing the data.
Returns:
Dictionary of dataframes in the order of the sorted file names.
"""
# Empty dictionary to store all the dataframes according
# to the order in the sorted files name list
df_raw = {}
# Reading the dataframes
for i, filename in enumerate(file_names):
df_raw[i] = pd.read_excel(
join(
path,
filename),
sheet_name=sheet_name)
return df_raw
[docs]def concat_df(df_dict):
"""
This function concatenates all the dataframes and edits
the cycle index for the concatenated dataframes.
Args:
df_dict(dictionary): Dictionary of dataframes to be concatenated.
Returns:
A concatenated dataframe with editted cycle index
"""
df_concat = None
for data in df_dict:
if df_concat is None:
df_next = df_dict[data]
df_concat = pd.DataFrame(data=None, columns=df_next.columns)
# df_next['Cycle'] = df_next['Cycle'] + max(df_pl12['Cycle'])
df_concat = pd.concat([df_concat, df_next])
else:
df_next = df_dict[data]
df_next['Cycle_Index'] = np.array(
df_next['Cycle_Index']) + max(np.array(df_concat['Cycle_Index']))
df_next['Test_Time(s)'] = np.array(
df_next['Test_Time(s)']) + max(np.array(df_concat['Test_Time(s)']))
df_next['Charge_Capacity(Ah)'] = np.array(
df_next['Charge_Capacity(Ah)']) + max(np.array(df_concat['Charge_Capacity(Ah)']))
df_next['Discharge_Capacity(Ah)'] = np.array(
df_next['Discharge_Capacity(Ah)']) + max(
np.array(df_concat['Discharge_Capacity(Ah)']))
df_concat = pd.concat([df_concat, df_next])
# Reset the index and drop the old index
df_reset = df_concat.reset_index(drop=True)
return df_reset
[docs]def capacity(df_data):
"""
This function calculates the net capacity of the battery
from the charge capacity and discharge capacity values.
Args:
df_data(dataframe): Concatenated dataframe which has the values of charge
capacity and discharge capacity for which net capacity has to be
calculated.
Returns:
Dataframe with net capacity of the battery for every point of the charge
and discharge cycle.
"""
# Grouping rows by the cycle index.
group = df_data.groupby(['Cycle_Index']).count()
# Get the indices when a cycle starts
cycle_start_indices = group['Data_Point'].cumsum()
# Get the charge_Ah per cycle
# Create numpy array to store the old charge_Ah row, and then
# perform transformation on it, rather than in the pandas series
# this is a lot faster in this case
charge_cycle_ah = np.array(df_data['Charge_Capacity(Ah)'])
charge_ah = np.array(df_data['Charge_Capacity(Ah)'])
for i in range(1, len(cycle_start_indices)):
begin_value = cycle_start_indices.iloc[i - 1]
end_value = cycle_start_indices.iloc[i]
charge_cycle_ah[begin_value:end_value] = charge_ah[begin_value:end_value] - \
charge_ah[begin_value - 1]
df_data['charge_cycle_ah'] = charge_cycle_ah
# Get the discharge_Ah per cycle
discharge_cycle_ah = np.array(df_data['Discharge_Capacity(Ah)'])
discharge_ah = np.array(df_data['Discharge_Capacity(Ah)'])
for i in range(1, len(cycle_start_indices)):
begin_value = cycle_start_indices.iloc[i - 1]
end_value = cycle_start_indices.iloc[i]
discharge_cycle_ah[begin_value:end_value] = discharge_ah[begin_value:end_value] - \
discharge_ah[begin_value - 1]
df_data['discharge_cycle_ah'] = discharge_cycle_ah
# This is the data column we can use for prediction.
# This is not totally accurate, as this still has some points that go negative,
# due to incorrect discharge_Ah values every few cycles.
# But the machine learning algorithm should consider these as outliers and
# hopefully get over it. We can come back and correct this.
df_data['capacity_ah'] = df_data['charge_cycle_ah'] - df_data['discharge_cycle_ah']
return df_data
[docs]def series_to_supervised(data, n_in=1, n_out=1, dropnan=True):
"""
Frame a time series as a supervised learning dataset.
Arguments:
data: Sequence of observations as a list or NumPy array.
n_in: Number of lag observations as input (X).
n_out: Number of observations as output (y).
dropnan: Boolean whether or not to drop rows with NaN values.
Returns:
Pandas DataFrame of series framed for supervised learning.
"""
n_vars = 1 if isinstance(data, list) else data.shape[1]
df_data = pd.DataFrame(data)
cols, names = list(), list()
# input sequence (t-n, ... t-1)
for i in range(n_in, 0, -1):
cols.append(df_data.shift(i))
names += [('var%d(t-%d)' % (j + 1, i)) for j in range(n_vars)]
# forecast sequence (t, t+1, ... t+n)
for i in range(0, n_out):
cols.append(df_data.shift(-i))
if i == 0:
names += [('var%d(t)' % (j + 1)) for j in range(n_vars)]
else:
names += [('var%d(t+%d)' % (j + 1, i)) for j in range(n_vars)]
# put it all together
sl_df = pd.concat(cols, axis=1)
sl_df.columns = names
# drop rows with NaN values
if dropnan:
sl_df.dropna(inplace=True)
sl_df.drop(sl_df.columns[[3, 4]], axis=1, inplace=True)
sl_df.rename(columns={'var1(t-1)':'Current(t-1)','var2(t-1)':'Voltage(t-1)',
'var3(t-1)':'discharge_capacity(t-1)','var3(t)':'discharge_capacity(t)'},
inplace = True)
return sl_df
[docs]def long_short_term_memory(model_data):
"""
This function splits the input dataset into training
and testing datasets. The keras LSTM model is then
trained and tested using the respective datasets.
Args:
model_data(dataframe): Values of input and output variables
of time series data framed as a supervised learning dataset.
Returns:
model_loss(dictionary): Returns the history dictionary (more info to be added)
y_hat(array): Predicted response for the testing dataset.
y_prediction(array): Predicted response for the completely new dataset.
"""
# Splitting the input dataset into training and testing data
train, test = train_test_split(model_data, test_size=0.2, random_state=944)
# split into input and outputs
train_x, train_y = train[train.columns[0:3]
].values, train[train.columns[3]].values
test_x, test_y = test[test.columns[0:3]
].values, test[test.columns[3]].values
# reshape input to be 3D [samples, timesteps, features]
train_x = train_x.reshape((train_x.shape[0], 1, train_x.shape[1]))
test_x = test_x.reshape((test_x.shape[0], 1, test_x.shape[1]))
# print(train_x.shape, train_y.shape, test_x.shape, test_y.shape)
# Designing the network
model = Sequential()
model.add(LSTM(50, input_shape=(train_x.shape[1], train_x.shape[2])))
model.add(Dense(1))
model.compile(loss='mae', optimizer='adam')
# Fitting the network with training and testing data
history = model.fit(
train_x,
train_y,
epochs=50,
batch_size=72,
validation_data=(
test_x,
test_y),
verbose=0,
shuffle=False)
model_loss = history.history
# Prediction for the test dataset.
yhat = model.predict(test_x)
# model.save('lstm_trained_model.h5')
return model_loss, yhat
[docs]def file_reader(data_dir, file_name_format, sheet_name, ignore_file_indices):
"""
This function reads PL sample, CX2 and CS2 files and returns a nice
dataframe with cyclic values of charge and discharge capacity with
cycles in ascending order
Args:
data_dir (string): This is the absolute path to the data directory.
file_name_format (string): Format of the filename, used to deduce other files.
sheet_name (string): Sheet name containing the data in the excel file.
ignore_file_indices (list, int): This list of ints tells which to ignore.
Returns:
The complete test data in a dataframe with extra column for capacity in Ah.
"""
# For excel files (CX2 and CS2 datafiles), the function 'cx2_file_reader'
# is used.
if file_name_format[:3] == 'CX2' or file_name_format[:3] == 'CS2':
df_output = cx2_file_reader(data_dir,file_name_format,sheet_name)
else:
df_output = pl_samples_file_reader(data_dir,file_name_format,ignore_file_indices)
# The function 'data_formatting' is used to drop the unnecesary columns
# from the training data i.e. only the features considered in the model
# (Current, Voltage and Discharge capacity) are retained.
formatted_data = data_formatting(df_output)
# The function 'series_to_supervised' is used to frame the time series training
# data as supervised learning dataset.
# df_out = series_to_supervised(
# formatted_data, n_in=1, n_out=1, dropnan=True)
return formatted_data