blob: a3a2bcdb7114edf480db42e46119435f15f011a7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.tests;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ColumnDefinitions;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.PreparedId;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.exceptions.InvalidQueryException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings;
import org.apache.ignite.cache.store.cassandra.session.BatchExecutionAssistant;
import org.apache.ignite.cache.store.cassandra.session.CassandraSessionImpl;
import org.apache.ignite.cache.store.cassandra.session.WrappedPreparedStatement;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/** */
public class CassandraSessionImplTest {
/** */
private PreparedStatement preparedStatement1 = mockPreparedStatement();
/** */
private PreparedStatement preparedStatement2 = mockPreparedStatement();
/** */
private MyBoundStatement1 boundStatement1 = new MyBoundStatement1(preparedStatement1);
/** */
private MyBoundStatement2 boundStatement2 = new MyBoundStatement2(preparedStatement2);
/** */
@SuppressWarnings("unchecked")
@Test
public void executeFailureTest() {
Session session1 = mock(Session.class);
Session session2 = mock(Session.class);
when(session1.prepare(nullable(String.class))).thenReturn(preparedStatement1);
when(session2.prepare(nullable(String.class))).thenReturn(preparedStatement2);
ResultSetFuture rsFuture = mock(ResultSetFuture.class);
ResultSet rs = mock(ResultSet.class);
Iterator it = mock(Iterator.class);
when(it.hasNext()).thenReturn(true);
when(it.next()).thenReturn(mock(Row.class));
when(rs.iterator()).thenReturn(it);
when(rsFuture.getUninterruptibly()).thenReturn(rs);
/* @formatter:off */
when(session1.executeAsync(any(Statement.class)))
.thenThrow(new InvalidQueryException("You may have used a PreparedStatement that was created with another Cluster instance"))
.thenThrow(new RuntimeException("this session should be refreshed / recreated"));
when(session2.executeAsync(boundStatement1))
.thenThrow(new InvalidQueryException("You may have used a PreparedStatement that was created with another Cluster instance"));
when(session2.executeAsync(boundStatement2)).thenReturn(rsFuture);
/* @formatter:on */
Cluster cluster = mock(Cluster.class);
when(cluster.connect()).thenReturn(session1).thenReturn(session2);
when(session1.getCluster()).thenReturn(cluster);
when(session2.getCluster()).thenReturn(cluster);
Cluster.Builder builder = mock(Cluster.Builder.class);
when(builder.build()).thenReturn(cluster);
CassandraSessionImpl cassandraSession = new CassandraSessionImpl(builder, null,
ConsistencyLevel.ONE, ConsistencyLevel.ONE, 0, mock(IgniteLogger.class));
BatchExecutionAssistant<String, String> batchExecutionAssistant = new MyBatchExecutionAssistant();
ArrayList<String> data = new ArrayList<>();
for (int i = 0; i < 10; i++) {
data.add(String.valueOf(i));
}
cassandraSession.execute(batchExecutionAssistant, data);
verify(cluster, times(2)).connect();
verify(session1, times(1)).prepare(nullable(String.class));
verify(session2, times(1)).prepare(nullable(String.class));
assertEquals(10, batchExecutionAssistant.processedCount());
}
/** */
private static PreparedStatement mockPreparedStatement() {
PreparedStatement ps = mock(PreparedStatement.class);
when(ps.getVariables()).thenReturn(mock(ColumnDefinitions.class));
when(ps.getPreparedId()).thenReturn(mock(PreparedId.class));
when(ps.getQueryString()).thenReturn("insert into xxx");
return ps;
}
/** */
private class MyBatchExecutionAssistant implements BatchExecutionAssistant {
/** */
private Set<Integer> processed = new HashSet<>();
/** {@inheritDoc} */
@Override public void process(Row row, int seqNum) {
if (processed.contains(seqNum))
return;
processed.add(seqNum);
}
/** {@inheritDoc} */
@Override public boolean alreadyProcessed(int seqNum) {
return processed.contains(seqNum);
}
/** {@inheritDoc} */
@Override public int processedCount() {
return processed.size();
}
/** {@inheritDoc} */
@Override public boolean tableExistenceRequired() {
return false;
}
/** {@inheritDoc} */
@Override public String getTable() {
return null;
}
/** {@inheritDoc} */
@Override public String getStatement() {
return null;
}
/** {@inheritDoc} */
@Override public BoundStatement bindStatement(PreparedStatement statement, Object obj) {
if (statement instanceof WrappedPreparedStatement)
statement = ((WrappedPreparedStatement)statement).getWrappedStatement();
if (statement == preparedStatement1) {
return boundStatement1;
}
else if (statement == preparedStatement2) {
return boundStatement2;
}
throw new RuntimeException("unexpected");
}
/** {@inheritDoc} */
@Override public KeyValuePersistenceSettings getPersistenceSettings() {
return null;
}
/** {@inheritDoc} */
@Override public String operationName() {
return null;
}
/** {@inheritDoc} */
@Override public Object processedData() {
return null;
}
}
/** */
private static class MyBoundStatement1 extends BoundStatement {
/** */
MyBoundStatement1(PreparedStatement ps) {
super(ps);
}
}
/** */
private static class MyBoundStatement2 extends BoundStatement {
/** */
MyBoundStatement2(PreparedStatement ps) {
super(ps);
}
}
}