-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathLogisticRegression.py
150 lines (116 loc) · 7.93 KB
/
LogisticRegression.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
from CRP import CRP
from multiprocessing import Pool, Queue, Process
from numpy.ma import dot, sum
from numpy import ndindex, sign, float_power
from math import e, exp
from time import time
class LogisticRegressionModel:
def __init__(self, probability_vector, constant_bias=0):
self.probability_vector = probability_vector
self.constant_bias = constant_bias
def get_output_probability(self, input_vector):
sigmoid = lambda input: 1 / (1 + float_power(e, input))
dot_product_of_input_and_probability = dot(input_vector, self.probability_vector)
probability = sigmoid(dot_product_of_input_and_probability)
return probability
class LogisticRegressionCostFunction:
def __init__(self, logistic_regression_model):
self.logistic_regression_model = logistic_regression_model
def get_sum_of_squared_errors_without_multiprocessing(self, training_examples, weight_index):
return sum([self.get_squared_error(training_example.response,
self.logistic_regression_model.get_output_probability(
training_example.challenge),
training_example.challenge[weight_index])
for training_example in training_examples])
def get_derivative_of_cost_function_without_multiprocessing(self, training_examples, weight_index):
return self.minus_one_over_length_of_training_examples(training_examples) * \
self.get_sum_of_squared_errors_without_multiprocessing(training_examples, weight_index)
def get_squared_error(self, training_response, model_response, input):
return (model_response - training_response) * input
def get_derivative_of_cost_function_with_multiprocessing(self, training_examples, weight_index, pool):
return self.minus_one_over_length_of_training_examples(training_examples) * \
self.get_sum_of_squared_errors_with_multiprocessing(training_examples, weight_index, pool)
def get_sum_of_squared_errors_with_multiprocessing(self, training_examples, weight_index, pool):
sum_of_squared_errors = pool.starmap(self.get_squared_error_multiprocessing,
[(training_example, weight_index)
for training_example in training_examples])
return sum(sum_of_squared_errors)
def get_squared_error_multiprocessing(self, training_example, weight_index):
return (self.logistic_regression_model.get_output_probability(training_example.challenge)
- training_example.response) * training_example.challenge[weight_index]
def minus_one_over_length_of_training_examples(self, training_examples):
return -(1 / len(training_examples))
class RPROP:
def __init__(self, epoch=300, default_step_size=0.1, error_tolerance_threshold=5.0):
self.min_step_size = 1 * exp(-6)
self.max_step_size = 50
self.default_step_size = default_step_size
self.step_size_increase_factor = 1.2
self.step_size_decrease_factor = 0.5
self.epoch = epoch
self.error_tolerance_threshold = error_tolerance_threshold
def train_model_irprop_minus_without_multiprocessing(self, model_to_train, cost_function, network_weights,
training_set):
step_size, weight_gradients_on_previous_iteration, weight_indexes = self.get_initial_variables(network_weights)
for iteration in range(self.epoch):
print("Starting epoch", iteration)
for weight_index in weight_indexes:
gradient_on_current_iteration = \
cost_function.get_derivative_of_cost_function_without_multiprocessing(training_set, weight_index)
gradient_product = self.get_gradient_product(gradient_on_current_iteration,
weight_gradients_on_previous_iteration[weight_index])
step_size[weight_index] = self.get_new_step_size(gradient_product, step_size[weight_index])
gradient_on_current_iteration = self.get_new_gradient_with_gradient_product(
gradient_on_current_iteration,
gradient_product)
network_weights[weight_index] = self.update_weight_with_step_size(network_weights[weight_index],
gradient_on_current_iteration,
step_size[weight_index])
weight_gradients_on_previous_iteration[weight_index] = gradient_on_current_iteration
print(network_weights, "\n")
return network_weights
def get_initial_variables(self, network_weights):
step_size = [self.default_step_size for weight_step_size in range(len(network_weights))]
weight_gradients_on_previous_iteration = [0.0 for value in range(len(network_weights))]
weight_indexes = list(range(len(network_weights)))
return step_size, weight_gradients_on_previous_iteration, weight_indexes
def train_model_irprop_minus_with_multiprocessing(self, model_to_train, cost_function, network_weights,
training_set):
step_size, weight_gradients_on_previous_iteration, weight_indexes = self.get_initial_variables(network_weights)
pool = Pool()
for epoch in range(self.epoch):
print("Starting epoch", epoch)
for weight_index in weight_indexes:
weight_gradient_on_current_iteration = \
cost_function.get_derivative_of_cost_function_with_multiprocessing(training_set, weight_index, pool)
gradient_product = self.get_gradient_product(weight_gradient_on_current_iteration,
weight_gradients_on_previous_iteration[weight_index])
step_size[weight_index] = self.get_new_step_size(gradient_product, step_size[weight_index])
gradient_on_current_iteration = self.get_new_gradient_with_gradient_product(
weight_gradient_on_current_iteration,
gradient_product)
network_weights[weight_index] = self.update_weight_with_step_size(network_weights[weight_index],
gradient_on_current_iteration,
step_size[weight_index])
weight_gradients_on_previous_iteration[weight_index] = gradient_on_current_iteration
print(network_weights, "\n")
pool.close()
pool.join()
return network_weights
def get_new_gradient_with_gradient_product(self, current_weight_gradient, gradient_product):
return 0 if gradient_product < 0 else current_weight_gradient
def get_new_step_size(self, gradient_product, current_step_size):
if gradient_product > 0:
return self.get_increased_step_size(current_step_size)
elif gradient_product < 0:
return self.get_decreased_step_size(current_step_size)
else:
return current_step_size
def get_increased_step_size(self, current_step_size):
return min(current_step_size * self.step_size_increase_factor, self.max_step_size)
def get_decreased_step_size(self, current_step_size):
return max(current_step_size * self.step_size_decrease_factor, self.min_step_size)
def get_gradient_product(self, weight_gradient_on_current_iteration, weight_gradients_on_previous_iteration):
return weight_gradient_on_current_iteration * weight_gradients_on_previous_iteration
def update_weight_with_step_size(self, weight, weight_gradient, update_step_size):
return weight - sign(weight_gradient) * update_step_size