blob: 0a3b100a4ee9e85fe649992d7e1c387d3c279da9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.mongodb.trident.state;
import com.google.common.collect.Maps;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.Validate;
import org.apache.storm.mongodb.common.MongoDbClient;
import org.apache.storm.mongodb.common.QueryFilterCreator;
import org.apache.storm.mongodb.common.mapper.MongoMapper;
import org.apache.storm.task.IMetricsContext;
import org.apache.storm.topology.FailedException;
import org.apache.storm.trident.state.JSONNonTransactionalSerializer;
import org.apache.storm.trident.state.JSONOpaqueSerializer;
import org.apache.storm.trident.state.JSONTransactionalSerializer;
import org.apache.storm.trident.state.OpaqueValue;
import org.apache.storm.trident.state.Serializer;
import org.apache.storm.trident.state.State;
import org.apache.storm.trident.state.StateFactory;
import org.apache.storm.trident.state.StateType;
import org.apache.storm.trident.state.TransactionalValue;
import org.apache.storm.trident.state.map.CachedMap;
import org.apache.storm.trident.state.map.IBackingMap;
import org.apache.storm.trident.state.map.MapState;
import org.apache.storm.trident.state.map.NonTransactionalMap;
import org.apache.storm.trident.state.map.OpaqueMap;
import org.apache.storm.trident.state.map.SnapshottableMap;
import org.apache.storm.trident.state.map.TransactionalMap;
import org.apache.storm.tuple.Values;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MongoMapState<T> implements IBackingMap<T> {
private static Logger LOG = LoggerFactory.getLogger(MongoMapState.class);
@SuppressWarnings("rawtypes")
private static final Map<StateType, Serializer> DEFAULT_SERIALZERS = Maps.newHashMap();
static {
DEFAULT_SERIALZERS.put(StateType.NON_TRANSACTIONAL, new JSONNonTransactionalSerializer());
DEFAULT_SERIALZERS.put(StateType.TRANSACTIONAL, new JSONTransactionalSerializer());
DEFAULT_SERIALZERS.put(StateType.OPAQUE, new JSONOpaqueSerializer());
}
private Options<T> options;
private Serializer<T> serializer;
private MongoDbClient mongoClient;
private Map<String, Object> map;
protected MongoMapState(Map<String, Object> map, Options options) {
this.options = options;
this.map = map;
this.serializer = options.serializer;
Validate.notEmpty(options.url, "url can not be blank or null");
Validate.notEmpty(options.collectionName, "collectionName can not be blank or null");
Validate.notNull(options.queryCreator, "queryCreator can not be null");
Validate.notNull(options.mapper, "mapper can not be null");
this.mongoClient = new MongoDbClient(options.url, options.collectionName);
}
public static class Options<T> implements Serializable {
public String url;
public String collectionName;
public MongoMapper mapper;
public QueryFilterCreator queryCreator;
public Serializer<T> serializer;
public int cacheSize = 5000;
public String globalKey = "$MONGO-MAP-STATE-GLOBAL";
public String serDocumentField = "tridentSerField";
}
@SuppressWarnings("rawtypes")
public static StateFactory opaque() {
Options<OpaqueValue> options = new Options<OpaqueValue>();
return opaque(options);
}
@SuppressWarnings("rawtypes")
public static StateFactory opaque(Options<OpaqueValue> opts) {
return new Factory(StateType.OPAQUE, opts);
}
@SuppressWarnings("rawtypes")
public static StateFactory transactional() {
Options<TransactionalValue> options = new Options<TransactionalValue>();
return transactional(options);
}
@SuppressWarnings("rawtypes")
public static StateFactory transactional(Options<TransactionalValue> opts) {
return new Factory(StateType.TRANSACTIONAL, opts);
}
public static StateFactory nonTransactional() {
Options<Object> options = new Options<Object>();
return nonTransactional(options);
}
public static StateFactory nonTransactional(Options<Object> opts) {
return new Factory(StateType.NON_TRANSACTIONAL, opts);
}
protected static class Factory implements StateFactory {
private StateType stateType;
private Options options;
@SuppressWarnings({"rawtypes", "unchecked"})
public Factory(StateType stateType, Options options) {
this.stateType = stateType;
this.options = options;
if (this.options.serializer == null) {
this.options.serializer = DEFAULT_SERIALZERS.get(stateType);
}
if (this.options.serializer == null) {
throw new RuntimeException("Serializer should be specified for type: " + stateType);
}
}
@SuppressWarnings({"rawtypes", "unchecked"})
@Override
public State makeState(Map<String, Object> conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
IBackingMap state = new MongoMapState(conf, options);
if (options.cacheSize > 0) {
state = new CachedMap(state, options.cacheSize);
}
MapState mapState;
switch (stateType) {
case NON_TRANSACTIONAL:
mapState = NonTransactionalMap.build(state);
break;
case OPAQUE:
mapState = OpaqueMap.build(state);
break;
case TRANSACTIONAL:
mapState = TransactionalMap.build(state);
break;
default:
throw new IllegalArgumentException("Unknown state type: " + stateType);
}
return new SnapshottableMap(mapState, new Values(options.globalKey));
}
}
@Override
public List<T> multiGet(List<List<Object>> keysList) {
List<T> retval = new ArrayList<>();
try {
for (List<Object> keys : keysList) {
Bson filter = options.queryCreator.createFilterByKeys(keys);
Document doc = mongoClient.find(filter);
if (doc != null) {
retval.add(this.serializer.deserialize((byte[])doc.get(options.serDocumentField)));
} else {
retval.add(null);
}
}
} catch (Exception e) {
LOG.warn("Batch get operation failed.", e);
throw new FailedException(e);
}
return retval;
}
@Override
public void multiPut(List<List<Object>> keysList, List<T> values) {
try {
for (int i = 0; i < keysList.size(); i++) {
List<Object> keys = keysList.get(i);
T value = values.get(i);
Bson filter = options.queryCreator.createFilterByKeys(keys);
Document document = options.mapper.toDocumentByKeys(keys);
document.append(options.serDocumentField, this.serializer.serialize(value));
this.mongoClient.update(filter, document, true, false);
}
} catch (Exception e) {
LOG.warn("Batch write operation failed.", e);
throw new FailedException(e);
}
}
}