# Source code for src.spectral_clustering

import numpy as np
import torch
from torch import Tensor
from typing import Union, Tuple

[docs]def initialize(X: Tensor, num_clusters: int, method: str='plus') -> np.array:
r"""
Initialize cluster centers.

:param X: matrix
:param num_clusters: number of clusters
:param method: denotes different initialization strategies: 'plus' (default) or 'random'
:return: initial state

.. note::
We support two initialization strategies: random initialization by setting method='random', or kmeans++
<https://en.wikipedia.org/wiki/K-means%2B%2B>_ by setting method='plus'.
"""
if method == 'plus':
init_func = _initialize_plus
elif method == 'random':
init_func = _initialize_random
else:
raise NotImplementedError
return init_func(X, num_clusters)

def _initialize_random(X, num_clusters):
"""
Initialize cluster centers randomly. See :func:src.spectral_clustering.initialize for details.
"""
num_samples = len(X)
indices = np.random.choice(num_samples, num_clusters, replace=False)
initial_state = X[indices]
return initial_state

def _initialize_plus(X, num_clusters):
"""
Initialize cluster centers by k-means++. See :func:src.spectral_clustering.initialize for details.
"""
num_samples = len(X)
centroid_index = np.zeros(num_clusters)
for i in range(num_clusters):
if i == 0:
choice_prob = np.full(num_samples, 1 / num_samples)
else:
centroid_X = X[centroid_index[:i]]
dis = _pairwise_distance(X, centroid_X)
dis_to_nearest_centroid = torch.min(dis, dim=1).values
choice_prob = dis_to_nearest_centroid / torch.sum(dis_to_nearest_centroid)
choice_prob = choice_prob.detach().cpu().numpy()

centroid_index[i] = np.random.choice(num_samples, 1, p=choice_prob, replace=False)

initial_state = X[centroid_index]
return initial_state

[docs]def kmeans(
X: Tensor,
num_clusters: int,
init_x: Union[Tensor, str]='plus',
distance: str='euclidean',
tol: float=1e-4,
device=torch.device('cpu'),
) -> Tuple[Tensor, Tensor]:
r"""
Perform kmeans on given data matrix :math:\mathbf X.

:param X: :math:(n\times d) input data matrix. :math:n: number of samples. :math:d: feature dimension
:param num_clusters: (int) number of clusters
:param init_x: how to initiate x (provide a initial state of x or define a init method) [default: 'plus']
:param distance: distance [options: 'euclidean', 'cosine'] [default: 'euclidean']
:param tol: convergence threshold [default: 0.0001]
:param device: computing device [default: cpu]
:return: cluster ids, cluster centers
"""
if distance == 'euclidean':
pairwise_distance_function = _pairwise_distance
elif distance == 'cosine':
pairwise_distance_function = _pairwise_cosine
else:
raise NotImplementedError

# convert to float
X = X.float()

# transfer to device
X = X.to(device)

# initialize
if type(init_x) is str:
initial_state = initialize(X, num_clusters, method=init_x)
else:
initial_state = init_x

iteration = 0
while True:
dis = pairwise_distance_function(X, initial_state)

choice_cluster = torch.argmin(dis, dim=1)

initial_state_pre = initial_state.clone()

for index in range(num_clusters):
selected = torch.nonzero(choice_cluster == index, as_tuple=False).squeeze().to(device)

selected = torch.index_select(X, 0, selected)
initial_state[index] = selected.mean(dim=0)

center_shift = torch.sum(
torch.sqrt(
torch.sum((initial_state - initial_state_pre) ** 2, dim=1)
))

# increment iteration
iteration = iteration + 1

if center_shift ** 2 < tol:
break

if torch.isnan(initial_state).any():
print('NAN encountered in clustering. Retrying...')
initial_state = initialize(X, num_clusters)

return choice_cluster.cpu(), initial_state.cpu()

[docs]def kmeans_predict(
X: Tensor,
cluster_centers: Tensor,
distance: str='euclidean',
device=torch.device('cpu')
) -> Tensor:
r"""
Kmeans prediction using existing cluster centers.

:param X: matrix
:param cluster_centers: cluster centers
:param distance: distance [options: 'euclidean', 'cosine'] [default: 'euclidean']
:param device: computing device [default: 'cpu']
:return: cluster ids
"""
if distance == 'euclidean':
pairwise_distance_function = _pairwise_distance
elif distance == 'cosine':
pairwise_distance_function = _pairwise_cosine
else:
raise NotImplementedError

# convert to float
X = X.float()

# transfer to device
X = X.to(device)

dis = pairwise_distance_function(X, cluster_centers)
choice_cluster = torch.argmin(dis, dim=1)

return choice_cluster.cpu()

def _pairwise_distance(data1, data2, device=torch.device('cpu')):
"""Compute pairwise Euclidean distance"""
# transfer to device
data1, data2 = data1.to(device), data2.to(device)

# N*1*M
A = data1.unsqueeze(dim=1)

# 1*N*M
B = data2.unsqueeze(dim=0)

dis = (A - B) ** 2.0
# return N*N matrix for pairwise distance
dis = dis.sum(dim=-1) #.squeeze(-1)
return dis

def _pairwise_cosine(data1, data2, device=torch.device('cpu')):
"""Compute pairwise cosine distance"""
# transfer to device
data1, data2 = data1.to(device), data2.to(device)

# N*1*M
A = data1.unsqueeze(dim=1)

# 1*N*M
B = data2.unsqueeze(dim=0)

# normalize the points  | [0.3, 0.4] -> [0.3/sqrt(0.09 + 0.16), 0.4/sqrt(0.09 + 0.16)] = [0.3/0.5, 0.4/0.5]
A_normalized = A / A.norm(dim=-1, keepdim=True)
B_normalized = B / B.norm(dim=-1, keepdim=True)

cosine = A_normalized * B_normalized

# return N*N matrix for pairwise distance
cosine_dis = 1 - cosine.sum(dim=-1).squeeze(-1)
return cosine_dis

[docs]def spectral_clustering(sim_matrix: Tensor, cluster_num: int, init: Tensor=None,
return_state: bool=False, normalized: bool=False):
r"""
Perform spectral clustering based on given similarity matrix.

This function firstly computes the leading eigenvectors of the given similarity matrix, and then utilizes the
eigenvectors as features and performs k-means clustering based on these features.

:param sim_matrix: :math:(n\times n) input similarity matrix. :math:n: number of instances
:param cluster_num: number of clusters
:param init: the initialization technique or initial features for k-means
:param return_state: whether return state features (can be further used for prediction)
:param normalized: whether to normalize the similarity matrix by its degree
:return: the belonging of each instance to clusters, state features (if return_state==True)
"""
degree = torch.diagflat(torch.sum(sim_matrix, dim=-1))
if normalized:
aff_matrix = (degree - sim_matrix) / torch.diag(degree).unsqueeze(1)
else:
aff_matrix = degree - sim_matrix
e, v = torch.symeig(aff_matrix, eigenvectors=True)
topargs = torch.argsort(torch.abs(e), descending=False)[1:cluster_num]
v = v[:, topargs]

if cluster_num == 2:
choice_cluster = (v > 0).to(torch.int).squeeze(1)
else:
choice_cluster, initial_state = kmeans(v, cluster_num, init if init is not None else 'plus',
distance='euclidean', tol=1e-6)

choice_cluster = choice_cluster.to(sim_matrix.device)

if return_state:
return choice_cluster, initial_state
else:
return choice_cluster