blob: a9993b5e80a280ab2e601003b36eadb87d8905ca [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.epam.datalab.mongo;
import com.epam.datalab.core.aggregate.UsageDataList;
import com.epam.datalab.exceptions.AdapterException;
import com.epam.datalab.model.aws.ReportLine;
import com.mongodb.BasicDBObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
import com.mongodb.WriteConcern;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.IndexOptions;
import com.mongodb.client.result.DeleteResult;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import static com.mongodb.client.model.Filters.eq;
/**
* Provides operation with Mongo database and billing report.
*/
public class MongoDbConnection implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbConnection.class);
/**
* Mongo client.
*/
private MongoClient client;
/**
* Mongo database.
*/
private MongoDatabase database;
/**
* Instantiate the helper for Mongo database adapter.
*
* @param host the host name.
* @param port the port.
* @param databaseName the name of database.
* @param username the name of user.
* @param password the password.
* @throws AdapterException
*/
public MongoDbConnection(String host, int port, String databaseName, String username, String password) throws
AdapterException {
try {
client = new MongoClient(
new ServerAddress(host, port),
Collections.singletonList(
MongoCredential.createCredential(username, databaseName, password.toCharArray())));
database = client.getDatabase(databaseName).withWriteConcern(WriteConcern.ACKNOWLEDGED);
} catch (Exception e) {
throw new AdapterException("Cannot create connection to database " +
databaseName + ". " + e.getLocalizedMessage(), e);
}
}
/**
* Close connection to Mongo database.
*/
@Override
public void close() throws IOException {
if (client != null) {
try {
client.close();
} catch (Exception e) {
throw new IOException(e.getLocalizedMessage(), e);
} finally {
client = null;
database = null;
}
}
}
/**
* Create index on billing collection.
*
* @param indexName the name of index.
* @param index the index options.
*/
private void createBillingIndexes(String indexName, Bson index) {
MongoCollection<Document> collection = database.getCollection(MongoConstants.COLLECTION_BILLING);
IndexOptions options = new IndexOptions().name(MongoConstants.COLLECTION_BILLING + indexName);
try {
collection
.createIndex(index, options);
} catch (Exception e) {
LOGGER.warn("Cannot create index {} on collection {}. {}", options.getName(),
MongoConstants.COLLECTION_BILLING, e.getLocalizedMessage(), e);
}
}
/**
* Create index on Mongo collection for fast upsert operations.
*/
public void createBillingIndexes() {
createBillingIndexes("_IntervalIdx",
new BasicDBObject()
.append(ReportLine.FIELD_USER_ID, 1)
.append(ReportLine.FIELD_USAGE_DATE, 2));
createBillingIndexes("_ExploratoryIdx",
new BasicDBObject()
.append(ReportLine.FIELD_USER_ID, 1)
.append(MongoConstants.FIELD_EXPLORATORY_NAME, 2));
}
/**
* Return the collection of Mongo database.
*
* @param collectionName the name of collection.
*/
public MongoCollection<Document> getCollection(String collectionName) {
return database.getCollection(collectionName);
}
/**
* Insert document to Mongo.
*
* @param collection the name of collection.
* @param document the document.
* @throws AdapterException
*/
public void insertOne(MongoCollection<Document> collection, Document document) throws AdapterException {
try {
collection.insertOne(document);
} catch (Exception e) {
throw new AdapterException("Cannot insert document into collection " +
collection.getNamespace() + ": " + e.getLocalizedMessage(), e);
}
}
/**
* Insert documents from list to Mongo collection and clear list.
*
* @param collection Mongo collection.
* @param documents the list of documents.
* @throws AdapterException
*/
public void insertRows(MongoCollection<Document> collection, List<Document> documents) throws AdapterException {
try {
if (!documents.isEmpty()) {
collection.insertMany(documents);
LOGGER.debug("{} documents has been inserted into collection {}",
documents.size(), collection.getNamespace());
documents.clear();
}
} catch (Exception e) {
throw new AdapterException("Cannot insert new documents into collection " +
collection.getNamespace() + ": " + e.getLocalizedMessage(), e);
}
}
/**
* Insert documents from list to Mongo collection and clear list.
*
* @param collection Mongo collection.
* @param documents the list of documents.
* @param usageDateList list of the data interval to deletion old data from Mongo.
* @throws AdapterException
*/
public void upsertRows(MongoCollection<Document> collection, List<Document> documents, UsageDataList usageDateList)
throws AdapterException {
deleteRows(collection, usageDateList);
insertRows(collection, documents);
}
/**
* Delete the documents from Mongo collection.
*
* @param collection Mongo collection.
* @param usageDateList list of the data interval to deletion data from Mongo.
* @throws AdapterException
*/
public void deleteRows(MongoCollection<Document> collection, UsageDataList usageDateList)
throws AdapterException {
try {
long rowCount = 0;
for (String date : usageDateList) {
if (!usageDateList.get(date)) {
DeleteResult result = collection.deleteMany(eq(ReportLine.FIELD_USAGE_DATE, date));
rowCount += result.getDeletedCount();
usageDateList.set(date, true);
}
}
if (rowCount > 0) {
LOGGER.debug("{} documents has been deleted from collection {}",
rowCount, collection.getNamespace());
}
} catch (Exception e) {
throw new AdapterException("Cannot delete old rows from collection " +
collection.getNamespace() + ": " + e.getLocalizedMessage(), e);
}
}
}