| /*========================================================================= |
| * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. |
| * This product is protected by U.S. and international copyright |
| * and intellectual property laws. Pivotal products are covered by |
| * one or more patents listed at http://www.pivotal.io/patents. |
| *========================================================================= |
| */ |
| package com.gemstone.gemfire.cache30; |
| |
| import java.io.DataInput; |
| import java.io.DataOutput; |
| import java.io.IOException; |
| import java.util.Collection; |
| import java.util.Properties; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import com.gemstone.gemfire.CopyHelper; |
| import com.gemstone.gemfire.DataSerializable; |
| import com.gemstone.gemfire.DataSerializer; |
| import com.gemstone.gemfire.cache.AttributesFactory; |
| import com.gemstone.gemfire.cache.CacheException; |
| import com.gemstone.gemfire.cache.InterestResultPolicy; |
| import com.gemstone.gemfire.cache.PartitionAttributesFactory; |
| import com.gemstone.gemfire.cache.Region; |
| import com.gemstone.gemfire.cache.RegionAttributes; |
| import com.gemstone.gemfire.cache.Scope; |
| import com.gemstone.gemfire.distributed.internal.DistributionConfig; |
| import com.gemstone.gemfire.internal.NanoTimer; |
| import com.gemstone.gemfire.internal.cache.BridgeServerImpl; |
| import com.gemstone.gemfire.internal.cache.BucketRegion; |
| import com.gemstone.gemfire.internal.cache.BucketRegion.RawValue; |
| import com.gemstone.gemfire.internal.cache.CachedDeserializable; |
| import com.gemstone.gemfire.internal.cache.EnumListenerEvent; |
| import com.gemstone.gemfire.internal.cache.EventID; |
| import com.gemstone.gemfire.internal.cache.KeyInfo; |
| import com.gemstone.gemfire.internal.cache.LocalRegion; |
| import com.gemstone.gemfire.internal.cache.PartitionedRegion; |
| import com.gemstone.gemfire.internal.cache.PartitionedRegionDataStore.BucketVisitor; |
| import com.gemstone.gemfire.internal.cache.ha.HARegionQueue; |
| import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy; |
| import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID; |
| import com.gemstone.gemfire.internal.cache.tier.sockets.ClientUpdateMessageImpl; |
| |
| import dunit.DistributedTestCase; |
| import dunit.Host; |
| import dunit.VM; |
| |
| /** |
| * |
| * @author Mitch Thomas |
| * @since bugfix5.7 |
| */ |
| public class Bug38741DUnitTest extends BridgeTestCase { |
| private static final long serialVersionUID = 1L; |
| |
| protected RegionAttributes getRegionAttributes() { |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.DISTRIBUTED_ACK); |
| return factory.create(); |
| } |
| |
| public Bug38741DUnitTest(String name) { |
| super(name); |
| } |
| |
| /** |
| * Test that CopyOnRead doesn't cause {@link HARegionQueue#peek()} to create a copy, |
| * assuming that creating copies performs a serialize and de-serialize operation. |
| * @throws Exception when there is a failure |
| * @since bugfix5.7 |
| */ |
| public void testCopyOnReadWithBridgeServer() throws Exception { |
| final Host h = Host.getHost(0); |
| final VM client = h.getVM(2); |
| final VM server = h.getVM(3); |
| final String rName = getUniqueName(); |
| final int ports[] = createUniquePorts(1); |
| final String k1 = "k1"; |
| final String k2 = "k2"; |
| final String k3 = "k3"; |
| |
| createBridgeServer(server, rName, ports[0]); |
| // Put an instance of SerializationCounter to assert copy-on-read behavior |
| // when notifyBySubscription is true |
| server.invoke(new CacheSerializableRunnable("Enable copy on read and assert server copy behavior") { |
| public void run2() throws CacheException { |
| final LocalRegion r = (LocalRegion) getRootRegion(rName); |
| |
| // Using a key that counts serialization, the test captures |
| // any serialization of the key when it is a member of another object, |
| // specifically in this case ClientUpdateMessageImpl which is assume to be |
| // the value of a HARegion |
| SerializationCountingKey key = new SerializationCountingKey(k1); |
| byte[] val = new byte[1]; |
| byte valIsObj = 0x01; |
| Integer cb = new Integer(0); |
| ClientProxyMembershipID cpmi = null; |
| EventID eid = null; |
| ClientUpdateMessageImpl cui = |
| new ClientUpdateMessageImpl(EnumListenerEvent.AFTER_CREATE, r, key, |
| val, valIsObj, cb, cpmi, eid); |
| ClientUpdateMessageImpl cuiCopy = (ClientUpdateMessageImpl) CopyHelper.copy(cui); |
| assertSame(key, cui.getKeyOfInterest()); |
| assertEquals(1, key.count.get()); |
| key = (SerializationCountingKey) cuiCopy.getKeyOfInterest(); |
| assertEquals(cui.getKeyOfInterest(), cuiCopy.getKeyOfInterest()); |
| assertEquals(1, key.count.get()); |
| |
| |
| SerializationCountingKey ks1 = new SerializationCountingKey(k1); |
| { // Make sure nothing (HARegion) has serialized/de-serialized this instance |
| SerializationCountingValue sc = new SerializationCountingValue(); |
| r.put(ks1, sc); |
| assertEquals(0, sc.count.get()); |
| assertEquals(0, ks1.count.get()); |
| } |
| |
| { // No copy should be made upon get (assert standard no copy behavior) |
| SerializationCountingValue sc = (SerializationCountingValue) r.get(ks1); |
| assertEquals(0, sc.count.get()); |
| assertEquals(0, ks1.count.get()); |
| } |
| |
| // enable copy on read |
| getCache().setCopyOnRead(true); |
| |
| { // Assert standard copy on read behavior |
| SerializationCountingValue sc = (SerializationCountingValue) r.get(ks1); |
| assertEquals(1, sc.count.get()); |
| assertEquals(0, ks1.count.get()); |
| } |
| |
| { // Put another counter with copy-on-read true |
| // Again check that nothing (HARegion) has performed serialization |
| SerializationCountingValue sc = new SerializationCountingValue(); |
| SerializationCountingKey ks3 = new SerializationCountingKey(k3); |
| r.put(ks3, sc); |
| assertEquals(0, sc.count.get()); |
| assertEquals(0, ks3.count.get()); |
| } |
| } |
| }); |
| |
| // Setup a client which subscribes to the server region, registers (aka pulls) |
| // interest in keys which creates an assumed HARegionQueue on the server |
| // (in the event that the above code didn't already create a HARegion) |
| final String serverHostName = getServerHostName(server.getHost()); |
| client.invoke(new CacheSerializableRunnable("Assert server copy behavior from client") { |
| public void run2() throws CacheException { |
| getCache(); |
| |
| AttributesFactory factory = new AttributesFactory(); |
| BridgeTestCase.configureConnectionPool(factory, serverHostName, ports, true,-1,1,null); |
| factory.setScope(Scope.LOCAL); |
| Region r = createRootRegion(rName, factory.create()); |
| SerializationCountingKey ks1 = new SerializationCountingKey(k1); |
| SerializationCountingKey ks3 = new SerializationCountingKey(k3); |
| r.registerInterest(ks1, InterestResultPolicy.KEYS_VALUES); |
| r.registerInterest(new SerializationCountingKey(k2), InterestResultPolicy.KEYS_VALUES); // entry shouldn't exist yet |
| r.registerInterest(ks3, InterestResultPolicy.KEYS_VALUES); |
| |
| { // Once for the get on the server, once to send the value to this client |
| SerializationCountingValue sc = (SerializationCountingValue) r.get(ks1); |
| assertEquals(2, sc.count.get()); |
| } |
| |
| { // Once to send the value to this client |
| SerializationCountingValue sc = (SerializationCountingValue) r.get(ks3); |
| assertEquals(1, sc.count.get()); |
| } |
| } |
| }); |
| |
| // Put an instance of SerializationCounter to assert copy-on-read behavior |
| // once a client has registered interest |
| server.invoke(new CacheSerializableRunnable("Assert copy behavior after client is setup") { |
| public void run2() throws CacheException { |
| Region r = getRootRegion(rName); |
| BridgeServerImpl bsi = (BridgeServerImpl) |
| getCache().getBridgeServers().iterator().next(); |
| Collection cp = bsi.getAcceptor().getCacheClientNotifier().getClientProxies(); |
| // Should only be one because only one client is connected |
| assertEquals(1, cp.size()); |
| final CacheClientProxy ccp = (CacheClientProxy) cp.iterator().next(); |
| // Wait for messages to drain to capture a stable "processed message count" |
| WaitCriterion ev = new WaitCriterion() { |
| public boolean done() { |
| return ccp.getHARegionQueue().size() == 0; |
| } |
| public String description() { |
| return "region queue never became empty"; |
| } |
| }; |
| DistributedTestCase.waitForCriterion(ev, 60 * 1000, 200, true); |
| |
| // Capture the current processed message count to know |
| // when the next message has been serialized |
| final int currMesgCount = ccp.getStatistics().getMessagesProcessed(); |
| |
| SerializationCountingKey ks2 = new SerializationCountingKey(k2); |
| SerializationCountingValue sc = new SerializationCountingValue(); |
| // Update a key upon which the client has expressed interest, |
| // expect it to send an update message to the client |
| r.put(ks2, sc); |
| |
| // Wait to know that the data has been at least serialized (possibly sent) |
| ev = new WaitCriterion() { |
| public boolean done() { |
| return ccp.getStatistics().getMessagesProcessed() != currMesgCount; |
| } |
| public String description() { |
| return null; |
| } |
| }; |
| DistributedTestCase.waitForCriterion(ev, 60 * 1000, 200, true); |
| |
| // assert one serialization to send value to interested client |
| // more than one implies copy-on-read behavior (bad) |
| assertEquals(1, sc.count.get()); |
| assertEquals(1, ks2.count.get()); |
| } |
| }); |
| |
| // Double-check the serialization count in the event that the previous check |
| // missed the copy due to race conditions |
| client.invoke(new CacheSerializableRunnable("Assert copy behavior from client after update") { |
| public void run2() throws CacheException { |
| Region r = getRootRegion(rName); |
| { // Once to send the value to this client via the updater thread |
| |
| SerializationCountingKey ks2 = new SerializationCountingKey(k2); |
| // Wait for the update to arrive on to the Cache Client Updater |
| long start = NanoTimer.getTime(); |
| final int maxSecs = 30; |
| while(!r.containsKey(ks2)) { |
| pause(100); |
| if ((NanoTimer.getTime() - start) > TimeUnit.SECONDS.toNanos(maxSecs)) { |
| fail("Waited over " + maxSecs + "s"); |
| } |
| } |
| |
| SerializationCountingValue sc = (SerializationCountingValue) r.getEntry(ks2).getValue(); |
| assertEquals(1, sc.count.get()); |
| } |
| } |
| }); |
| } |
| |
| /** |
| * Test to ensure that a PartitionedRegion doesn't make more than the |
| * expected number of copies when copy-on-read is set to true |
| * @throws Exception |
| */ |
| public void testPartitionedRegionAndCopyOnRead() throws Exception { |
| final Host h = Host.getHost(0); |
| final VM accessor = h.getVM(2); |
| final VM datastore = h.getVM(3); |
| final String rName = getUniqueName(); |
| final String k1 = "k1"; |
| |
| datastore.invoke(new CacheSerializableRunnable("Create PR DataStore") { |
| public void run2() throws CacheException { |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setPartitionAttributes(new PartitionAttributesFactory().setRedundantCopies(0).create()); |
| createRootRegion(rName, factory.create()); |
| } |
| }); |
| |
| accessor.invoke(new CacheSerializableRunnable("Create PR Accessor and put new value") { |
| public void run2() throws CacheException { |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setPartitionAttributes(new PartitionAttributesFactory().setLocalMaxMemory(0).setRedundantCopies(0).create()); |
| Region r = createRootRegion(rName, factory.create()); |
| SerializationCountingValue val = new SerializationCountingValue(); |
| r.put(k1, val); |
| // First put to a bucket will serialize once to determine the size of the value |
| // to know how much extra space the new bucket with the new entry will consume |
| // and serialize again to send the bytes |
| assertEquals(2, val.count.get()); |
| // A put to an already created bucket should only be serialized once |
| val = new SerializationCountingValue(); |
| r.put(k1, val); |
| assertEquals(1, val.count.get()); |
| } |
| }); |
| |
| datastore.invoke(new CacheSerializableRunnable("assert datastore entry serialization count") { |
| public void run2() throws CacheException { |
| PartitionedRegion pr = (PartitionedRegion) getRootRegion(rName); |
| // Visit the one bucket (since there is only one value in the entire PR) |
| // to directly copy the entry bytes and assert the serialization count. |
| // All this extra work is to assure the serialization count does not increase |
| // (by de-serializing the value stored in the map, which would then have to be |
| // re-serialized). |
| pr.getDataStore().visitBuckets(new BucketVisitor() { |
| public void visit(Integer bucketId, Region r) { |
| BucketRegion br = (BucketRegion) r; |
| try { |
| KeyInfo keyInfo = new KeyInfo(k1, null, bucketId); |
| RawValue rv = br.getSerialized(keyInfo, false, false, null, false, false); |
| Object val = rv.getRawValue(); |
| assertTrue(val instanceof CachedDeserializable); |
| CachedDeserializable cd = (CachedDeserializable)val; |
| SerializationCountingValue scv = (SerializationCountingValue)cd.getDeserializedForReading(); |
| assertEquals(1, scv.count.get()); |
| } catch (IOException fail) { |
| fail("Unexpected IOException", fail); |
| } |
| } |
| }); |
| } |
| }); |
| |
| accessor.invoke(new CacheSerializableRunnable("assert accessor entry serialization count") { |
| public void run2() throws CacheException { |
| Region r = getRootRegion(rName); |
| SerializationCountingValue v1 = (SerializationCountingValue)r.get(k1); |
| // The counter was incremented once to send the data to the datastore |
| assertEquals(1, v1.count.get()); |
| getCache().setCopyOnRead(true); |
| // Once to send the data to the datastore, no need to do a serialization |
| // when we make copy since it is serialized from datastore to us. |
| SerializationCountingValue v2 = (SerializationCountingValue)r.get(k1); |
| assertEquals(1, v2.count.get()); |
| assertTrue(v1 != v2); |
| } |
| }); |
| |
| datastore.invoke(new CacheSerializableRunnable("assert value serialization") { |
| public void run2() throws CacheException { |
| Region r = getRootRegion(rName); |
| SerializationCountingValue v1 = (SerializationCountingValue)r.get(k1); |
| // Once to send the value from the accessor to the data store |
| assertEquals(1, v1.count.get()); |
| getCache().setCopyOnRead(true); |
| // Once to send the value from the accessor to the data store |
| // once to make a local copy |
| SerializationCountingValue v2 = (SerializationCountingValue)r.get(k1); |
| assertEquals(2, v2.count.get()); |
| assertTrue(v1 != v2); |
| } |
| }); |
| } |
| |
| public Properties getDistributedSystemProperties() { |
| Properties props = new Properties(); |
| props.setProperty(DistributionConfig.DELTA_PROPAGATION_PROP_NAME, "false"); |
| return props; |
| } |
| |
| public static class SerializationCountingValue implements DataSerializable { |
| private static final long serialVersionUID = 1L; |
| public final AtomicInteger count = new AtomicInteger(); |
| public SerializationCountingValue() { |
| } |
| |
| public void fromData(DataInput in) throws IOException, |
| ClassNotFoundException { |
| count.set(in.readInt()); |
| } |
| public void toData(DataOutput out) throws IOException { |
| out.writeInt(count.addAndGet(1)); |
| //GemFireCacheImpl.getInstance().getLogger().info("DEBUG "+this, new RuntimeException("STACK")); |
| } |
| public String toString() { |
| return getClass().getName() + "@" + System.identityHashCode(this) + "; count=" + count; |
| } |
| } |
| public static class SerializationCountingKey extends SerializationCountingValue { |
| private static final long serialVersionUID = 1L; |
| private String k; |
| public SerializationCountingKey(String k) { |
| this.k = k; |
| } |
| public SerializationCountingKey() {super();} |
| public void fromData(DataInput in) throws IOException, |
| ClassNotFoundException { |
| super.fromData(in); |
| k = DataSerializer.readString(in); |
| } |
| public void toData(DataOutput out) throws IOException { |
| super.toData(out); |
| DataSerializer.writeString(k, out); |
| } |
| public boolean equals(Object obj) { |
| if (obj == this) { |
| return true; |
| } |
| if (obj instanceof SerializationCountingKey) { |
| SerializationCountingKey other = (SerializationCountingKey) obj; |
| return k.equals(other.k); |
| } |
| return false; |
| } |
| public int hashCode() { |
| return k.hashCode(); |
| } |
| public String toString() { |
| return super.toString() + "; k=" + k; |
| } |
| } |
| } |