blob: f60c76c6abbbc9113f1716d99d2478c6dd17d100 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.cassandra.query.impl;
import com.datastax.driver.core.CodecRegistry;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.TypeCodec;
import com.datastax.driver.mapping.Mapper;
import com.datastax.driver.mapping.Mapper.Option;
import com.datastax.driver.mapping.MappingManager;
import com.google.common.base.Preconditions;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.WeakHashMap;
import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
import org.apache.storm.cassandra.query.ObjectMapperOperation;
import org.apache.storm.tuple.ITuple;
/**
* Tuple mapper that is able to map objects annotated with {@link com.datastax.driver.mapping.annotations.Table} to CQL statements.
*/
@SuppressWarnings("checkstyle:AbbreviationAsWordInName")
public class ObjectMapperCqlStatementMapper implements CQLStatementTupleMapper {
private static final Map<Session, MappingManager> mappingManagers = new WeakHashMap<>();
private final String operationField;
private final String valueField;
private final String timestampField;
private final String ttlField;
private final String consistencyLevelField;
private final Collection<TypeCodec<?>> codecs;
private final Collection<Class<?>> udtClasses;
public ObjectMapperCqlStatementMapper(String operationField, String valueField, String timestampField, String ttlField,
String consistencyLevelField, Collection<TypeCodec<?>> codecs, Collection<Class<?>> udtClasses) {
Preconditions.checkNotNull(operationField, "Operation field must not be null");
Preconditions.checkNotNull(valueField, "Value field should not be null");
this.operationField = operationField;
this.valueField = valueField;
this.timestampField = timestampField;
this.ttlField = ttlField;
this.consistencyLevelField = consistencyLevelField;
this.codecs = codecs;
this.udtClasses = udtClasses;
}
@Override
public List<Statement> map(Map<String, Object> map, Session session, ITuple tuple) {
final ObjectMapperOperation operation = (ObjectMapperOperation) tuple.getValueByField(operationField);
Preconditions.checkNotNull(operation, "Operation must not be null");
final Object value = tuple.getValueByField(valueField);
final Object timestampObject = timestampField != null ? tuple.getValueByField(timestampField) : null;
final Object ttlObject = ttlField != null ? tuple.getValueByField(ttlField) : null;
final ConsistencyLevel consistencyLevel =
consistencyLevelField != null ? (ConsistencyLevel) tuple.getValueByField(consistencyLevelField) : null;
final Class<?> valueClass = value.getClass();
final Mapper mapper = getMappingManager(session).mapper(valueClass);
Collection<Option> options = new ArrayList<>();
if (timestampObject != null) {
if (timestampObject instanceof Number) {
options.add(Option.timestamp(((Number) timestampObject).longValue()));
} else if (timestampObject instanceof Instant) {
Instant timestamp = (Instant) timestampObject;
options.add(Option.timestamp(timestamp.getEpochSecond() * 1000_0000L + timestamp.getNano() / 1000L));
}
}
if (ttlObject != null) {
if (ttlObject instanceof Number) {
options.add(Option.ttl(((Number) ttlObject).intValue()));
} else if (ttlObject instanceof Duration) {
Duration ttl = (Duration) ttlObject;
options.add(Option.ttl((int) ttl.getSeconds()));
}
}
if (consistencyLevel != null) {
options.add(Option.consistencyLevel(consistencyLevel));
}
if (operation == ObjectMapperOperation.SAVE) {
options.add(Option.saveNullFields(true));
return Arrays.asList(mapper.saveQuery(value, options.toArray(new Option[options.size()])));
} else if (operation == ObjectMapperOperation.SAVE_IGNORE_NULLS) {
options.add(Option.saveNullFields(false));
return Arrays.asList(mapper.saveQuery(value, options.toArray(new Option[options.size()])));
} else if (operation == ObjectMapperOperation.DELETE) {
return Arrays.asList(mapper.deleteQuery(value, options.toArray(new Option[options.size()])));
} else {
throw new UnsupportedOperationException("Unknown operation: " + operation);
}
}
private MappingManager getMappingManager(Session session) {
synchronized (mappingManagers) {
MappingManager mappingManager = mappingManagers.get(session);
if (mappingManager == null) {
mappingManager = new MappingManager(session);
mappingManagers.put(session, mappingManager);
CodecRegistry codecRegistry = session.getCluster().getConfiguration().getCodecRegistry();
for (TypeCodec<?> codec : codecs) {
codecRegistry.register(codec);
}
for (Class<?> udtClass : udtClasses) {
mappingManager.udtCodec(udtClass);
}
}
return mappingManager;
}
}
}