blob: e6799a41ccddaf47a89c2e33f93473e458ced9f7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.db.redshift;
import java.io.File;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.apache.apex.malhar.lib.db.jdbc.JdbcTransactionalStore;
import org.apache.apex.malhar.lib.fs.FSRecordCompactionOperator;
import org.apache.commons.io.FileUtils;
import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import static org.apache.apex.malhar.lib.helper.OperatorContextTestHelper.mockOperatorContext;
import static org.mockito.Mockito.when;
public class RedshiftJdbcTransactionalOperatorTest
{
private String inputDir;
private static final String FILE_1 = "file1.txt";
private static final String FILE_2 = "file2.txt";
private static final String FILE_1_DATA = "460|xkalk|665\n950|xkalk|152\n850|xsblk|252";
private static final String FILE_2_DATA = "640|xkalk|655\n50|bcklk|52";
private static FSRecordCompactionOperator.OutputMetaData file1Meta;
private static FSRecordCompactionOperator.OutputMetaData file2Meta;
private static List<FSRecordCompactionOperator.OutputMetaData> listOfFiles = new ArrayList<>();
private static List<String> data = new ArrayList<>();
public static class TestMeta extends TestWatcher
{
public String baseDirectory;
Context.OperatorContext context;
@Mock
public Statement statement;
@Mock
public JdbcTransactionalStore store;
@Mock
public Connection conn;
@Override
protected void starting(org.junit.runner.Description description)
{
this.baseDirectory = "target/" + description.getClassName() + "/" + description.getMethodName();
MockitoAnnotations.initMocks(this);
try {
when(store.getConnection()).thenReturn(conn);
when(conn.createStatement()).thenReturn(statement);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
@Override
protected void finished(Description description)
{
try {
FileUtils.deleteDirectory(new File(baseDirectory));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
@Rule
public TestMeta testMeta = new TestMeta();
@Before
public void setup() throws Exception
{
inputDir = testMeta.baseDirectory + File.separator + "input";
File file1 = new File(inputDir + File.separator + FILE_1);
file1Meta = new FSRecordCompactionOperator.OutputMetaData(file1.getPath(), file1.getName(), file1.length());
FileUtils.writeStringToFile(file1, FILE_1_DATA);
File file2 = new File(inputDir + File.separator + FILE_2);
file2Meta = new FSRecordCompactionOperator.OutputMetaData(file2.getPath(), file2.getName(), file2.length());
FileUtils.writeStringToFile(file2, FILE_2_DATA);
}
@Test
public void TestBatchData() throws SQLException, IOException
{
RedshiftJdbcTransactionableTestOutputOperator operator = new RedshiftJdbcTransactionableTestOutputOperator();
operator.setReaderMode("READ_FROM_S3");
operator.setStore(testMeta.store);
operator.setAccessKey("accessKey");
operator.setSecretKey("secretKey");
operator.setBucketName("bucketName");
Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
attributeMap.put(Context.OperatorContext.SPIN_MILLIS, 500);
attributeMap.put(Context.DAGContext.APPLICATION_PATH, testMeta.baseDirectory);
testMeta.context = mockOperatorContext(1, attributeMap);;
operator.setup(testMeta.context);
operator.beginWindow(1);
operator.input.process(file1Meta);
operator.input.process(file2Meta);
when(testMeta.statement.executeBatch()).thenReturn(executeBatch());
operator.endWindow();
Assert.assertEquals("Number of tuples in database", 5, data.size());
}
public int[] executeBatch() throws IOException
{
for (FSRecordCompactionOperator.OutputMetaData metaData: listOfFiles) {
data.addAll(FileUtils.readLines(new File(metaData.getPath())));
}
return null;
}
@Test
public void VerifyS3Properties()
{
RedshiftJdbcTransactionableTestOutputOperator operator = new RedshiftJdbcTransactionableTestOutputOperator();
operator.setReaderMode("READ_FROM_S3");
operator.setAccessKey("accessKey");
operator.setSecretKey("secretKey");
operator.setBucketName("bucketName");
Assert.assertNotNull(operator.getBucketName());
}
@Test
public void VerifyEMRProperties()
{
RedshiftJdbcTransactionableTestOutputOperator operator = new RedshiftJdbcTransactionableTestOutputOperator();
operator.setReaderMode("READ_FROM_EMR");
operator.setAccessKey("accessKey");
operator.setSecretKey("secretKey");
operator.setEmrClusterId("emrClusterId");
Assert.assertNotNull(operator.getEmrClusterId());
}
public static class RedshiftJdbcTransactionableTestOutputOperator extends RedshiftJdbcTransactionableOutputOperator
{
@Override
public void processTuple(FSRecordCompactionOperator.OutputMetaData tuple)
{
super.processTuple(tuple);
listOfFiles.add(tuple);
}
}
}