blob: ee6a368f3b4564ad185cad2010cd1d1c3d6ea53a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.statistics;
import static org.apache.geode.distributed.ConfigurationProperties.STATISTIC_ARCHIVE_FILE;
import static org.apache.geode.distributed.ConfigurationProperties.STATISTIC_SAMPLE_RATE;
import static org.apache.geode.distributed.ConfigurationProperties.STATISTIC_SAMPLING_ENABLED;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.apache.geode.test.dunit.Assert.assertEquals;
import static org.apache.geode.test.dunit.Assert.assertFalse;
import static org.apache.geode.test.dunit.Assert.assertNotNull;
import static org.apache.geode.test.dunit.Assert.assertTrue;
import static org.apache.geode.test.dunit.Assert.fail;
import static org.apache.geode.test.dunit.Host.getHost;
import static org.apache.geode.test.dunit.Invoke.invokeInEveryVM;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.StatisticDescriptor;
import org.apache.geode.Statistics;
import org.apache.geode.StatisticsFactory;
import org.apache.geode.StatisticsType;
import org.apache.geode.StatisticsTypeFactory;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheListener;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionEvent;
import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.cache.util.RegionMembershipListenerAdapter;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.NanoTimer;
import org.apache.geode.internal.statistics.StatArchiveReader.ResourceInst;
import org.apache.geode.internal.statistics.StatArchiveReader.StatSpec;
import org.apache.geode.internal.statistics.StatArchiveReader.StatValue;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
import org.apache.geode.test.junit.categories.StatisticsTest;
import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
/**
* Distributed tests for {@link Statistics}.
*
* <p>
* VM0 performs puts and VM1 receives updates. Both use custom statistics for start/end with
* increment to add up puts and updates. Then validation tests values in stat resource instances and
* uses StatArchiveReader. Both are tested against static counters in both VMs.
*
* <p>
* This test mimics hydratest/locators/cacheDS.conf in an attempt to reproduce bug #45478. So far
* this test passes consistently.
*
* @since GemFire 7.0
*/
@Category({StatisticsTest.class})
@SuppressWarnings({"rawtypes", "serial", "unused"})
public class StatisticsDistributedTest extends JUnit4CacheTestCase {
private static final int MAX_PUTS = 1000;
private static final int NUM_KEYS = 100;
private static final int NUM_PUB_THREADS = 2;
private static final int NUM_PUBS = 2;
private static final boolean RANDOMIZE_PUTS = true;
private static AtomicInteger updateEvents = new AtomicInteger();
private static AtomicInteger puts = new AtomicInteger();
private static AtomicReference<PubSubStats> subStatsRef = new AtomicReference<>();
private static AtomicReferenceArray<PubSubStats> pubStatsRef =
new AtomicReferenceArray<>(NUM_PUB_THREADS);
private static AtomicReference<RegionMembershipListener> rmlRef = new AtomicReference<>();
private File directory;
@Rule
public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder();
@Override
public final void postSetUp() throws Exception {
this.directory = this.temporaryFolder.getRoot();
}
@Override
public final void preTearDownCacheTestCase() throws Exception {
invokeInEveryVM(() -> cleanup());
disconnectAllFromDS(); // because this test enabled stat sampling!
}
@Test
public void testPubAndSubCustomStats() throws Exception {
String regionName = "region_" + getName();
VM[] pubs = new VM[NUM_PUBS];
for (int pubVM = 0; pubVM < NUM_PUBS; pubVM++) {
pubs[pubVM] = getHost(0).getVM(pubVM);
}
VM sub = getHost(0).getVM(NUM_PUBS);
for (VM pub : pubs) {
pub.invoke(() -> puts.set(0));
}
String subArchive =
this.directory.getAbsolutePath() + File.separator + getName() + "_sub" + ".gfs";
String[] pubArchives = new String[NUM_PUBS];
for (int pubVM = 0; pubVM < NUM_PUBS; pubVM++) {
pubArchives[pubVM] =
this.directory.getAbsolutePath() + File.separator + getName() + "_pub-" + pubVM + ".gfs";
}
for (int i = 0; i < NUM_PUBS; i++) {
final int pubVM = i;
pubs[pubVM].invoke("pub-connect-and-create-data-" + pubVM, () -> {
Properties props = new Properties();
props.setProperty(STATISTIC_SAMPLING_ENABLED, "true");
props.setProperty(STATISTIC_SAMPLE_RATE, "1000");
props.setProperty(STATISTIC_ARCHIVE_FILE, pubArchives[pubVM]);
InternalDistributedSystem system = getSystem(props);
// assert that sampler is working as expected
GemFireStatSampler sampler = system.getStatSampler();
assertTrue(sampler.isSamplingEnabled());
assertTrue(sampler.isAlive());
assertEquals(new File(pubArchives[pubVM]), sampler.getArchiveFileName());
await("awaiting SampleCollector to exist")
.until(() -> sampler.getSampleCollector() != null);
SampleCollector sampleCollector = sampler.getSampleCollector();
assertNotNull(sampleCollector);
StatArchiveHandler archiveHandler = sampleCollector.getStatArchiveHandler();
assertNotNull(archiveHandler);
assertTrue(archiveHandler.isArchiving());
// create cache and region
Cache cache = getCache();
RegionFactory<String, Number> factory = cache.createRegionFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
RegionMembershipListener rml = new RegionMembershipListener();
rmlRef.set(rml);
factory.addCacheListener(rml);
Region<String, Number> region = factory.create(regionName);
// create the keys
if (region.getAttributes().getScope() == Scope.DISTRIBUTED_ACK) {
for (int key = 0; key < NUM_KEYS; key++) {
region.create("KEY-" + key, null);
}
}
});
}
DistributedMember subMember = sub.invoke("sub-connect-and-create-keys", () -> {
Properties props = new Properties();
props.setProperty(STATISTIC_SAMPLING_ENABLED, "true");
props.setProperty(STATISTIC_SAMPLE_RATE, "1000");
props.setProperty(STATISTIC_ARCHIVE_FILE, subArchive);
InternalDistributedSystem system = getSystem(props);
PubSubStats statistics = new PubSubStats(system, "sub-1", 1);
subStatsRef.set(statistics);
// assert that sampler is working as expected
GemFireStatSampler sampler = system.getStatSampler();
assertTrue(sampler.isSamplingEnabled());
assertTrue(sampler.isAlive());
assertEquals(new File(subArchive), sampler.getArchiveFileName());
await("awaiting SampleCollector to exist")
.until(() -> sampler.getSampleCollector() != null);
SampleCollector sampleCollector = sampler.getSampleCollector();
assertNotNull(sampleCollector);
StatArchiveHandler archiveHandler = sampleCollector.getStatArchiveHandler();
assertNotNull(archiveHandler);
assertTrue(archiveHandler.isArchiving());
// create cache and region with UpdateListener
Cache cache = getCache();
RegionFactory<String, Number> factory = cache.createRegionFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
CacheListener<String, Number> cl = new UpdateListener(statistics);
factory.addCacheListener(cl);
Region<String, Number> region = factory.create(regionName);
// create the keys
if (region.getAttributes().getScope() == Scope.DISTRIBUTED_ACK) {
for (int key = 0; key < NUM_KEYS; key++) {
region.create("KEY-" + key, null);
}
}
assertEquals(0, statistics.getUpdateEvents());
return system.getDistributedMember();
});
for (int i = 0; i < NUM_PUBS; i++) {
final int pubVM = i;
AsyncInvocation[] publishers = new AsyncInvocation[NUM_PUB_THREADS];
for (int j = 0; j < NUM_PUB_THREADS; j++) {
final int pubThread = j;
publishers[pubThread] = pubs[pubVM]
.invokeAsync("pub-connect-and-put-data-" + pubVM + "-thread-" + pubThread, () -> {
PubSubStats statistics = new PubSubStats(basicGetSystem(), "pub-" + pubThread, pubVM);
pubStatsRef.set(pubThread, statistics);
RegionMembershipListener rml = rmlRef.get();
Region<String, Number> region = getCache().getRegion(regionName);
// assert that sub is in rml membership
assertNotNull(rml);
await("awaiting Membership to contain subMember")
.until(() -> rml.contains(subMember) && rml.size() == NUM_PUBS);
// publish lots of puts cycling through the NUM_KEYS
assertEquals(0, statistics.getPuts());
// cycle through the keys randomly
if (RANDOMIZE_PUTS) {
Random randomGenerator = new Random();
int key = 0;
for (int idx = 0; idx < MAX_PUTS; idx++) {
long start = statistics.startPut();
key = randomGenerator.nextInt(NUM_KEYS);
region.put("KEY-" + key, idx);
statistics.endPut(start);
}
// cycle through the keys in order and wrapping back around
} else {
int key = 0;
for (int idx = 0; idx < MAX_PUTS; idx++) {
long start = statistics.startPut();
region.put("KEY-" + key, idx);
key++; // cycle through the keys...
if (key >= NUM_KEYS) {
key = 0;
}
statistics.endPut(start);
}
}
assertEquals(MAX_PUTS, statistics.getPuts());
// wait for 2 samples to ensure all stats have been archived
StatisticsType statSamplerType = getSystem().findType("StatSampler");
Statistics[] statsArray = getSystem().findStatisticsByType(statSamplerType);
assertEquals(1, statsArray.length);
Statistics statSamplerStats = statsArray[0];
int initialSampleCount = statSamplerStats.getInt(StatSamplerStats.SAMPLE_COUNT);
await("awaiting sampleCount >= 2").until(() -> statSamplerStats
.getInt(StatSamplerStats.SAMPLE_COUNT) >= initialSampleCount + 2);
});
}
for (int pubThread = 0; pubThread < publishers.length; pubThread++) {
publishers[pubThread].join();
if (publishers[pubThread].exceptionOccurred()) {
fail("Test failed", publishers[pubThread].getException());
}
}
}
sub.invoke("sub-wait-for-samples", () -> {
// wait for 2 samples to ensure all stats have been archived
StatisticsType statSamplerType = getSystem().findType("StatSampler");
Statistics[] statsArray = getSystem().findStatisticsByType(statSamplerType);
assertEquals(1, statsArray.length);
Statistics statSamplerStats = statsArray[0];
int initialSampleCount = statSamplerStats.getInt(StatSamplerStats.SAMPLE_COUNT);
await("awaiting sampleCount >= 2").until(
() -> statSamplerStats.getInt(StatSamplerStats.SAMPLE_COUNT) >= initialSampleCount + 2);
// now post total updateEvents to static
PubSubStats statistics = subStatsRef.get();
assertNotNull(statistics);
updateEvents.set(statistics.getUpdateEvents());
});
// validate pub values against sub values
int totalUpdateEvents = sub.invoke(() -> getUpdateEvents());
// validate pub values against pub statistics against pub archive
for (int i = 0; i < NUM_PUBS; i++) {
final int pubIdx = i;
pubs[pubIdx].invoke("pub-validation", () -> {
// add up all the puts
assertEquals(NUM_PUB_THREADS, pubStatsRef.length());
int totalPuts = 0;
for (int pubThreadIdx = 0; pubThreadIdx < NUM_PUB_THREADS; pubThreadIdx++) {
PubSubStats statistics = pubStatsRef.get(pubThreadIdx);
assertNotNull(statistics);
totalPuts += statistics.getPuts();
}
// assert that total puts adds up to max puts times num threads
assertEquals(MAX_PUTS * NUM_PUB_THREADS, totalPuts);
// assert that archive file contains same values as statistics
File archive = new File(pubArchives[pubIdx]);
assertTrue(archive.exists());
StatArchiveReader reader = new StatArchiveReader(new File[] {archive}, null, false);
double combinedPuts = 0;
List resources = reader.getResourceInstList();
assertNotNull(resources);
assertFalse(resources.isEmpty());
for (Iterator<ResourceInst> iter = resources.iterator(); iter.hasNext();) {
ResourceInst ri = iter.next();
if (!ri.getType().getName().equals(PubSubStats.TYPE_NAME)) {
continue;
}
StatValue[] statValues = ri.getStatValues();
for (int idx = 0; idx < statValues.length; idx++) {
String statName = ri.getType().getStats()[idx].getName();
assertNotNull(statName);
if (statName.equals(PubSubStats.PUTS)) {
StatValue sv = statValues[idx];
sv.setFilter(StatValue.FILTER_NONE);
double mostRecent = sv.getSnapshotsMostRecent();
double min = sv.getSnapshotsMinimum();
double max = sv.getSnapshotsMaximum();
double maxMinusMin = sv.getSnapshotsMaximum() - sv.getSnapshotsMinimum();
double mean = sv.getSnapshotsAverage();
double stdDev = sv.getSnapshotsStandardDeviation();
assertEquals(mostRecent, max, 0f);
double summation = 0;
double[] rawSnapshots = sv.getRawSnapshots();
for (int j = 0; j < rawSnapshots.length; j++) {
summation += rawSnapshots[j];
}
assertEquals(mean, summation / sv.getSnapshotsSize(), 0);
combinedPuts += mostRecent;
}
}
}
// assert that sum of mostRecent values for all puts equals totalPuts
assertEquals((double) totalPuts, combinedPuts, 0);
puts.getAndAdd(totalPuts);
});
}
// validate pub values against sub values
int totalCombinedPuts = 0;
for (int i = 0; i < NUM_PUBS; i++) {
int pubIdx = i;
int totalPuts = pubs[pubIdx].invoke(() -> getPuts());
assertEquals(MAX_PUTS * NUM_PUB_THREADS, totalPuts);
totalCombinedPuts += totalPuts;
}
assertEquals(totalCombinedPuts, totalUpdateEvents);
assertEquals(MAX_PUTS * NUM_PUB_THREADS * NUM_PUBS, totalCombinedPuts);
// validate sub values against sub statistics against sub archive
final int totalPuts = totalCombinedPuts;
sub.invoke("sub-validation", () -> {
PubSubStats statistics = subStatsRef.get();
assertNotNull(statistics);
int updateEvents = statistics.getUpdateEvents();
assertEquals(totalPuts, updateEvents);
assertEquals(totalUpdateEvents, updateEvents);
assertEquals(MAX_PUTS * NUM_PUB_THREADS * NUM_PUBS, updateEvents);
// assert that archive file contains same values as statistics
File archive = new File(subArchive);
assertTrue(archive.exists());
StatArchiveReader reader = new StatArchiveReader(new File[] {archive}, null, false);
double combinedUpdateEvents = 0;
List resources = reader.getResourceInstList();
for (Iterator<ResourceInst> iter = resources.iterator(); iter.hasNext();) {
ResourceInst ri = iter.next();
if (!ri.getType().getName().equals(PubSubStats.TYPE_NAME)) {
continue;
}
StatValue[] statValues = ri.getStatValues();
for (int i = 0; i < statValues.length; i++) {
String statName = ri.getType().getStats()[i].getName();
assertNotNull(statName);
if (statName.equals(PubSubStats.UPDATE_EVENTS)) {
StatValue sv = statValues[i];
sv.setFilter(StatValue.FILTER_NONE);
double mostRecent = sv.getSnapshotsMostRecent();
double min = sv.getSnapshotsMinimum();
double max = sv.getSnapshotsMaximum();
double maxMinusMin = sv.getSnapshotsMaximum() - sv.getSnapshotsMinimum();
double mean = sv.getSnapshotsAverage();
double stdDev = sv.getSnapshotsStandardDeviation();
assertEquals(mostRecent, max, 0);
double summation = 0;
double[] rawSnapshots = sv.getRawSnapshots();
for (int j = 0; j < rawSnapshots.length; j++) {
summation += rawSnapshots[j];
}
assertEquals(mean, summation / sv.getSnapshotsSize(), 0);
combinedUpdateEvents += mostRecent;
}
}
}
assertEquals((double) totalUpdateEvents, combinedUpdateEvents, 0);
});
int updateEvents =
sub.invoke(() -> readIntStat(new File(subArchive), "PubSubStats", "updateEvents"));
assertTrue(updateEvents > 0);
assertEquals(MAX_PUTS * NUM_PUB_THREADS * NUM_PUBS, updateEvents);
int puts = 0;
for (int pubVM = 0; pubVM < NUM_PUBS; pubVM++) {
int currentPubVM = pubVM;
int vmPuts = pubs[pubVM]
.invoke(() -> readIntStat(new File(pubArchives[currentPubVM]), "PubSubStats", "puts"));
assertTrue(vmPuts > 0);
assertEquals(MAX_PUTS * NUM_PUB_THREADS, vmPuts);
puts += vmPuts;
}
assertTrue(puts > 0);
assertEquals(MAX_PUTS * NUM_PUB_THREADS * NUM_PUBS, puts);
// use regex "testPubAndSubCustomStats"
MultipleArchiveReader reader =
new MultipleArchiveReader(this.directory, ".*" + getTestMethodName() + ".*\\.gfs");
int combinedUpdateEvents = reader.readIntStat(PubSubStats.TYPE_NAME, PubSubStats.UPDATE_EVENTS);
assertTrue("Failed to read updateEvents stat values", combinedUpdateEvents > 0);
int combinedPuts = reader.readIntStat(PubSubStats.TYPE_NAME, PubSubStats.PUTS);
assertTrue("Failed to read puts stat values", combinedPuts > 0);
assertTrue("updateEvents is " + combinedUpdateEvents + " but puts is " + combinedPuts,
combinedUpdateEvents == combinedPuts);
}
static int readIntStat(final File archive, final String typeName, final String statName)
throws IOException {
MultipleArchiveReader reader = new MultipleArchiveReader(archive);
return reader.readIntStat(typeName, statName);
}
/** invoked by reflection */
private static void cleanup() {
updateEvents.set(0);
rmlRef.set(null);
}
/** invoked by reflection */
private static int getUpdateEvents() {
return updateEvents.get();
}
/** invoked by reflection */
private static int getPuts() {
return puts.get();
}
public static void main(final String[] args) throws Exception {
if (args.length == 2) {
final String statType = args[0];
final String statName = args[1];
MultipleArchiveReader reader = new MultipleArchiveReader(new File("."));
int value = reader.readIntStat(statType, statName);
System.out.println(statType + "#" + statName + "=" + value);
} else if (args.length == 3) {
final String archiveName = args[0];
final String statType = args[1];
final String statName = args[2];
File archive = new File(archiveName).getAbsoluteFile();
assertTrue("File " + archive + " does not exist!", archive.exists());
assertTrue(archive + " exists but is not a file!", archive.isFile());
MultipleArchiveReader reader = new MultipleArchiveReader(archive);
int value = reader.readIntStat(statType, statName);
System.out.println(archive + ": " + statType + "#" + statName + "=" + value);
} else if (args.length == 4) {
final String statType1 = args[0];
final String statName1 = args[1];
final String statType2 = args[2];
final String statName2 = args[3];
MultipleArchiveReader reader = new MultipleArchiveReader(new File("."));
int value1 = reader.readIntStat(statType1, statName1);
int value2 = reader.readIntStat(statType2, statName2);
assertTrue(statType1 + "#" + statName1 + "=" + value1 + " does not equal " + statType2 + "#"
+ statName2 + "=" + value2, value1 == value2);
} else {
assertEquals("Minimum two args are required: statType statName", 2, args.length);
}
}
/**
* @since GemFire 7.0
*/
static class PubSubStats {
private static final String TYPE_NAME = "PubSubStats";
private static final String TYPE_DESCRIPTION =
"Statistics for StatisticsDistributedTest with Pub/Sub.";
private static final String INSTANCE_PREFIX = "pubSubStats_";
private static final String PUTS = "puts";
private static final String PUT_TIME = "putTime";
private static final String UPDATE_EVENTS = "updateEvents";
private static StatisticsType createType(final StatisticsFactory f) {
StatisticsTypeFactory stf = StatisticsTypeFactoryImpl.singleton();
StatisticsType type = stf.createType(TYPE_NAME, TYPE_DESCRIPTION, createDescriptors(f));
return type;
}
private static StatisticDescriptor[] createDescriptors(final StatisticsFactory f) {
boolean largerIsBetter = true;
return new StatisticDescriptor[] {
f.createIntCounter(PUTS, "Number of puts completed.", "operations", largerIsBetter),
f.createLongCounter(PUT_TIME, "Total time spent doing puts.", "nanoseconds",
!largerIsBetter),
f.createIntCounter(UPDATE_EVENTS, "Number of update events.", "events", largerIsBetter)};
}
private final Statistics statistics;
PubSubStats(final StatisticsFactory f, final String name, final int id) {
this.statistics = f.createAtomicStatistics(createType(f), INSTANCE_PREFIX + "_" + name, id);
}
Statistics statistics() {
return this.statistics;
}
void close() {
this.statistics.close();
}
int getUpdateEvents() {
return statistics().getInt(UPDATE_EVENTS);
}
void incUpdateEvents() {
incUpdateEvents(1);
}
void incUpdateEvents(final int amount) {
incStat(UPDATE_EVENTS, amount);
}
int getPuts() {
return statistics().getInt(PUTS);
}
void incPuts() {
incPuts(1);
}
void incPuts(final int amount) {
incStat(PUTS, amount);
}
void incPutTime(final long amount) {
incStat(PUT_TIME, amount);
}
long startPut() {
return NanoTimer.getTime();
}
void endPut(final long start) {
endPut(start, 1);
}
void endPut(final long start, final int amount) {
long elapsed = NanoTimer.getTime() - start;
incPuts(amount);
incPutTime(elapsed);
}
private void incStat(final String statName, final int intValue) {
statistics().incInt(statName, intValue);
}
private void incStat(final String statName, final long longValue) {
statistics().incLong(statName, longValue);
}
}
/**
* @since GemFire 7.0
*/
static class UpdateListener extends CacheListenerAdapter<String, Number> {
private final PubSubStats statistics;
UpdateListener(final PubSubStats statistics) {
this.statistics = statistics;
}
@Override
public void afterUpdate(final EntryEvent<String, Number> event) {
this.statistics.incUpdateEvents(1);
}
}
/**
* @since GemFire 7.0
*/
static class RegionMembershipListener extends RegionMembershipListenerAdapter<String, Number> {
private final List<DistributedMember> members = new ArrayList<>();
int size() {
return this.members.size();
}
List<DistributedMember> getMembers() {
return Collections.unmodifiableList(new ArrayList<>(this.members));
}
boolean containsId(final DistributedMember member) {
for (DistributedMember peer : getMembers()) {
if (peer.getId().equals(member.getId())) {
return true;
}
}
return false;
}
boolean contains(final DistributedMember member) {
return this.members.contains(member);
}
String debugContains(final DistributedMember member) {
StringBuilder sb = new StringBuilder();
for (DistributedMember peer : getMembers()) {
if (!peer.equals(member)) {
InternalDistributedMember peerIDM = (InternalDistributedMember) peer;
InternalDistributedMember memberIDM = (InternalDistributedMember) member;
sb.append("peer port=").append(peerIDM.getPort()).append(" ");
sb.append("member port=").append(memberIDM.getPort()).append(" ");
}
}
return sb.toString();
}
@Override
public void initialMembers(final Region<String, Number> region,
final DistributedMember[] initialMembers) {
for (int i = 0; i < initialMembers.length; i++) {
this.members.add(initialMembers[i]);
}
}
@Override
public void afterRemoteRegionCreate(final RegionEvent<String, Number> event) {
this.members.add(event.getDistributedMember());
}
@Override
public void afterRemoteRegionDeparture(final RegionEvent<String, Number> event) {
this.members.remove(event.getDistributedMember());
}
@Override
public void afterRemoteRegionCrash(final RegionEvent<String, Number> event) {
this.members.remove(event.getDistributedMember());
}
}
static class MultipleArchiveReader {
private final File dir;
private final String regex;
MultipleArchiveReader(final File dir, final String regex) {
this.dir = dir;
this.regex = regex;
}
MultipleArchiveReader(final File dir) {
this.dir = dir;
this.regex = null;
}
int readIntStat(final String typeName, final String statName) throws IOException {
// directory (maybe directories) with one or more archives
if (this.dir.exists() && this.dir.isDirectory()) {
List<File> archives = findFilesWithSuffix(this.dir, this.regex, ".gfs");
return readIntStatFromArchives(archives, typeName, statName);
// one archive file
} else if (this.dir.exists() && this.dir.isFile()) {
List<File> archives = new ArrayList<File>();
archives.add(this.dir);
return readIntStatFromArchives(archives, typeName, statName);
// failure
} else {
throw new IllegalStateException(this.dir + " does not exist!");
}
}
private int readIntStatFromArchives(final List<File> archives, final String typeName,
final String statName) throws IOException {
StatValue[] statValues = readStatValues(archives, typeName, statName);
assertNotNull("statValues is null!", statValues);
assertTrue("statValues is empty!", statValues.length > 0);
int value = 0;
for (int i = 0; i < statValues.length; i++) {
statValues[i].setFilter(StatValue.FILTER_NONE);
value += (int) statValues[i].getSnapshotsMaximum();
}
return value;
}
private static List<File> findFilesWithSuffix(final File dir, final String regex,
final String suffix) {
Pattern p = null;
if (regex != null) {
p = Pattern.compile(regex);
}
final Pattern pattern = p;
return findFiles(dir, (final File file) -> {
boolean value = true;
if (regex != null) {
final Matcher matcher = pattern.matcher(file.getName());
value = matcher.matches();
}
if (suffix != null) {
value = value && file.getName().endsWith(suffix);
}
return value;
}, true);
}
private static List<File> findFiles(final File dir, final FileFilter filter,
final boolean recursive) {
File[] tmpfiles = dir.listFiles(filter);
List<File> matches;
if (tmpfiles == null) {
matches = new ArrayList<>();
} else {
matches = new ArrayList<>(Arrays.asList(tmpfiles));
}
if (recursive) {
File[] files = dir.listFiles();
if (files != null) {
for (int i = 0; i < files.length; i++) {
File file = files[i];
if (file.isDirectory()) {
matches.addAll(findFiles(file, filter, recursive));
}
}
}
}
return matches;
}
private static StatValue[] readStatValues(final List<File> archives, final String typeName,
final String statName) throws IOException {
final StatSpec statSpec = new StatSpec() {
@Override
public boolean archiveMatches(File value) {
return true;
}
@Override
public boolean typeMatches(String value) {
return typeName.equals(value);
}
@Override
public boolean statMatches(String value) {
return statName.equals(value);
}
@Override
public boolean instanceMatches(String textId, long numericId) {
return true;
}
@Override
public int getCombineType() {
return StatSpec.FILE;
}
};
File[] archiveFiles = archives.toArray(new File[archives.size()]);
StatSpec[] filters = new StatSpec[] {statSpec};
StatArchiveReader reader = new StatArchiveReader(archiveFiles, filters, true);
StatValue[] values = reader.matchSpec(statSpec);
return values;
}
}
}