blob: dcfe369d80b032161b5d589663f4bb857f61b7a4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.epam.dlab.mongo;
import com.epam.dlab.billing.BillingCalculationUtils;
import com.epam.dlab.billing.DlabResourceType;
import com.epam.dlab.dto.base.DataEngineType;
import com.epam.dlab.exceptions.InitializationException;
import com.epam.dlab.exceptions.ParseException;
import com.epam.dlab.model.aws.ReportLine;
import com.mongodb.client.AggregateIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.Updates;
import org.apache.commons.lang3.StringUtils;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.SimpleDateFormat;
import java.util.*;
import static com.mongodb.client.model.Accumulators.*;
import static com.mongodb.client.model.Aggregates.*;
import static com.mongodb.client.model.Filters.*;
import static com.mongodb.client.model.Projections.*;
import static org.apache.commons.lang3.StringUtils.EMPTY;
/**
* Provides Mongo DAO for billing resources in DLab.
*/
public class DlabResourceTypeDAO implements MongoConstants {
private static final Logger LOGGER = LoggerFactory.getLogger(DlabResourceTypeDAO.class);
private static final String VOLUME_PRIMARY_SUFFIX = "-volume-primary";
private static final String VOLUME_SECONDARY_SUFFIX = "-volume-secondary";
/**
* Mongo database connection.
*/
private final MongoDbConnection connection;
/**
* Service base name.
*/
private String serviceBaseName;
private String serviceBaseNameId;
/**
* Describe all DLab resources: SSN, EDGE, exploratory, computational and buckets.
*/
private ResourceItemList resourceList;
/**
* Instantiate DAO for billing resources.
*
* @param connection the connection to Mongo DB.
* @throws InitializationException
*/
public DlabResourceTypeDAO(MongoDbConnection connection) throws InitializationException {
this.connection = connection;
setServiceBaseName();
setResourceList();
}
/**
* Returns the base name of service.
*/
public String getServiceBaseName() {
return serviceBaseName;
}
/**
* Set the base name of service.
*
* @throws InitializationException
*/
private void setServiceBaseName() throws InitializationException {
Document d = connection.getCollection(COLLECTION_SETTINGS)
.find(eq(FIELD_ID, FIELD_SERIVICE_BASE_NAME))
.first();
if (d == null) {
throw new InitializationException("Service base name property " + COLLECTION_SETTINGS +
"." + FIELD_SERIVICE_BASE_NAME + " in Mongo DB not found");
}
String value = d.getOrDefault("value", EMPTY).toString();
if (d.isEmpty()) {
throw new InitializationException("Service base name property " + COLLECTION_SETTINGS +
"." + FIELD_SERIVICE_BASE_NAME + " in Mongo DB is empty");
}
serviceBaseName = value;
serviceBaseNameId = value + ":";
LOGGER.debug("serviceBaseName is {}", serviceBaseName);
}
/**
* Return DLab resources from Mongo DB.
*
* @throws InitializationException
*/
public ResourceItemList getResourceList() {
return resourceList;
}
/**
* Load and return DLab resources from Mongo DB.
*
* @throws InitializationException
*/
private void setResourceList() {
resourceList = new ResourceItemList();
// Add SSN
String sbName = getServiceBaseName();
resourceList.append(sbName + "-ssn", "SSN", DlabResourceType.SSN);
resourceList.append(sbName + "-ssn-volume-primary", "SSN volume", DlabResourceType.VOLUME);
resourceList.append(sbName + "-ssn-bucket", "SSN bucket", DlabResourceType.SSN_BUCKET);
// collaboration bucket
resourceList.append(sbName + "-shared-bucket", "Collaboration bucket", DlabResourceType
.COLLABORATION_BUCKET);
// Add PROJECTS
Bson projection = fields(include("name", "endpoints"));
Iterable<Document> docs = connection.getCollection("Projects").find().projection(projection);
for (Document d : docs) {
String projectName = d.getString("name");
((List<Document>) d.get("endpoints"))
.stream()
.map(endpoint -> endpoint.getString("name"))
.forEach(endpoint -> {
resourceList.append(sbName + "-" + endpoint + "-shared-bucket", "Shared endpoint bucket",
DlabResourceType.COLLABORATION_BUCKET);
resourceList.append(sbName + "-" + projectName + "-" + endpoint + "-bucket", "Project bucket",
DlabResourceType.COLLABORATION_BUCKET, null, null, projectName);
resourceList.append(sbName + "-" + projectName + "-" + endpoint + "-edge", "EDGE Node",
DlabResourceType.EDGE, null, null, projectName);
resourceList.append(sbName + "-" + projectName+ "-" + endpoint + "-edge-volume-primary",
"EDGE Volume", DlabResourceType.VOLUME, null, null, projectName);
});
}
// Add exploratory
projection = fields(include(FIELD_USER,
FIELD_EXPLORATORY_NAME,
FIELD_EXPLORATORY_ID,
FIELD_PROJECT,
FIELD_COMPUTATIONAL_RESOURCES + "." + FIELD_COMPUTATIONAL_ID,
FIELD_COMPUTATIONAL_RESOURCES + "." + FIELD_COMPUTATIONAL_NAME,
FIELD_COMPUTATIONAL_RESOURCES + "." + FIELD_IMAGE,
FIELD_COMPUTATIONAL_RESOURCES + "." + FIELD_DATAENGINE_INSTANCE_COUNT),
excludeId());
docs = connection.getCollection(COLLECTION_USER_INSTANCES).find().projection(projection);
for (Document exp : docs) {
String username = exp.getString(FIELD_USER);
String exploratoryName = exp.getString(FIELD_EXPLORATORY_NAME);
String exploratoryId = exp.getString(FIELD_EXPLORATORY_ID);
String project = exp.getString(FIELD_PROJECT);
resourceList.append(exploratoryId, exploratoryName, DlabResourceType.EXPLORATORY, username,
exploratoryName, project);
appendExploratoryVolumes(username, exploratoryName, exploratoryId, project);
// Add computational
@SuppressWarnings("unchecked")
List<Document> compList = (List<Document>) exp.get(FIELD_COMPUTATIONAL_RESOURCES);
if (compList == null) {
continue;
}
for (Document comp : compList) {
String computationalId = comp.getString(FIELD_COMPUTATIONAL_ID);
String computationalName = comp.getString(FIELD_COMPUTATIONAL_NAME);
final DataEngineType dataEngineType = DataEngineType.fromDockerImageName(comp.getString(FIELD_IMAGE));
resourceList.append(computationalId, computationalName, DlabResourceType.COMPUTATIONAL, username,
exploratoryName, project);
if (DataEngineType.CLOUD_SERVICE == dataEngineType) {
appendDataengineServiceVolumes(username, exploratoryName, computationalId, computationalName,
project);
} else {
appendDataengineVolumes(username, exploratoryName, comp, computationalId, computationalName,
project);
}
}
}
LOGGER.debug("resourceList is {}", resourceList);
}
private void appendExploratoryVolumes(String username, String exploratoryName, String exploratoryId,
String project) {
resourceList.append(exploratoryId + VOLUME_PRIMARY_SUFFIX, "Volume primary", DlabResourceType.VOLUME,
username, exploratoryName, project);
resourceList.append(exploratoryId + VOLUME_SECONDARY_SUFFIX, "Volume secondary", DlabResourceType.VOLUME,
username, exploratoryName, project);
}
private void appendDataengineServiceVolumes(String username, String exploratoryName, String computationalId,
String computationalName, String project) {
resourceList.append(computationalId + VOLUME_PRIMARY_SUFFIX, computationalName + " volume primary",
DlabResourceType.VOLUME, username, exploratoryName, project);
resourceList.append(computationalId + VOLUME_SECONDARY_SUFFIX, computationalName + " volume secondary",
DlabResourceType.VOLUME, username, exploratoryName, project);
}
private void appendDataengineVolumes(String username, String exploratoryName, Document comp, String
computationalId, String computationalName, String project) {
resourceList.append(computationalId + "-m-volume-primary", computationalName + " master volume primary",
DlabResourceType.VOLUME, username, exploratoryName, project);
resourceList.append(computationalId + "-m-volume-secondary", computationalName + " master volume secondary",
DlabResourceType.VOLUME, username, exploratoryName, project);
final Integer instanceCount = Integer.valueOf(comp.getString(FIELD_DATAENGINE_INSTANCE_COUNT));
for (int i = instanceCount - 1; i > 0; i--) {
final String slaveId = computationalId + "-s" + i;
final String slaveName = computationalName + "-s" + i;
resourceList.append(slaveId + VOLUME_PRIMARY_SUFFIX, slaveName + " volume primary", DlabResourceType
.VOLUME, username, exploratoryName, project);
resourceList.append(slaveId + VOLUME_SECONDARY_SUFFIX, slaveName + " volume secondary", DlabResourceType
.VOLUME, username, exploratoryName, project);
}
}
/**
* Convert and return the report line of billing to Mongo document.
*
* @param row report line.
* @return Mongo document.
* @throws ParseException
*/
public Document transform(ReportLine row) throws ParseException {
String resourceId = row.getDlabId();
if (resourceId == null || !resourceId.startsWith(serviceBaseNameId)) {
throw new ParseException("DlabId is not match: expected start with " + serviceBaseNameId + ", actual " +
resourceId);
}
resourceId = resourceId.substring(serviceBaseNameId.length());
ResourceItem resource = resourceList.getById(resourceId);
Document d = new Document(ReportLine.FIELD_DLAB_ID, resourceId);
if (resource == null) {
d.put(FIELD_DLAB_RESOURCE_ID, null);
d.put(FIELD_DLAB_RESOURCE_TYPE, null);
d.put(ReportLine.FIELD_USER_ID, null);
d.put(FIELD_EXPLORATORY_NAME, null);
} else {
d.put(FIELD_DLAB_RESOURCE_ID, resource.getResourceId());
d.put(FIELD_DLAB_RESOURCE_TYPE, resource.getType().toString());
d.put(ReportLine.FIELD_USER_ID, resource.getUser());
d.put(FIELD_EXPLORATORY_NAME, resource.getExploratoryName());
d.put(FIELD_PROJECT, resource.getProject());
}
return d.append(ReportLine.FIELD_USAGE_DATE, row.getUsageDate())
.append(ReportLine.FIELD_PRODUCT, row.getProduct())
.append(ReportLine.FIELD_USAGE_TYPE, row.getUsageType())
.append(ReportLine.FIELD_USAGE, row.getUsage())
.append(ReportLine.FIELD_COST, row.getCost())
.append(ReportLine.FIELD_CURRENCY_CODE, row.getCurrencyCode())
.append(ReportLine.FIELD_RESOURCE_TYPE, row.getResourceType().category())
.append(ReportLine.FIELD_RESOURCE_ID, row.getResourceId())
.append(ReportLine.FIELD_TAGS, row.getTags());
}
/**
* Return field condition for groupping.
*
* @param fieldNames the list of field names.
*/
private Document getGrouppingFields(String... fieldNames) {
Document d = new Document();
for (String name : fieldNames) {
d.put(name, "$" + name);
}
return d;
}
/**
* Update monthly total in Mongo DB.
*
* @param month the month in format YYYY-MM.
* @throws InitializationException
*/
public void updateMonthTotalCost(String month) throws InitializationException {
LOGGER.debug("Update total cost for month {}", month);
try {
//Check month
SimpleDateFormat fd = new SimpleDateFormat("yyyy-MM");
fd.parse(month);
} catch (java.text.ParseException e) {
throw new InitializationException("Invalid month value. " + e.getLocalizedMessage(), e);
}
List<? extends Bson> pipeline = Arrays.asList(
match(and(gte(ReportLine.FIELD_USAGE_DATE, month + "-01"),
lte(ReportLine.FIELD_USAGE_DATE, month + "-31"))),
group(getGrouppingFields(FIELD_DLAB_RESOURCE_ID,
FIELD_DLAB_RESOURCE_TYPE,
FIELD_USER,
FIELD_EXPLORATORY_NAME,
ReportLine.FIELD_CURRENCY_CODE,
ReportLine.FIELD_RESOURCE_TYPE),
sum(ReportLine.FIELD_COST, "$" + ReportLine.FIELD_COST))
);
AggregateIterable<Document> docs = connection.getCollection(COLLECTION_BILLING).aggregate(pipeline);
MongoCollection<Document> collection = connection.getCollection(COLLECTION_BILLING_TOTAL);
long deletedCount = collection.deleteMany(eq(ReportLine.FIELD_USAGE_DATE, month)).getDeletedCount();
LOGGER.debug("{} documents has been deleted from collection {}", deletedCount, COLLECTION_BILLING_TOTAL);
List<Document> totals = new ArrayList<>();
for (Document d : docs) {
Document total = (Document) d.get(FIELD_ID);
total
.append(ReportLine.FIELD_USAGE_DATE, month)
.append(ReportLine.FIELD_COST, d.getDouble(ReportLine.FIELD_COST));
totals.add(total);
}
if (!totals.isEmpty()) {
LOGGER.debug("{} documents will be inserted into collection {}", totals.size(), COLLECTION_BILLING_TOTAL);
collection.insertMany(totals);
}
}
/**
* Comparator to sort billing exploratory details.
*/
class BillingComparator implements Comparator<Document> {
@Override
public int compare(Document d1, Document d2) {
int result = StringUtils.compare(d1.getString(FIELD_RESOURCE_NAME), d2.getString(FIELD_RESOURCE_NAME));
if (result == 0) {
result = StringUtils.compare(d1.getString(ReportLine.FIELD_PRODUCT), d2.getString(ReportLine
.FIELD_PRODUCT));
if (result == 0) {
return StringUtils.compare(d1.getString(ReportLine.FIELD_RESOURCE_TYPE), d2.getString(ReportLine
.FIELD_RESOURCE_TYPE));
}
}
return result;
}
}
/**
* Update exploratory cost in Mongo DB.
*
* @param user the name of user.
* @param exploratoryName id of exploratory.
*/
private void updateExploratoryCost(String user, String exploratoryName) {
LOGGER.debug("Update explorartory {} cost for user {}", exploratoryName, user);
List<? extends Bson> pipeline = Arrays.asList(
match(and(eq(FIELD_USER, user),
eq(FIELD_EXPLORATORY_NAME, exploratoryName))),
group(getGrouppingFields(FIELD_DLAB_RESOURCE_ID,
ReportLine.FIELD_PRODUCT,
ReportLine.FIELD_RESOURCE_TYPE,
ReportLine.FIELD_CURRENCY_CODE),
sum(ReportLine.FIELD_COST, "$" + ReportLine.FIELD_COST),
min(FIELD_USAGE_DATE_START, "$" + ReportLine.FIELD_USAGE_DATE),
max(FIELD_USAGE_DATE_END, "$" + ReportLine.FIELD_USAGE_DATE)
),
sort(new Document(FIELD_ID + "." + FIELD_DLAB_RESOURCE_ID, 1).append(FIELD_ID + "." + ReportLine
.FIELD_PRODUCT, 1))
);
AggregateIterable<Document> docs = connection.getCollection(COLLECTION_BILLING)
.aggregate(pipeline);
LinkedList<Document> billing = new LinkedList<>();
ResourceItemList resources = getResourceList();
Double costTotal = null;
String currencyCode = null;
for (Document d : docs) {
Document id = (Document) d.get(FIELD_ID);
double cost = BillingCalculationUtils.round(d.getDouble(ReportLine.FIELD_COST), 2);
costTotal = (costTotal == null ? cost : costTotal + cost);
if (currencyCode == null) {
currencyCode = id.getString(ReportLine.FIELD_CURRENCY_CODE);
}
Document total = new Document()
.append(FIELD_RESOURCE_NAME, resources.getById(id.getString(FIELD_DLAB_RESOURCE_ID))
.getResourceName())
.append(ReportLine.FIELD_PRODUCT, id.getString(ReportLine.FIELD_PRODUCT))
.append(ReportLine.FIELD_RESOURCE_TYPE, id.getString(ReportLine.FIELD_RESOURCE_TYPE))
.append(ReportLine.FIELD_COST, BillingCalculationUtils.formatDouble(cost))
.append(ReportLine.FIELD_CURRENCY_CODE, id.getString(ReportLine.FIELD_CURRENCY_CODE))
.append(FIELD_USAGE_DATE_START, d.getString(FIELD_USAGE_DATE_START))
.append(FIELD_USAGE_DATE_END, d.getString(FIELD_USAGE_DATE_END));
billing.add(total);
}
LOGGER.debug("Total explorartory {} cost for user {} is {} {}, detail count is {}",
exploratoryName, user, costTotal, currencyCode, billing.size());
billing.sort(new BillingComparator());
MongoCollection<Document> cExploratory = connection.getCollection(COLLECTION_USER_INSTANCES);
Bson values = Updates.combine(
Updates.set(ReportLine.FIELD_COST, BillingCalculationUtils.formatDouble(costTotal)),
Updates.set(FIELD_CURRENCY_CODE, currencyCode),
Updates.set(COLLECTION_BILLING, (!billing.isEmpty() ? billing : null)));
cExploratory.updateOne(
and(and(eq(FIELD_USER, user),
eq(FIELD_EXPLORATORY_NAME, exploratoryName))),
values);
}
/**
* Update EDGE cost in Mongo DB.
*
* @param user the name of user.
*/
private void updateEdgeCost(String user) {
List<? extends Bson> pipeline = Arrays.asList(
match(and(eq(FIELD_USER, user),
eq(FIELD_EXPLORATORY_NAME, null))),
group(getGrouppingFields(ReportLine.FIELD_CURRENCY_CODE),
sum(ReportLine.FIELD_COST, "$" + ReportLine.FIELD_COST))
);
AggregateIterable<Document> docs = connection.getCollection(COLLECTION_BILLING_TOTAL)
.aggregate(pipeline);
MongoCollection<Document> cEdge = connection.getCollection(COLLECTION_USER_EDGE);
for (Document d : docs) {
Document id = (Document) d.get(FIELD_ID);
Bson values = Updates.combine(
Updates.set(ReportLine.FIELD_COST, BillingCalculationUtils.round(d.getDouble(ReportLine
.FIELD_COST), 2)),
Updates.set(FIELD_CURRENCY_CODE, id.get(ReportLine.FIELD_CURRENCY_CODE)));
cEdge.updateOne(
eq(FIELD_ID, user),
values);
}
}
/**
* Update the cost of exploratory environment for all users in Mongo DB.
*/
public void updateExploratoryCost() {
for (int i = 0; i < resourceList.size(); i++) {
ResourceItem item = resourceList.get(i);
if (item.getType() == DlabResourceType.EXPLORATORY) {
updateExploratoryCost(item.getUser(), item.getExploratoryName());
} else if (item.getType() == DlabResourceType.EDGE) {
updateEdgeCost(item.getUser());
}
}
}
}