| |
| package com.yahoo.labs.samoa.moa.clusterers.clustream; |
| |
| /* |
| * #%L |
| * SAMOA |
| * %% |
| * Copyright (C) 2010 RWTH Aachen University, Germany |
| * %% |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * #L% |
| */ |
| |
| import java.util.ArrayList; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Random; |
| import com.yahoo.labs.samoa.moa.cluster.Cluster; |
| import com.yahoo.labs.samoa.moa.cluster.Clustering; |
| import com.yahoo.labs.samoa.moa.cluster.SphereCluster; |
| import com.yahoo.labs.samoa.moa.clusterers.AbstractClusterer; |
| import com.yahoo.labs.samoa.moa.core.Measurement; |
| import com.github.javacliparser.IntOption; |
| import com.yahoo.labs.samoa.instances.DenseInstance; |
| import com.yahoo.labs.samoa.instances.Instance; |
| |
| /** Citation: CluStream: Charu C. Aggarwal, Jiawei Han, Jianyong Wang, Philip S. Yu: |
| * A Framework for Clustering Evolving Data Streams. VLDB 2003: 81-92 |
| */ |
| public class Clustream extends AbstractClusterer{ |
| |
| private static final long serialVersionUID = 1L; |
| |
| public IntOption timeWindowOption = new IntOption("horizon", |
| 'h', "Rang of the window.", 1000); |
| |
| public IntOption maxNumKernelsOption = new IntOption( |
| "maxNumKernels", 'k', |
| "Maximum number of micro kernels to use.", 100); |
| |
| public IntOption kernelRadiFactorOption = new IntOption( |
| "kernelRadiFactor", 't', |
| "Multiplier for the kernel radius", 2); |
| |
| private int timeWindow; |
| private long timestamp = -1; |
| private ClustreamKernel[] kernels; |
| private boolean initialized; |
| private List<ClustreamKernel> buffer; // Buffer for initialization with kNN |
| private int bufferSize; |
| private double t; |
| private int m; |
| |
| public Clustream() { |
| } |
| |
| |
| @Override |
| public void resetLearningImpl() { |
| this.kernels = new ClustreamKernel[maxNumKernelsOption.getValue()]; |
| this.timeWindow = timeWindowOption.getValue(); |
| this.initialized = false; |
| this.buffer = new LinkedList<>(); |
| this.bufferSize = maxNumKernelsOption.getValue(); |
| t = kernelRadiFactorOption.getValue(); |
| m = maxNumKernelsOption.getValue(); |
| } |
| |
| @Override |
| public void trainOnInstanceImpl(Instance instance) { |
| int dim = instance.numValues(); |
| timestamp++; |
| // 0. Initialize |
| if ( !initialized ) { |
| if ( buffer.size() < bufferSize ) { |
| buffer.add( new ClustreamKernel(instance,dim, timestamp, t, m) ); |
| return; |
| } |
| |
| int k = kernels.length; |
| //System.err.println("k="+k+" bufferSize="+bufferSize); |
| assert (k <= bufferSize); |
| |
| ClustreamKernel[] centers = new ClustreamKernel[k]; |
| for ( int i = 0; i < k; i++ ) { |
| centers[i] = buffer.get( i ); // TODO: make random! |
| } |
| Clustering kmeans_clustering = kMeans(k, centers, buffer); |
| // Clustering kmeans_clustering = kMeans(k, buffer); |
| |
| for ( int i = 0; i < kmeans_clustering.size(); i++ ) { |
| kernels[i] = new ClustreamKernel( new DenseInstance(1.0,centers[i].getCenter()), dim, timestamp, t, m ); |
| } |
| |
| buffer.clear(); |
| initialized = true; |
| return; |
| } |
| |
| |
| // 1. Determine closest kernel |
| ClustreamKernel closestKernel = null; |
| double minDistance = Double.MAX_VALUE; |
| for (ClustreamKernel kernel : kernels) { |
| //System.out.println(i+" "+kernels[i].getWeight()+" "+kernels[i].getDeviation()); |
| double distance = distance(instance.toDoubleArray(), kernel.getCenter()); |
| if (distance < minDistance) { |
| closestKernel = kernel; |
| minDistance = distance; |
| } |
| } |
| |
| // 2. Check whether instance fits into closestKernel |
| double radius; |
| if (closestKernel != null && closestKernel.getWeight() == 1) { |
| // Special case: estimate radius by determining the distance to the |
| // next closest cluster |
| radius = Double.MAX_VALUE; |
| double[] center = closestKernel.getCenter(); |
| for (ClustreamKernel kernel : kernels) { |
| if (kernel == closestKernel) { |
| continue; |
| } |
| |
| double distance = distance(kernel.getCenter(), center); |
| radius = Math.min(distance, radius); |
| } |
| } else { |
| radius = closestKernel.getRadius(); |
| } |
| |
| if ( minDistance < radius ) { |
| // Date fits, put into kernel and be happy |
| closestKernel.insert( instance, timestamp ); |
| return; |
| } |
| |
| // 3. Date does not fit, we need to free |
| // some space to insert a new kernel |
| long threshold = timestamp - timeWindow; // Kernels before this can be forgotten |
| |
| // 3.1 Try to forget old kernels |
| for ( int i = 0; i < kernels.length; i++ ) { |
| if ( kernels[i].getRelevanceStamp() < threshold ) { |
| kernels[i] = new ClustreamKernel( instance, dim, timestamp, t, m ); |
| return; |
| } |
| } |
| |
| // 3.2 Merge closest two kernels |
| int closestA = 0; |
| int closestB = 0; |
| minDistance = Double.MAX_VALUE; |
| for ( int i = 0; i < kernels.length; i++ ) { |
| double[] centerA = kernels[i].getCenter(); |
| for ( int j = i + 1; j < kernels.length; j++ ) { |
| double dist = distance( centerA, kernels[j].getCenter() ); |
| if ( dist < minDistance ) { |
| minDistance = dist; |
| closestA = i; |
| closestB = j; |
| } |
| } |
| } |
| assert (closestA != closestB); |
| |
| kernels[closestA].add( kernels[closestB] ); |
| kernels[closestB] = new ClustreamKernel( instance, dim, timestamp, t, m ); |
| } |
| |
| @Override |
| public Clustering getMicroClusteringResult() { |
| if ( !initialized ) { |
| return new Clustering( new Cluster[0] ); |
| } |
| |
| ClustreamKernel[] res = new ClustreamKernel[kernels.length]; |
| for ( int i = 0; i < res.length; i++ ) { |
| res[i] = new ClustreamKernel( kernels[i], t, m ); |
| } |
| |
| return new Clustering( res ); |
| } |
| |
| @Override |
| public boolean implementsMicroClusterer() { |
| return true; |
| } |
| |
| @Override |
| public Clustering getClusteringResult() { |
| return null; |
| } |
| |
| public String getName() { |
| return "Clustream " + timeWindow; |
| } |
| |
| private static double distance(double[] pointA, double [] pointB){ |
| double distance = 0.0; |
| for (int i = 0; i < pointA.length; i++) { |
| double d = pointA[i] - pointB[i]; |
| distance += d * d; |
| } |
| return Math.sqrt(distance); |
| } |
| |
| //wrapper... we need to rewrite kmeans to points, not clusters, doesnt make sense anymore |
| // public static Clustering kMeans( int k, ArrayList<Instance> points, int dim ) { |
| // ArrayList<ClustreamKernel> cl = new ArrayList<ClustreamKernel>(); |
| // for(Instance inst : points){ |
| // cl.add(new ClustreamKernel(inst, dim , 0, 0, 0)); |
| // } |
| // Clustering clustering = kMeans(k, cl); |
| // return clustering; |
| // } |
| |
| public static Clustering kMeans( int k, List<? extends Cluster> data ) { |
| Random random = new Random(0); |
| Cluster[] centers = new Cluster[k]; |
| for (int i = 0; i < centers.length; i++) { |
| int rid = random.nextInt(k); |
| centers[i] = new SphereCluster(data.get(rid).getCenter(),0); |
| } |
| return kMeans(k, centers, data); |
| } |
| |
| |
| |
| |
| |
| public static Clustering kMeans( int k, Cluster[] centers, List<? extends Cluster> data ) { |
| assert (centers.length == k); |
| assert (k > 0); |
| |
| int dimensions = centers[0].getCenter().length; |
| |
| ArrayList<ArrayList<Cluster>> clustering = new ArrayList<>(); |
| for ( int i = 0; i < k; i++ ) { |
| clustering.add( new ArrayList<Cluster>() ); |
| } |
| |
| int repetitions = 100; |
| while ( repetitions-- >= 0 ) { |
| // Assign points to clusters |
| for ( Cluster point : data ) { |
| double minDistance = distance( point.getCenter(), centers[0].getCenter() ); |
| int closestCluster = 0; |
| for ( int i = 1; i < k; i++ ) { |
| double distance = distance( point.getCenter(), centers[i].getCenter() ); |
| if ( distance < minDistance ) { |
| closestCluster = i; |
| minDistance = distance; |
| } |
| } |
| |
| clustering.get( closestCluster ).add( point ); |
| } |
| |
| // Calculate new centers and clear clustering lists |
| SphereCluster[] newCenters = new SphereCluster[centers.length]; |
| for ( int i = 0; i < k; i++ ) { |
| newCenters[i] = calculateCenter( clustering.get( i ), dimensions ); |
| clustering.get( i ).clear(); |
| } |
| centers = newCenters; |
| } |
| |
| return new Clustering( centers ); |
| } |
| |
| private static SphereCluster calculateCenter( ArrayList<Cluster> cluster, int dimensions ) { |
| double[] res = new double[dimensions]; |
| for ( int i = 0; i < res.length; i++ ) { |
| res[i] = 0.0; |
| } |
| |
| if ( cluster.size() == 0 ) { |
| return new SphereCluster( res, 0.0 ); |
| } |
| |
| for ( Cluster point : cluster ) { |
| double [] center = point.getCenter(); |
| for (int i = 0; i < res.length; i++) { |
| res[i] += center[i]; |
| } |
| } |
| |
| // Normalize |
| for ( int i = 0; i < res.length; i++ ) { |
| res[i] /= cluster.size(); |
| } |
| |
| // Calculate radius |
| double radius = 0.0; |
| for ( Cluster point : cluster ) { |
| double dist = distance( res, point.getCenter() ); |
| if ( dist > radius ) { |
| radius = dist; |
| } |
| } |
| SphereCluster sc = new SphereCluster( res, radius ); |
| sc.setWeight(cluster.size()); |
| return sc; |
| } |
| |
| @Override |
| protected Measurement[] getModelMeasurementsImpl() { |
| throw new UnsupportedOperationException("Not supported yet."); |
| } |
| |
| @Override |
| public void getModelDescription(StringBuilder out, int indent) { |
| throw new UnsupportedOperationException("Not supported yet."); |
| } |
| |
| public boolean isRandomizable() { |
| return false; |
| } |
| |
| public double[] getVotesForInstance(Instance inst) { |
| throw new UnsupportedOperationException("Not supported yet."); |
| } |
| |
| |
| } |
| |