blob: b89d7580c697565700ed1abc822a70747651075b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.lucene.internal.distributed;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.Logger;
import org.apache.geode.cache.execute.FunctionException;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.cache.lucene.LuceneQuery;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.logging.LogService;
/**
* A {@link ResultCollector} implementation for collecting and ordering {@link TopEntries}. The
* {@link TopEntries} objects will be created by members when a {@link LuceneQuery} is executed on
* the local data hosted by the member. The member executing this logic must have sufficient space
* to hold all the {@link EntryScore} documents returned from the members.
*
* <p>
* This class will perform a lazy merge operation. Merge will take place if the merge
* {@link ResultCollector#getResult} is invoked or if the combined result size is more than the
* limit set. In the later case, merge will be performed whenever {@link ResultCollector#addResult}
* is invoked.
*/
public class TopEntriesFunctionCollector
implements ResultCollector<TopEntriesCollector, TopEntries> {
private static final Logger logger = LogService.getLogger();
// Use this instance to perform reduce operation
private final CollectorManager<TopEntriesCollector> manager;
private final String id;
private final Collection<TopEntriesCollector> subResults = new ArrayList<>();
private TopEntriesCollector mergedResults;
public TopEntriesFunctionCollector() {
this(null);
}
public TopEntriesFunctionCollector(LuceneFunctionContext<TopEntriesCollector> context) {
this(context, null);
}
public TopEntriesFunctionCollector(LuceneFunctionContext<TopEntriesCollector> context,
InternalCache cache) {
id = cache == null ? String.valueOf(this.hashCode()) : cache.getName();
int limit = context == null ? 0 : context.getLimit();
if (context != null && context.getCollectorManager() != null) {
this.manager = context.getCollectorManager();
} else {
this.manager = new TopEntriesCollectorManager(id, limit);
}
}
@Override
public TopEntries getResult() throws FunctionException {
return aggregateResults();
}
@Override
public TopEntries getResult(long timeout, TimeUnit unit) throws FunctionException {
return aggregateResults();
}
private TopEntries aggregateResults() {
synchronized (subResults) {
if (mergedResults != null) {
return mergedResults.getEntries();
}
mergedResults = manager.reduce(subResults);
return mergedResults.getEntries();
}
}
@Override
public void endResults() {}
@Override
public void clearResults() {
synchronized (subResults) {
subResults.clear();
}
}
@Override
public void addResult(DistributedMember memberID, TopEntriesCollector resultOfSingleExecution) {
synchronized (subResults) {
subResults.add(resultOfSingleExecution);
}
}
String id() {
return this.id;
}
}