| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package com.epam.dlab.billing.azure; |
| |
| import com.epam.dlab.MongoKeyWords; |
| import com.epam.dlab.billing.BillingCalculationUtils; |
| import com.epam.dlab.billing.DlabResourceType; |
| import com.google.common.collect.Lists; |
| import com.mongodb.client.AggregateIterable; |
| import com.mongodb.client.model.*; |
| import com.mongodb.client.result.UpdateResult; |
| import lombok.extern.slf4j.Slf4j; |
| import org.bson.Document; |
| import org.bson.conversions.Bson; |
| |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.function.Consumer; |
| import java.util.stream.StreamSupport; |
| |
| @Slf4j |
| public class AzureBillingDetailsService { |
| private MongoDbBillingClient mongoDbBillingClient; |
| private String currencyCode; |
| |
| public AzureBillingDetailsService(MongoDbBillingClient mongoDbBillingClient, String currencyCode) { |
| this.mongoDbBillingClient = mongoDbBillingClient; |
| this.currencyCode = currencyCode; |
| } |
| |
| public void updateBillingDetails() { |
| StreamSupport.stream(mongoDbBillingClient.getDatabase() |
| .getCollection(MongoKeyWords.NOTEBOOK_COLLECTION).find().spliterator(), false) |
| .map(a -> a.getString(MongoKeyWords.DLAB_USER)) |
| .forEach(this::updateBillingDetails); |
| } |
| |
| public void updateBillingDetails(String user) { |
| log.debug("Updating billing details for user {}", user); |
| |
| try { |
| AggregateIterable<Document> aggregateIterable = mongoDbBillingClient.getDatabase() |
| .getCollection(MongoKeyWords.BILLING_DETAILS) |
| .aggregate(Lists.newArrayList( |
| Aggregates.match( |
| Filters.and( |
| Filters.eq(MongoKeyWords.DLAB_USER, user), |
| Filters.in(MongoKeyWords.RESOURCE_TYPE, |
| DlabResourceType.EXPLORATORY.toString(), |
| DlabResourceType.COMPUTATIONAL.toString(), |
| DlabResourceType.VOLUME.toString()) |
| ) |
| ), |
| |
| Aggregates.group(getGroupingFields( |
| MongoKeyWords.DLAB_ID, |
| MongoKeyWords.DLAB_USER, |
| MongoKeyWords.EXPLORATORY_ID, |
| MongoKeyWords.RESOURCE_TYPE, |
| MongoKeyWords.RESOURCE_NAME, |
| MongoKeyWords.COMPUTATIONAL_ID, |
| MongoKeyWords.METER_CATEGORY), |
| Accumulators.sum(MongoKeyWords.COST, MongoKeyWords.prepend$(MongoKeyWords.COST)), |
| Accumulators.min(MongoKeyWords.USAGE_FROM, MongoKeyWords.prepend$(MongoKeyWords |
| .USAGE_DAY)), |
| Accumulators.max(MongoKeyWords.USAGE_TO, MongoKeyWords.prepend$(MongoKeyWords |
| .USAGE_DAY)) |
| ), |
| |
| Aggregates.sort(Sorts.ascending( |
| MongoKeyWords.prependId(MongoKeyWords.RESOURCE_NAME), |
| MongoKeyWords.prependId(MongoKeyWords.METER_CATEGORY))) |
| ) |
| ); |
| |
| updateBillingDetails(user, mapToDetails(aggregateIterable)); |
| } catch (RuntimeException e) { |
| log.error("Updating billing details for user {} is failed", user, e); |
| } |
| } |
| |
| private List<Document> mapToDetails(AggregateIterable<Document> aggregateIterable) { |
| List<Document> billingDetails = new ArrayList<>(); |
| for (Document document : aggregateIterable) { |
| Document oldRef = (Document) document.get(MongoKeyWords.MONGO_ID); |
| Document newDocument = new Document(); |
| |
| newDocument.append(MongoKeyWords.USAGE_FROM, document.getString(MongoKeyWords.USAGE_FROM)); |
| newDocument.append(MongoKeyWords.USAGE_TO, document.getString(MongoKeyWords.USAGE_TO)); |
| newDocument.append(MongoKeyWords.COST, document.getDouble(MongoKeyWords.COST)); |
| |
| newDocument.append(MongoKeyWords.METER_CATEGORY, oldRef.getString(MongoKeyWords.METER_CATEGORY)); |
| newDocument.append(MongoKeyWords.RESOURCE_NAME, oldRef.getString(MongoKeyWords.RESOURCE_NAME)); |
| newDocument.append(MongoKeyWords.EXPLORATORY_ID, oldRef.getString(MongoKeyWords.EXPLORATORY_ID)); |
| newDocument.append(MongoKeyWords.RESOURCE_TYPE, oldRef.getString(MongoKeyWords.RESOURCE_TYPE)); |
| newDocument.append(MongoKeyWords.CURRENCY_CODE, currencyCode); |
| |
| billingDetails.add(newDocument); |
| } |
| |
| return billingDetails; |
| } |
| |
| |
| private void updateBillingDetails(String user, List<Document> billingDetails) { |
| if (!billingDetails.isEmpty()) { |
| Map<String, List<Document>> info = new HashMap<>(); |
| |
| Consumer<Document> aggregator = e -> { |
| |
| String notebookId = e.getString(MongoKeyWords.EXPLORATORY_ID); |
| List<Document> documents = info.get(notebookId); |
| if (documents == null) { |
| documents = new ArrayList<>(); |
| } |
| |
| documents.add(e); |
| info.put(notebookId, documents); |
| }; |
| |
| billingDetails.stream() |
| .filter(e -> DlabResourceType.EXPLORATORY.toString().equals(e.getString(MongoKeyWords |
| .RESOURCE_TYPE))) |
| .forEach(aggregator); |
| |
| billingDetails.stream() |
| .filter(e -> DlabResourceType.COMPUTATIONAL.toString().equals(e.getString(MongoKeyWords |
| .RESOURCE_TYPE))) |
| .forEach(aggregator); |
| |
| billingDetails.stream() |
| .filter(e -> DlabResourceType.VOLUME.toString().equals(e.getString(MongoKeyWords.RESOURCE_TYPE))) |
| .forEach(aggregator); |
| |
| |
| for (Map.Entry<String, List<Document>> entry : info.entrySet()) { |
| double sum = entry.getValue().stream().mapToDouble(e -> e.getDouble(MongoKeyWords.COST)).sum(); |
| |
| entry.getValue().forEach(e -> e.put(MongoKeyWords.COST_STRING, |
| BillingCalculationUtils.formatDouble(e.getDouble(MongoKeyWords.COST)))); |
| |
| log.debug("Update billing for notebook {}, cost is {} {}", entry.getKey(), sum, currencyCode); |
| |
| Bson updates = Updates.combine( |
| Updates.set(MongoKeyWords.COST_STRING, BillingCalculationUtils.formatDouble(sum)), |
| Updates.set(MongoKeyWords.COST, sum), |
| Updates.set(MongoKeyWords.CURRENCY_CODE, currencyCode), |
| Updates.set(MongoKeyWords.BILLING_DETAILS, entry.getValue())); |
| |
| UpdateResult updateResult = mongoDbBillingClient.getDatabase() |
| .getCollection(MongoKeyWords.NOTEBOOK_COLLECTION) |
| .updateOne( |
| Filters.and( |
| Filters.eq(MongoKeyWords.DLAB_USER, user), |
| Filters.eq(MongoKeyWords.EXPLORATORY_ID_OLD, entry.getKey()) |
| ), |
| updates |
| ); |
| |
| log.debug("Update result for {}/{} is {}", user, entry.getKey(), updateResult); |
| } |
| } else { |
| log.warn("No billing details found for notebooks for user {}", user); |
| } |
| } |
| |
| |
| private Document getGroupingFields(String... fieldNames) { |
| Document d = new Document(); |
| for (String name : fieldNames) { |
| d.put(name, "$" + name); |
| } |
| return d; |
| } |
| } |