blob: 0944a610c063f8a808faf0ec53f263022b386db3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.cdcr;
import java.util.Arrays;
import com.google.common.collect.ImmutableMap;
import org.apache.lucene.util.LuceneTestCase.Nightly;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.handler.CdcrParams;
import org.junit.Test;
@Nightly
public class CdcrRequestHandlerTest extends BaseCdcrDistributedZkTest {
@Override
public void distribSetUp() throws Exception {
schemaString = "schema15.xml"; // we need a string id
createTargetCollection = false; // we do not need the target cluster
super.distribSetUp();
}
// check that the life-cycle state is properly synchronised across nodes
@Test
@ShardsFixed(num = 2)
public void testLifeCycleActions() throws Exception {
// check initial status
this.assertState(SOURCE_COLLECTION, CdcrParams.ProcessState.STOPPED, CdcrParams.BufferState.ENABLED);
// send start action to first shard
@SuppressWarnings({"rawtypes"})
NamedList rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
@SuppressWarnings({"rawtypes"})
NamedList status = (NamedList) rsp.get(CdcrParams.CdcrAction.STATUS.toLower());
assertEquals(CdcrParams.ProcessState.STARTED.toLower(), status.get(CdcrParams.ProcessState.getParam()));
// check status
this.assertState(SOURCE_COLLECTION, CdcrParams.ProcessState.STARTED, CdcrParams.BufferState.ENABLED);
// Restart the leader of shard 1
this.restartServer(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1));
// check status - the node that died should have picked up the original state
this.assertState(SOURCE_COLLECTION, CdcrParams.ProcessState.STARTED, CdcrParams.BufferState.ENABLED);
// send stop action to second shard
rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2), CdcrParams.CdcrAction.STOP);
status = (NamedList) rsp.get(CdcrParams.CdcrAction.STATUS.toLower());
assertEquals(CdcrParams.ProcessState.STOPPED.toLower(), status.get(CdcrParams.ProcessState.getParam()));
// check status
this.assertState(SOURCE_COLLECTION, CdcrParams.ProcessState.STOPPED, CdcrParams.BufferState.ENABLED);
}
// check the checkpoint API
@Test
@ShardsFixed(num = 2)
public void testCheckpointActions() throws Exception {
// initial request on an empty index, must return -1
@SuppressWarnings({"rawtypes"})
NamedList rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.COLLECTIONCHECKPOINT);
assertEquals(-1l, rsp.get(CdcrParams.CHECKPOINT));
index(SOURCE_COLLECTION, getDoc(id, "a","test_i_dvo",10)); // shard 2
// only one document indexed in shard 2, the checkpoint must be still -1
rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.COLLECTIONCHECKPOINT);
assertEquals(-1l, rsp.get(CdcrParams.CHECKPOINT));
index(SOURCE_COLLECTION, getDoc(id, "b")); // shard 1
// a second document indexed in shard 1, the checkpoint must come from shard 2
rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2), CdcrParams.CdcrAction.COLLECTIONCHECKPOINT);
long checkpoint1 = (Long) rsp.get(CdcrParams.CHECKPOINT);
long expected = (Long) invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2), CdcrParams.CdcrAction.SHARDCHECKPOINT).get(CdcrParams.CHECKPOINT);
assertEquals(expected, checkpoint1);
index(SOURCE_COLLECTION, getDoc(id, "c")); // shard 1
// a third document indexed in shard 1, the checkpoint must still come from shard 2
rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.COLLECTIONCHECKPOINT);
assertEquals(checkpoint1, rsp.get(CdcrParams.CHECKPOINT));
index(SOURCE_COLLECTION, getDoc(id, "d")); // shard 2
// a fourth document indexed in shard 2, the checkpoint must come from shard 1
rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2), CdcrParams.CdcrAction.COLLECTIONCHECKPOINT);
long checkpoint2 = (Long) rsp.get(CdcrParams.CHECKPOINT);
expected = (Long) invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.SHARDCHECKPOINT).get(CdcrParams.CHECKPOINT);
assertEquals(expected, checkpoint2);
// send a delete by id
long pre_op = (Long) invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2), CdcrParams.CdcrAction.SHARDCHECKPOINT).get(CdcrParams.CHECKPOINT);
deleteById(SOURCE_COLLECTION, Arrays.asList(new String[]{"c"})); //shard1
// document deleted in shard1, checkpoint should come from shard2
rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2), CdcrParams.CdcrAction.COLLECTIONCHECKPOINT);
long checkpoint3 = (Long) rsp.get(CdcrParams.CHECKPOINT);
expected = (Long) invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2), CdcrParams.CdcrAction.SHARDCHECKPOINT).get(CdcrParams.CHECKPOINT);
assertEquals(pre_op, expected);
assertEquals(expected, checkpoint3);
// send a in-place update
SolrInputDocument in_place_doc = new SolrInputDocument();
in_place_doc.setField(id, "a");
in_place_doc.setField("test_i_dvo", ImmutableMap.of("inc", 10)); //shard2
index(SOURCE_COLLECTION, in_place_doc);
// document updated in shard2, checkpoint should come from shard1
rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.COLLECTIONCHECKPOINT);
long checkpoint4 = (Long) rsp.get(CdcrParams.CHECKPOINT);
expected = (Long) invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.SHARDCHECKPOINT).get(CdcrParams.CHECKPOINT);
assertEquals(expected, checkpoint4);
// send a delete by query
deleteByQuery(SOURCE_COLLECTION, "*:*");
// all the checkpoints must come from the DBQ
rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2), CdcrParams.CdcrAction.COLLECTIONCHECKPOINT);
long checkpoint5= (Long) rsp.get(CdcrParams.CHECKPOINT);
assertTrue(checkpoint5 > 0); // ensure that checkpoints from deletes are in absolute form
checkpoint5 = (Long) invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.SHARDCHECKPOINT).get(CdcrParams.CHECKPOINT);
assertTrue(checkpoint5 > 0); // ensure that checkpoints from deletes are in absolute form
checkpoint5 = (Long) invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2), CdcrParams.CdcrAction.SHARDCHECKPOINT).get(CdcrParams.CHECKPOINT);
assertTrue(checkpoint5 > 0); // ensure that checkpoints from deletes are in absolute form
// replication never started, lastProcessedVersion should be -1 for both shards
rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.LASTPROCESSEDVERSION);
long lastVersion = (Long) rsp.get(CdcrParams.LAST_PROCESSED_VERSION);
assertEquals(-1l, lastVersion);
rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2), CdcrParams.CdcrAction.LASTPROCESSEDVERSION);
lastVersion = (Long) rsp.get(CdcrParams.LAST_PROCESSED_VERSION);
assertEquals(-1l, lastVersion);
}
// check that the buffer state is properly synchronised across nodes
@Test
@ShardsFixed(num = 2)
public void testBufferActions() throws Exception {
// check initial status
this.assertState(SOURCE_COLLECTION, CdcrParams.ProcessState.STOPPED, CdcrParams.BufferState.ENABLED);
// send disable buffer action to first shard
@SuppressWarnings({"rawtypes"})
NamedList rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.DISABLEBUFFER);
@SuppressWarnings({"rawtypes"})
NamedList status = (NamedList) rsp.get(CdcrParams.CdcrAction.STATUS.toLower());
assertEquals(CdcrParams.BufferState.DISABLED.toLower(), status.get(CdcrParams.BufferState.getParam()));
// check status
this.assertState(SOURCE_COLLECTION, CdcrParams.ProcessState.STOPPED, CdcrParams.BufferState.DISABLED);
// Restart the leader of shard 1
this.restartServer(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1));
// check status
this.assertState(SOURCE_COLLECTION, CdcrParams.ProcessState.STOPPED, CdcrParams.BufferState.DISABLED);
// send enable buffer action to second shard
rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2), CdcrParams.CdcrAction.ENABLEBUFFER);
status = (NamedList) rsp.get(CdcrParams.CdcrAction.STATUS.toLower());
assertEquals(CdcrParams.BufferState.ENABLED.toLower(), status.get(CdcrParams.BufferState.getParam()));
// check status
this.assertState(SOURCE_COLLECTION, CdcrParams.ProcessState.STOPPED, CdcrParams.BufferState.ENABLED);
}
}