| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.camel.component.aws.lambda; |
| |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.nio.ByteBuffer; |
| import java.nio.charset.StandardCharsets; |
| import java.util.List; |
| import java.util.Map; |
| |
| import com.amazonaws.AmazonServiceException; |
| import com.amazonaws.services.lambda.AWSLambda; |
| import com.amazonaws.services.lambda.model.CreateEventSourceMappingRequest; |
| import com.amazonaws.services.lambda.model.CreateEventSourceMappingResult; |
| import com.amazonaws.services.lambda.model.CreateFunctionRequest; |
| import com.amazonaws.services.lambda.model.CreateFunctionResult; |
| import com.amazonaws.services.lambda.model.DeadLetterConfig; |
| import com.amazonaws.services.lambda.model.DeleteEventSourceMappingRequest; |
| import com.amazonaws.services.lambda.model.DeleteEventSourceMappingResult; |
| import com.amazonaws.services.lambda.model.DeleteFunctionRequest; |
| import com.amazonaws.services.lambda.model.DeleteFunctionResult; |
| import com.amazonaws.services.lambda.model.Environment; |
| import com.amazonaws.services.lambda.model.FunctionCode; |
| import com.amazonaws.services.lambda.model.GetFunctionRequest; |
| import com.amazonaws.services.lambda.model.GetFunctionResult; |
| import com.amazonaws.services.lambda.model.InvokeRequest; |
| import com.amazonaws.services.lambda.model.InvokeResult; |
| import com.amazonaws.services.lambda.model.ListEventSourceMappingsRequest; |
| import com.amazonaws.services.lambda.model.ListEventSourceMappingsResult; |
| import com.amazonaws.services.lambda.model.ListFunctionsResult; |
| import com.amazonaws.services.lambda.model.ListTagsRequest; |
| import com.amazonaws.services.lambda.model.ListTagsResult; |
| import com.amazonaws.services.lambda.model.ListVersionsByFunctionRequest; |
| import com.amazonaws.services.lambda.model.ListVersionsByFunctionResult; |
| import com.amazonaws.services.lambda.model.PublishVersionRequest; |
| import com.amazonaws.services.lambda.model.PublishVersionResult; |
| import com.amazonaws.services.lambda.model.TagResourceRequest; |
| import com.amazonaws.services.lambda.model.TagResourceResult; |
| import com.amazonaws.services.lambda.model.TracingConfig; |
| import com.amazonaws.services.lambda.model.UntagResourceRequest; |
| import com.amazonaws.services.lambda.model.UntagResourceResult; |
| import com.amazonaws.services.lambda.model.UpdateFunctionCodeRequest; |
| import com.amazonaws.services.lambda.model.UpdateFunctionCodeResult; |
| import com.amazonaws.services.lambda.model.VpcConfig; |
| import com.amazonaws.util.IOUtils; |
| |
| import org.apache.camel.Endpoint; |
| import org.apache.camel.Exchange; |
| import org.apache.camel.Message; |
| import org.apache.camel.support.DefaultProducer; |
| import org.apache.camel.util.CastUtils; |
| import org.apache.camel.util.ObjectHelper; |
| |
| /** |
| * A Producer which sends messages to the Amazon Web Service Lambda <a |
| * href="https://aws.amazon.com/lambda/">AWS Lambda</a> |
| */ |
| public class LambdaProducer extends DefaultProducer { |
| |
| public LambdaProducer(final Endpoint endpoint) { |
| super(endpoint); |
| } |
| |
| @Override |
| public void process(final Exchange exchange) throws Exception { |
| switch (determineOperation(exchange)) { |
| case getFunction: |
| getFunction(getEndpoint().getAwsLambdaClient(), exchange); |
| break; |
| case createFunction: |
| createFunction(getEndpoint().getAwsLambdaClient(), exchange); |
| break; |
| case deleteFunction: |
| deleteFunction(getEndpoint().getAwsLambdaClient(), exchange); |
| break; |
| case invokeFunction: |
| invokeFunction(getEndpoint().getAwsLambdaClient(), exchange); |
| break; |
| case listFunctions: |
| listFunctions(getEndpoint().getAwsLambdaClient(), exchange); |
| break; |
| case updateFunction: |
| updateFunction(getEndpoint().getAwsLambdaClient(), exchange); |
| break; |
| case createEventSourceMapping: |
| createEventSourceMapping(getEndpoint().getAwsLambdaClient(), exchange); |
| break; |
| case deleteEventSourceMapping: |
| deleteEventSourceMapping(getEndpoint().getAwsLambdaClient(), exchange); |
| break; |
| case listEventSourceMapping: |
| listEventSourceMapping(getEndpoint().getAwsLambdaClient(), exchange); |
| break; |
| case listTags: |
| listTags(getEndpoint().getAwsLambdaClient(), exchange); |
| break; |
| case tagResource: |
| tagResource(getEndpoint().getAwsLambdaClient(), exchange); |
| break; |
| case untagResource: |
| untagResource(getEndpoint().getAwsLambdaClient(), exchange); |
| break; |
| case publishVersion: |
| publishVersion(getEndpoint().getAwsLambdaClient(), exchange); |
| break; |
| case listVersions: |
| listVersions(getEndpoint().getAwsLambdaClient(), exchange); |
| break; |
| default: |
| throw new IllegalArgumentException("Unsupported operation"); |
| } |
| } |
| |
| private void getFunction(AWSLambda lambdaClient, Exchange exchange) { |
| GetFunctionResult result; |
| try { |
| result = lambdaClient.getFunction(new GetFunctionRequest().withFunctionName(getConfiguration().getFunction())); |
| } catch (AmazonServiceException ase) { |
| log.trace("getFunction command returned the error code {}", ase.getErrorCode()); |
| throw ase; |
| } |
| Message message = getMessageForResponse(exchange); |
| message.setBody(result); |
| } |
| |
| private void deleteFunction(AWSLambda lambdaClient, Exchange exchange) { |
| DeleteFunctionResult result; |
| try { |
| result = lambdaClient.deleteFunction(new DeleteFunctionRequest().withFunctionName(getConfiguration().getFunction())); |
| } catch (AmazonServiceException ase) { |
| log.trace("deleteFunction command returned the error code {}", ase.getErrorCode()); |
| throw ase; |
| } |
| Message message = getMessageForResponse(exchange); |
| message.setBody(result); |
| } |
| |
| private void listFunctions(AWSLambda lambdaClient, Exchange exchange) { |
| ListFunctionsResult result; |
| try { |
| result = lambdaClient.listFunctions(); |
| } catch (AmazonServiceException ase) { |
| log.trace("listFunctions command returned the error code {}", ase.getErrorCode()); |
| throw ase; |
| } |
| Message message = getMessageForResponse(exchange); |
| message.setBody(result); |
| } |
| |
| private void invokeFunction(AWSLambda lambdaClient, Exchange exchange) { |
| InvokeResult result; |
| try { |
| InvokeRequest request = new InvokeRequest() |
| .withFunctionName(getConfiguration().getFunction()) |
| .withPayload(exchange.getIn().getBody(String.class)); |
| result = lambdaClient.invoke(request); |
| } catch (AmazonServiceException ase) { |
| log.trace("invokeFunction command returned the error code {}", ase.getErrorCode()); |
| throw ase; |
| } |
| Message message = getMessageForResponse(exchange); |
| message.setBody(StandardCharsets.UTF_8.decode(result.getPayload()).toString()); |
| } |
| |
| private void createFunction(AWSLambda lambdaClient, Exchange exchange) throws Exception { |
| CreateFunctionResult result; |
| |
| try { |
| CreateFunctionRequest request = new CreateFunctionRequest() |
| .withFunctionName(getConfiguration().getFunction()); |
| |
| FunctionCode functionCode = new FunctionCode(); |
| if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.S3_BUCKET))) { |
| String s3Bucket = exchange.getIn().getHeader(LambdaConstants.S3_BUCKET, String.class); |
| functionCode.withS3Bucket(s3Bucket); |
| } |
| |
| if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.S3_KEY))) { |
| String s3Key = exchange.getIn().getHeader(LambdaConstants.S3_KEY, String.class); |
| functionCode.withS3Key(s3Key); |
| } |
| |
| if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.S3_OBJECT_VERSION))) { |
| String s3ObjectVersion = exchange.getIn().getHeader(LambdaConstants.S3_OBJECT_VERSION, String.class); |
| functionCode.withS3ObjectVersion(s3ObjectVersion); |
| } |
| |
| if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.ZIP_FILE))) { |
| String zipFile = exchange.getIn().getHeader(LambdaConstants.ZIP_FILE, String.class); |
| File fileLocalPath = new File(zipFile); |
| FileInputStream inputStream = new FileInputStream(fileLocalPath); |
| functionCode.withZipFile(ByteBuffer.wrap(IOUtils.toByteArray(inputStream))); |
| } |
| if (ObjectHelper.isNotEmpty(exchange.getIn().getBody())) { |
| functionCode.withZipFile(exchange.getIn().getBody(ByteBuffer.class)); |
| } |
| |
| if (ObjectHelper.isNotEmpty(exchange.getIn().getBody()) |
| || (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.S3_BUCKET)) && ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.S3_KEY)))) { |
| request.withCode(functionCode); |
| } else { |
| throw new IllegalArgumentException("At least S3 bucket/S3 key or zip file must be specified"); |
| } |
| |
| if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.ROLE))) { |
| request.withRole(exchange.getIn().getHeader(LambdaConstants.ROLE, String.class)); |
| } else { |
| throw new IllegalArgumentException("Role must be specified"); |
| } |
| |
| if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.RUNTIME))) { |
| request.withRuntime(exchange.getIn().getHeader(LambdaConstants.RUNTIME, String.class)); |
| } else { |
| throw new IllegalArgumentException("Runtime must be specified"); |
| } |
| |
| if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.HANDLER))) { |
| request.withHandler(exchange.getIn().getHeader(LambdaConstants.HANDLER, String.class)); |
| } else { |
| throw new IllegalArgumentException("Handler must be specified"); |
| } |
| |
| if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.DESCRIPTION))) { |
| String description = exchange.getIn().getHeader(LambdaConstants.DESCRIPTION, String.class); |
| request.withDescription(description); |
| } |
| |
| if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.TARGET_ARN))) { |
| String targetArn = exchange.getIn().getHeader(LambdaConstants.TARGET_ARN, String.class); |
| request.withDeadLetterConfig(new DeadLetterConfig().withTargetArn(targetArn)); |
| } |
| |
| if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.MEMORY_SIZE))) { |
| Integer memorySize = exchange.getIn().getHeader(LambdaConstants.MEMORY_SIZE, Integer.class); |
| request.withMemorySize(memorySize); |
| } |
| |
| if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.KMS_KEY_ARN))) { |
| String kmsKeyARN = exchange.getIn().getHeader(LambdaConstants.KMS_KEY_ARN, String.class); |
| request.withKMSKeyArn(kmsKeyARN); |
| } |
| if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.PUBLISH))) { |
| Boolean publish = exchange.getIn().getHeader(LambdaConstants.PUBLISH, Boolean.class); |
| request.withPublish(publish); |
| } |
| |
| if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.TIMEOUT))) { |
| Integer timeout = exchange.getIn().getHeader(LambdaConstants.TIMEOUT, Integer.class); |
| request.withTimeout(timeout); |
| } |
| |
| if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.TRACING_CONFIG))) { |
| String tracingConfigMode = exchange.getIn().getHeader(LambdaConstants.TRACING_CONFIG, String.class); |
| request.withTracingConfig(new TracingConfig().withMode(tracingConfigMode)); |
| } |
| |
| if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.SDK_CLIENT_EXECUTION_TIMEOUT))) { |
| Integer timeout = exchange.getIn().getHeader(LambdaConstants.SDK_CLIENT_EXECUTION_TIMEOUT, Integer.class); |
| request.withSdkClientExecutionTimeout(timeout); |
| } |
| |
| if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.SDK_REQUEST_TIMEOUT))) { |
| Integer timeout = exchange.getIn().getHeader(LambdaConstants.SDK_REQUEST_TIMEOUT, Integer.class); |
| request.withSdkRequestTimeout(timeout); |
| } |
| |
| Map<String, String> environmentVariables = CastUtils.cast(exchange.getIn().getHeader(LambdaConstants.ENVIRONMENT_VARIABLES, Map.class)); |
| if (environmentVariables != null) { |
| request.withEnvironment(new Environment().withVariables(environmentVariables)); |
| } |
| |
| Map<String, String> tags = CastUtils.cast(exchange.getIn().getHeader(LambdaConstants.TAGS, Map.class)); |
| if (tags != null) { |
| request.withTags(tags); |
| } |
| |
| List<String> securityGroupIds = CastUtils.cast(exchange.getIn().getHeader(LambdaConstants.SECURITY_GROUP_IDS, (Class<List<String>>) (Object) List.class)); |
| List<String> subnetIds = CastUtils.cast(exchange.getIn().getHeader(LambdaConstants.SUBNET_IDS, (Class<List<String>>) (Object) List.class)); |
| if (securityGroupIds != null || subnetIds != null) { |
| VpcConfig vpcConfig = new VpcConfig(); |
| if (securityGroupIds != null) { |
| vpcConfig.withSecurityGroupIds(securityGroupIds); |
| } |
| if (subnetIds != null) { |
| vpcConfig.withSubnetIds(subnetIds); |
| } |
| request.withVpcConfig(vpcConfig); |
| } |
| result = lambdaClient.createFunction(request); |
| |
| } catch (AmazonServiceException ase) { |
| log.trace("createFunction command returned the error code {}", ase.getErrorCode()); |
| throw ase; |
| } |
| |
| Message message = getMessageForResponse(exchange); |
| message.setBody(result); |
| } |
| |
| private void updateFunction(AWSLambda lambdaClient, Exchange exchange) throws Exception { |
| UpdateFunctionCodeResult result; |
| |
| try { |
| UpdateFunctionCodeRequest request = new UpdateFunctionCodeRequest() |
| .withFunctionName(getConfiguration().getFunction()); |
| |
| FunctionCode functionCode = new FunctionCode(); |
| if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.S3_BUCKET))) { |
| String s3Bucket = exchange.getIn().getHeader(LambdaConstants.S3_BUCKET, String.class); |
| functionCode.withS3Bucket(s3Bucket); |
| } |
| |
| if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.S3_KEY))) { |
| String s3Key = exchange.getIn().getHeader(LambdaConstants.S3_KEY, String.class); |
| functionCode.withS3Key(s3Key); |
| } |
| |
| if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.S3_OBJECT_VERSION))) { |
| String s3ObjectVersion = exchange.getIn().getHeader(LambdaConstants.S3_OBJECT_VERSION, String.class); |
| functionCode.withS3ObjectVersion(s3ObjectVersion); |
| } |
| |
| if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.ZIP_FILE))) { |
| String zipFile = exchange.getIn().getHeader(LambdaConstants.ZIP_FILE, String.class); |
| File fileLocalPath = new File(zipFile); |
| FileInputStream inputStream = new FileInputStream(fileLocalPath); |
| functionCode.withZipFile(ByteBuffer.wrap(IOUtils.toByteArray(inputStream))); |
| } |
| if (ObjectHelper.isNotEmpty(exchange.getIn().getBody())) { |
| functionCode.withZipFile(exchange.getIn().getBody(ByteBuffer.class)); |
| } |
| |
| if (ObjectHelper.isEmpty(exchange.getIn().getBody()) |
| && (ObjectHelper.isEmpty(exchange.getIn().getHeader(LambdaConstants.S3_BUCKET)) && ObjectHelper.isEmpty(exchange.getIn().getHeader(LambdaConstants.S3_KEY)))) { |
| throw new IllegalArgumentException("At least S3 bucket/S3 key or zip file must be specified"); |
| } |
| |
| if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.PUBLISH))) { |
| Boolean publish = exchange.getIn().getHeader(LambdaConstants.PUBLISH, Boolean.class); |
| request.withPublish(publish); |
| } |
| |
| if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.SDK_CLIENT_EXECUTION_TIMEOUT))) { |
| Integer timeout = exchange.getIn().getHeader(LambdaConstants.SDK_CLIENT_EXECUTION_TIMEOUT, Integer.class); |
| request.withSdkClientExecutionTimeout(timeout); |
| } |
| |
| if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.SDK_REQUEST_TIMEOUT))) { |
| Integer timeout = exchange.getIn().getHeader(LambdaConstants.SDK_REQUEST_TIMEOUT, Integer.class); |
| request.withSdkRequestTimeout(timeout); |
| } |
| |
| result = lambdaClient.updateFunctionCode(request); |
| |
| } catch (AmazonServiceException ase) { |
| log.trace("updateFunction command returned the error code {}", ase.getErrorCode()); |
| throw ase; |
| } |
| |
| Message message = getMessageForResponse(exchange); |
| message.setBody(result); |
| } |
| |
| private void createEventSourceMapping(AWSLambda lambdaClient, Exchange exchange) { |
| CreateEventSourceMappingResult result; |
| try { |
| CreateEventSourceMappingRequest request = new CreateEventSourceMappingRequest().withFunctionName(getConfiguration().getFunction()); |
| if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.EVENT_SOURCE_ARN))) { |
| request.withEventSourceArn(exchange.getIn().getHeader(LambdaConstants.EVENT_SOURCE_ARN, String.class)); |
| } else { |
| throw new IllegalArgumentException("Event Source Arn must be specified"); |
| } |
| if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.EVENT_SOURCE_BATCH_SIZE))) { |
| Integer batchSize = exchange.getIn().getHeader(LambdaConstants.EVENT_SOURCE_BATCH_SIZE, Integer.class); |
| request.withBatchSize(batchSize); |
| } |
| if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.SDK_CLIENT_EXECUTION_TIMEOUT))) { |
| Integer timeout = exchange.getIn().getHeader(LambdaConstants.SDK_CLIENT_EXECUTION_TIMEOUT, Integer.class); |
| request.withSdkClientExecutionTimeout(timeout); |
| } |
| |
| if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.SDK_REQUEST_TIMEOUT))) { |
| Integer timeout = exchange.getIn().getHeader(LambdaConstants.SDK_REQUEST_TIMEOUT, Integer.class); |
| request.withSdkRequestTimeout(timeout); |
| } |
| result = lambdaClient.createEventSourceMapping(request); |
| } catch (AmazonServiceException ase) { |
| log.trace("createEventSourceMapping command returned the error code {}", ase.getErrorCode()); |
| throw ase; |
| } |
| Message message = getMessageForResponse(exchange); |
| message.setBody(result); |
| } |
| |
| private void deleteEventSourceMapping(AWSLambda lambdaClient, Exchange exchange) { |
| DeleteEventSourceMappingResult result; |
| try { |
| DeleteEventSourceMappingRequest request = new DeleteEventSourceMappingRequest(); |
| if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.EVENT_SOURCE_UUID))) { |
| request.withUUID(exchange.getIn().getHeader(LambdaConstants.EVENT_SOURCE_UUID, String.class)); |
| } else { |
| throw new IllegalArgumentException("Event Source Arn must be specified"); |
| } |
| if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.SDK_CLIENT_EXECUTION_TIMEOUT))) { |
| Integer timeout = exchange.getIn().getHeader(LambdaConstants.SDK_CLIENT_EXECUTION_TIMEOUT, Integer.class); |
| request.withSdkClientExecutionTimeout(timeout); |
| } |
| |
| if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.SDK_REQUEST_TIMEOUT))) { |
| Integer timeout = exchange.getIn().getHeader(LambdaConstants.SDK_REQUEST_TIMEOUT, Integer.class); |
| request.withSdkRequestTimeout(timeout); |
| } |
| result = lambdaClient.deleteEventSourceMapping(request); |
| } catch (AmazonServiceException ase) { |
| log.trace("deleteEventSourceMapping command returned the error code {}", ase.getErrorCode()); |
| throw ase; |
| } |
| Message message = getMessageForResponse(exchange); |
| message.setBody(result); |
| } |
| |
| private void listEventSourceMapping(AWSLambda lambdaClient, Exchange exchange) { |
| ListEventSourceMappingsResult result; |
| try { |
| ListEventSourceMappingsRequest request = new ListEventSourceMappingsRequest().withFunctionName(getConfiguration().getFunction()); |
| if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.SDK_CLIENT_EXECUTION_TIMEOUT))) { |
| Integer timeout = exchange.getIn().getHeader(LambdaConstants.SDK_CLIENT_EXECUTION_TIMEOUT, Integer.class); |
| request.withSdkClientExecutionTimeout(timeout); |
| } |
| |
| if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.SDK_REQUEST_TIMEOUT))) { |
| Integer timeout = exchange.getIn().getHeader(LambdaConstants.SDK_REQUEST_TIMEOUT, Integer.class); |
| request.withSdkRequestTimeout(timeout); |
| } |
| result = lambdaClient.listEventSourceMappings(request); |
| } catch (AmazonServiceException ase) { |
| log.trace("listEventSourceMapping command returned the error code {}", ase.getErrorCode()); |
| throw ase; |
| } |
| Message message = getMessageForResponse(exchange); |
| message.setBody(result); |
| } |
| |
| private void listTags(AWSLambda lambdaClient, Exchange exchange) { |
| ListTagsResult result; |
| try { |
| ListTagsRequest request = new ListTagsRequest(); |
| if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.RESOURCE_ARN))) { |
| String resource = exchange.getIn().getHeader(LambdaConstants.RESOURCE_ARN, String.class); |
| request.withResource(resource); |
| } else { |
| throw new IllegalArgumentException("The resource ARN must be specified"); |
| } |
| result = lambdaClient.listTags(request); |
| } catch (AmazonServiceException ase) { |
| log.trace("listTags command returned the error code {}", ase.getErrorCode()); |
| throw ase; |
| } |
| Message message = getMessageForResponse(exchange); |
| message.setBody(result); |
| } |
| |
| private void tagResource(AWSLambda lambdaClient, Exchange exchange) { |
| TagResourceResult result; |
| try { |
| TagResourceRequest request = new TagResourceRequest(); |
| if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.RESOURCE_ARN))) { |
| String resource = exchange.getIn().getHeader(LambdaConstants.RESOURCE_ARN, String.class); |
| request.withResource(resource); |
| } else { |
| throw new IllegalArgumentException("The resource ARN must be specified"); |
| } |
| if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.RESOURCE_TAGS))) { |
| Map<String, String> tags = exchange.getIn().getHeader(LambdaConstants.RESOURCE_TAGS, Map.class); |
| request.withTags(tags); |
| } else { |
| throw new IllegalArgumentException("The tags must be specified"); |
| } |
| result = lambdaClient.tagResource(request); |
| } catch (AmazonServiceException ase) { |
| log.trace("listTags command returned the error code {}", ase.getErrorCode()); |
| throw ase; |
| } |
| Message message = getMessageForResponse(exchange); |
| message.setBody(result); |
| } |
| |
| private void untagResource(AWSLambda lambdaClient, Exchange exchange) { |
| UntagResourceResult result; |
| try { |
| UntagResourceRequest request = new UntagResourceRequest(); |
| if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.RESOURCE_ARN))) { |
| String resource = exchange.getIn().getHeader(LambdaConstants.RESOURCE_ARN, String.class); |
| request.withResource(resource); |
| } else { |
| throw new IllegalArgumentException("The resource ARN must be specified"); |
| } |
| if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.RESOURCE_TAG_KEYS))) { |
| List<String> tagKeys = exchange.getIn().getHeader(LambdaConstants.RESOURCE_TAG_KEYS, List.class); |
| request.withTagKeys(tagKeys); |
| } else { |
| throw new IllegalArgumentException("The tag keys must be specified"); |
| } |
| result = lambdaClient.untagResource(request); |
| } catch (AmazonServiceException ase) { |
| log.trace("untagResource command returned the error code {}", ase.getErrorCode()); |
| throw ase; |
| } |
| Message message = getMessageForResponse(exchange); |
| message.setBody(result); |
| } |
| |
| private void publishVersion(AWSLambda lambdaClient, Exchange exchange) { |
| PublishVersionResult result; |
| try { |
| PublishVersionRequest request = new PublishVersionRequest().withFunctionName(getConfiguration().getFunction()); |
| if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.VERSION_DESCRIPTION))) { |
| String description = exchange.getIn().getHeader(LambdaConstants.VERSION_DESCRIPTION, String.class); |
| request.withDescription(description); |
| } |
| if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(LambdaConstants.VERSION_REVISION_ID))) { |
| String revisionId = exchange.getIn().getHeader(LambdaConstants.VERSION_REVISION_ID, String.class); |
| request.withRevisionId(revisionId); |
| } |
| result = lambdaClient.publishVersion(request); |
| } catch (AmazonServiceException ase) { |
| log.trace("publishVersion command returned the error code {}", ase.getErrorCode()); |
| throw ase; |
| } |
| Message message = getMessageForResponse(exchange); |
| message.setBody(result); |
| } |
| |
| private void listVersions(AWSLambda lambdaClient, Exchange exchange) { |
| ListVersionsByFunctionResult result; |
| try { |
| ListVersionsByFunctionRequest request = new ListVersionsByFunctionRequest().withFunctionName(getConfiguration().getFunction()); |
| result = lambdaClient.listVersionsByFunction(request); |
| } catch (AmazonServiceException ase) { |
| log.trace("publishVersion command returned the error code {}", ase.getErrorCode()); |
| throw ase; |
| } |
| Message message = getMessageForResponse(exchange); |
| message.setBody(result); |
| } |
| |
| private LambdaOperations determineOperation(Exchange exchange) { |
| LambdaOperations operation = exchange.getIn().getHeader(LambdaConstants.OPERATION, LambdaOperations.class); |
| if (operation == null) { |
| operation = getConfiguration().getOperation(); |
| } |
| return operation; |
| } |
| |
| protected LambdaConfiguration getConfiguration() { |
| return getEndpoint().getConfiguration(); |
| } |
| |
| @Override |
| public LambdaEndpoint getEndpoint() { |
| return (LambdaEndpoint) super.getEndpoint(); |
| } |
| |
| public static Message getMessageForResponse(final Exchange exchange) { |
| if (exchange.getPattern().isOutCapable()) { |
| Message out = exchange.getOut(); |
| out.copyFrom(exchange.getIn()); |
| return out; |
| } |
| return exchange.getIn(); |
| } |
| |
| } |