blob: e981dea8c833cb72ee3d20731bf7d2610dfc6c52 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.tests;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.HostDistance;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.policies.LoadBalancingPolicy;
import com.datastax.driver.core.policies.RoundRobinPolicy;
import com.datastax.driver.core.policies.TokenAwarePolicy;
import org.apache.ignite.cache.store.cassandra.datasource.Credentials;
import org.apache.ignite.cache.store.cassandra.datasource.DataSource;
import org.apache.ignite.cache.store.cassandra.serializer.JavaSerializer;
import org.apache.ignite.tests.utils.CassandraAdminCredentials;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
/**
* Test for datasource serialization.
*/
public class DatasourceSerializationTest {
/**
* Sample class for serialization test.
*/
private static class MyLoadBalancingPolicy implements LoadBalancingPolicy, Serializable {
/** */
private transient LoadBalancingPolicy plc = new TokenAwarePolicy(new RoundRobinPolicy());
/** {@inheritDoc} */
@Override public void init(Cluster cluster, Collection<Host> hosts) {
plc.init(cluster, hosts);
}
/** {@inheritDoc} */
@Override public HostDistance distance(Host host) {
return plc.distance(host);
}
/** {@inheritDoc} */
@Override public Iterator<Host> newQueryPlan(String loggedKeyspace, Statement statement) {
return plc.newQueryPlan(loggedKeyspace, statement);
}
/** {@inheritDoc} */
@Override public void onAdd(Host host) {
plc.onAdd(host);
}
/** {@inheritDoc} */
@Override public void onUp(Host host) {
plc.onUp(host);
}
/** {@inheritDoc} */
@Override public void onDown(Host host) {
plc.onDown(host);
}
/** {@inheritDoc} */
@Override public void onRemove(Host host) {
plc.onRemove(host);
}
/** {@inheritDoc} */
@Override public void close() {
plc.close();
}
}
/**
* Serialization test.
*/
@Test
public void serializationTest() {
DataSource src = new DataSource();
Credentials cred = new CassandraAdminCredentials();
String[] points = new String[]{"127.0.0.1", "10.0.0.2", "10.0.0.3"};
LoadBalancingPolicy plc = new MyLoadBalancingPolicy();
src.setCredentials(cred);
src.setContactPoints(points);
src.setReadConsistency("ONE");
src.setWriteConsistency("QUORUM");
src.setLoadBalancingPolicy(plc);
JavaSerializer serializer = new JavaSerializer();
ByteBuffer buff = serializer.serialize(src);
DataSource _src = (DataSource)serializer.deserialize(buff);
Credentials _cred = (Credentials)getFieldValue(_src, "creds");
List<InetAddress> _points = (List<InetAddress>)getFieldValue(_src, "contactPoints");
ConsistencyLevel _readCons = (ConsistencyLevel)getFieldValue(_src, "readConsistency");
ConsistencyLevel _writeCons = (ConsistencyLevel)getFieldValue(_src, "writeConsistency");
LoadBalancingPolicy _plc = (LoadBalancingPolicy)getFieldValue(_src, "loadBalancingPlc");
assertTrue("Incorrectly serialized/deserialized credentials for Cassandra DataSource",
cred.getPassword().equals(_cred.getPassword()) && cred.getUser().equals(_cred.getUser()));
assertTrue("Incorrectly serialized/deserialized contact points for Cassandra DataSource",
"/127.0.0.1".equals(_points.get(0).toString()) &&
"/10.0.0.2".equals(_points.get(1).toString()) &&
"/10.0.0.3".equals(_points.get(2).toString()));
assertTrue("Incorrectly serialized/deserialized consistency levels for Cassandra DataSource",
ConsistencyLevel.ONE == _readCons && ConsistencyLevel.QUORUM == _writeCons);
assertTrue("Incorrectly serialized/deserialized load balancing policy for Cassandra DataSource",
_plc instanceof MyLoadBalancingPolicy);
}
/**
* @param obj Object.
* @param field Field name.
* @return Field value.
*/
private Object getFieldValue(Object obj, String field) {
try {
Field f = obj.getClass().getDeclaredField(field);
f.setAccessible(true);
return f.get(obj);
}
catch (Throwable e) {
throw new RuntimeException("Failed to get field '" + field + "' value", e);
}
}
}