blob: f31a6e5cf64038160480ff89c2cda8dfb6cf73b9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.fluo.recipes.core.export.it;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import com.google.common.collect.Iterators;
import org.apache.commons.io.FileUtils;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.FluoFactory;
import org.apache.fluo.api.client.LoaderExecutor;
import org.apache.fluo.api.client.Snapshot;
import org.apache.fluo.api.client.scanner.CellScanner;
import org.apache.fluo.api.client.scanner.ColumnScanner;
import org.apache.fluo.api.client.scanner.RowScanner;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.ColumnValue;
import org.apache.fluo.api.data.Span;
import org.apache.fluo.api.mini.MiniFluo;
import org.apache.fluo.api.observer.ObserverProvider;
import org.apache.fluo.recipes.core.export.ExportQueue;
import org.apache.fluo.recipes.core.export.SequencedExport;
import org.apache.fluo.recipes.core.export.function.Exporter;
import org.apache.fluo.recipes.core.serialization.SimpleSerializer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import static org.apache.fluo.api.observer.Observer.NotificationType.STRONG;
public class ExportTestBase {
private static Map<String, Map<String, RefInfo>> globalExports = new HashMap<>();
private static int exportCalls = 0;
protected static Set<String> getExportedReferees(String node) {
synchronized (globalExports) {
Set<String> ret = new HashSet<>();
Map<String, RefInfo> referees = globalExports.get(node);
if (referees == null) {
return ret;
}
referees.forEach((k, v) -> {
if (!v.deleted)
ret.add(k);
});
return ret;
}
}
protected static Map<String, Set<String>> getExportedReferees() {
synchronized (globalExports) {
Map<String, Set<String>> ret = new HashMap<>();
for (String k : globalExports.keySet()) {
Set<String> referees = getExportedReferees(k);
if (referees.size() > 0) {
ret.put(k, referees);
}
}
return ret;
}
}
protected static int getNumExportCalls() {
synchronized (globalExports) {
return exportCalls;
}
}
public static class RefExporter implements Exporter<String, RefUpdates> {
public static final String QUEUE_ID = "req";
private void updateExports(String key, long seq, String addedRef, boolean deleted) {
Map<String, RefInfo> referees = globalExports.computeIfAbsent(addedRef, k -> new HashMap<>());
referees.compute(key, (k, v) -> (v == null || v.seq < seq) ? new RefInfo(seq, deleted) : v);
}
@Override
public void export(Iterator<SequencedExport<String, RefUpdates>> exportIterator) {
ArrayList<SequencedExport<String, RefUpdates>> exportList = new ArrayList<>();
Iterators.addAll(exportList, exportIterator);
synchronized (globalExports) {
exportCalls++;
for (SequencedExport<String, RefUpdates> se : exportList) {
for (String addedRef : se.getValue().getAddedRefs()) {
updateExports(se.getKey(), se.getSequence(), addedRef, false);
}
for (String deletedRef : se.getValue().getDeletedRefs()) {
updateExports(se.getKey(), se.getSequence(), deletedRef, true);
}
}
}
}
}
protected MiniFluo miniFluo;
protected int getNumBuckets() {
return 13;
}
protected Integer getBufferSize() {
return null;
}
public static class ExportTestObserverProvider implements ObserverProvider {
@Override
public void provide(Registry or, Context ctx) {
ExportQueue<String, RefUpdates> refExportQueue =
ExportQueue.getInstance(RefExporter.QUEUE_ID, ctx.getAppConfiguration());
or.forColumn(new Column("content", "new"), STRONG)
.useObserver(new DocumentObserver(refExportQueue));
refExportQueue.registerObserver(or, new RefExporter());
}
}
@Before
public void setUpFluo() throws Exception {
FileUtils.deleteQuietly(new File("target/mini"));
FluoConfiguration props = new FluoConfiguration();
props.setApplicationName("eqt");
props.setWorkerThreads(20);
props.setMiniDataDir("target/mini");
props.setObserverProvider(ExportTestObserverProvider.class);
SimpleSerializer.setSerializer(props, GsonSerializer.class);
if (getBufferSize() == null) {
ExportQueue.configure(RefExporter.QUEUE_ID).keyType(String.class).valueType(RefUpdates.class)
.buckets(getNumBuckets()).save(props);
} else {
ExportQueue.configure(RefExporter.QUEUE_ID).keyType(String.class).valueType(RefUpdates.class)
.buckets(getNumBuckets()).bufferSize(getBufferSize()).save(props);
}
miniFluo = FluoFactory.newMiniFluo(props);
globalExports.clear();
exportCalls = 0;
}
@After
public void tearDownFluo() throws Exception {
if (miniFluo != null) {
miniFluo.close();
}
}
protected static Set<String> ns(String... sa) {
return new HashSet<>(Arrays.asList(sa));
}
protected static String nk(int i) {
return String.format("%06d", i);
}
protected static Set<String> ns(int... ia) {
HashSet<String> ret = new HashSet<>();
for (int i : ia) {
ret.add(nk(i));
}
return ret;
}
public void assertEquals(Map<String, Set<String>> expected, Map<String, Set<String>> actual,
FluoClient fc) {
if (!expected.equals(actual)) {
System.out.println("*** diff ***");
diff(expected, actual);
System.out.println("*** fluo dump ***");
dump(fc);
System.out.println("*** map dump ***");
Assert.fail();
}
}
protected void loadRandom(FluoClient fc, int num, int maxDocId) {
try (LoaderExecutor loader = fc.newLoaderExecutor()) {
Random rand = new Random();
for (int i = 0; i < num; i++) {
String docid = String.format("%05d", rand.nextInt(maxDocId));
String[] refs = new String[rand.nextInt(20) + 1];
for (int j = 0; j < refs.length; j++) {
refs[j] = String.format("%05d", rand.nextInt(maxDocId));
}
loader.execute(new DocumentLoader(docid, refs));
}
}
}
protected void diff(Map<String, Set<String>> fr, Map<String, Set<String>> er) {
HashSet<String> allKeys = new HashSet<>(fr.keySet());
allKeys.addAll(er.keySet());
for (String k : allKeys) {
Set<String> s1 = fr.getOrDefault(k, Collections.emptySet());
Set<String> s2 = er.getOrDefault(k, Collections.emptySet());
HashSet<String> sub1 = new HashSet<>(s1);
sub1.removeAll(s2);
HashSet<String> sub2 = new HashSet<>(s2);
sub2.removeAll(s1);
if (sub1.size() > 0 || sub2.size() > 0) {
System.out.println(k + " " + sub1 + " " + sub2);
}
}
}
protected Map<String, Set<String>> getFluoReferees(FluoClient fc) {
Map<String, Set<String>> fluoReferees = new HashMap<>();
try (Snapshot snap = fc.newSnapshot()) {
Column currCol = new Column("content", "current");
RowScanner rowScanner = snap.scanner().over(Span.prefix("d:")).fetch(currCol).byRow().build();
for (ColumnScanner columnScanner : rowScanner) {
String docid = columnScanner.getsRow().substring(2);
for (ColumnValue columnValue : columnScanner) {
String[] refs = columnValue.getsValue().split(" ");
for (String ref : refs) {
if (ref.isEmpty())
continue;
fluoReferees.computeIfAbsent(ref, k -> new HashSet<>()).add(docid);
}
}
}
}
return fluoReferees;
}
public static void dump(FluoClient fc) {
try (Snapshot snap = fc.newSnapshot()) {
CellScanner scanner = snap.scanner().build();
scanner.forEach(rcv -> System.out.println("row:[" + rcv.getRow() + "] col:["
+ rcv.getColumn() + "] val:[" + rcv.getValue() + "]"));
}
}
}