| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.spi.checkpoint.s3; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.Map; |
| import com.amazonaws.AmazonClientException; |
| import com.amazonaws.AmazonServiceException; |
| import com.amazonaws.ClientConfiguration; |
| import com.amazonaws.auth.AWSCredentials; |
| import com.amazonaws.services.s3.AmazonS3; |
| import com.amazonaws.services.s3.AmazonS3Client; |
| import com.amazonaws.services.s3.model.ObjectListing; |
| import com.amazonaws.services.s3.model.ObjectMetadata; |
| import com.amazonaws.services.s3.model.S3Object; |
| import com.amazonaws.services.s3.model.S3ObjectSummary; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.internal.IgniteInterruptedCheckedException; |
| import org.apache.ignite.internal.util.IgniteUtils; |
| import org.apache.ignite.internal.util.tostring.GridToStringExclude; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.internal.S; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.resources.IgniteInstanceResource; |
| import org.apache.ignite.resources.LoggerResource; |
| import org.apache.ignite.spi.IgniteSpiAdapter; |
| import org.apache.ignite.spi.IgniteSpiConfiguration; |
| import org.apache.ignite.spi.IgniteSpiException; |
| import org.apache.ignite.spi.IgniteSpiMBeanAdapter; |
| import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport; |
| import org.apache.ignite.spi.IgniteSpiThread; |
| import org.apache.ignite.spi.checkpoint.CheckpointListener; |
| import org.apache.ignite.spi.checkpoint.CheckpointSpi; |
| import org.jetbrains.annotations.Nullable; |
| |
| /** |
| * This class defines Amazon S3-based implementation for checkpoint SPI. |
| * <p> |
| * For information about Amazon S3 visit <a href="http://aws.amazon.com">aws.amazon.com</a>. |
| * <p> |
| * <h1 class="header">Configuration</h1> |
| * <h2 class="header">Mandatory</h2> |
| * This SPI has one mandatory configuration parameter: |
| * <ul> |
| * <li>AWS credentials (see {@link #setAwsCredentials(AWSCredentials)} |
| * </ul> |
| * <h2 class="header">Optional</h2> |
| * This SPI has following optional configuration parameters: |
| * <ul> |
| * <li>Bucket name suffix (see {@link #setBucketNameSuffix(String)})</li> |
| * <li>Client configuration (see {@link #setClientConfiguration(ClientConfiguration)})</li> |
| * <li>Bucket endpoint (see {@link #setBucketEndpoint(String)})</li> |
| * <li>Server side encryption algorithm (see {@link #setSSEAlgorithm(String)})</li> |
| * </ul> |
| * <h2 class="header">Java Example</h2> |
| * {@link S3CheckpointSpi} can be configured as follows: |
| * <pre name="code" class="java"> |
| * IgniteConfiguration cfg = new IgniteConfiguration(); |
| * |
| * S3CheckpointSpi spi = new S3CheckpointSpi(); |
| * |
| * AWSCredentials cred = new BasicAWSCredentials(YOUR_ACCESS_KEY_ID, YOUR_SECRET_ACCESS_KEY); |
| * |
| * spi.setAwsCredentials(cred); |
| * |
| * spi.setBucketNameSuffix("checkpoints"); |
| * |
| * // Override default checkpoint SPI. |
| * cfg.setCheckpointSpi(cpSpi); |
| * |
| * // Start grid. |
| * G.start(cfg); |
| * </pre> |
| * <h2 class="header">Spring Example</h2> |
| * {@link S3CheckpointSpi} can be configured from Spring XML configuration file: |
| * <pre name="code" class="xml"> |
| * <bean id="grid.custom.cfg" class="org.apache.ignite.configuration.IgniteConfiguration" singleton="true"> |
| * ... |
| * <property name="checkpointSpi"> |
| * <bean class="org.apache.ignite.spi.checkpoint.s3.S3CheckpointSpi"> |
| * <property name="awsCredentials"> |
| * <bean class="com.amazonaws.auth.BasicAWSCredentials"> |
| * <constructor-arg value="YOUR_ACCESS_KEY_ID" /> |
| * <constructor-arg value="YOUR_SECRET_ACCESS_KEY" /> |
| * </bean> |
| * </property> |
| * </bean> |
| * </property> |
| * ... |
| * </bean> |
| * </pre> |
| * <p> |
| * Note that storing data in AWS S3 service will result in charges to your AWS account. |
| * Choose another implementation of {@link org.apache.ignite.spi.checkpoint.CheckpointSpi} for local or |
| * home network tests. |
| * <p> |
| * <img src="http://ignite.apache.org/images/spring-small.png"> |
| * <br> |
| * For information about Spring framework visit <a href="http://www.springframework.org/">www.springframework.org</a> |
| * @see org.apache.ignite.spi.checkpoint.CheckpointSpi |
| */ |
| @IgniteSpiMultipleInstancesSupport(true) |
| public class S3CheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi { |
| /** Logger. */ |
| @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) |
| @LoggerResource |
| private IgniteLogger log; |
| |
| /** Ignite instance. */ |
| @IgniteInstanceResource |
| private Ignite ignite; |
| |
| /** Task that takes care about outdated files. */ |
| private S3TimeoutWorker timeoutWrk; |
| |
| /** Listener. */ |
| private CheckpointListener lsnr; |
| |
| /** Prefix to use in bucket name generation. */ |
| public static final String BUCKET_NAME_PREFIX = "ignite-checkpoint-"; |
| |
| /** Suffix to use in bucket name generation. */ |
| public static final String DFLT_BUCKET_NAME_SUFFIX = "default-bucket"; |
| |
| /** Client to interact with S3 storage. */ |
| @GridToStringExclude |
| private AmazonS3 s3; |
| |
| /** Bucket name suffix (set by user). */ |
| private String bucketNameSuffix; |
| |
| /** Bucket name (generated). */ |
| private String bucketName; |
| |
| /** Bucket endpoint (set by user). */ |
| @Nullable private String bucketEndpoint; |
| |
| /** Server side encryption algorithm */ |
| @Nullable private String sseAlg; |
| |
| /** Amazon client configuration. */ |
| private ClientConfiguration cfg; |
| |
| /** AWS Credentials. */ |
| @GridToStringExclude |
| private AWSCredentials cred; |
| |
| /** Mutex. */ |
| private final Object mux = new Object(); |
| |
| /** |
| * Gets S3 bucket name to use. |
| * |
| * @return S3 bucket name to use. |
| */ |
| public String getBucketName() { |
| return bucketName; |
| } |
| |
| /** |
| * Gets S3 bucket endpoint to use. |
| * |
| * @return S3 bucket endpoint to use. |
| */ |
| @Nullable public String getBucketEndpoint() { |
| return bucketEndpoint; |
| } |
| |
| /** |
| * Gets S3 server-side encryption algorithm. |
| * |
| * @return S3 server-side encryption algorithm to use. |
| */ |
| @Nullable public String getSSEAlgorithm() { |
| return sseAlg; |
| } |
| |
| /** |
| * Gets S3 access key. |
| * |
| * @return S3 access key. |
| */ |
| public String getAccessKey() { |
| return cred.getAWSAccessKeyId(); |
| } |
| |
| /** |
| * Gets S3 secret key. |
| * |
| * @return S3 secret key. |
| */ |
| public String getSecretAccessKey() { |
| return cred.getAWSSecretKey(); |
| } |
| |
| /** |
| * Gets HTTP proxy host. |
| * |
| * @return HTTP proxy host. |
| */ |
| public String getProxyHost() { |
| return cfg.getProxyHost(); |
| } |
| |
| /** |
| * Gets HTTP proxy port. |
| * |
| * @return HTTP proxy port. |
| */ |
| public int getProxyPort() { |
| return cfg.getProxyPort(); |
| } |
| |
| /** |
| * Gets HTTP proxy user name. |
| * |
| * @return HTTP proxy user name. |
| */ |
| public String getProxyUsername() { |
| return cfg.getProxyUsername(); |
| } |
| |
| /** |
| * Gets HTTP proxy password. |
| * |
| * @return HTTP proxy password. |
| */ |
| public String getProxyPassword() { |
| return cfg.getProxyPassword(); |
| } |
| |
| /** |
| * Sets bucket name suffix. |
| * |
| * @param bucketNameSuffix Bucket name suffix. |
| * @return {@code this} for chaining. |
| */ |
| @IgniteSpiConfiguration(optional = true) |
| public S3CheckpointSpi setBucketNameSuffix(String bucketNameSuffix) { |
| this.bucketNameSuffix = bucketNameSuffix; |
| |
| return this; |
| } |
| |
| /** |
| * Sets bucket endpoint. |
| * If the endpoint is not set then S3CheckpointSpi will go to each region to find a corresponding bucket. |
| * For information about possible endpoint names visit |
| * <a href="http://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region">docs.aws.amazon.com</a> |
| * |
| * @param bucketEndpoint Bucket endpoint, for example, {@code }s3.us-east-2.amazonaws.com. |
| * @return {@code this} for chaining. |
| */ |
| @IgniteSpiConfiguration(optional = true) |
| public S3CheckpointSpi setBucketEndpoint(String bucketEndpoint) { |
| this.bucketEndpoint = bucketEndpoint; |
| |
| return this; |
| } |
| |
| /** |
| * Sets server-side encryption algorithm for Amazon S3-managed encryption keys. |
| * For information about possible S3-managed encryption keys visit |
| * <a href="http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingServerSideEncryption.html">docs.aws.amazon.com</a>. |
| * |
| * @param sseAlg Server-side encryption algorithm, for example, AES256 or SSES3. |
| * @return {@code this} for chaining. |
| */ |
| @IgniteSpiConfiguration(optional = true) |
| public S3CheckpointSpi setSSEAlgorithm(String sseAlg) { |
| this.sseAlg = sseAlg; |
| |
| return this; |
| } |
| |
| /** |
| * Sets Amazon client configuration. |
| * <p> |
| * For details refer to Amazon S3 API reference. |
| * |
| * @param cfg Amazon client configuration. |
| * @return {@code this} for chaining. |
| */ |
| @IgniteSpiConfiguration(optional = true) |
| public S3CheckpointSpi setClientConfiguration(ClientConfiguration cfg) { |
| this.cfg = cfg; |
| |
| return this; |
| } |
| |
| /** |
| * Sets AWS credentials. |
| * <p> |
| * For details refer to Amazon S3 API reference. |
| * |
| * @param cred AWS credentials. |
| * @return {@code this} for chaining. |
| */ |
| @IgniteSpiConfiguration(optional = false) |
| public S3CheckpointSpi setAwsCredentials(AWSCredentials cred) { |
| this.cred = cred; |
| |
| return this; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void spiStart(String igniteInstanceName) throws IgniteSpiException { |
| // Start SPI start stopwatch. |
| startStopwatch(); |
| |
| assertParameter(cred != null, "awsCredentials != null"); |
| |
| if (log.isDebugEnabled()) { |
| log.debug(configInfo("awsCredentials", cred)); |
| log.debug(configInfo("clientConfiguration", cfg)); |
| log.debug(configInfo("bucketNameSuffix", bucketNameSuffix)); |
| log.debug(configInfo("bucketEndpoint", bucketEndpoint)); |
| log.debug(configInfo("SSEAlgorithm", sseAlg)); |
| } |
| |
| if (cfg == null) |
| U.warn(log, "Amazon client configuration is not set (will use default)."); |
| |
| if (F.isEmpty(bucketNameSuffix)) { |
| U.warn(log, "Bucket name suffix is null or empty (will use default bucket name)."); |
| |
| bucketName = BUCKET_NAME_PREFIX + DFLT_BUCKET_NAME_SUFFIX; |
| } |
| else |
| bucketName = BUCKET_NAME_PREFIX + bucketNameSuffix; |
| |
| s3 = cfg != null ? new AmazonS3Client(cred, cfg) : new AmazonS3Client(cred); |
| |
| if (!F.isEmpty(bucketEndpoint)) |
| s3.setEndpoint(bucketEndpoint); |
| |
| if (!s3.doesBucketExist(bucketName)) { |
| try { |
| s3.createBucket(bucketName); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Created S3 bucket: " + bucketName); |
| |
| while (!s3.doesBucketExist(bucketName)) |
| try { |
| U.sleep(200); |
| } |
| catch (IgniteInterruptedCheckedException e) { |
| throw new IgniteSpiException("Thread has been interrupted.", e); |
| } |
| } |
| catch (AmazonClientException e) { |
| try { |
| if (!s3.doesBucketExist(bucketName)) |
| throw new IgniteSpiException("Failed to create bucket: " + bucketName, e); |
| } |
| catch (AmazonClientException ignored) { |
| throw new IgniteSpiException("Failed to create bucket: " + bucketName, e); |
| } |
| } |
| } |
| |
| Collection<S3TimeData> s3TimeDataLst = new LinkedList<>(); |
| |
| try { |
| ObjectListing list = s3.listObjects(bucketName); |
| |
| while (true) { |
| for (S3ObjectSummary sum : list.getObjectSummaries()) { |
| S3CheckpointData data = read(sum.getKey()); |
| |
| if (data != null) { |
| s3TimeDataLst.add(new S3TimeData(data.getExpireTime(), data.getKey())); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Registered existing checkpoint from key: " + data.getKey()); |
| } |
| } |
| |
| if (list.isTruncated()) |
| list = s3.listNextBatchOfObjects(list); |
| else |
| break; |
| } |
| } |
| catch (AmazonClientException e) { |
| throw new IgniteSpiException("Failed to read checkpoint bucket: " + bucketName, e); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteSpiException("Failed to marshal/unmarshal objects in bucket: " + bucketName, e); |
| } |
| |
| // Track expiration for only those data that are made by this node |
| timeoutWrk = new S3TimeoutWorker(); |
| |
| timeoutWrk.add(s3TimeDataLst); |
| |
| timeoutWrk.start(); |
| |
| registerMBean(igniteInstanceName, new S3CheckpointSpiMBeanImpl(this), S3CheckpointSpiMBean.class); |
| |
| // Ack ok start. |
| if (log.isDebugEnabled()) |
| log.debug(startInfo()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void spiStop() throws IgniteSpiException { |
| if (timeoutWrk != null) { |
| IgniteUtils.interrupt(timeoutWrk); |
| IgniteUtils.join(timeoutWrk, log); |
| } |
| |
| unregisterMBean(); |
| |
| // Ack ok stop. |
| if (log.isDebugEnabled()) |
| log.debug(stopInfo()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public byte[] loadCheckpoint(String key) throws IgniteSpiException { |
| assert !F.isEmpty(key); |
| |
| try { |
| S3CheckpointData data = read(key); |
| |
| return data != null ? |
| data.getExpireTime() == 0 || data.getExpireTime() > U.currentTimeMillis() ? |
| data.getState() : |
| null : |
| null; |
| } |
| catch (AmazonClientException e) { |
| throw new IgniteSpiException("Failed to read checkpoint key: " + key, e); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteSpiException("Failed to marshal/unmarshal objects in checkpoint key: " + key, e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean saveCheckpoint(String key, byte[] state, long timeout, boolean overwrite) |
| throws IgniteSpiException { |
| assert !F.isEmpty(key); |
| |
| long expireTime = 0; |
| |
| if (timeout > 0) { |
| expireTime = U.currentTimeMillis() + timeout; |
| |
| if (expireTime < 0) |
| expireTime = Long.MAX_VALUE; |
| } |
| |
| try { |
| if (hasKey(key)) { |
| if (!overwrite) |
| return false; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Overriding existing key: " + key); |
| } |
| |
| S3CheckpointData data = new S3CheckpointData(state, expireTime, key); |
| |
| write(data); |
| } |
| catch (AmazonClientException e) { |
| throw new IgniteSpiException("Failed to write checkpoint data [key=" + key + ", state=" + |
| Arrays.toString(state) + ']', e); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteSpiException("Failed to marshal checkpoint data [key=" + key + ", state=" + |
| Arrays.toString(state) + ']', e); |
| } |
| |
| if (timeout > 0) |
| timeoutWrk.add(new S3TimeData(expireTime, key)); |
| |
| return true; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean removeCheckpoint(String key) { |
| assert !F.isEmpty(key); |
| |
| timeoutWrk.remove(key); |
| |
| boolean rmv = false; |
| |
| try { |
| rmv = delete(key); |
| } |
| catch (AmazonClientException e) { |
| U.error(log, "Failed to delete data by key: " + key, e); |
| } |
| |
| if (rmv) { |
| CheckpointListener tmpLsnr = lsnr; |
| |
| if (tmpLsnr != null) |
| tmpLsnr.onCheckpointRemoved(key); |
| } |
| |
| return rmv; |
| } |
| |
| /** |
| * Reads checkpoint data. |
| * |
| * @param key Key name to read data from. |
| * @return Checkpoint data object. |
| * @throws IgniteCheckedException Thrown if an error occurs while unmarshalling. |
| * @throws AmazonClientException If an error occurs while querying Amazon S3. |
| */ |
| @Nullable private S3CheckpointData read(String key) throws IgniteCheckedException, AmazonClientException { |
| assert !F.isEmpty(key); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Reading data from S3 [bucket=" + bucketName + ", key=" + key + ']'); |
| |
| try { |
| S3Object obj = s3.getObject(bucketName, key); |
| |
| InputStream in = obj.getObjectContent(); |
| |
| try { |
| return S3CheckpointData.fromStream(in); |
| } |
| catch (IOException e) { |
| throw new IgniteCheckedException("Failed to unmarshal S3CheckpointData [bucketName=" + |
| bucketName + ", key=" + key + ']', e); |
| } |
| finally { |
| U.closeQuiet(in); |
| } |
| } |
| catch (AmazonServiceException e) { |
| if (e.getStatusCode() != 404) |
| throw e; |
| } |
| |
| return null; |
| } |
| |
| /** |
| * Writes given checkpoint data to a given S3 bucket. Data is serialized to |
| * the binary stream and saved to the S3. |
| * |
| * @param data Checkpoint data. |
| * @throws IgniteCheckedException Thrown if an error occurs while marshalling. |
| * @throws AmazonClientException If an error occurs while querying Amazon S3. |
| */ |
| private void write(S3CheckpointData data) throws IgniteCheckedException, AmazonClientException { |
| assert data != null; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Writing data to S3 [bucket=" + bucketName + ", key=" + data.getKey() + ']'); |
| |
| byte[] buf = data.toBytes(); |
| |
| ObjectMetadata meta = new ObjectMetadata(); |
| |
| meta.setContentLength(buf.length); |
| |
| if (!F.isEmpty(sseAlg)) |
| meta.setSSEAlgorithm(sseAlg); |
| |
| s3.putObject(bucketName, data.getKey(), new ByteArrayInputStream(buf), meta); |
| } |
| |
| /** |
| * Deletes checkpoint data. |
| * |
| * @param key Key of the data in storage. |
| * @return {@code True} if operations succeeds and data is actually removed. |
| * @throws AmazonClientException If an error occurs while querying Amazon S3. |
| */ |
| private boolean delete(String key) throws AmazonClientException { |
| assert !F.isEmpty(key); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Removing data from S3 [bucket=" + bucketName + ", key=" + key + ']'); |
| |
| if (!hasKey(key)) |
| return false; |
| |
| s3.deleteObject(bucketName, key); |
| |
| return true; |
| } |
| |
| /** |
| * Returns {@code true} if mapping presents for the provided key. |
| * |
| * @param key Key to check mapping for. |
| * @return {@code true} if mapping presents for key. |
| * @throws AmazonClientException If an error occurs while querying Amazon S3. |
| */ |
| boolean hasKey(String key) throws AmazonClientException { |
| assert !F.isEmpty(key); |
| |
| try { |
| return s3.getObjectMetadata(bucketName, key).getContentLength() != 0; |
| } |
| catch (AmazonServiceException e) { |
| if (e.getStatusCode() != 404) |
| throw e; |
| } |
| |
| return false; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void setCheckpointListener(CheckpointListener lsnr) { |
| this.lsnr = lsnr; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public S3CheckpointSpi setName(String name) { |
| super.setName(name); |
| |
| return this; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(S3CheckpointSpi.class, this); |
| } |
| |
| /** |
| * Implementation of {@link org.apache.ignite.spi.IgniteSpiThread} that takes care about outdated S3 data. |
| * Every checkpoint has expiration date after which it makes no sense to |
| * keep it. This worker periodically cleans S3 bucket according to checkpoints |
| * expiration time. |
| */ |
| private class S3TimeoutWorker extends IgniteSpiThread { |
| /** List of data with access and expiration date. */ |
| private Map<String, S3TimeData> map = new HashMap<>(); |
| |
| /** |
| * Constructor. |
| */ |
| S3TimeoutWorker() { |
| super(ignite.name(), "grid-s3-checkpoint-worker", log); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void body() throws InterruptedException { |
| long nextTime = 0; |
| |
| Collection<String> rmvKeys = new HashSet<>(); |
| |
| while (!isInterrupted()) { |
| rmvKeys.clear(); |
| |
| synchronized (mux) { |
| long delay = U.currentTimeMillis() - nextTime; |
| |
| if (nextTime != 0 && delay > 0) |
| mux.wait(delay); |
| |
| long now = U.currentTimeMillis(); |
| |
| nextTime = -1; |
| |
| // check map one by one and physically remove |
| // if (now - last modification date) > expiration time |
| for (Iterator<Map.Entry<String, S3TimeData>> iter = map.entrySet().iterator(); iter.hasNext();) { |
| Map.Entry<String, S3TimeData> entry = iter.next(); |
| |
| String key = entry.getKey(); |
| |
| S3TimeData timeData = entry.getValue(); |
| |
| if (timeData.getExpireTime() > 0) |
| if (timeData.getExpireTime() <= now) { |
| try { |
| delete(key); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Data was deleted by timeout: " + key); |
| } |
| catch (AmazonClientException e) { |
| U.error(log, "Failed to delete data by key: " + key, e); |
| } |
| |
| iter.remove(); |
| |
| rmvKeys.add(timeData.getKey()); |
| } |
| else |
| if (timeData.getExpireTime() < nextTime || nextTime == -1) |
| nextTime = timeData.getExpireTime(); |
| } |
| } |
| |
| CheckpointListener tmpLsnr = lsnr; |
| |
| if (tmpLsnr != null) |
| for (String key : rmvKeys) |
| tmpLsnr.onCheckpointRemoved(key); |
| } |
| |
| synchronized (mux) { |
| map.clear(); |
| } |
| } |
| |
| /** |
| * Adds data to a list of files this task should look after. |
| * |
| * @param timeData File expiration and access information. |
| */ |
| void add(S3TimeData timeData) { |
| assert timeData != null; |
| |
| synchronized (mux) { |
| map.put(timeData.getKey(), timeData); |
| |
| mux.notifyAll(); |
| } |
| } |
| |
| /** |
| * Adds list of data this task should look after. |
| * |
| * @param newData List of data. |
| */ |
| void add(Iterable<S3TimeData> newData) { |
| assert newData != null; |
| |
| synchronized (mux) { |
| for (S3TimeData data : newData) |
| map.put(data.getKey(), data); |
| |
| mux.notifyAll(); |
| } |
| } |
| |
| /** |
| * Removes data. |
| * |
| * @param key Checkpoint key. |
| */ |
| public void remove(String key) { |
| assert key != null; |
| |
| synchronized (mux) { |
| map.remove(key); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(S3TimeoutWorker.class, this); |
| } |
| } |
| |
| /** |
| * MBean implementation for S3CheckpointSpi. |
| */ |
| private class S3CheckpointSpiMBeanImpl extends IgniteSpiMBeanAdapter implements S3CheckpointSpiMBean { |
| /** {@inheritDoc} */ |
| S3CheckpointSpiMBeanImpl(IgniteSpiAdapter spiAdapter) { |
| super(spiAdapter); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String getBucketName() { |
| return S3CheckpointSpi.this.getBucketName(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String getBucketEndpoint() { |
| return S3CheckpointSpi.this.getBucketName(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String getSSEAlgorithm() { |
| return S3CheckpointSpi.this.getSSEAlgorithm(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String getAccessKey() { |
| return S3CheckpointSpi.this.getAccessKey(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String getProxyHost() { |
| return S3CheckpointSpi.this.getProxyHost(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int getProxyPort() { |
| return S3CheckpointSpi.this.getProxyPort(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String getProxyUsername() { |
| return S3CheckpointSpi.this.getProxyUsername(); |
| } |
| } |
| } |