blob: 830835cea66d964d2c1efe798f8c82b29b50fa67 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.state.managed;
import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;
import javax.annotation.Nullable;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.apex.malhar.lib.fileaccess.FileAccess;
import org.apache.apex.malhar.lib.util.comparator.SliceComparator;
import org.apache.apex.malhar.lib.utils.serde.BufferSlice;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.RemoteIterator;
import com.google.common.base.Function;
import com.google.common.collect.Maps;
import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DAG;
import com.datatorrent.netlet.util.Slice;
import static org.apache.apex.malhar.lib.helper.OperatorContextTestHelper.mockOperatorContext;
public class ManagedStateTestUtils
{
/**
* Validates the bucket data on the File System.
* @param fileAccess file access
* @param bucketId bucket id
* @param unsavedBucket bucket data to compare with.
* @param keysPerTimeBucket num keys per time bucket
* @throws IOException
*/
public static void validateBucketOnFileSystem(FileAccess fileAccess, long bucketId,
Map<Slice, Bucket.BucketedValue> unsavedBucket, int keysPerTimeBucket) throws IOException
{
RemoteIterator<LocatedFileStatus> iterator = fileAccess.listFiles(bucketId);
TreeMap<Slice, Slice> fromDisk = Maps.newTreeMap(new SliceComparator());
int size = 0;
while (iterator.hasNext()) {
LocatedFileStatus fileStatus = iterator.next();
String timeBucketStr = fileStatus.getPath().getName();
if (timeBucketStr.equals(BucketsFileSystem.META_FILE_NAME) || timeBucketStr.endsWith(".tmp")) {
//ignoring meta file
continue;
}
LOG.debug("bucket {} time-bucket {}", bucketId, timeBucketStr);
FileAccess.FileReader reader = fileAccess.getReader(bucketId, timeBucketStr);
reader.readFully(fromDisk);
size += keysPerTimeBucket;
Assert.assertEquals("size of bucket " + bucketId, size, fromDisk.size());
}
Assert.assertEquals("size of bucket " + bucketId, unsavedBucket.size(), fromDisk.size());
Map<Slice, Slice> testBucket = Maps.transformValues(unsavedBucket, new Function<Bucket.BucketedValue, Slice>()
{
@Override
public Slice apply(@Nullable Bucket.BucketedValue input)
{
assert input != null;
return input.getValue();
}
});
Assert.assertEquals("data of bucket" + bucketId, testBucket, fromDisk);
}
public static Map<Long, Map<Slice, Bucket.BucketedValue>> getTestData(int startBucket, int endBucket, int keyStart)
{
Map<Long, Map<Slice, Bucket.BucketedValue>> data = Maps.newHashMap();
for (int i = startBucket; i < endBucket; i++) {
Map<Slice, Bucket.BucketedValue> bucketData = getTestBucketData(keyStart, 100);
data.put((long)i, bucketData);
}
return data;
}
public static Map<Slice, Bucket.BucketedValue> getTestBucketData(int keyStart, long timeBucketStart)
{
Map<Slice, Bucket.BucketedValue> bucketData = Maps.newHashMap();
for (int j = 0; j < 5; j++) {
Slice keyVal = new Slice(Integer.toString(keyStart).getBytes());
bucketData.put(keyVal, new Bucket.BucketedValue(timeBucketStart + j, keyVal));
keyStart++;
}
return bucketData;
}
public static OperatorContext getOperatorContext(int operatorId, String applicationPath)
{
Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
attributes.put(DAG.APPLICATION_PATH, applicationPath);
return mockOperatorContext(operatorId, attributes);
}
public static OperatorContext getOperatorContext(int operatorId)
{
Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
return mockOperatorContext(operatorId, attributes);
}
private static final transient Logger LOG = LoggerFactory.getLogger(ManagedStateTestUtils.class);
public static Slice getSliceFor(String x)
{
return new BufferSlice(x.getBytes());
}
}