DTLearner
import pandas as pd
import numpy as np
class DTLearner(object):
def __init__(self, leaf_size = 1, verbose = False):
self.leaf_size = leaf_size
self.verbose = verbose
self.dataframe = None
self.tree = None
def get_dataframe(self):
return(self.dataframe)
def get_tree(self):
return(self.tree)
def addEvidence(self, Xtrain, Ytrain):
“””Accepts inputs (Xtrain) and outputs (Ytrain) and calls the build_tree function on the data, updates the tree attribute”””
dataframe = pd.DataFrame(Xtrain)
dataframe[‘Y’] = Ytrain
self.data = dataframe
self.tree = self.build_tree(dataframe)
self.query_tree = self.tree.copy()
def highest_correlation(self, df):
“””Returns the highest correlated feature by its index value”””
correlations = np.tril(np.array(df.corr()), k=-1) # takes the lower half of the correlation table without the diagonal.
return(abs(np.nan_to_num(correlations[-1])).argmax())
def split_val(self, df):
“””Acceptes a df and returns (best_feature, value to split on)”””
best_feature = self.highest_correlation(df)
column = df.iloc[:, best_feature]
return best_feature, column.median()
def build_tree(self, data):
“””Recursively build’s a tree by returning arrays in the form [feature, split value, less than index, greater than index]
leaf values are denoted as feature == -1″””
if data.shape[0] <= self.leaf_size or len(pd.unique(data.iloc[:,-1])) == 1:
return(np.array([-1, data.iloc[np.random.choice(range(data.shape[0])), -1], np.nan, np.nan]).reshape(1,4))
else:
best_feature, split_val = self.split_val(data)
# when split_val does not separate a feature, it will iterate forever, bandaid fix with min
if data[best_feature].shape[0] == data[data[best_feature] <= split_val].shape[0]:
split_val = data[best_feature].min()
left_tree = self.build_tree(data[data.iloc[:, best_feature] <= split_val])
right_tree = self.build_tree(data[data.iloc[:, best_feature] > split_val])
root = [best_feature, split_val, 1, left_tree.shape[0] + 1]
temp_tree = np.vstack([root, left_tree, right_tree])
return(temp_tree)
#
def query_value(self, values):
“””Queries a single list of values, returns the output of the tree”””
current_pos = 0
while True:
tree_pos = self.tree[current_pos]
if current_pos > self.tree.shape[0]:
return(‘Error querying value’)
elif int(tree_pos[0]) == -1:
return(tree_pos[1])
elif values[int(tree_pos[0])] <= tree_pos[1]:
current_pos += 1
else:
current_pos += int(tree_pos[3])
def query(self,Xtest):
“””Given an input (Xtest), returns the associated query output(s), can accept arrays”””
try: # assumes multiple test values
return([self.query_value(i) for i in Xtest])
except:
return([self.query_value(Xtest)])
RTLearner
import pandas as pd
import numpy as np
class RTLearner(object):
def __init__(self, leaf_size = 1, verbose = False):
self.leaf_size = leaf_size
self.verbose = verbose
self.dataframe = None
self.tree = None
def get_dataframe(self):
return(self.dataframe)
def get_tree(self):
return(self.tree)
def addEvidence(self, Xtrain, Ytrain):
“””Accepts inputs (Xtrain) and outputs (Ytrain) and calls the build_tree function on the data, updates the tree attribute”””
dataframe = pd.DataFrame(Xtrain)
dataframe[‘Y’] = Ytrain
self.data = dataframe
self.tree = self.build_tree(dataframe)
self.query_tree = self.tree.copy()
def build_tree(self, data):
“””Recursively build’s a tree by returning arrays in the form [feature, split value, less than index, greater than index]
leaf values are denoted as feature == -1″””
if data.shape[0] <= self.leaf_size or len(pd.unique(data.iloc[:,-1])) == 1:
# randomly selects a leaf value if it is larger than 1
return(np.array([-1, data.iloc[np.random.choice(range(data.shape[0])), -1], np.nan, np.nan]).reshape(1,4))
else:
feature = np.random.choice(data.columns[:-1])
split1, split2 = np.random.choice(data.iloc[:,feature], size=2)
split_val = (split1 + split2)/2.0
# checks if the split_val will only generate a left tree, rerandomizes split_val to allow two tree’s
while data[data.iloc[:, feature] <= split_val].shape[0] == data.shape[0]:
feature = np.random.choice(data.columns[:-1])
split1, split2 = np.random.choice(data.iloc[:,feature], size=2)
split_val = (split1 + split2)/2.0
left_tree = self.build_tree(data[data.iloc[:, feature] <= split_val])
right_tree = self.build_tree(data[data.iloc[:, feature] > split_val])
root = [feature, split_val, 1, left_tree.shape[0] + 1]
temp_tree = np.vstack([root, left_tree, right_tree])
return(temp_tree)
def query_value(self, values):
“””Queries a single list of values, returns the output of the tree”””
current_pos = 0
while True:
tree_pos = self.tree[current_pos]
if current_pos > self.tree.shape[0]:
return(‘Error querying value’)
elif int(tree_pos[0]) == -1:
return(tree_pos[1])
elif values[int(tree_pos[0])] <= tree_pos[1]:
current_pos += 1
else:
current_pos += int(tree_pos[3])
def query(self,Xtest):
“””Given an input (Xtest), returns the associated query output(s), can accept arrays”””
try: # assumes multiple test values
return([self.query_value(i) for i in Xtest])
except:
return([self.query_value(Xtest)])
Insane Learner
import BagLearner as bl
import LinRegLearner as lrl
class InsaneLearner(object):
def __init__(self, verbose=False):
self.learners = [bl.BagLearner(learner=lrl.LinRegLearner, bags=20) for _ in range(20)]
self.verbose = verbose
def add_evidence(self, Xtrain, Ytrain):
for learner in self.learners:
learner.add_evidence(Xtrain, Ytrain)
def query(self, Xtest):
predictions = [learner.query(Xtest) for learner in self.learners]
return sum(predictions) / len(predictions)
BagLearner
import numpy as np
import pandas as pd
import DTLearner as dt
import RTLearner as rt
class BagLearner(object):
def __init__(self, learner, kwargs = {}, bags=20, boost=False, verbose=False):
learners = []
for i in range(bags):
learners.append(learner(**kwargs))
self.learners = learners
self.kwargs = kwargs
self.bags = bags
#self.boost = boost
self.verbose = verbose
self.trees = []
def addEvidence(self, Xtrain, Ytrain):
df = pd.DataFrame(Xtrain)
df[‘output’] = Ytrain
for method in self.learners:
learning_df = pd.DataFrame([df.sample().values[0] for i in range(df.shape[0])])
X = learning_df.iloc[:, :-1]
Y = learning_df.iloc[:,-1]
method.addEvidence(X,Y)
self.trees.append(method.tree)
def query_value(self, values, tree):
“””Queries a single list of values for a given tree, returns the output of the tree”””
current_pos = 0
while True:
tree_pos = tree[current_pos]
if current_pos > tree.shape[0]:
return(‘Error querying value’)
elif int(tree_pos[0]) == -1:
return(tree_pos[1])
elif values[int(tree_pos[0])] <= tree_pos[1]:
current_pos += 1
else:
current_pos += int(tree_pos[3])
def query_trees(self,Xtest, tree):
“””Given an input (Xtest), returns the associated query output(s), can accept arrays”””
try: # assumes multiple test values
return([self.query_value(i, tree) for i in Xtest])
except:
return([self.query_value(Xtest, tree)])
def query(self,Xtest):
queries = [self.query_trees(Xtest, i) for i in self.trees]
return(np.average(queries,axis=0))
LinReg
import numpy as np
class LinRegLearner(object):
“””
This is a Linear Regression Learner. It is implemented correctly.
:param verbose: If “verbose” is True, your code can print out information for debugging.
If verbose = False your code should not generate ANY output. When we test your code, verbose will be False.
:type verbose: bool
“””
def __init__(self, verbose=False):
“””
Constructor method
“””
pass # move along, these aren’t the drones you’re looking for
def author(self):
“””
:return: The GT username of the student
:rtype: str
“””
return “tb34” # replace tb34 with your Georgia Tech username
def add_evidence(self, data_x, data_y):
“””
Add training data to learner
:param data_x: A set of feature values used to train the learner
:type data_x: numpy.ndarray
:param data_y: The value we are attempting to predict given the X data
:type data_y: numpy.ndarray
“””
# slap on 1s column so linear regression finds a constant term
new_data_x = np.ones([data_x.shape[0], data_x.shape[1] + 1])
new_data_x[:, 0 : data_x.shape[1]] = data_x
# build and save the model
self.model_coefs, residuals, rank, s = np.linalg.lstsq(
new_data_x, data_y, rcond=None
)
def query(self, points):
“””
Estimate a set of test points given the model we built.
:param points: A numpy array with each row corresponding to a specific query.
:type points: numpy.ndarray
:return: The predicted result of the input data according to the trained model
:rtype: numpy.ndarray
“””
return (self.model_coefs[:-1] * points).sum(axis=1) + self.model_coefs[
-1
]
if __name__ == “__main__”:
print(“the secret clue is ‘zzyzx'”)
TestLearner
import math
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from DTLearner import DTLearner
from RTLearner import RTLearner
from BagLearner import BagLearner
# Specify the file path to your CSV
file_path = “C:/Users/Anthony/OneDrive/Desktop/CS-7646 Machine Learning for Trading/Project 3-Assess Learners/assess_learners/Data/Istanbul.csv”
# Load the dataset without headers and skip the first column
data = pd.read_csv(file_path, header=None, skiprows=1).iloc[:, 1:].reset_index(drop=True)
# Compute how much of the data is training and testing
train_rows = int(0.6 * data.shape[0])
test_rows = data.shape[0] – train_rows
# Separate out training and testing data
train_x = data.iloc[:train_rows, :-1].values
train_y = data.iloc[:train_rows, -1].values
test_x = data.iloc[train_rows:, :-1].values
test_y = data.iloc[train_rows:, -1].values
# Experiment 1: Overfitting with DTLearner
leaf_sizes = [1, 5, 10, 20, 50]
rmse_values = []
for leaf_size in leaf_sizes:
learner = DTLearner(leaf_size=leaf_size)
learner.addEvidence(train_x, train_y)
pred_y = learner.query(test_x)
rmse = math.sqrt(((test_y – pred_y) ** 2).sum() / test_y.shape[0])
rmse_values.append(rmse)
# Plot RMSE vs. Leaf Size
plt.figure(figsize=(8, 6))
plt.plot(leaf_sizes, rmse_values, marker=’o’)
plt.title(“RMSE vs. Leaf Size for DTLearner”)
plt.xlabel(“Leaf Size”)
plt.ylabel(“RMSE”)
plt.grid()
plt.show()
# Experiment 2: Bagging to Reduce Overfitting with DTLearner
num_bags = 20
leaf_size = 10
learner = BagLearner(learner=DTLearner, bags=num_bags)
learner.addEvidence(train_x, train_y)
# Query each bag and average the predictions
pred_y = np.zeros(test_y.shape)
for i in range(num_bags):
pred_y += learner.query(test_x)
pred_y /= num_bags
rmse = math.sqrt(((test_y – pred_y) ** 2).sum() / test_y.shape[0])
print(f”RMSE with BagLearner: {rmse}”)
# Experiment 3: Comparison between DTLearner and RTLearner
dt_learner = DTLearner()
rt_learner = RTLearner()
dt_learner.addEvidence(train_x, train_y)
rt_learner.addEvidence(train_x, train_y)
dt_pred_y = dt_learner.query(test_x)
rt_pred_y = rt_learner.query(test_x)
# Calculate the Mean Absolute Error (MAE) for each learner
dt_mae = np.mean(np.abs(test_y – dt_pred_y))
rt_mae = np.mean(np.abs(test_y – rt_pred_y))
# Print the results
print(f”DTLearner MAE: {dt_mae}”)
print(f”RTLearner MAE: {rt_mae}”)
I am getting a graph that looks like this…
But I need this to print graphs that look like the ones below. Please help fix this code to do so and help with the correct answers to the experiment questions. Thank you in advance.
2
3
4
5