| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.brooklyn.entity.webapp; |
| |
| import static com.google.common.base.Preconditions.checkNotNull; |
| |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.Callable; |
| |
| import org.apache.brooklyn.api.entity.Entity; |
| import org.apache.brooklyn.api.mgmt.Task; |
| import org.apache.brooklyn.api.mgmt.TaskAdaptable; |
| import org.apache.brooklyn.api.sensor.AttributeSensor; |
| import org.apache.brooklyn.core.effector.Effectors; |
| import org.apache.brooklyn.core.entity.Attributes; |
| import org.apache.brooklyn.core.entity.Entities; |
| import org.apache.brooklyn.enricher.stock.Enrichers; |
| import org.apache.brooklyn.entity.group.DynamicCluster; |
| import org.apache.brooklyn.entity.group.DynamicClusterImpl; |
| import org.apache.brooklyn.util.collections.MutableMap; |
| import org.apache.brooklyn.util.collections.MutableSet; |
| import org.apache.brooklyn.util.core.task.DynamicTasks; |
| import org.apache.brooklyn.util.core.task.TaskBuilder; |
| import org.apache.brooklyn.util.core.task.TaskTags; |
| import org.apache.brooklyn.util.core.task.Tasks; |
| import org.apache.brooklyn.util.exceptions.Exceptions; |
| import org.apache.brooklyn.util.time.Duration; |
| import org.apache.brooklyn.util.time.Time; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.Iterables; |
| |
| /** |
| * DynamicWebAppClusters provide cluster-wide aggregates of entity attributes. Currently totals and averages: |
| * <ul> |
| * <li>Entity request counts</li> |
| * <li>Entity error counts</li> |
| * <li>Requests per second</li> |
| * <li>Entity processing time</li> |
| * </ul> |
| */ |
| public class DynamicWebAppClusterImpl extends DynamicClusterImpl implements DynamicWebAppCluster { |
| |
| private static final Logger log = LoggerFactory.getLogger(DynamicWebAppClusterImpl.class); |
| private static final FilenameToWebContextMapper FILENAME_TO_WEB_CONTEXT_MAPPER = new FilenameToWebContextMapper(); |
| |
| /** |
| * Instantiate a new DynamicWebAppCluster. Parameters as per {@link DynamicCluster#DynamicCluster()} |
| */ |
| public DynamicWebAppClusterImpl() { |
| super(); |
| } |
| |
| @Override |
| public void init() { |
| super.init(); |
| // Enricher attribute setup. A way of automatically discovering these (but avoiding |
| // averaging things like HTTP port and response codes) would be neat. |
| List<? extends List<? extends AttributeSensor<? extends Number>>> summingEnricherSetup = ImmutableList.of( |
| ImmutableList.of(REQUEST_COUNT, REQUEST_COUNT), |
| ImmutableList.of(ERROR_COUNT, ERROR_COUNT), |
| ImmutableList.of(REQUESTS_PER_SECOND_LAST, REQUESTS_PER_SECOND_LAST), |
| ImmutableList.of(REQUESTS_PER_SECOND_IN_WINDOW, REQUESTS_PER_SECOND_IN_WINDOW), |
| ImmutableList.of(TOTAL_PROCESSING_TIME, TOTAL_PROCESSING_TIME), |
| ImmutableList.of(PROCESSING_TIME_FRACTION_IN_WINDOW, PROCESSING_TIME_FRACTION_IN_WINDOW) |
| ); |
| |
| List<? extends List<? extends AttributeSensor<? extends Number>>> averagingEnricherSetup = ImmutableList.of( |
| ImmutableList.of(REQUEST_COUNT, REQUEST_COUNT_PER_NODE), |
| ImmutableList.of(ERROR_COUNT, ERROR_COUNT_PER_NODE), |
| ImmutableList.of(REQUESTS_PER_SECOND_LAST, REQUESTS_PER_SECOND_LAST_PER_NODE), |
| ImmutableList.of(REQUESTS_PER_SECOND_IN_WINDOW, REQUESTS_PER_SECOND_IN_WINDOW_PER_NODE), |
| ImmutableList.of(TOTAL_PROCESSING_TIME, TOTAL_PROCESSING_TIME_PER_NODE), |
| ImmutableList.of(PROCESSING_TIME_FRACTION_IN_WINDOW, PROCESSING_TIME_FRACTION_IN_WINDOW_PER_NODE) |
| ); |
| |
| for (List<? extends AttributeSensor<? extends Number>> es : summingEnricherSetup) { |
| AttributeSensor<? extends Number> t = es.get(0); |
| AttributeSensor<? extends Number> total = es.get(1); |
| enrichers().add(Enrichers.builder() |
| .aggregating(t) |
| .publishing(total) |
| .fromMembers() |
| .computingSum() |
| .build()); |
| } |
| |
| for (List<? extends AttributeSensor<? extends Number>> es : averagingEnricherSetup) { |
| @SuppressWarnings("unchecked") |
| AttributeSensor<Number> t = (AttributeSensor<Number>) es.get(0); |
| @SuppressWarnings("unchecked") |
| AttributeSensor<Double> average = (AttributeSensor<Double>) es.get(1); |
| enrichers().add(Enrichers.builder() |
| .aggregating(t) |
| .publishing(average) |
| .fromMembers() |
| .computingAverage() |
| .defaultValueForUnreportedSensors(0) |
| .build()); |
| } |
| } |
| |
| // TODO this will probably be useful elsewhere ... but where to put it? |
| // TODO add support for this in DependentConfiguration (see TODO there) |
| /** Waits for the given target to report service up, then runs the given task |
| * (often an invocation on that entity), with the given name. |
| * If the target goes away, this task marks itself inessential |
| * before failing so as not to cause a parent task to fail. */ |
| static <T> Task<T> whenServiceUp(final Entity target, final TaskAdaptable<T> task, String name) { |
| return Tasks.<T>builder().displayName(name).dynamic(true).body(new Callable<T>() { |
| @Override |
| public T call() { |
| try { |
| while (true) { |
| if (!Entities.isManaged(target)) { |
| Tasks.markInessential(); |
| throw new IllegalStateException("Target "+target+" is no longer managed"); |
| } |
| if (Boolean.TRUE.equals(target.getAttribute(Attributes.SERVICE_UP))) { |
| Tasks.resetBlockingDetails(); |
| TaskTags.markInessential(task); |
| DynamicTasks.queue(task); |
| try { |
| return task.asTask().getUnchecked(); |
| } catch (Exception e) { |
| if (Entities.isManaged(target)) { |
| throw Exceptions.propagate(e); |
| } else { |
| Tasks.markInessential(); |
| throw new IllegalStateException("Target "+target+" is no longer managed", e); |
| } |
| } |
| } else { |
| Tasks.setBlockingDetails("Waiting on "+target+" to be ready"); |
| } |
| // TODO replace with subscription? |
| Time.sleep(Duration.ONE_SECOND); |
| } |
| } finally { |
| Tasks.resetBlockingDetails(); |
| } |
| } |
| }).build(); |
| } |
| |
| @Override |
| public void deploy(String url, String targetName) { |
| checkNotNull(url, "url"); |
| checkNotNull(targetName, "targetName"); |
| targetName = FILENAME_TO_WEB_CONTEXT_MAPPER.convertDeploymentTargetNameToContext(targetName); |
| |
| // set it up so future nodes get the right wars |
| addToWarsByContext(this, url, targetName); |
| |
| log.debug("Deploying "+targetName+"->"+url+" across cluster "+this+"; WARs now "+getConfig(WARS_BY_CONTEXT)); |
| |
| Iterable<CanDeployAndUndeploy> targets = Iterables.filter(getChildren(), CanDeployAndUndeploy.class); |
| TaskBuilder<Void> tb = Tasks.<Void>builder().parallel(true).displayName("Deploy "+targetName+" to cluster (size "+Iterables.size(targets)+")"); |
| for (Entity target: targets) { |
| tb.add(whenServiceUp(target, Effectors.invocation(target, DEPLOY, MutableMap.of("url", url, "targetName", targetName)), |
| "Deploy "+targetName+" to "+target+" when ready")); |
| } |
| DynamicTasks.queueIfPossible(tb.build()).orSubmitAsync(this).asTask().getUnchecked(); |
| |
| // Update attribute |
| // TODO support for atomic sensor update (should be part of standard tooling; NB there is some work towards this, according to @aledsage) |
| Set<String> deployedWars = MutableSet.copyOf(getAttribute(DEPLOYED_WARS)); |
| deployedWars.add(targetName); |
| sensors().set(DEPLOYED_WARS, deployedWars); |
| } |
| |
| @Override |
| public void undeploy(String targetName) { |
| checkNotNull(targetName, "targetName"); |
| targetName = FILENAME_TO_WEB_CONTEXT_MAPPER.convertDeploymentTargetNameToContext(targetName); |
| |
| // set it up so future nodes get the right wars |
| if (!removeFromWarsByContext(this, targetName)) { |
| DynamicTasks.submit(Tasks.warning("Context "+targetName+" not known at "+this+"; attempting to undeploy regardless", null), this); |
| } |
| |
| log.debug("Undeploying "+targetName+" across cluster "+this+"; WARs now "+getConfig(WARS_BY_CONTEXT)); |
| |
| Iterable<CanDeployAndUndeploy> targets = Iterables.filter(getChildren(), CanDeployAndUndeploy.class); |
| TaskBuilder<Void> tb = Tasks.<Void>builder().parallel(true).displayName("Undeploy "+targetName+" across cluster (size "+Iterables.size(targets)+")"); |
| for (Entity target: targets) { |
| tb.add(whenServiceUp(target, Effectors.invocation(target, UNDEPLOY, MutableMap.of("targetName", targetName)), |
| "Undeploy "+targetName+" at "+target+" when ready")); |
| } |
| DynamicTasks.queueIfPossible(tb.build()).orSubmitAsync(this).asTask().getUnchecked(); |
| |
| // Update attribute |
| Set<String> deployedWars = MutableSet.copyOf(getAttribute(DEPLOYED_WARS)); |
| deployedWars.remove( FILENAME_TO_WEB_CONTEXT_MAPPER.convertDeploymentTargetNameToContext(targetName) ); |
| sensors().set(DEPLOYED_WARS, deployedWars); |
| } |
| |
| static void addToWarsByContext(Entity entity, String url, String targetName) { |
| targetName = FILENAME_TO_WEB_CONTEXT_MAPPER.convertDeploymentTargetNameToContext(targetName); |
| // TODO a better way to do atomic updates, see comment above |
| synchronized (entity) { |
| Map<String,String> newWarsMap = MutableMap.copyOf(entity.getConfig(WARS_BY_CONTEXT)); |
| newWarsMap.put(targetName, url); |
| entity.config().set(WARS_BY_CONTEXT, newWarsMap); |
| } |
| } |
| |
| static boolean removeFromWarsByContext(Entity entity, String targetName) { |
| targetName = FILENAME_TO_WEB_CONTEXT_MAPPER.convertDeploymentTargetNameToContext(targetName); |
| // TODO a better way to do atomic updates, see comment above |
| synchronized (entity) { |
| Map<String,String> newWarsMap = MutableMap.copyOf(entity.getConfig(WARS_BY_CONTEXT)); |
| String url = newWarsMap.remove(targetName); |
| if (url==null) { |
| return false; |
| } |
| entity.config().set(WARS_BY_CONTEXT, newWarsMap); |
| return true; |
| } |
| } |
| |
| @Override |
| public void redeployAll() { |
| Map<String, String> wars = MutableMap.copyOf(getConfig(WARS_BY_CONTEXT)); |
| String redeployPrefix = "Redeploy all WARs (count "+wars.size()+")"; |
| |
| log.debug("Redeplying all WARs across cluster "+this+": "+getConfig(WARS_BY_CONTEXT)); |
| |
| Iterable<CanDeployAndUndeploy> targetEntities = Iterables.filter(getChildren(), CanDeployAndUndeploy.class); |
| TaskBuilder<Void> tb = Tasks.<Void>builder().parallel(true).displayName(redeployPrefix+" across cluster (size "+Iterables.size(targetEntities)+")"); |
| for (Entity targetEntity: targetEntities) { |
| TaskBuilder<Void> redeployAllToTarget = Tasks.<Void>builder().displayName(redeployPrefix+" at "+targetEntity+" (after ready check)"); |
| for (String warContextPath: wars.keySet()) { |
| redeployAllToTarget.add(Effectors.invocation(targetEntity, DEPLOY, MutableMap.of("url", wars.get(warContextPath), "targetName", warContextPath))); |
| } |
| tb.add(whenServiceUp(targetEntity, redeployAllToTarget.build(), redeployPrefix+" at "+targetEntity+" when ready")); |
| } |
| DynamicTasks.queueIfPossible(tb.build()).orSubmitAsync(this).asTask().getUnchecked(); |
| } |
| |
| } |