| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache; |
| |
| import static org.apache.geode.distributed.ConfigurationProperties.DELTA_PROPAGATION; |
| import static org.apache.geode.internal.lang.SystemPropertyHelper.EARLY_ENTRY_EVENT_SERIALIZATION; |
| import static org.apache.geode.test.awaitility.GeodeAwaitility.await; |
| import static org.assertj.core.api.Assertions.assertThat; |
| import static org.assertj.core.api.Assertions.fail; |
| |
| import java.io.DataInput; |
| import java.io.DataOutput; |
| import java.io.IOException; |
| import java.util.Collection; |
| import java.util.Properties; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import org.junit.Before; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| import org.junit.runner.RunWith; |
| |
| import org.apache.geode.CopyHelper; |
| import org.apache.geode.DataSerializable; |
| import org.apache.geode.DataSerializer; |
| import org.apache.geode.cache.AttributesFactory; |
| import org.apache.geode.cache.InterestResultPolicy; |
| import org.apache.geode.cache.PartitionAttributesFactory; |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.cache.RegionAttributes; |
| import org.apache.geode.cache.RegionFactory; |
| import org.apache.geode.cache.Scope; |
| import org.apache.geode.cache30.ClientServerTestCase; |
| import org.apache.geode.internal.cache.BucketRegion.RawValue; |
| import org.apache.geode.internal.cache.ha.HARegionQueue; |
| import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy; |
| import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID; |
| import org.apache.geode.internal.cache.tier.sockets.ClientUpdateMessageImpl; |
| import org.apache.geode.internal.lang.SystemProperty; |
| import org.apache.geode.test.dunit.Invoke; |
| import org.apache.geode.test.dunit.NetworkUtils; |
| import org.apache.geode.test.dunit.VM; |
| import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties; |
| import org.apache.geode.test.junit.categories.ClientServerTest; |
| import org.apache.geode.test.junit.runners.GeodeParamsRunner; |
| |
| /** |
| * Client side deserialization should not throw EOFException when copy-on-read is true. |
| * |
| * <p> |
| * TRAC #38741: EOFException during deserialize on client update with copy-on-read=true |
| * |
| * @since GemFire bugfix5.7 |
| */ |
| @Category({ClientServerTest.class}) |
| @RunWith(GeodeParamsRunner.class) |
| @SuppressWarnings("serial") |
| public class ClientDeserializationCopyOnReadRegressionTest extends ClientServerTestCase { |
| |
| private final String k1 = "k1"; |
| private final String k2 = "k2"; |
| private final String k3 = "k3"; |
| |
| private VM client; |
| private VM server; |
| private String rName; |
| private int[] ports; |
| |
| @Rule |
| public DistributedRestoreSystemProperties restoreSystemProperties = |
| new DistributedRestoreSystemProperties(); |
| |
| @Before |
| public void setUp() { |
| client = VM.getVM(2); |
| server = VM.getVM(3); |
| rName = getUniqueName(); |
| ports = createUniquePorts(); |
| } |
| |
| @Override |
| public Properties getDistributedSystemProperties() { |
| Properties props = new Properties(); |
| props.setProperty(DELTA_PROPAGATION, "false"); |
| return props; |
| } |
| |
| @Override |
| protected <K, V> RegionAttributes<K, V> getRegionAttributes() { |
| AttributesFactory<K, V> factory = new AttributesFactory<>(); |
| factory.setScope(Scope.DISTRIBUTED_ACK); |
| return factory.create(); |
| } |
| |
| /** |
| * Test that CopyOnRead doesn't cause {@link HARegionQueue#peek()} to create a copy, assuming that |
| * creating copies performs a serialize and de-serialize operation. |
| */ |
| @Test |
| |
| public void testCopyOnReadWithBridgeServer() { |
| System.setProperty(SystemProperty.DEFAULT_PREFIX + EARLY_ENTRY_EVENT_SERIALIZATION, "true"); |
| Invoke.invokeInEveryVM( |
| () -> System.setProperty(SystemProperty.DEFAULT_PREFIX + EARLY_ENTRY_EVENT_SERIALIZATION, |
| "true")); |
| |
| createBridgeServer(server, rName, ports[0]); |
| // Put an instance of SerializationCounter to assert copy-on-read behavior |
| // when notifyBySubscription is true |
| server.invoke("Enable copy on read and assert server copy behavior", () -> { |
| final Region<Object, Object> rootRegion = getRootRegion(rName); |
| |
| // Using a key that counts serialization, the test captures |
| // any serialization of the key when it is a member of another object, |
| // specifically in this case ClientUpdateMessageImpl which is assume to be |
| // the value of a HARegion |
| SerializationCountingKey key = new SerializationCountingKey(k1); |
| byte[] val = new byte[1]; |
| byte valIsObj = 0x01; |
| Integer cb = 0; |
| ClientProxyMembershipID cpmi = null; |
| EventID eid = null; |
| ClientUpdateMessageImpl cui = new ClientUpdateMessageImpl( |
| EnumListenerEvent.AFTER_CREATE, (LocalRegion) rootRegion, key, val, valIsObj, cb, cpmi, |
| eid); |
| ClientUpdateMessageImpl cuiCopy = CopyHelper.copy(cui); |
| assertThat(cui.getKeyOfInterest()).isSameAs(key); |
| assertThat(key.count.get()).isEqualTo(1); |
| key = (SerializationCountingKey) cuiCopy.getKeyOfInterest(); |
| assertThat(cuiCopy.getKeyOfInterest()).isEqualTo(cui.getKeyOfInterest()); |
| assertThat(key.count.get()).isEqualTo(1); |
| |
| SerializationCountingKey ks1 = new SerializationCountingKey(k1); |
| // AbstractRegionMap basicPut now serializes newValue in EntryEventImpl |
| // which can be used for delivering client update message later |
| SerializationCountingValue sc = new SerializationCountingValue(); |
| rootRegion.put(ks1, sc); |
| assertThat(sc.count.get()).isEqualTo(1); |
| assertThat(ks1.count.get()).isEqualTo(0); |
| |
| |
| // No copy should be made upon get (assert standard no copy behavior) |
| sc = (SerializationCountingValue) rootRegion.get(ks1); |
| assertThat(sc.count.get()).isEqualTo(1); |
| assertThat(ks1.count.get()).isEqualTo(0); |
| |
| |
| // enable copy on read |
| getCache().setCopyOnRead(true); |
| |
| // Assert standard copy on read behavior and basicPut in AbstractRegionMap |
| sc = (SerializationCountingValue) rootRegion.get(ks1); |
| assertThat(sc.count.get()).isEqualTo(2); |
| assertThat(ks1.count.get()).isEqualTo(0); |
| // Put another counter with copy-on-read true |
| // AbstractRegionMap basicPut now serializes newValue |
| sc = new SerializationCountingValue(); |
| SerializationCountingKey ks3 = new SerializationCountingKey(k3); |
| rootRegion.put(ks3, sc); |
| assertThat(sc.count.get()).isEqualTo(1); |
| assertThat(ks3.count.get()).isEqualTo(0); |
| }); |
| |
| // Setup a client which subscribes to the server region, registers (aka pulls) |
| // interest in keys which creates an assumed HARegionQueue on the server |
| // (in the event that the above code didn't already create a HARegion) |
| final String serverHostName = NetworkUtils.getServerHostName(); |
| client.invoke("Assert server copy behavior from client", () -> { |
| getCache(); |
| |
| RegionFactory<Object, Object> factory = getCache().createRegionFactory(); |
| ClientServerTestCase.configureConnectionPool(factory, serverHostName, ports, true, -1, 1, |
| null); |
| factory.setScope(Scope.LOCAL); |
| Region<Object, Object> rootRegion = createRootRegion(rName, factory); |
| SerializationCountingKey ks1 = new SerializationCountingKey(k1); |
| SerializationCountingKey ks3 = new SerializationCountingKey(k3); |
| // original two serializations on server and one serialization for register interest |
| rootRegion.registerInterest(ks1, InterestResultPolicy.KEYS_VALUES); |
| // entry shouldn't exist yet |
| rootRegion.registerInterest(new SerializationCountingKey(k2), |
| InterestResultPolicy.KEYS_VALUES); |
| // original one serializations on server and one serialization for register interest |
| rootRegion.registerInterest(ks3, InterestResultPolicy.KEYS_VALUES); |
| |
| // get from local cache. |
| // original two serializations on server and one for previous register interest |
| SerializationCountingValue sc = (SerializationCountingValue) rootRegion.get(ks1); |
| assertThat(sc.count.get()).isEqualTo(3); |
| |
| sc = (SerializationCountingValue) rootRegion.get(ks3); |
| assertThat(sc.count.get()).isEqualTo(2); |
| }); |
| |
| // Put an instance of SerializationCounter to assert copy-on-read behavior |
| // once a client has registered interest |
| server.invoke("Assert copy behavior after client is setup", () -> { |
| Region<Object, Object> rootRegion = getRootRegion(rName); |
| CacheServerImpl bsi = (CacheServerImpl) getCache().getCacheServers().iterator().next(); |
| Collection cp = bsi.getAcceptor().getCacheClientNotifier().getClientProxies(); |
| // Should only be one because only one client is connected |
| assertThat(cp.size()).isEqualTo(1); |
| final CacheClientProxy ccp = (CacheClientProxy) cp.iterator().next(); |
| // Wait for messages to drain to capture a stable "processed message count" |
| await("region queue never became empty") |
| .until(() -> ccp.getHARegionQueue().size() == 0); |
| |
| // Capture the current processed message count to know |
| // when the next message has been serialized |
| final int currMesgCount = ccp.getStatistics().getMessagesProcessed(); |
| |
| SerializationCountingKey ks2 = new SerializationCountingKey(k2); |
| SerializationCountingValue sc = new SerializationCountingValue(); |
| // Update a key upon which the client has expressed interest, |
| // expect it to send an update message to the client |
| rootRegion.put(ks2, sc); |
| |
| // Wait to know that the data has been at least serialized (possibly sent) |
| await() |
| .until(() -> ccp.getStatistics().getMessagesProcessed() != currMesgCount); |
| |
| // assert one serialization to send value to interested client |
| // more than one implies copy-on-read behavior (bad) |
| assertThat(sc.count.get()).isEqualTo(1); |
| assertThat(ks2.count.get()).isEqualTo(1); |
| }); |
| |
| // Double-check the serialization count in the event that the previous check |
| // missed the copy due to race conditions |
| client.invoke("Assert copy behavior from client after update", () -> { |
| Region rootRegion = getRootRegion(rName); |
| { // Once to send the value to this client via the updater thread |
| |
| SerializationCountingKey ks2 = new SerializationCountingKey(k2); |
| // Wait for the update to arrive on to the Cache Client Updater |
| final int maxSecs = 30; |
| await("Waited over " + maxSecs + "s").timeout(maxSecs, TimeUnit.SECONDS) |
| .until(() -> rootRegion.containsKey(ks2)); |
| |
| SerializationCountingValue sc = |
| (SerializationCountingValue) rootRegion.getEntry(ks2).getValue(); |
| assertThat(sc.count.get()).isEqualTo(1); |
| } |
| }); |
| } |
| |
| /** |
| * Test to ensure that a PartitionedRegion doesn't make more than the expected number of copies |
| * when copy-on-read is set to true |
| */ |
| @Test |
| public void testPartitionedRegionAndCopyOnRead() { |
| final VM accessor = VM.getVM(2); |
| final VM datastore = VM.getVM(3); |
| final String rName = getUniqueName(); |
| final String k1 = "k1"; |
| |
| datastore.invoke("Create PR DataStore", () -> { |
| RegionFactory<Object, Object> factory = getCache().createRegionFactory(); |
| factory.setPartitionAttributes( |
| new PartitionAttributesFactory().setRedundantCopies(0).create()); |
| createRootRegion(rName, factory); |
| }); |
| |
| accessor.invoke("Create PR Accessor and put new value", () -> { |
| RegionFactory<Object, Object> factory = getCache().createRegionFactory(); |
| factory.setPartitionAttributes( |
| new PartitionAttributesFactory().setLocalMaxMemory(0).setRedundantCopies(0).create()); |
| Region<Object, Object> rootRegion = createRootRegion(rName, factory); |
| SerializationCountingValue val = new SerializationCountingValue(); |
| rootRegion.put(k1, val); |
| // First put to a bucket will serialize once to determine the size of the value |
| // to know how much extra space the new bucket with the new entry will consume |
| // and serialize again to send the bytes |
| assertThat(val.count.get()).isEqualTo(2); |
| // A put to an already created bucket should only be serialized once |
| val = new SerializationCountingValue(); |
| rootRegion.put(k1, val); |
| assertThat(val.count.get()).isEqualTo(1); |
| }); |
| |
| datastore.invoke("assert datastore entry serialization count", () -> { |
| PartitionedRegion pr = (PartitionedRegion) getRootRegion(rName); |
| // Visit the one bucket (since there is only one value in the entire PR) |
| // to directly copy the entry bytes and assert the serialization count. |
| // All this extra work is to assure the serialization count does not increase |
| // (by de-serializing the value stored in the map, which would then have to be |
| // re-serialized). |
| pr.getDataStore().visitBuckets((bucketId, r) -> { |
| BucketRegion br = (BucketRegion) r; |
| KeyInfo keyInfo = new KeyInfo(k1, null, bucketId); |
| RawValue rv = null; |
| try { |
| rv = br.getSerialized(keyInfo, false, false, null, null, false); |
| } catch (IOException e) { |
| fail("Unexpected IOException", e); |
| } |
| Object val = rv.getRawValue(); |
| assertThat(val).isInstanceOf(CachedDeserializable.class); |
| CachedDeserializable cd = (CachedDeserializable) val; |
| SerializationCountingValue scv = |
| (SerializationCountingValue) cd.getDeserializedForReading(); |
| assertThat(scv.count.get()).isEqualTo(1); |
| }); |
| }); |
| |
| accessor.invoke("assert accessor entry serialization count", () -> { |
| Region<Object, Object> rootRegion = getRootRegion(rName); |
| SerializationCountingValue value1 = (SerializationCountingValue) rootRegion.get(k1); |
| // The counter was incremented once to send the data to the datastore |
| assertThat(value1.count.get()).isEqualTo(1); |
| getCache().setCopyOnRead(true); |
| // Once to send the data to the datastore, no need to do a serialization |
| // when we make copy since it is serialized from datastore to us. |
| SerializationCountingValue value2 = (SerializationCountingValue) rootRegion.get(k1); |
| assertThat(value2.count.get()).isEqualTo(1); |
| assertThat(value2).isNotEqualTo(value1); |
| }); |
| |
| datastore.invoke("assert value serialization", () -> { |
| Region rootRegion = getRootRegion(rName); |
| SerializationCountingValue value1 = (SerializationCountingValue) rootRegion.get(k1); |
| // Once to send the value from the accessor to the data store |
| assertThat(value1.count.get()).isEqualTo(1); |
| getCache().setCopyOnRead(true); |
| // Once to send the value from the accessor to the data store |
| // once to make a local copy |
| SerializationCountingValue value2 = (SerializationCountingValue) rootRegion.get(k1); |
| assertThat(value2.count.get()).isEqualTo(2); |
| assertThat(value2).isNotEqualTo(value1); |
| }); |
| } |
| |
| private static class SerializationCountingValue implements DataSerializable { |
| public final AtomicInteger count = new AtomicInteger(); |
| |
| @SuppressWarnings("WeakerAccess") |
| public SerializationCountingValue() { |
| // nothing |
| } |
| |
| @Override |
| public void fromData(DataInput in) throws IOException, ClassNotFoundException { |
| count.set(in.readInt()); |
| } |
| |
| @Override |
| public void toData(DataOutput out) throws IOException { |
| out.writeInt(count.addAndGet(1)); |
| } |
| |
| @Override |
| public String toString() { |
| return getClass().getName() + "@" + System.identityHashCode(this) + "; count=" + count; |
| } |
| } |
| |
| private static class SerializationCountingKey extends SerializationCountingValue { |
| private String k; |
| |
| @SuppressWarnings("WeakerAccess") |
| public SerializationCountingKey(String k) { |
| this.k = k; |
| } |
| |
| @SuppressWarnings("unused") |
| public SerializationCountingKey() { |
| super(); |
| } |
| |
| @Override |
| public void fromData(DataInput in) throws IOException, ClassNotFoundException { |
| super.fromData(in); |
| k = DataSerializer.readString(in); |
| } |
| |
| @Override |
| public void toData(DataOutput out) throws IOException { |
| super.toData(out); |
| DataSerializer.writeString(k, out); |
| } |
| |
| @Override |
| public boolean equals(Object obj) { |
| if (obj == this) { |
| return true; |
| } |
| if (obj instanceof SerializationCountingKey) { |
| SerializationCountingKey other = (SerializationCountingKey) obj; |
| return k.equals(other.k); |
| } |
| return false; |
| } |
| |
| @Override |
| public int hashCode() { |
| return k.hashCode(); |
| } |
| |
| @Override |
| public String toString() { |
| return super.toString() + "; k=" + k; |
| } |
| } |
| } |