1. 作业要求
(1)实现ID3算法构建决策树模型。属性分类能力指标请使用信息增益比率(Information Gain Ratio);可以不实现剪枝操作。
(2)使用泰坦尼克号生存预测数据集进行模型训练和预测,并学习pandas模块
2. 关于决策树
决策树(decision tree)时一种基本的分类和回归方法。
决策树学习通常包含三个步骤:特征选择、决策树生成和决策树修剪。
(1)决策树的生成,往往通过计算信息增益或其它指标,从根节点开始,递归地生成树。
这相当于用信息增益或其他准则不断地选取局部最优的特征,或将训练集分割为能够基本正确分类的子集。
(2)决策树的剪枝。由于生成的决策树存在过拟合问题,需要对它进行剪枝,以简化学到的决策树。
3. 关于信息增益比
(1)样本集合$D$对特征$A$的信息增益
$$g(D, A)=H(D)-H(D|A)$$
$$H(D)=-\sum_{k=1}^{K}\frac{|C_{k}|}{|D|}log_{2}\frac{|C_{k}|}{|D|}$$
$$H(D | A)=\sum_{i=1}^{n} \frac{|D_{i}|}{|D|} H(D_{i})$$
其中,$H(D)$是数据集$D$的熵,$H(D_i)$是数据集$D_i$的熵,$H(D|A)$是数据集$D$对特征$A$的条件熵。 $D_i$是$D$中特征$A$取第$i$个值的样本子集,$C_k$是$D$中属于第$k$类的样本子集。$n$是特征$A$取 值的个数,$K$是类的个数。
(2)特征 $A$ 对训练数据集 $D$ 的信息增益比:
$$g_R(D, A)=\frac{g(D, A)}{H_A(D)}$$
其中,$g(D,A)$是信息增益,$H_A(D)$是数据数据集 $D$ 关于特征 $A$ 的值的熵。
$$H_A(D)=-\sum_{i=1}^n\frac{|D_i|}{D}log_2\frac{|D_i|}{D}$$
(3)一个计算信息增益的实例
# 最后一列是数据集D的标签
datasets = [['青年', '否', '否', '一般', '否'],
['青年', '否', '否', '好', '否'],
['青年', '是', '否', '好', '是'],
['青年', '是', '是', '一般', '是'],
['青年', '否', '否', '一般', '否'],
['中年', '否', '否', '一般', '否'],
['中年', '否', '否', '好', '否'],
['中年', '是', '是', '好', '是'],
['中年', '否', '是', '非常好', '是'],
['中年', '否', '是', '非常好', '是'],
['老年', '否', '是', '非常好', '是'],
['老年', '否', '是', '好', '是'],
['老年', '是', '否', '好', '是'],
['老年', '是', '否', '非常好', '是'],
['老年', '否', '否', '一般', '否'],
]
首先计算经验熵 $H(D)$
$$H(D)=-\frac{9}{15}log_2\frac{9}{15}-\frac{6}{15}log_2\frac{6}{15}=0.971$$然后计算各特征对数据集D的信息增益
$$g(D,A_1)=H(D)-[\frac{5}{15}H(D_1)+\frac{5}{15}H(D_2)+\frac{5}{15}H(D_3)]=0.083$$
$D_1$, $D_2$, $D_3$ 分别表示$A_1$(年龄)取定值时,数据集 D 的子集。$H(D_1)$计算方法同第一步。
$$g(D,A_2)=0.324, g(D,A_3)=0.420, g(D,A_4)=0.363$$
根据信息增益,将$A_3$作为最优特征
4. 代码实现
4.1 数据处理
import pandas as pd
def process_data(train, test):
train = train.drop(['PassengerId','Name','Ticket', 'Fare', 'Cabin'], axis = 1)
print(train.columns)
mean_age = train['Age'].median()
train['Age'] = train['Age'].fillna(mean_age)
train["Embarked"] = train["Embarked"].fillna('S')
def age_fun(x):
if x < 12: return "children"
elif x < 18: return "teenager"
elif x < 65: return "audlt"
else: return "old"
def sib_fun(x):
if x == 0: return "no"
elif x <= 3: return "few"
else: return "many"
train["Age"] = train["Age"].apply(age_fun)
train["SibSp"] = train["SibSp"].apply(sib_fun)
train["Parch"] = train["Parch"].apply(sib_fun)
print("{:*^50}".format(" train "))
for feature in train.columns:
print(feature, ": ", train[feature].unique())
test = test.drop(['PassengerId','Name','Ticket', 'Fare', 'Cabin'], axis = 1)
test["Age"] = test["Age"].fillna(mean_age)
test["Embarked"] = test["Embarked"].fillna('S')
test["Age"] = test["Age"].apply(age_fun)
test["SibSp"] = test["SibSp"].apply(sib_fun)
test["Parch"] = test["Parch"].apply(sib_fun)
print("{:*^50}".format(" test "))
for feature in test.columns:
print(feature, ": ", test[feature].unique())
return train, test
if __name__ == "__main__":
train_data = pd.read_csv("train.csv")
test_data = pd.read_csv("test.csv")
train_data, test_data = process_data(train_data, test_data)
train_data.head(10)
4.2 计算熵
from collections import Counter
from math import log
import numpy as np
def calc_ent(datasets):
n = len(datasets)
d = Counter([datasets[i][0] for i in range(n)]) # 计数器,参考collections模块
return sum([-(p/n)*log(p/n,2) for p in d.values()])
calc_ent(np.array(train_data)) # 找不到train_data的话,将第一步的train_data放入全局
4.3 计算信息增益比率
from collections import Counter, defaultdict
from math import log
import numpy as np
#计算熵
def calc_ent(datasets):
n = len(datasets)
d = Counter([datasets[i][0] for i in range(n)])
return sum([-(p/n)*log(p/n,2) for p in d.values()])
#计算经验条件熵
def cond_ent(datasets, feature_col):
n = len(datasets)
d = defaultdict(list)
for i in range(n):
feature = datasets[i][feature_col]
d[feature].append(datasets[i])
return sum([(len(p)/n)*calc_ent(p) for p in d.values()])
#计算数据集 D 关于特征 A 的熵
def calc_had(datasets,feature_col):
n = len(datasets)
d = Counter([datasets[i][feature_col] for i in range(n)])
print(feature_col,d.items())
return sum([-(p/n)*log(p/n,2) for p in d.values()])
"""
def info_gain(datasets, feature_col):
return calc_ent(datasets) - cond_ent(datasets, feature_col)
#按书上例子测试,这里没问题
"""
# 信息增益比率
def info_gain_ratio(datasets, feature_col):
ent = calc_ent(datasets)
ce = cond_ent(datasets, feature_col)
had = calc_had(datasets, feature_col)
return (ent-ce)/had
for i in range(1,7):
print(info_gain_ratio(np.array(train_data), i))
#默认第 0 列为label, 1-6为属性
4.4 建树
造轮子好麻烦…
# 没有按照老师的来
# 建好之后如果self.can_use == [] 表示出结果了,结果的 0-1 放在了 self.datasets中
#写递归或迭代程序时,主要要注意出口
class Node:
def __init__(self, datasets, can_use, feature = None):
self.datasets = datasets
self.can_use = can_use
self.feature = feature
self.children = {}
class DTree:
def check(self, datasets):
n = len(datasets)
d = Counter([datasets[i][0] for i in range(n)])
if len(d)==1: return True
else: return False
def compu(self, datasets, col):
ans = {}
feature = cols[col]
for val in dic[feature]:
new_data = [row for row in datasets if row[col]==val]
n = len(new_data)
d = Counter([new_data[i][0] for i in range(n)])
if d[1]>=d[0]:
ans[val] = Node(1,[]) #判定为存活
else:
ans[val] = Node(0,[])
return ans
def train(self, root):
if len(root.can_use)==1:
col = root.can_use[0]
root.feature = cols[col]
root.children = self.compu(root.datasets, col)
else:
#print("--------------",root.can_use)
if self.check(root.datasets): #全部为正例或反例
ans = root.datasets[0][0]
root.datasets = ans
root.can_use = []
return
idx, max_ratio = 0, 0
for i in root.can_use:
ratio = info_gain_ratio(root.datasets, i)
if ratio >= max_ratio:
idx = i
max_ratio =ratio
root.feature = cols[idx]
#print("!!",root.feature)
#print(root.can_use,root.feature, idx)
for feature_val in dic[root.feature]:
#print(feature_val)
new_data = np.array([row for row in root.datasets if row[idx]==feature_val])
root.children[feature_val] = Node(new_data, [i for i in root.can_use if i!=idx])
self.train(root.children[feature_val])
def test(self, root, tmp):
p = root
while p.can_use != []:
#print(p.feature)
#print(re_cols[p.feature])
#print(tmp[re_cols[p.feature]])
x = tmp[re_cols[p.feature]]
if x not in p.children:
return p.children
p = p.children[x]
return p.datasets
4.5 完整代码
from collections import Counter, defaultdict
from math import log
import numpy as np
import pandas as pd
dic = {}
cols = ['Survived', 'Pclass', 'Sex', 'Age', 'SibSp', 'Parch', 'Embarked']
re_cols = {'Sex':1, 'Pclass':0, 'Age':2, 'SibSp':3, 'Parch':4, 'Embarked':5}
def process_data(train, test):
train = train.drop(['PassengerId','Name','Ticket', 'Fare', 'Cabin'], axis = 1)
print(train.columns)
mean_age = train['Age'].median()
train['Age'] = train['Age'].fillna(mean_age)
train["Embarked"] = train["Embarked"].fillna('S')
def age_fun(x):
if x < 12: return "children"
elif x < 18: return "teenager"
elif x < 65: return "audlt"
else: return "old"
def sib_fun(x):
if x == 0: return "no"
elif x <= 3: return "few"
else: return "many"
train["Age"] = train["Age"].apply(age_fun)
train["SibSp"] = train["SibSp"].apply(sib_fun)
train["Parch"] = train["Parch"].apply(sib_fun)
print("{:*^50}".format(" train "))
for feature in train.columns:
dic[feature] = train[feature].unique()
print(feature, ": ", train[feature].unique())
test = test.drop(['PassengerId','Name','Ticket', 'Fare', 'Cabin'], axis = 1)
test["Age"] = test["Age"].fillna(mean_age)
test["Embarked"] = test["Embarked"].fillna('S')
test["Age"] = test["Age"].apply(age_fun)
test["SibSp"] = test["SibSp"].apply(sib_fun)
test["Parch"] = test["Parch"].apply(sib_fun)
print("{:*^50}".format(" test "))
for feature in test.columns:
print(feature, ": ", test[feature].unique())
return train, test
#计算熵
def calc_ent(datasets):
n = len(datasets)
d = Counter([datasets[i][0] for i in range(n)])
return sum([-(p/n)*log(p/n,2) for p in d.values()])
#计算经验条件熵
def cond_ent(datasets, feature_col):
n = len(datasets)
d = defaultdict(list)
for i in range(n):
feature = datasets[i][feature_col]
d[feature].append(datasets[i])
return sum([(len(p)/n)*calc_ent(p) for p in d.values()])
#计算数据集 D 关于特征 A 的熵
def calc_had(datasets,feature_col):
n = len(datasets)
d = Counter([datasets[i][feature_col] for i in range(n)])
#print(feature_col,d.items())
return sum([-(p/n)*log(p/n,2) for p in d.values()])
"""
def info_gain(datasets, feature_col):
return calc_ent(datasets) - cond_ent(datasets, feature_col)
"""
# 信息增益比率
def info_gain_ratio(datasets, feature_col):
ent = calc_ent(datasets)
ce = cond_ent(datasets, feature_col)
had = calc_had(datasets, feature_col)
if had==0: return 0
return (ent-ce)/had
class Node:
def __init__(self, datasets, can_use, feature = None):
self.datasets = datasets
self.can_use = can_use
self.feature = feature
self.children = {}
class DTree:
def check(self, datasets):
n = len(datasets)
d = Counter([datasets[i][0] for i in range(n)])
if len(d)==1: return True
else: return False
def compu(self, datasets, col):
ans = {}
feature = cols[col]
for val in dic[feature]:
new_data = [row for row in datasets if row[col]==val]
n = len(new_data)
d = Counter([new_data[i][0] for i in range(n)])
if d[1]>=d[0]:
ans[val] = Node(1,[]) #判定为存活
else:
ans[val] = Node(0,[])
return ans
def train(self, root):
if len(root.can_use)==1:
col = root.can_use[0]
root.feature = cols[col]
root.children = self.compu(root.datasets, col)
else:
#print("--------------",root.can_use)
if self.check(root.datasets): #全部为正例或反例
ans = root.datasets[0][0]
root.datasets = ans
root.can_use = []
return
idx, max_ratio = 0, 0
for i in root.can_use:
ratio = info_gain_ratio(root.datasets, i)
if ratio >= max_ratio:
idx = i
max_ratio =ratio
root.feature = cols[idx]
#print("!!",root.feature)
#print(root.can_use,root.feature, idx)
for feature_val in dic[root.feature]:
#print(feature_val)
new_data = np.array([row for row in root.datasets if row[idx]==feature_val])
root.children[feature_val] = Node(new_data, [i for i in root.can_use if i!=idx])
self.train(root.children[feature_val])
def test(self, root, tmp):
p = root
while p.can_use != []:
#print(p.feature)
#print(re_cols[p.feature])
#print(tmp[re_cols[p.feature]])
x = tmp[re_cols[p.feature]]
if x not in p.children:
return p.children
p = p.children[x]
return p.datasets
#if __name__ == "__main__":
train_data = pd.read_csv("train.csv")
test_data = pd.read_csv("test.csv")
train_data, test_data = process_data(train_data, test_data)
train_data = np.array(train_data)
root = Node(train_data, [1,2,3,4,5,6])
print("{:*^50}".format(" predict the test "))
dt = DTree()
dt.train(root)
test_data = np.array(test_data)
for data in test_data:
print(data, dt.test(root, data))
4.6 优化
为了防止过拟合(决策树的分支过多,导致实际准确率下降),可设置一个阈值使建树过程提早结束,更改check
函数即可。
5. 决策树的可视化
#测试代码
#不适用于以上的树的结构
from graphviz import Digraph
def plot_model(tree, name):
g = Digraph("G", filename=name, format='png', strict=False)
first_label = list(tree.keys())[0]
g.node("0", first_label)
_sub_plot(g, tree, "0")
g.view()
root = "0"
def _sub_plot(g, tree, inc):
global root
first_label = list(tree.keys())[0]
ts = tree[first_label]
for i in ts.keys():
if isinstance(tree[first_label][i], dict):
root = str(int(root) + 1)
g.node(root, list(tree[first_label][i].keys())[0])
g.edge(inc, root, str(i))
_sub_plot(g, tree[first_label][i], root)
else:
root = str(int(root) + 1)
g.node(root, tree[first_label][i])
g.edge(inc, root, str(i))
d1 = {"no surfacing": {0: "no", 1: {"flippers": {0: "no", 1: "yes"}}}}
d2 = {'tearRate': {'reduced': 'no lenses', 'normal': {'astigmatic': {'yes': {
'prescript': {'myope': 'hard', 'hyper': {'age': {'young': 'hard', 'presbyopic': 'no lenses', 'pre': 'no lenses'}}}},
'no': {'age': {'young': 'soft', 'presbyopic': {
'prescript': {'myope': 'no lenses',
'hyper': 'soft'}},
'pre': 'soft'}}}}}}
plot_model(d1, "hello")
plot_model(d2, "hello2")
测试代码会报错:GraphViz's executables not found
然后去安装软件,http://www.graphviz.org/download/
安装时勾选:加入到所有用户的系统路径。
重新打开 jupyter 并运行测试代码
成功后,会用图片管理器打开决策树的可视化图。如图
# 2021-04-24更新,可视化代码,适用,将其运行即可
from graphviz import Digraph
def plot_model(root, name):
g = Digraph("G", filename=name, format='png', strict=False)
first_label = root.feature
g.node("0", first_label)
_sub_plot(g, root, "0")
g.view()
_root = "0"
def _sub_plot(g, root, inc):
global _root
ts = root.children
for i in ts.keys():
if ts[i].can_use != []:
_root = str(int(_root) + 1)
g.node(_root, ts[i].feature)
g.edge(inc, _root, str(i))
_sub_plot(g, ts[i], _root)
else:
_root = str(int(_root) + 1)
g.node(_root, str(ts[i].datasets))
g.edge(inc, _root, str(i))
plot_model(root, "hello")