Shattering Distribution for Active Learning文章来源:https://www.toymoban.com/news/detail-631963.html
IEEE TRANSACTIONS ON NEURAL NETWORKS AND LEARNING SYSTEMS 文章来源地址https://www.toymoban.com/news/detail-631963.html
"""
Code of SDAL for paper: Shattering Distribution for Active Learning
This Code is exactly the same as the original codes.
"""
import xlwt
import xlrd
import math
import numpy as np
import pandas as pd
from pathlib import Path
from sklearn.model_selection import StratifiedKFold
from collections import OrderedDict
from sklearn.svm import SVC
from scipy.special import expit
from copy import deepcopy
from scipy.spatial.distance import pdist, squareform
from sklearn.metrics import accuracy_score, mean_absolute_error, f1_score
from sklearn.preprocessing import StandardScaler
from sklearn.metrics.pairwise import rbf_kernel
from KBS_NEW.PointwiseQuery.ALOR import ALOR
from sklearn.metrics import accuracy_score, mean_squared_error
from time import time
from sklearn import preprocessing
from sklearn.metrics.pairwise import pairwise_kernels
from sklearn.base import ClassifierMixin, BaseEstimator
from sklearn.utils.validation import check_X_y
from scipy.linalg import pinv, pinv2, pinvh
from sklearn.linear_model import LogisticRegression
from sklearn.cluster import KMeans
class sdal():
def __init__(self, X, y, labeled, budget, X_test, y_test):
self.X = X
self.y = y
self.nSample, self.nDim = X.shape
self.labels = sorted(np.unique(self.y))
self.nClass = len(self.labels)
self.X_test = X_test
self.y_test = y_test
self.budget = deepcopy(budget)
self.budgetLeft = deepcopy(budget)
self.labeled = list(deepcopy(labeled))
self.unlabeled = self.initialization()
self.K = rbf_kernel(X=self.X, gamma=0.1)
self.lamb = 10e-4
self.halving_ids = self.get_halving()
def initialization(self):
unlabeled = list(range(self.nSample))
for idx in self.labeled:
unlabeled.remove(idx)
return unlabeled
def get_halving(self):
"""Corresponding to the Halving function in the original code"""
Halving_ids = []
num_unlabeled = len(self.unlabeled)
num_half = int(np.floor(num_unlabeled))
if num_half < self.budget:
num_half = self.budget
Tmp_unlabeled = deepcopy(self.unlabeled)
Halving_left = deepcopy(num_half)
while Halving_left > 0:
score = OrderedDict()
for idx in Tmp_unlabeled:
score[idx] = np.linalg.norm(self.K[idx,:]) / (self.K[idx,idx] + self.lamb)
tar_idx = max(score, key=score.get)
Halving_ids.append(tar_idx)
self.K = self.K - np.outer(self.K[tar_idx],self.K[tar_idx]) / (self.K[tar_idx,tar_idx] + self.lamb)
Halving_left -= 1
return Halving_ids
def NumberDensity(self, data, Center, Radius):
f = 0.
for i in range(len(data)):
Ball_dist = []
dist = []
for j in range(len(Center)):
dist.append(np.linalg.norm(data[i, :] - Center[j, :]))
if dist[j] < Radius:
a=np.array(dist[j])
Ball_dist.append(dist[j])
f = f + sum(np.exp(np.array(Ball_dist) / 1.8) ** 2) / (len(Ball_dist) + 1)
return f
def select(self):
"""Corresponding to the SDAL function in the original codes"""
if self.budget == len(self.halving_ids):
for idx in self.halving_ids:
self.labeled.append(idx)
self.unlabeled.remove(idx)
self.budgetLeft -= 1
else:
data = self.X[self.halving_ids]
clf = KMeans(n_clusters=self.budget)
clf.fit(data)
Center = clf.cluster_centers_
Radi = 0.25
T = 0
L = data.shape[0]
f = self.NumberDensity(data, Center, Radi)
while T < 50:
for j in range(self.budget):
Ball = []
dist = []
for i in range(L):
dist.append(np.linalg.norm(data[i, :] - Center[j, :]))
if dist[i] < Radi:
Ball.append(data[i, :])
if len(Ball) == 0:
Center[j, :] = Center[j, :]
else:
Center[j, :] = np.mean(np.array(Ball), 0)
F = self.NumberDensity(data, Center, Radi)
cul = np.zeros((len(Center), len(Center)))
flag = 0
for j in range(len(Center)):
for i in range(len(Center)):
cul[i, j] = np.linalg.norm(Center[i, :] - Center[j, :])
if i != j and cul[i, j] < 2 * Radi:
flag = 1
if F - f == 0 or flag:
break
else:
f = F
T += 1
Radi = (1 + 0.1) * Radi
# -----------------------------
selected_ids = np.zeros(self.budget)
for b in range(self.budget):
min_dist = np.inf
tmp_center = Center[b]
for idx in self.halving_ids:
dist = np.linalg.norm(tmp_center - self.X[idx])
if dist <= min_dist:
min_dist = dist
selected_ids[b] = idx
# --------------------------
for idx in selected_ids:
self.labeled.append(idx)
return self
if __name__ == '__main__':
names_list = ["PowerPlant-5bin"]
for name in names_list:
print("########################{}".format(name))
p = Path("D:\OCdata")
data_path = Path(r"D:\OCdata")
partition_path = Path(r"E:\CCCCC_Result\DataPartitions")
# kmeans_path = Path(r"E:\CCCCC_Result\KmeansResult")
"""--------------read the whole data--------------------"""
read_data_path = data_path.joinpath(name + ".csv")
data = np.array(pd.read_csv(read_data_path, header=None))
X = np.asarray(data[:, :-1], np.float64)
scaler = StandardScaler()
X = scaler.fit_transform(X)
y = data[:, -1]
y -= y.min()
nClass = len(np.unique(y))
Budget = 10 * nClass
"""--------read the partitions--------"""
read_partition_path = str(partition_path.joinpath(name + ".xls"))
book_partition = xlrd.open_workbook(read_partition_path)
"""-----read the kmeans results according to the partition-----"""
# read_kmeans_path = str(kmeans_path.joinpath(name + ".xls"))
# book_kmeans = xlrd.open_workbook(read_kmeans_path)
workbook = xlwt.Workbook()
count = 0
for SN in book_partition.sheet_names():
S_Time = time()
train_idx = []
test_idx = []
labeled = []
table_partition = book_partition.sheet_by_name(SN)
for idx in table_partition.col_values(0):
if isinstance(idx,float):
train_idx.append(int(idx))
for idx in table_partition.col_values(1):
if isinstance(idx,float):
test_idx.append(int(idx))
for idx in table_partition.col_values(2):
if isinstance(idx,float):
labeled.append(int(idx))
X_train = X[train_idx]
y_train = y[train_idx].astype(np.int32)
X_test = X[test_idx]
y_test = y[test_idx]
model = sdal(X=X_train, y=y_train, labeled=labeled, budget=Budget, X_test=X_test, y_test=y_test)
model.select()
# SheetNames = "{}".format(count)
sheet = workbook.add_sheet(SN)
for i, idx in enumerate(train_idx):
sheet.write(i, 0, int(idx))
for i, idx in enumerate(test_idx):
sheet.write(i, 1, int(idx))
for i, idx in enumerate(labeled):
sheet.write(i, 2, int(idx))
for i, idx in enumerate(model.labeled):
sheet.write(i, 3, int(idx))
print("SN:",SN," Time:",time()-S_Time)
save_path = Path(r"E:\CCCCC_Result\SelectedResult\SDAL")
save_path = str(save_path.joinpath(name + ".xls"))
workbook.save(save_path)
到了这里,关于Shattering Distribution for Active Learning:SDAL的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!