blob: 5c0b8b1ccea4287f9bcaf34d178aba2065152605 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.examples.ml.clustering;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.Cache;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.examples.ml.util.MLSandboxDatasets;
import org.apache.ignite.examples.ml.util.SandboxMLCache;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.ml.clustering.kmeans.KMeansModel;
import org.apache.ignite.ml.clustering.kmeans.KMeansTrainer;
import org.apache.ignite.ml.dataset.feature.extractor.Vectorizer;
import org.apache.ignite.ml.dataset.feature.extractor.impl.DummyVectorizer;
import org.apache.ignite.ml.environment.LearningEnvironmentBuilder;
import org.apache.ignite.ml.math.distances.EuclideanDistance;
import org.apache.ignite.ml.math.primitives.vector.Vector;
import org.apache.ignite.ml.selection.split.TrainTestDatasetSplitter;
import org.apache.ignite.ml.selection.split.TrainTestSplit;
import org.apache.ignite.ml.structures.LabeledVector;
/**
* Example of using KMeans clusterization to determine the optimal count of clusters in data.
* <p>
* Description of model can be found in: https://en.wikipedia.org/wiki/Kmeans . Original dataset can be downloaded from:
* https://archive.ics.uci.edu/ml/datasets/Wholesale+customers . Copy of dataset are stored in:
* modules/ml/src/main/resources/datasets/wholesale_customers.csv . Score for clusterizer estimation: mean of entropy in
* clusters. Description of entropy can be found in: https://en.wikipedia.org/wiki/Entropy_(information_theory) .
*/
public class CustomersClusterizationExample {
/**
* Runs example.
*/
public static void main(String[] args) throws IOException {
try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
System.out.println(">>> Ignite grid started.");
IgniteCache<Integer, Vector> dataCache = null;
try {
System.out.println(">>> Fill dataset cache.");
dataCache = new SandboxMLCache(ignite).fillCacheWith(MLSandboxDatasets.WHOLESALE_CUSTOMERS);
System.out.println(">>> Start training and scoring.");
for (int amountOfClusters = 1; amountOfClusters < 10; amountOfClusters++) {
KMeansTrainer trainer = new KMeansTrainer()
.withAmountOfClusters(amountOfClusters)
.withDistance(new EuclideanDistance())
.withEnvironmentBuilder(LearningEnvironmentBuilder.defaultBuilder().withRNGSeed(0))
.withMaxIterations(50);
// This vectorizer works with values in cache of Vector class.
Vectorizer<Integer, Vector, Integer, Double> vectorizer = new DummyVectorizer<Integer>()
.labeled(Vectorizer.LabelCoordinate.FIRST); // FIRST means "label are stored at first coordinate of vector"
// Splits dataset to train and test samples with 80/20 proportion.
TrainTestSplit<Integer, Vector> split = new TrainTestDatasetSplitter<Integer, Vector>().split(0.8);
KMeansModel mdl = trainer.fit(
ignite, dataCache,
split.getTrainFilter(),
vectorizer
);
double entropy = computeMeanEntropy(dataCache, split.getTestFilter(), vectorizer, mdl);
System.out.println(String.format(">> Clusters mean entropy [%d clusters]: %.2f", amountOfClusters, entropy));
}
}
finally {
if (dataCache != null)
dataCache.destroy();
}
}
finally {
System.out.flush();
}
}
/**
* Computes mean entropy in clusters.
*
* @param cache Dataset cache.
* @param filter Test dataset filter.
* @param vectorizer Upstream vectorizer.
* @param mdl KMeans model.
* @return Score.
*/
private static double computeMeanEntropy(IgniteCache<Integer, Vector> cache,
IgniteBiPredicate<Integer, Vector> filter,
Vectorizer<Integer, Vector, Integer, Double> vectorizer,
KMeansModel mdl) {
Map<Integer, Map<Integer, AtomicInteger>> clusterUniqueLbCounts = new HashMap<>();
try (QueryCursor<Cache.Entry<Integer, Vector>> cursor = cache.query(new ScanQuery<>(filter))) {
for (Cache.Entry<Integer, Vector> ent : cursor) {
LabeledVector<Double> vec = vectorizer.apply(ent.getKey(), ent.getValue());
int cluster = mdl.predict(vec.features());
int ch = vec.label().intValue();
if (!clusterUniqueLbCounts.containsKey(cluster))
clusterUniqueLbCounts.put(cluster, new HashMap<>());
if (!clusterUniqueLbCounts.get(cluster).containsKey(ch))
clusterUniqueLbCounts.get(cluster).put(ch, new AtomicInteger());
clusterUniqueLbCounts.get(cluster).get(ch).incrementAndGet();
}
}
double sumOfClusterEntropies = 0.0;
for (Integer cluster : clusterUniqueLbCounts.keySet()) {
Map<Integer, AtomicInteger> lbCounters = clusterUniqueLbCounts.get(cluster);
int sizeOfCluster = lbCounters.values().stream().mapToInt(AtomicInteger::get).sum();
double entropyInCluster = lbCounters.values().stream()
.mapToDouble(AtomicInteger::get)
.map(lblsCount -> lblsCount / sizeOfCluster)
.map(lblProb -> -lblProb * Math.log(lblProb))
.sum();
sumOfClusterEntropies += entropyInCluster;
}
return sumOfClusterEntropies / clusterUniqueLbCounts.size();
}
}