blob: 73ff31de5cb4984a4a7849c515e93698303daf03 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.mongo.config;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import com.google.common.base.Preconditions;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.store.mongo.DrillMongoConstants;
import org.apache.drill.exec.store.sys.BasePersistentStore;
import org.apache.drill.exec.store.sys.PersistentStoreConfig;
import org.apache.drill.exec.store.sys.PersistentStoreMode;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.Updates;
import com.mongodb.client.result.UpdateResult;
import static org.apache.drill.exec.store.mongo.config.MongoPersistentStoreProvider.pKey;
public class MongoPersistentStore<V> extends BasePersistentStore<V> {
private static final Logger logger = LoggerFactory.getLogger(MongoPersistentStore.class);
private final PersistentStoreConfig<V> config;
private final MongoCollection<Document> collection;
public MongoPersistentStore(PersistentStoreConfig<V> config, MongoCollection<Document> collection) {
// this.config = config;
// this.collection = collection;
throw new UnsupportedOperationException("Mongo DB PStore not currently supported");
}
@Override
public PersistentStoreMode getMode() {
return PersistentStoreMode.PERSISTENT;
}
@Override
public boolean contains(String key) {
try {
Bson query = Filters.eq(DrillMongoConstants.ID, key);
Document document = collection.find(query).first();
return document != null && document.containsKey(pKey);
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new DrillRuntimeException(e.getMessage(), e);
}
}
@Override
public V get(String key) {
try {
Bson query = Filters.eq(DrillMongoConstants.ID, key);
Document document = collection.find(query).first();
if (document != null) {
return value((byte[]) document.get(pKey));
} else {
return null;
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new DrillRuntimeException(e.getMessage(), e);
}
}
@Override
public void put(String key, V value) {
try {
Document putObj = new Document(DrillMongoConstants.ID, key).append(pKey, bytes(value));
collection.insertOne(putObj);
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new DrillRuntimeException(e.getMessage(), e);
}
}
@Override
public boolean putIfAbsent(String key, V value) {
try {
Bson query = Filters.eq(DrillMongoConstants.ID, key);
Bson update = Updates.set(pKey, bytes(value));
UpdateResult updateResult = collection.updateOne(query, update, new UpdateOptions().upsert(true));
return updateResult.getModifiedCount() == 1;
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new DrillRuntimeException(e.getMessage(), e);
}
}
@Override
public void delete(String key) {
try {
Bson query = Filters.eq(DrillMongoConstants.ID, key);
collection.deleteOne(query);
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new DrillRuntimeException(e.getMessage(), e);
}
}
private byte[] bytes(V value) {
try {
return config.getSerializer().serialize(value);
} catch (IOException e) {
throw new DrillRuntimeException(e.getMessage(), e);
}
}
private V value(byte[] serialize) {
try {
return config.getSerializer().deserialize(serialize);
} catch (IOException e) {
throw new DrillRuntimeException(e.getMessage(), e);
}
}
@Override
public Iterator<Entry<String, V>> getRange(int skip, int take) {
final MongoCursor<Document> cursor = collection.find().skip(skip).limit(take).iterator();
return new MongoIterator(cursor);
}
private class MongoIterator implements Iterator<Entry<String, V>> {
private MongoCursor<Document> cursor;
public MongoIterator(final MongoCursor<Document> cursor) {
this.cursor = Preconditions.checkNotNull(cursor);
}
@Override
public boolean hasNext() {
return cursor.hasNext();
}
@Override
public Entry<String, V> next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return new DeferredEntry(cursor.next());
}
@Override
public void remove() {
cursor.remove();
}
}
private class DeferredEntry implements Entry<String, V> {
private Document result;
public DeferredEntry(Document result) {
this.result = result;
}
@Override
public String getKey() {
return result.get(DrillMongoConstants.ID).toString();
}
@Override
public V getValue() {
return get(result.get(DrillMongoConstants.ID).toString());
}
@Override
public V setValue(V value) {
throw new UnsupportedOperationException();
}
}
@Override
public void close() {
}
}