blob: 7769c8de10a1f960f25407fc8ef9c895b2e346e6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.contrib.cassandra;
import java.util.List;
import org.junit.Before;
import org.junit.Test;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DAG;
import com.datatorrent.lib.helper.TestPortContext;
import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext;
import static org.junit.Assert.assertEquals;
/**
* A simple test class that checks functionality when composite primary keys are present as table definitions
*/
public class AbstractUpsertOutputOperatorCompositePKTest
{
public static final String APP_ID = "TestCassandraUpsertOperator";
public static final int OPERATOR_ID_FOR_COMPOSITE_PRIMARY_KEYS = 2;
CompositePrimaryKeyUpdateOperator compositePrimaryKeysOperator = null;
OperatorContext contextForCompositePrimaryKeysOperator;
TestPortContext testPortContextForCompositePrimaryKeys;
@Before
public void setupApexContexts() throws Exception
{
Attribute.AttributeMap.DefaultAttributeMap attributeMapForCompositePrimaryKey =
new Attribute.AttributeMap.DefaultAttributeMap();
attributeMapForCompositePrimaryKey.put(DAG.APPLICATION_ID, APP_ID);
contextForCompositePrimaryKeysOperator = mockOperatorContext(OPERATOR_ID_FOR_COMPOSITE_PRIMARY_KEYS,
attributeMapForCompositePrimaryKey);
Attribute.AttributeMap.DefaultAttributeMap portAttributesForCompositePrimaryKeys =
new Attribute.AttributeMap.DefaultAttributeMap();
portAttributesForCompositePrimaryKeys.put(Context.PortContext.TUPLE_CLASS, CompositePrimaryKeyRow.class);
testPortContextForCompositePrimaryKeys = new TestPortContext(portAttributesForCompositePrimaryKeys);
compositePrimaryKeysOperator = new CompositePrimaryKeyUpdateOperator();
compositePrimaryKeysOperator.setup(contextForCompositePrimaryKeysOperator);
compositePrimaryKeysOperator.activate(contextForCompositePrimaryKeysOperator);
compositePrimaryKeysOperator.input.setup(testPortContextForCompositePrimaryKeys);
}
@Test
public void testForCompositeRowKeyBasedTable() throws Exception
{
CompositePrimaryKeyRow aCompositeRowKey = new CompositePrimaryKeyRow();
String userId = new String("user1" + System.currentTimeMillis());
String employeeId = new String(userId + System.currentTimeMillis());
int day = 1;
int month = 12;
int year = 2017;
aCompositeRowKey.setDay(day);
aCompositeRowKey.setMonth(month);
aCompositeRowKey.setYear(year);
aCompositeRowKey.setCurrentstatus("status" + System.currentTimeMillis());
aCompositeRowKey.setUserid(userId);
aCompositeRowKey.setEmployeeid(employeeId);
UpsertExecutionContext<CompositePrimaryKeyRow> anUpdate = new UpsertExecutionContext<>();
anUpdate.setPayload(aCompositeRowKey);
compositePrimaryKeysOperator.beginWindow(12);
compositePrimaryKeysOperator.input.process(anUpdate);
compositePrimaryKeysOperator.endWindow();
ResultSet results = compositePrimaryKeysOperator.session.execute(
"SELECT * FROM unittests.userstatus WHERE userid = '" + userId + "' and day=" + day + " and month=" +
month + " and year=" + year + " and employeeid='" + employeeId + "'");
List<Row> rows = results.all();
assertEquals(rows.size(), 1);
}
}