blob: cf0ef2fa77e75fd937a03a9af39975e967d66c0a [file] [log] [blame]
/*
* Copyright 2015 Webindex authors (see AUTHORS)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package webindex.data.fluo;
import java.util.HashMap;
import java.util.Map;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.metrics.Meter;
import org.apache.fluo.api.metrics.MetricsReporter;
import org.apache.fluo.recipes.core.combine.ChangeObserver;
import org.apache.fluo.recipes.core.combine.CombineQueue;
import org.apache.fluo.recipes.core.export.ExportQueue;
import webindex.core.models.URL;
import webindex.core.models.UriInfo;
import webindex.core.models.export.IndexUpdate;
import webindex.core.models.export.UriUpdate;
/**
* This class contains code related to a CombineQueue that keeps track of the count of information
* about URIs.
*/
public class UriCombineQ {
public static final String URI_COMBINE_Q_ID = "um";
/**
* Observes uri map updates and adds those updates to an export queue.
*/
public static class UriUpdateObserver implements ChangeObserver<String, UriInfo> {
private ExportQueue<String, IndexUpdate> exportQ;
private CombineQueue<String, Long> domainQ;
private Meter linksNew;
private Meter linksChanged;
public UriUpdateObserver(ExportQueue<String, IndexUpdate> exportQ,
CombineQueue<String, Long> domainMap, MetricsReporter metricsReporter) {
this.exportQ = exportQ;
this.domainQ = domainMap;
linksNew = metricsReporter.meter("webindex_links_new");
linksChanged = metricsReporter.meter("webindex_links_changed");
}
@Override
public void process(TransactionBase tx, Iterable<Change<String, UriInfo>> updates) {
Map<String, Long> domainUpdates = new HashMap<>();
for (Change<String, UriInfo> update : updates) {
String uri = update.getKey();
UriInfo oldVal = update.getOldValue().orElse(UriInfo.ZERO);
UriInfo newVal = update.getNewValue().orElse(UriInfo.ZERO);
exportQ.add(tx, uri, new UriUpdate(uri, oldVal, newVal));
linksChanged.mark();
String pageDomain = URL.fromUri(uri).getReverseDomain();
if (oldVal.equals(UriInfo.ZERO) && !newVal.equals(UriInfo.ZERO)) {
domainUpdates.merge(pageDomain, 1L, (o, n) -> o + n);
linksNew.mark();
} else if (newVal.equals(UriInfo.ZERO) && !oldVal.equals(UriInfo.ZERO)) {
domainUpdates.merge(pageDomain, -1L, (o, n) -> o + n);
}
}
domainQ.addAll(tx, domainUpdates);
}
}
/**
* A helper method for configuring the uri map before initializing Fluo.
*/
public static void configure(FluoConfiguration config, int numBuckets, int numTablets) {
CombineQueue.configure(URI_COMBINE_Q_ID).keyType(String.class).valueType(UriInfo.class)
.buckets(numBuckets).bucketsPerTablet(numBuckets / numTablets).save(config);
}
}