blob: 82122f9986c884e3c426df92a26f5ee0de19552c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.cassandra.trident.state;
import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
import static org.apache.storm.cassandra.DynamicStatementBuilder.all;
import static org.apache.storm.cassandra.DynamicStatementBuilder.boundQuery;
import com.datastax.driver.core.querybuilder.Insert;
import com.datastax.driver.core.querybuilder.Select;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.storm.cassandra.CassandraContext;
import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
import org.apache.storm.trident.state.JSONNonTransactionalSerializer;
import org.apache.storm.trident.state.JSONOpaqueSerializer;
import org.apache.storm.trident.state.JSONTransactionalSerializer;
import org.apache.storm.trident.state.OpaqueValue;
import org.apache.storm.trident.state.Serializer;
import org.apache.storm.trident.state.StateFactory;
import org.apache.storm.trident.state.StateType;
import org.apache.storm.trident.state.TransactionalValue;
import org.apache.storm.tuple.Fields;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A helper for building a MapState backed by Cassandra. It internalizes some common
* implementation choices to simplify usage.
*
* <p>In the simplest use case, a map state can be constructed with:
*
* <p>StateFactory mapState = MapStateFactoryBuilder.opaque()
* .withTable("mykeyspace", "year_month_state")
* .withKeys("year", "month")
* .withJSONBinaryState("state")
* .build();
*
* <p>for a cassandra table with:
* mykeyspace.year_month_state {
* year: int,
* month: int,
* state: blob
* }
*
* <p>This will use the storm JSON serializers to convert the state to and from binary format.
* Other binary serializers can be used with the {@link #withBinaryState(String, Serializer)} method.
*
* <p>Storing state in explicit fields (e.g. in a field "sum" of type int) is possible by instead calling
* {@link #withStateMapper(StateMapper)}. For instance, you can use {@link NonTransactionalTupleStateMapper},
* {@link TransactionalTupleStateMapper} or {@link OpaqueTupleStateMapper} if your state values are tuples.
*
*/
public class MapStateFactoryBuilder<T> {
private static final Logger logger = LoggerFactory.getLogger(MapStateFactoryBuilder.class);
private String keyspace;
private String table;
private String[] keys;
private Integer maxParallelism;
private StateType stateType;
private StateMapper<T> stateMapper;
private Map<String, Object> cassandraConfig;
private int cacheSize;
public static <U> MapStateFactoryBuilder<OpaqueValue<U>> opaque(Map<String, Object> cassandraConf) {
return new MapStateFactoryBuilder<OpaqueValue<U>>()
.withStateType(StateType.OPAQUE)
.withCassandraConfig(cassandraConf);
}
public static <U> MapStateFactoryBuilder<TransactionalValue<U>> transactional(Map<String, Object> cassandraConf) {
return new MapStateFactoryBuilder<TransactionalValue<U>>()
.withStateType(StateType.TRANSACTIONAL)
.withCassandraConfig(cassandraConf);
}
public static <U> MapStateFactoryBuilder<U> nontransactional(Map<String, Object> cassandraConf) {
return new MapStateFactoryBuilder<U>()
.withStateType(StateType.NON_TRANSACTIONAL)
.withCassandraConfig(cassandraConf);
}
public MapStateFactoryBuilder<T> withTable(String keyspace, String table) {
this.keyspace = keyspace;
this.table = table;
return this;
}
public MapStateFactoryBuilder<T> withKeys(String... keys) {
this.keys = keys;
return this;
}
public MapStateFactoryBuilder<T> withMaxParallelism(Integer maxParallelism) {
this.maxParallelism = maxParallelism;
return this;
}
@SuppressWarnings("checkstyle:AbbreviationAsWordInName")
public MapStateFactoryBuilder<T> withJSONBinaryState(String stateField) {
switch (stateType) {
case OPAQUE:
return withBinaryState(stateField, (Serializer) new JSONOpaqueSerializer());
case TRANSACTIONAL:
return withBinaryState(stateField, (Serializer) new JSONTransactionalSerializer());
case NON_TRANSACTIONAL:
return withBinaryState(stateField, new JSONNonTransactionalSerializer());
default:
throw new IllegalArgumentException("State type " + stateType + " is unknown.");
}
}
public MapStateFactoryBuilder<T> withStateMapper(StateMapper<T> stateMapper) {
this.stateMapper = stateMapper;
return this;
}
public MapStateFactoryBuilder<T> withBinaryState(String stateField, Serializer<T> serializer) {
return withStateMapper(new SerializedStateMapper<>(stateField, serializer));
}
protected MapStateFactoryBuilder<T> withStateType(StateType stateType) {
this.stateType = stateType;
return this;
}
protected MapStateFactoryBuilder<T> withCassandraConfig(Map<String, Object> cassandraConf) {
this.cassandraConfig = cassandraConf;
return this;
}
public MapStateFactoryBuilder<T> withCache(int cacheSize) {
this.cacheSize = cacheSize;
return this;
}
public StateFactory build() {
Objects.requireNonNull(keyspace, "A keyspace is required.");
Objects.requireNonNull(table, "A table name is required.");
Objects.requireNonNull(keys, "At least one key must be specified.");
if (keys.length == 0) {
throw new IllegalArgumentException("At least one key must be specified.");
}
Objects.requireNonNull(stateMapper, "A state mapper must be specified.");
Objects.requireNonNull(stateType, "A state type must be specified.");
List<String> stateFields = stateMapper.getStateFields()
.toList();
String[] stateFieldsArray = stateFields.toArray(new String[stateFields.size()]);
List<String> allFields = new ArrayList<>();
Collections.addAll(allFields, keys);
allFields.addAll(stateFields);
// Build get query
Select.Where getQuery = select(stateFieldsArray)
.from(keyspace, table)
.where();
for (String key : keys) {
getQuery.and(eq(key, bindMarker()));
}
CQLStatementTupleMapper get = boundQuery(getQuery.toString())
.bind(all())
.build();
// Build put query
Insert putStatement = insertInto(keyspace, table)
.values(allFields, Collections.<Object>nCopies(allFields.size(), bindMarker()));
CQLStatementTupleMapper put = boundQuery(putStatement.toString())
.bind(all())
.build();
CassandraBackingMap.Options options = new CassandraBackingMap.Options<T>(new CassandraContext())
.withGetMapper(get)
.withPutMapper(put)
.withStateMapper(stateMapper)
.withKeys(new Fields(keys))
.withMaxParallelism(maxParallelism);
logger.debug("Building factory with: \n get: {}\n put: {}\n mapper: {}",
getQuery.toString(),
putStatement.toString(),
stateMapper.toString());
switch (stateType) {
case NON_TRANSACTIONAL:
return CassandraMapStateFactory.nonTransactional(options, cassandraConfig)
.withCache(cacheSize);
case TRANSACTIONAL:
return CassandraMapStateFactory.transactional(options, cassandraConfig)
.withCache(cacheSize);
case OPAQUE:
return CassandraMapStateFactory.opaque(options, cassandraConfig)
.withCache(cacheSize);
default:
throw new IllegalArgumentException(String.format("stateType %s not supported",
stateType));
}
}
}