blob: 2e9b48e2abc1f044d457e1c13b0ca0ff79a53f8c [file] [log] [blame]
/*
* Copyright 2012 International Business Machines Corp.
*
* See the NOTICE file distributed with this work for additional information
* regarding copyright ownership. Licensed under the Apache License,
* Version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.batchee.container.proxy;
import java.lang.reflect.Proxy;
import java.util.HashSet;
import java.util.Set;
import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
import org.apache.batchee.spi.BatchArtifactFactory;
import javax.batch.api.Batchlet;
import javax.batch.api.Decider;
import javax.batch.api.chunk.CheckpointAlgorithm;
import javax.batch.api.chunk.ItemProcessor;
import javax.batch.api.chunk.ItemReader;
import javax.batch.api.chunk.ItemWriter;
import javax.batch.api.partition.PartitionAnalyzer;
import javax.batch.api.partition.PartitionCollector;
import javax.batch.api.partition.PartitionMapper;
import javax.batch.api.partition.PartitionReducer;
/*
* Introduce a level of indirection so proxies are not instantiated directly by newing them up.
*/
public class ProxyFactory {
private static final ThreadLocal<InjectionReferences> INJECTION_CONTEXT = new ThreadLocal<InjectionReferences>();
private ProxyFactory() {
// private utility class ct
}
protected static Object loadArtifact(final BatchArtifactFactory factory, final String id, final InjectionReferences injectionReferences, final RuntimeJobExecution execution) {
INJECTION_CONTEXT.set(injectionReferences);
try {
final BatchArtifactFactory.Instance instance = factory.load(id);
if (instance == null) {
return null;
}
if (instance.getReleasable() != null && execution != null) {
execution.addReleasable(instance.getReleasable());
}
return instance.getValue();
} catch (final Exception e) {
throw new RuntimeException(e);
} finally {
INJECTION_CONTEXT.remove();
}
}
public static InjectionReferences getInjectionReferences() {
return INJECTION_CONTEXT.get();
}
/**
* set the InjectionReferences into the ThreadLocal and return the previously stored value
*/
public static InjectionReferences setInjectionReferences(InjectionReferences injectionReferences) {
InjectionReferences oldRef = INJECTION_CONTEXT.get();
INJECTION_CONTEXT.set(injectionReferences);
if (injectionReferences == null) {
INJECTION_CONTEXT.remove();
}
return oldRef;
}
public static <T> T createProxy(T delegate, InjectionReferences injectionRefs, String... nonExceptionHandlingMethods) {
if (delegate == null) {
// this is allowed per the spec! But don't ask why...
return null;
}
return (T) Proxy.newProxyInstance(delegate.getClass().getClassLoader(), getInterfaces(delegate.getClass()),
new BatchProxyInvocationHandler(delegate, injectionRefs, nonExceptionHandlingMethods));
}
/*
* Decider
*/
public static Decider createDeciderProxy(final BatchArtifactFactory factory, final String id, final InjectionReferences injectionRefs,
final RuntimeJobExecution execution) {
return createProxy((Decider) loadArtifact(factory, id, injectionRefs, execution), injectionRefs);
}
/*
* Batchlet artifact
*/
public static Batchlet createBatchletProxy(final BatchArtifactFactory factory, final String id, final InjectionReferences injectionRefs,
final RuntimeJobExecution execution) {
final Batchlet loadedArtifact = (Batchlet) loadArtifact(factory, id, injectionRefs, execution);
return createProxy(loadedArtifact, injectionRefs);
}
/*
* The four main chunk-related artifacts
*/
public static CheckpointAlgorithmProxy createCheckpointAlgorithmProxy(final BatchArtifactFactory factory, final String id, final InjectionReferences injectionRefs,
final RuntimeJobExecution execution) {
final CheckpointAlgorithm loadedArtifact = (CheckpointAlgorithm) loadArtifact(factory, id, injectionRefs, execution);
final CheckpointAlgorithmProxy proxy = new CheckpointAlgorithmProxy(loadedArtifact);
proxy.setStepContext(injectionRefs.getStepContext());
return proxy;
}
public static ItemReader createItemReaderProxy(final BatchArtifactFactory factory, final String id, final InjectionReferences injectionRefs,
final RuntimeJobExecution execution) {
final ItemReader loadedArtifact = (ItemReader) loadArtifact(factory, id, injectionRefs, execution);
return createProxy(loadedArtifact, injectionRefs, "readItem");
}
public static ItemProcessor createItemProcessorProxy(final BatchArtifactFactory factory, final String id, final InjectionReferences injectionRefs,
final RuntimeJobExecution execution) {
final ItemProcessor loadedArtifact = (ItemProcessor) loadArtifact(factory, id, injectionRefs, execution);
return createProxy(loadedArtifact, injectionRefs, "processItem");
}
public static ItemWriter createItemWriterProxy(final BatchArtifactFactory factory, final String id, final InjectionReferences injectionRefs,
final RuntimeJobExecution execution) {
final ItemWriter loadedArtifact = (ItemWriter) loadArtifact(factory, id, injectionRefs, execution);
return createProxy(loadedArtifact, injectionRefs, "writeItems");
}
/*
* The four partition-related artifacts
*/
public static PartitionReducer createPartitionReducerProxy(final BatchArtifactFactory factory, final String id, final InjectionReferences injectionRefs,
final RuntimeJobExecution execution) {
final PartitionReducer loadedArtifact = (PartitionReducer) loadArtifact(factory, id, injectionRefs, execution);
return createProxy(loadedArtifact, injectionRefs);
}
public static PartitionMapper createPartitionMapperProxy(final BatchArtifactFactory factory, final String id, final InjectionReferences injectionRefs,
final RuntimeJobExecution execution) {
final PartitionMapper loadedArtifact = (PartitionMapper) loadArtifact(factory, id, injectionRefs, execution);
return createProxy(loadedArtifact, injectionRefs);
}
public static PartitionAnalyzer createPartitionAnalyzerProxy(final BatchArtifactFactory factory, final String id, final InjectionReferences injectionRefs,
final RuntimeJobExecution execution) {
final PartitionAnalyzer loadedArtifact = (PartitionAnalyzer) loadArtifact(factory, id, injectionRefs, execution);
return createProxy(loadedArtifact, injectionRefs);
}
public static PartitionCollector createPartitionCollectorProxy(final BatchArtifactFactory factory, final String id, final InjectionReferences injectionRefs,
final RuntimeJobExecution execution) {
final PartitionCollector loadedArtifact = (PartitionCollector) loadArtifact(factory, id, injectionRefs, execution);
return createProxy(loadedArtifact, injectionRefs);
}
/**
* @return all the interfaces fo the given class and it's superclasses
*/
private static Class<?>[] getInterfaces(Class<?> clazz) {
if (clazz.getSuperclass() == Object.class) {
return clazz.getInterfaces();
} else {
Set<Class<?>> clazzes = new HashSet<Class<?>>();
while (clazz != Object.class) {
for (Class<?> interfaceClass : clazz.getInterfaces()) {
clazzes.add(interfaceClass);
}
clazz = clazz.getSuperclass();
}
return clazzes.toArray(new Class<?>[clazzes.size()]);
}
}
}