| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.resource; |
| |
| import java.lang.annotation.Annotation; |
| import java.lang.reflect.InvocationTargetException; |
| import java.lang.reflect.Method; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteFileSystem; |
| import org.apache.ignite.cache.store.CacheStoreSession; |
| import org.apache.ignite.compute.ComputeJob; |
| import org.apache.ignite.compute.ComputeJobContext; |
| import org.apache.ignite.compute.ComputeLoadBalancer; |
| import org.apache.ignite.compute.ComputeTask; |
| import org.apache.ignite.compute.ComputeTaskContinuousMapper; |
| import org.apache.ignite.compute.ComputeTaskSession; |
| import org.apache.ignite.internal.GridInternalWrapper; |
| import org.apache.ignite.internal.GridJobContextImpl; |
| import org.apache.ignite.internal.GridKernalContext; |
| import org.apache.ignite.internal.GridTaskSessionImpl; |
| import org.apache.ignite.internal.managers.deployment.GridDeployment; |
| import org.apache.ignite.internal.processors.GridProcessorAdapter; |
| import org.apache.ignite.internal.util.typedef.X; |
| import org.apache.ignite.internal.util.typedef.internal.S; |
| import org.apache.ignite.lifecycle.LifecycleBean; |
| import org.apache.ignite.services.Service; |
| import org.apache.ignite.spi.IgniteSpi; |
| import org.jetbrains.annotations.Nullable; |
| |
| /** |
| * Processor for all Ignite and task/job resources. |
| */ |
| public class GridResourceProcessor extends GridProcessorAdapter { |
| /** Cleaning injector. */ |
| private final GridResourceInjector nullInjector = new GridResourceBasicInjector<>(null); |
| |
| /** */ |
| private GridSpringResourceContext rsrcCtx; |
| |
| /** */ |
| private final GridResourceIoc ioc = new GridResourceIoc(); |
| |
| /** */ |
| private final GridResourceInjector[] injectorByAnnotation; |
| |
| /** |
| * Creates resources processor. |
| * |
| * @param ctx Kernal context. |
| */ |
| public GridResourceProcessor(GridKernalContext ctx) { |
| super(ctx); |
| |
| injectorByAnnotation = new GridResourceInjector[GridResourceIoc.ResourceAnnotation.values().length]; |
| |
| injectorByAnnotation[GridResourceIoc.ResourceAnnotation.SERVICE.ordinal()] = |
| new GridResourceServiceInjector(ctx.grid()); |
| injectorByAnnotation[GridResourceIoc.ResourceAnnotation.LOGGER.ordinal()] = |
| new GridResourceLoggerInjector(ctx.config().getGridLogger()); |
| injectorByAnnotation[GridResourceIoc.ResourceAnnotation.IGNITE_INSTANCE.ordinal()] = |
| new GridResourceBasicInjector<>(ctx.grid()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void start() throws IgniteCheckedException { |
| if (log.isDebugEnabled()) |
| log.debug("Started resource processor."); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void stop(boolean cancel) { |
| ioc.undeployAll(); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Stopped resource processor."); |
| } |
| |
| /** |
| * Sets Spring resource context. |
| * |
| * @param rsrcCtx Spring resource context. |
| */ |
| public void setSpringContext(@Nullable GridSpringResourceContext rsrcCtx) { |
| this.rsrcCtx = rsrcCtx; |
| |
| GridResourceInjector springCtxInjector = rsrcCtx != null ? rsrcCtx.springContextInjector() : nullInjector; |
| GridResourceInjector springBeanInjector = rsrcCtx != null ? rsrcCtx.springBeanInjector() : nullInjector; |
| |
| injectorByAnnotation[GridResourceIoc.ResourceAnnotation.SPRING.ordinal()] = springBeanInjector; |
| injectorByAnnotation[GridResourceIoc.ResourceAnnotation.SPRING_APPLICATION_CONTEXT.ordinal()] = |
| springCtxInjector; |
| } |
| |
| /** |
| * Callback to be called when class loader is undeployed. |
| * |
| * @param dep Deployment to release resources for. |
| */ |
| public void onUndeployed(GridDeployment dep) { |
| ioc.onUndeployed(dep.classLoader()); |
| } |
| |
| /** |
| * @param dep Deployment. |
| * @param target Target object. |
| * @param annCls Annotation class. |
| * @throws IgniteCheckedException If failed to execute annotated methods. |
| */ |
| public void invokeAnnotated(GridDeployment dep, Object target, Class<? extends Annotation> annCls) |
| throws IgniteCheckedException { |
| if (target != null) { |
| GridResourceMethod[] rsrcMtds = ioc.getMethodsWithAnnotation(dep, target.getClass(), annCls); |
| |
| for (GridResourceMethod rsrcMtd : rsrcMtds) { |
| Method mtd = rsrcMtd.getMethod(); |
| |
| try { |
| // No need to call mtd.setAccessible(true); |
| // It has been called in GridResourceMethod constructor. |
| mtd.invoke(target); |
| } |
| catch (IllegalArgumentException | InvocationTargetException | IllegalAccessException e) { |
| throw new IgniteCheckedException("Failed to invoke annotated method [job=" + target + ", mtd=" + mtd + |
| ", ann=" + annCls + ']', e); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Injects resources into generic class. |
| * |
| * @param dep Deployment. |
| * @param depCls Deployed class. |
| * @param target Target instance to inject into. |
| * @throws IgniteCheckedException Thrown in case of any errors. |
| */ |
| public void inject(GridDeployment dep, Class<?> depCls, Object target) throws IgniteCheckedException { |
| assert target != null; |
| |
| if (log.isDebugEnabled()) |
| log.debug(S.toString("Injecting resources", "target", target, true)); |
| |
| // Unwrap Proxy object. |
| target = unwrapTarget(target); |
| |
| inject(target, GridResourceIoc.AnnotationSet.GENERIC, dep, depCls); |
| } |
| |
| /** |
| * Injects cache name into given object. |
| * |
| * @param obj Object. |
| * @param cacheName Cache name to inject. |
| * @throws IgniteCheckedException If failed to inject. |
| */ |
| public void injectCacheName(Object obj, String cacheName) throws IgniteCheckedException { |
| assert obj != null; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Injecting cache name: " + obj); |
| |
| // Unwrap Proxy object. |
| obj = unwrapTarget(obj); |
| |
| inject(obj, GridResourceIoc.ResourceAnnotation.CACHE_NAME, null, null, cacheName); |
| } |
| |
| /** |
| * Injects cache store session into given object. |
| * |
| * @param obj Object. |
| * @param ses Session to inject. |
| * @return {@code True} if session was injected. |
| * @throws IgniteCheckedException If failed to inject. |
| */ |
| public boolean injectStoreSession(Object obj, CacheStoreSession ses) throws IgniteCheckedException { |
| assert obj != null; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Injecting cache store session: " + obj); |
| |
| // Unwrap Proxy object. |
| obj = unwrapTarget(obj); |
| |
| return inject(obj, GridResourceIoc.ResourceAnnotation.CACHE_STORE_SESSION, null, null, ses); |
| } |
| |
| /** |
| * Injects filesystem instance into given object. |
| * |
| * @param obj Object. |
| * @param igfs Ignite filesystem to inject. |
| * @return {@code True} if filesystem was injected. |
| * @throws IgniteCheckedException If failed to inject. |
| */ |
| public boolean injectFileSystem(Object obj, IgniteFileSystem igfs) throws IgniteCheckedException { |
| assert obj != null; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Injecting cache store session: " + obj); |
| |
| // Unwrap Proxy object. |
| obj = unwrapTarget(obj); |
| |
| return inject(obj, GridResourceIoc.ResourceAnnotation.FILESYSTEM_RESOURCE, null, null, igfs); |
| } |
| |
| /** |
| * @param obj Object to inject. |
| * @throws IgniteCheckedException If failed to inject. |
| */ |
| public void injectGeneric(Object obj) throws IgniteCheckedException { |
| inject(obj, GridResourceIoc.AnnotationSet.GENERIC); |
| } |
| |
| /** |
| * @param obj Object to inject. |
| * @param annSet Supported annotations. |
| * @param params Parameters. |
| * @throws IgniteCheckedException If failed to inject. |
| */ |
| public void inject(Object obj, GridResourceIoc.AnnotationSet annSet, Object... params) |
| throws IgniteCheckedException { |
| assert obj != null; |
| |
| if (log.isDebugEnabled()) |
| log.debug(S.toString("Injecting resources", "obj", obj, true)); |
| |
| // Unwrap Proxy object. |
| obj = unwrapTarget(obj); |
| |
| inject(obj, annSet, null, null, params); |
| } |
| |
| /** |
| * @param obj Object to inject. |
| * @param annSet Supported annotations. |
| * @param dep Deployment. |
| * @param depCls Deployment class. |
| * @param params Parameters. |
| * @throws IgniteCheckedException If failed to inject. |
| */ |
| private void inject(Object obj, |
| GridResourceIoc.AnnotationSet annSet, |
| @Nullable GridDeployment dep, |
| @Nullable Class<?> depCls, |
| Object... params) |
| throws IgniteCheckedException { |
| GridResourceIoc.ClassDescriptor clsDesc = ioc.descriptor(null, obj.getClass()); |
| |
| assert clsDesc != null; |
| |
| if (clsDesc.isAnnotated(annSet) == 0) |
| return; |
| |
| int i = 0; |
| for (GridResourceIoc.ResourceAnnotation ann : annSet.annotations) { |
| if (clsDesc.isAnnotated(ann)) { |
| final GridResourceInjector injector = injectorByAnnotation(ann, i < params.length ? params[i] : null); |
| |
| if (injector != null) |
| clsDesc.inject(obj, ann, injector, dep, depCls); |
| } |
| |
| i++; |
| } |
| } |
| |
| /** |
| * @param obj Object. |
| * @param annSet Supported annotations. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void cleanup(Object obj, GridResourceIoc.AnnotationSet annSet) |
| throws IgniteCheckedException { |
| assert obj != null; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Cleaning up resources: " + obj); |
| |
| // Unwrap Proxy object. |
| obj = unwrapTarget(obj); |
| |
| GridResourceIoc.ClassDescriptor clsDesc = ioc.descriptor(null, obj.getClass()); |
| |
| assert clsDesc != null; |
| |
| if (clsDesc.isAnnotated(annSet) == 0) |
| return; |
| |
| for (GridResourceIoc.ResourceAnnotation ann : annSet.annotations) |
| clsDesc.inject(obj, ann, nullInjector, null, null); |
| } |
| |
| /** |
| * @param ann Annotation. |
| * @param param Injector parameter. |
| * @return Injector. |
| */ |
| private GridResourceInjector injectorByAnnotation(GridResourceIoc.ResourceAnnotation ann, Object param) { |
| final GridResourceInjector res; |
| |
| switch (ann) { |
| case CACHE_NAME: |
| case TASK_SESSION: |
| case LOAD_BALANCER: |
| case TASK_CONTINUOUS_MAPPER: |
| case CACHE_STORE_SESSION: |
| res = new GridResourceBasicInjector<>(param); |
| break; |
| |
| case JOB_CONTEXT: |
| res = new GridResourceJobContextInjector((ComputeJobContext)param); |
| break; |
| |
| case FILESYSTEM_RESOURCE: |
| res = new GridResourceBasicInjector<>(param); |
| break; |
| |
| default: |
| res = injectorByAnnotation[ann.ordinal()]; |
| break; |
| } |
| |
| return res; |
| } |
| |
| /** |
| * @param obj Object to inject. |
| * @param ann Annotation enum. |
| * @param dep Grid deployment object. |
| * @param depCls Grid deployment class. |
| * @param param Resource to inject. |
| * @return {@code True} if resource was injected. |
| * @throws IgniteCheckedException If failed to inject. |
| */ |
| private boolean inject(Object obj, GridResourceIoc.ResourceAnnotation ann, @Nullable GridDeployment dep, |
| @Nullable Class<?> depCls, Object param) |
| throws IgniteCheckedException { |
| GridResourceIoc.ClassDescriptor clsDesc = ioc.descriptor(null, obj.getClass()); |
| |
| assert clsDesc != null; |
| |
| if (clsDesc.isAnnotated(ann)) { |
| GridResourceInjector injector = injectorByAnnotation(ann, param); |
| |
| if (injector != null) |
| return clsDesc.inject(obj, ann, injector, dep, depCls); |
| } |
| |
| return false; |
| } |
| |
| /** |
| * @param obj Object. |
| * @throws IgniteCheckedException If failed. |
| */ |
| public void cleanupGeneric(Object obj) throws IgniteCheckedException { |
| if (obj != null) |
| cleanup(obj, GridResourceIoc.AnnotationSet.GENERIC); |
| } |
| |
| /** |
| * Injects held resources into given {@code job}. |
| * |
| * @param dep Deployment. |
| * @param taskCls Task class. |
| * @param job Grid job to inject resources to. |
| * @param ses Current task session. |
| * @param jobCtx Job context. |
| * @throws IgniteCheckedException Thrown in case of any errors. |
| */ |
| public void inject(GridDeployment dep, Class<?> taskCls, ComputeJob job, ComputeTaskSession ses, |
| GridJobContextImpl jobCtx) throws IgniteCheckedException { |
| if (log.isDebugEnabled()) |
| log.debug(S.toString("Injecting resources", "job", job, true)); |
| |
| // Unwrap Proxy object. |
| Object obj = unwrapTarget(job); |
| |
| injectToJob(dep, taskCls, obj, ses, jobCtx); |
| |
| if (obj instanceof GridInternalWrapper) { |
| Object usrObj = ((GridInternalWrapper)obj).userObject(); |
| |
| if (usrObj != null) |
| injectToJob(dep, taskCls, usrObj, ses, jobCtx); |
| } |
| } |
| |
| /** |
| * Internal routine for resource injection into job. |
| * |
| * @param dep Deployment. |
| * @param taskCls Task class. |
| * @param job Job. |
| * @param ses Session. |
| * @param jobCtx Job context. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void injectToJob(GridDeployment dep, Class<?> taskCls, Object job, ComputeTaskSession ses, |
| GridJobContextImpl jobCtx) throws IgniteCheckedException { |
| |
| inject(job, GridResourceIoc.AnnotationSet.JOB, dep, taskCls, ses, jobCtx); |
| } |
| |
| /** |
| * Injects held resources into given grid task. |
| * |
| * @param dep Deployed class. |
| * @param task Grid task. |
| * @param ses Grid task session. |
| * @param balancer Load balancer. |
| * @param mapper Continuous task mapper. |
| * @throws IgniteCheckedException Thrown in case of any errors. |
| */ |
| public void inject(GridDeployment dep, ComputeTask<?, ?> task, GridTaskSessionImpl ses, |
| ComputeLoadBalancer balancer, ComputeTaskContinuousMapper mapper) throws IgniteCheckedException { |
| if (log.isDebugEnabled()) |
| log.debug(S.toString("Injecting resources", "task", task, true)); |
| |
| // Unwrap Proxy object. |
| Object obj = unwrapTarget(task); |
| |
| inject(obj, GridResourceIoc.AnnotationSet.TASK, dep, null, ses, balancer, mapper); |
| } |
| |
| /** |
| * Checks if annotation presents in specified object. |
| * |
| * @param dep Class deployment. |
| * @param target Object to check. |
| * @param annCls Annotation to find. |
| * @return {@code true} if annotation is presented, {@code false} otherwise. |
| */ |
| public boolean isAnnotationPresent(GridDeployment dep, Object target, Class<? extends Annotation> annCls) { |
| return ioc.isAnnotationPresent(target, annCls, dep); |
| } |
| |
| /** |
| * Checks if annotations presents in specified object. |
| * |
| * @param dep Class deployment. |
| * @param target Object to check. |
| * @param annSet Annotations to find. |
| * @return {@code true} if any annotation is presented, {@code false} if it's not. |
| */ |
| public boolean isAnnotationsPresent(GridDeployment dep, Object target, GridResourceIoc.AnnotationSet annSet) { |
| return ioc.isAnnotationsPresent(dep, target, annSet); |
| } |
| |
| /** |
| * Injects held resources into given SPI implementation. |
| * |
| * @param spi SPI implementation. |
| * @throws IgniteCheckedException Throw in case of any errors. |
| */ |
| public void inject(IgniteSpi spi) throws IgniteCheckedException { |
| injectGeneric(spi); |
| } |
| |
| /** |
| * Cleans up resources from given SPI implementation. Essentially, this |
| * method injects {@code null}s into SPI implementation. |
| * |
| * @param spi SPI implementation. |
| * @throws IgniteCheckedException Thrown in case of any errors. |
| */ |
| public void cleanup(IgniteSpi spi) throws IgniteCheckedException { |
| cleanupGeneric(spi); |
| } |
| |
| /** |
| * Injects held resources into given lifecycle bean. |
| * |
| * @param lifecycleBean Lifecycle bean. |
| * @throws IgniteCheckedException Thrown in case of any errors. |
| */ |
| public void inject(LifecycleBean lifecycleBean) throws IgniteCheckedException { |
| injectGeneric(lifecycleBean); |
| } |
| |
| /** |
| * Cleans up resources from given lifecycle beans. Essentially, this |
| * method injects {@code null}s into lifecycle bean. |
| * |
| * @param lifecycleBean Lifecycle bean. |
| * @throws IgniteCheckedException Thrown in case of any errors. |
| */ |
| public void cleanup(LifecycleBean lifecycleBean) throws IgniteCheckedException { |
| cleanupGeneric(lifecycleBean); |
| } |
| |
| /** |
| * Injects resources into service. |
| * |
| * @param svc Service to inject. |
| * @throws IgniteCheckedException If failed. |
| */ |
| public void inject(Service svc) throws IgniteCheckedException { |
| injectGeneric(svc); |
| } |
| |
| /** |
| * Cleans up resources from given service. Essentially, this |
| * method injects {@code null}s into service bean. |
| * |
| * @param svc Service. |
| * @throws IgniteCheckedException Thrown in case of any errors. |
| */ |
| public void cleanup(Service svc) throws IgniteCheckedException { |
| cleanupGeneric(svc); |
| } |
| |
| /** |
| * This method is declared public as it is used from tests as well. |
| * Note, that this method can be used only with unwrapped objects |
| * (see {@link #unwrapTarget(Object)}). |
| * |
| * @param target Target object. |
| * @param annCls Setter annotation. |
| * @param rsrc Resource to inject. |
| * @throws IgniteCheckedException If injection failed. |
| */ |
| public void injectBasicResource(Object target, Class<? extends Annotation> annCls, Object rsrc) |
| throws IgniteCheckedException { |
| // Safety. |
| assert !(rsrc instanceof GridResourceInjector) : "Invalid injection."; |
| |
| // Basic injection don't cache anything. Use null as a key. |
| ioc.inject(target, annCls, new GridResourceBasicInjector<>(rsrc), null, null); |
| } |
| |
| /** |
| * Returns GridResourceIoc object. For tests only!!! |
| * |
| * @return GridResourceIoc object. |
| */ |
| GridResourceIoc getResourceIoc() { |
| return ioc; |
| } |
| |
| /** |
| * Return original object if Spring AOP used with proxy objects. |
| * |
| * @param target Target object. |
| * @return Original object wrapped by proxy. |
| * @throws IgniteCheckedException If unwrap failed. |
| */ |
| private Object unwrapTarget(Object target) throws IgniteCheckedException { |
| return rsrcCtx != null ? rsrcCtx.unwrapTarget(target) : target; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void printMemoryStats() { |
| X.println(">>>"); |
| X.println(">>> Resource processor memory stats [igniteInstanceName=" + ctx.igniteInstanceName() + ']'); |
| |
| ioc.printMemoryStats(); |
| } |
| } |