blob: 827cbdb2ea596fc47ecfd5b549a446a7d73e6d02 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.sql.mongodb;
import com.google.common.base.Preconditions;
import java.net.URI;
import java.util.List;
import java.util.Properties;
import org.apache.storm.mongodb.bolt.MongoInsertBolt;
import org.apache.storm.mongodb.common.mapper.MongoMapper;
import org.apache.storm.sql.runtime.DataSourcesProvider;
import org.apache.storm.sql.runtime.FieldInfo;
import org.apache.storm.sql.runtime.IOutputSerializer;
import org.apache.storm.sql.runtime.ISqlStreamsDataSource;
import org.apache.storm.sql.runtime.utils.FieldInfoUtils;
import org.apache.storm.sql.runtime.utils.SerdeUtils;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.tuple.ITuple;
import org.apache.storm.tuple.Values;
import org.bson.Document;
/**
* Create a MongoDB sink based on the URI and properties. The URI has the format of
* mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]].
* The properties are in JSON format which specifies the name of the MongoDB collection and etc.
*/
public class MongoDataSourcesProvider implements DataSourcesProvider {
public static final String SCHEME_NAME = "mongodb";
public static final String VALUE_SERIALIZED_FIELD = "ser.field";
public static final String TRIDENT_VALUE_SERIALIZED_FIELD = "trident.ser.field";
public static final String DEFAULT_VALUE_SERIALIZED_FIELD = "tridentSerField";
public static final String COLLECTION_NAME = "collection.name";
private static class MongoStreamsDataSource implements ISqlStreamsDataSource {
private final String url;
private final Properties props;
private final IOutputSerializer serializer;
private MongoStreamsDataSource(String url, Properties props, IOutputSerializer serializer) {
this.url = url;
this.props = props;
this.serializer = serializer;
}
@Override
public IRichSpout getProducer() {
throw new UnsupportedOperationException(this.getClass().getName() + " doesn't provide Producer");
}
@Override
public IRichBolt getConsumer() {
Preconditions.checkArgument(!props.isEmpty(), "Writable MongoDB must contain collection config");
String serField;
if (props.contains(VALUE_SERIALIZED_FIELD)) {
serField = props.getProperty(VALUE_SERIALIZED_FIELD);
} else if (props.contains(TRIDENT_VALUE_SERIALIZED_FIELD)) {
// backward compatibility
serField = props.getProperty(TRIDENT_VALUE_SERIALIZED_FIELD);
} else {
serField = DEFAULT_VALUE_SERIALIZED_FIELD;
}
MongoMapper mapper = new SqlMongoMapper(serField, serializer);
return new MongoInsertBolt(url, props.getProperty(COLLECTION_NAME), mapper);
}
}
private static class SqlMongoMapper implements MongoMapper {
private final String serField;
private final IOutputSerializer serializer;
private SqlMongoMapper(String serField, IOutputSerializer serializer) {
this.serField = serField;
this.serializer = serializer;
}
@Override
public Document toDocument(ITuple tuple) {
Document document = new Document();
Values values = (Values) tuple.getValue(1);
byte[] array = serializer.write(values, null).array();
document.append(serField, array);
return document;
}
@Override
public Document toDocumentByKeys(List<Object> keys) {
return null;
}
}
@Override
public String scheme() {
return SCHEME_NAME;
}
@Override
public ISqlStreamsDataSource constructStreams(URI uri, String inputFormatClass, String outputFormatClass,
Properties properties, List<FieldInfo> fields) {
List<String> fieldNames = FieldInfoUtils.getFieldNames(fields);
IOutputSerializer serializer = SerdeUtils.getSerializer(outputFormatClass, properties, fieldNames);
return new MongoStreamsDataSource(uri.toString(), properties, serializer);
}
}