blob: 58fd43df09e5960f88e2560d5159d783736b5d9b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.geode.management.internal.functions;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.healthmarketscience.rmiio.RemoteInputStream;
import com.healthmarketscience.rmiio.RemoteInputStreamClient;
import org.apache.commons.io.IOUtils;
import org.apache.logging.log4j.Logger;
import org.apache.geode.annotations.Immutable;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.execute.FunctionContext;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.execute.InternalFunction;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.management.api.RealizationResult;
import org.apache.geode.management.configuration.AbstractConfiguration;
import org.apache.geode.management.configuration.Deployment;
import org.apache.geode.management.configuration.DiskStore;
import org.apache.geode.management.configuration.GatewayReceiver;
import org.apache.geode.management.configuration.HasFile;
import org.apache.geode.management.configuration.Index;
import org.apache.geode.management.configuration.Member;
import org.apache.geode.management.configuration.Pdx;
import org.apache.geode.management.configuration.Region;
import org.apache.geode.management.internal.CacheElementOperation;
import org.apache.geode.management.internal.beans.FileUploader;
import org.apache.geode.management.internal.configuration.realizers.ConfigurationRealizer;
import org.apache.geode.management.internal.configuration.realizers.DeploymentRealizer;
import org.apache.geode.management.internal.configuration.realizers.DiskStoreRealizer;
import org.apache.geode.management.internal.configuration.realizers.GatewayReceiverRealizer;
import org.apache.geode.management.internal.configuration.realizers.IndexRealizer;
import org.apache.geode.management.internal.configuration.realizers.MemberRealizer;
import org.apache.geode.management.internal.configuration.realizers.PdxRealizer;
import org.apache.geode.management.internal.configuration.realizers.RegionConfigRealizer;
import org.apache.geode.management.runtime.RuntimeInfo;
public class CacheRealizationFunction implements InternalFunction<List> {
private static final Logger logger = LogService.getLogger();
@Immutable
private static final Map<Class, ConfigurationRealizer> realizers = new HashMap<>();
private static final long serialVersionUID = -2695517414081975343L;
static {
realizers.put(Region.class, new RegionConfigRealizer());
realizers.put(GatewayReceiver.class, new GatewayReceiverRealizer());
realizers.put(Member.class, new MemberRealizer());
realizers.put(Pdx.class, new PdxRealizer());
realizers.put(Deployment.class, new DeploymentRealizer());
realizers.put(Index.class, new IndexRealizer());
realizers.put(DiskStore.class, new DiskStoreRealizer());
}
@Override
public void execute(FunctionContext<List> context) {
AbstractConfiguration cacheElement = (AbstractConfiguration) context.getArguments().get(0);
CacheElementOperation operation = (CacheElementOperation) context.getArguments().get(1);
// for get operation, caller is expecting RuntimeInfo
if (operation == CacheElementOperation.GET) {
try {
InternalCache cache = (InternalCache) context.getCache();
context.getResultSender().lastResult(executeGet(context, cache, cacheElement));
} catch (CacheClosedException e) {
// cache not ready or closed already, no need to log and do not return any runtime info
context.getResultSender().lastResult(null);
} catch (Exception e) {
logError("Unable to gather runtime information on this member. ", e);
context.getResultSender().lastResult(null);
}
}
// for other operations, caller is expecting RealizationResult
else {
try {
RemoteInputStream jarStream = (RemoteInputStream) context.getArguments().get(2);
InternalCache cache = (InternalCache) context.getCache();
context.getResultSender()
.lastResult(
executeUpdate(context.getMemberName(), cache, cacheElement, operation, jarStream));
} catch (CacheClosedException e) {
// cache not ready or closed already, no need to log it
context.getResultSender().lastResult(new RealizationResult()
.setSuccess(false)
.setMessage(e.getMessage()));
} catch (Exception e) {
logError("unable to update cache with the configuration.", e);
context.getResultSender().lastResult(new RealizationResult()
.setSuccess(false)
.setMemberName(context.getMemberName())
.setMessage(e.getMessage()));
}
}
}
@VisibleForTesting
void logError(String s, Exception e) {
logger.error(s, e);
}
public RuntimeInfo executeGet(FunctionContext<List> context,
InternalCache cache, AbstractConfiguration cacheElement) {
ConfigurationRealizer realizer = realizers.get(cacheElement.getClass());
if (realizer == null) {
return null;
}
RuntimeInfo runtimeInfo = realizer.get(cacheElement, cache);
// set the member name if this is not a global runtime
if (!cacheElement.isGlobalRuntime()) {
runtimeInfo.setMemberName(context.getMemberName());
}
return runtimeInfo;
}
public RealizationResult executeUpdate(String memberName,
InternalCache cache, AbstractConfiguration cacheElement,
CacheElementOperation operation, RemoteInputStream jarStream) throws Exception {
ConfigurationRealizer realizer = realizers.get(cacheElement.getClass());
RealizationResult result = new RealizationResult();
result.setMemberName(memberName);
if (realizer == null || realizer.isReadyOnly()) {
return result.setMessage("Server '" + memberName
+ "' needs to be restarted for this configuration change to be realized.");
}
// the the function parameter contains streamed file, staging the file first
if ((cacheElement instanceof HasFile) && jarStream != null) {
HasFile configuration = (HasFile) cacheElement;
Set<File> files = stageFileContent(Collections.singletonList(configuration.getFileName()),
Collections.singletonList(jarStream));
configuration.setFile(files.iterator().next());
}
switch (operation) {
case CREATE:
if (realizer.exists(cacheElement, cache)) {
return result.setMessage(cacheElement.getClass().getSimpleName() + " '"
+ cacheElement.getId() + "' already exists. Skipped creation.");
}
result = realizer.create(cacheElement, cache);
break;
case DELETE:
if (!realizer.exists(cacheElement, cache)) {
return result.setMessage(
cacheElement.getClass().getSimpleName() + " '" + cacheElement.getId()
+ "' does not exist.");
}
result = realizer.delete(cacheElement, cache);
break;
case UPDATE:
if (!realizer.exists(cacheElement, cache)) {
return result.setSuccess(false).setMessage(
cacheElement.getClass().getSimpleName() + " '" + cacheElement.getId()
+ "' does not exist.");
}
result = realizer.update(cacheElement, cache);
break;
}
result.setMemberName(memberName);
return result;
}
public static Set<File> stageFileContent(List<String> jarNames,
List<RemoteInputStream> jarStreams) throws IOException {
Set<File> stagedJars = new HashSet<>();
try {
Path tempDir = FileUploader.createSecuredTempDirectory("deploy-");
for (int i = 0; i < jarNames.size(); i++) {
Path tempJar = Paths.get(tempDir.toString(), jarNames.get(i));
FileOutputStream fos = new FileOutputStream(tempJar.toString());
InputStream input = RemoteInputStreamClient.wrap(jarStreams.get(i));
IOUtils.copyLarge(input, fos);
fos.close();
input.close();
stagedJars.add(tempJar.toFile());
}
} catch (IOException iox) {
for (RemoteInputStream jarStream : jarStreams) {
try {
jarStream.close(true);
} catch (IOException ex) {
// Ignored
}
}
throw iox;
}
return stagedJars;
}
@Override
public boolean isHA() {
return false;
}
}