blob: 918483faaa7a48de8151e458dc714ecacd197ab1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.engine.util;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.apex.common.util.AsyncStorageAgent;
import com.google.common.collect.Maps;
import com.datatorrent.api.StorageAgent;
/**
* A StorageAgent which chains two StorageAgent. It use the current storage-agent to store
* the checkpoint, and use the parent agent to read old checkpoints. For application having
* large number of physical operators, the size and number of files to be copied could be
* large impacting application restart time. This storage-agent is used during application
* restart to avoiding copying checkpoints from old application directory to improve application
* restart time.
*
* @since 3.6.0
*/
public class CascadeStorageAgent implements StorageAgent, AsyncStorageAgent, Serializable
{
private static final long serialVersionUID = 985557590735264920L;
private static final Logger logger = LoggerFactory.getLogger(CascadeStorageAgent.class);
private final StorageAgent parent;
private final StorageAgent current;
private transient Map<Integer, long[]> oldOperatorToWindowIdsMap;
public CascadeStorageAgent(StorageAgent parent, StorageAgent current)
{
this.parent = parent;
this.current = current;
oldOperatorToWindowIdsMap = Maps.newConcurrentMap();
}
/**
* does the checkpoint belong to parent
*/
private boolean isCheckpointFromParent(int operatorId, long wid) throws IOException
{
long[] wids = getParentWindowIds(operatorId);
if (wids.length != 0) {
return (wid <= wids[wids.length - 1]);
}
return false;
}
/**
* Return window-id of checkpoints available in old storage agent. This function
* will call getWindowIds of old storage agent only once for the fist time, and
* return cached data for next calls for same operator.
*
* @param operatorId
* @return
* @throws IOException
*/
private long[] getParentWindowIds(int operatorId) throws IOException
{
long[] oldWindowIds = oldOperatorToWindowIdsMap.get(operatorId);
if (oldWindowIds == null) {
oldWindowIds = parent.getWindowIds(operatorId);
if (oldWindowIds == null) {
oldWindowIds = new long[0];
}
Arrays.sort(oldWindowIds);
oldOperatorToWindowIdsMap.put(operatorId, oldWindowIds);
logger.debug("CascadeStorageAgent window ids from old storage agent op {} wids {}", operatorId, Arrays.toString(oldWindowIds));
}
return oldWindowIds;
}
/**
* Save object in current storage agent. This should not modify old storage agent
* in any way.
*
* @param object - The operator whose state needs to be saved.
* @param operatorId - Identifier of the operator.
* @param windowId - Identifier for the specific state of the operator.
* @throws IOException
*/
@Override
public void save(Object object, int operatorId, long windowId) throws IOException
{
current.save(object, operatorId, windowId);
}
/**
* Delete old checkpoints from the storage agent.
*
* The checkpoints are deleted from current directory if it is present in current
* storage agent. and cached state for old storage agent is removed.
*
* @param operatorId
* @param windowId
* @throws IOException
*/
@Override
public void delete(int operatorId, long windowId) throws IOException
{
if (!isCheckpointFromParent(operatorId, windowId)) {
current.delete(operatorId, windowId);
}
}
/**
* Load checkpoint from storage agents. Do a basic comparision of windowIds
* to check the storage agent which has the checkpoint.
*
* @param operatorId Id for which the object was previously saved
* @param windowId WindowId for which the object was previously saved
* @return
* @throws IOException
*/
@Override
public Object load(int operatorId, long windowId) throws IOException
{
long[] oldWindowIds = getParentWindowIds(operatorId);
if (oldWindowIds.length >= 1 && windowId <= oldWindowIds[oldWindowIds.length - 1]) {
return parent.load(operatorId, windowId);
}
return current.load(operatorId, windowId);
}
@Override
public long[] getWindowIds(int operatorId) throws IOException
{
long[] currentIds = current.getWindowIds(operatorId);
long[] oldWindowIds = getParentWindowIds(operatorId);
return merge(currentIds, oldWindowIds);
}
private static final long[] EMPTY_LONG_ARRAY = new long[0];
private long[] merge(long[] currentIds, long[] oldWindowIds)
{
if (currentIds == null && oldWindowIds == null) {
return EMPTY_LONG_ARRAY;
}
if (currentIds == null) {
return oldWindowIds;
}
if (oldWindowIds == null) {
return currentIds;
}
long[] mergedArray = new long[currentIds.length + oldWindowIds.length];
System.arraycopy(currentIds, 0, mergedArray, 0, currentIds.length);
System.arraycopy(oldWindowIds, 0, mergedArray, currentIds.length, oldWindowIds.length);
Arrays.sort(mergedArray);
return mergedArray;
}
@Override
public void flush(int operatorId, long windowId) throws IOException
{
if (current instanceof AsyncStorageAgent) {
((AsyncStorageAgent)current).flush(operatorId, windowId);
}
}
@Override
public boolean isSyncCheckpoint()
{
if (parent instanceof AsyncStorageAgent) {
return ((AsyncStorageAgent)parent).isSyncCheckpoint();
}
return true;
}
private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException
{
input.defaultReadObject();
oldOperatorToWindowIdsMap = Maps.newConcurrentMap();
}
public StorageAgent getCurrentStorageAgent()
{
return current;
}
public StorageAgent getParentStorageAgent()
{
return parent;
}
}