blob: 50c35c56a78387ffb7331c6cd30610d066b2ddd5 [file] [log] [blame]
package io.pivotal.gemfire.spark.connector.internal.gemfirefunctions;
import java.util.Iterator;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.execute.FunctionException;
import com.gemstone.gemfire.cache.query.Query;
import com.gemstone.gemfire.cache.query.QueryService;
import com.gemstone.gemfire.cache.query.SelectResults;
import com.gemstone.gemfire.cache.query.Struct;
import com.gemstone.gemfire.internal.cache.*;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.execute.Function;
import com.gemstone.gemfire.cache.execute.FunctionContext;
import com.gemstone.gemfire.cache.partition.PartitionRegionHelper;
import com.gemstone.gemfire.internal.cache.execute.InternalRegionFunctionContext;
import com.gemstone.gemfire.internal.cache.execute.InternalResultSender;
import com.gemstone.gemfire.internal.cache.partitioned.PREntriesIterator;
import com.gemstone.gemfire.internal.logging.LogService;
/**
* GemFire function that is used by `SparkContext.gemfireRegion(regionPath, whereClause)`
* to retrieve region data set for the given bucket set as a RDD partition
**/
public class RetrieveRegionFunction implements Function {
public final static String ID = "spark-gemfire-retrieve-region";
private static final Logger logger = LogService.getLogger();
private static final RetrieveRegionFunction instance = new RetrieveRegionFunction();
public RetrieveRegionFunction() {
}
/** ------------------------------------------ */
/** interface Function implementation */
/** ------------------------------------------ */
public static RetrieveRegionFunction getInstance() {
return instance;
}
@Override
public String getId() {
return ID;
}
@Override
public boolean hasResult() {
return true;
}
@Override
public boolean optimizeForWrite() {
return true;
}
@Override
public boolean isHA() {
return true;
}
@Override
public void execute(FunctionContext context) {
String[] args = (String[]) context.getArguments();
String where = args[0];
String taskDesc = args[1];
InternalRegionFunctionContext irfc = (InternalRegionFunctionContext) context;
LocalRegion localRegion = (LocalRegion) irfc.getDataSet();
boolean partitioned = localRegion.getDataPolicy().withPartitioning();
if (where.trim().isEmpty())
retrieveFullRegion(irfc, partitioned, taskDesc);
else
retrieveRegionWithWhereClause(irfc, localRegion, partitioned, where, taskDesc);
}
/** ------------------------------------------ */
/** Retrieve region data with where clause */
/** ------------------------------------------ */
private void retrieveRegionWithWhereClause(
InternalRegionFunctionContext context, LocalRegion localRegion, boolean partitioned, String where, String desc) {
String regionPath = localRegion.getFullPath();
String qstr = "select key, value from " + regionPath + ".entries where " + where;
logger.info(desc + ": " + qstr);
try {
Cache cache = CacheFactory.getAnyInstance();
QueryService queryService = cache.getQueryService();
Query query = queryService.newQuery(qstr);
SelectResults<Struct> results =
(SelectResults<Struct>) (partitioned ? query.execute(context) : query.execute());
Iterator<Object[]> entries = getStructIteratorWrapper(results.asList().iterator());
InternalResultSender irs = (InternalResultSender) context.getResultSender();
StructStreamingResultSender sender = new StructStreamingResultSender(irs, null, entries, desc);
sender.send();
} catch (Exception e) {
throw new FunctionException(e);
}
}
private Iterator<Object[]> getStructIteratorWrapper(Iterator<Struct> entries) {
return new WrapperIterator<Struct, Iterator<Struct>>(entries) {
@Override public Object[] next() {
return delegate.next().getFieldValues();
}
};
}
/** ------------------------------------------ */
/** Retrieve full region data */
/** ------------------------------------------ */
private void retrieveFullRegion(InternalRegionFunctionContext context, boolean partitioned, String desc) {
Iterator<Object[]> entries;
if (partitioned) {
PREntriesIterator<Region.Entry> iter = (PREntriesIterator<Region.Entry>)
((LocalDataSet) PartitionRegionHelper.getLocalDataForContext(context)).entrySet().iterator();
// entries = getPREntryIterator(iter);
entries = getSimpleEntryIterator(iter);
} else {
LocalRegion owner = (LocalRegion) context.getDataSet();
Iterator<Region.Entry> iter = (Iterator<Region.Entry>) owner.entrySet().iterator();
// entries = getRREntryIterator(iter, owner);
entries = getSimpleEntryIterator(iter);
}
InternalResultSender irs = (InternalResultSender) context.getResultSender();
StructStreamingResultSender sender = new StructStreamingResultSender(irs, null, entries, desc);
sender.send();
}
// /** An iterator for partitioned region that uses internal API to get serialized value */
// private Iterator<Object[]> getPREntryIterator(PREntriesIterator<Region.Entry> iterator) {
// return new WrapperIterator<Region.Entry, PREntriesIterator<Region.Entry>>(iterator) {
// @Override public Object[] next() {
// Region.Entry entry = delegate.next();
// int bucketId = delegate.getBucketId();
// KeyInfo keyInfo = new KeyInfo(entry.getKey(), null, bucketId);
// // owner needs to be the bucket region not the enclosing partition region
// LocalRegion owner = ((PartitionedRegion) entry.getRegion()).getDataStore().getLocalBucketById(bucketId);
// Object value = owner.getDeserializedValue(keyInfo, false, true, true, null, false);
// return new Object[] {keyInfo.getKey(), value};
// }
// };
// }
//
// /** An iterator for replicated region that uses internal API to get serialized value */
// private Iterator<Object[]> getRREntryIterator(Iterator<Region.Entry> iterator, LocalRegion region) {
// final LocalRegion owner = region;
// return new WrapperIterator<Region.Entry, Iterator<Region.Entry>>(iterator) {
// @Override public Object[] next() {
// Region.Entry entry = delegate.next();
// KeyInfo keyInfo = new KeyInfo(entry.getKey(), null, null);
// Object value = owner.getDeserializedValue(keyInfo, false, true, true, null, false);
// return new Object[] {keyInfo.getKey(), value};
// }
// };
// }
// todo. compare performance of regular and simple iterator
/** An general iterator for both partitioned and replicated region that returns un-serialized value */
private Iterator<Object[]> getSimpleEntryIterator(Iterator<Region.Entry> iterator) {
return new WrapperIterator<Region.Entry, Iterator<Region.Entry>>(iterator) {
@Override public Object[] next() {
Region.Entry entry = delegate.next();
return new Object[] {entry.getKey(), entry.getValue()};
}
};
}
/** ------------------------------------------ */
/** abstract wrapper iterator */
/** ------------------------------------------ */
/** An abstract wrapper iterator to reduce duplicated code of anonymous iterators */
abstract class WrapperIterator<T, S extends Iterator<T>> implements Iterator<Object[]> {
final S delegate;
protected WrapperIterator(S delegate) {
this.delegate = delegate;
}
@Override public boolean hasNext() {
return delegate.hasNext();
}
@Override public void remove() { }
}
}