blob: 79b960a388434115085537477a42415336f28b0c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.util;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
/**
* Static methods for creating and working with {@link MutationDetector}.
*/
public class MutationDetectors {
private MutationDetectors() {}
/**
* Creates a new {@code MutationDetector} for the provided {@code value} that uses the provided
* {@link Coder} to perform deep copies and comparisons by serializing and deserializing values.
*
* <p>It is permissible for {@code value} to be {@code null}. Since {@code null} is immutable,
* the mutation check will always succeed.
*/
public static <T> MutationDetector forValueWithCoder(T value, Coder<T> coder)
throws CoderException {
if (value == null) {
return noopMutationDetector();
} else {
return new CodedValueMutationDetector<>(value, coder);
}
}
/**
* Creates a new {@code MutationDetector} that always succeeds.
*
* <p>This is useful, for example, for providing a very efficient mutation detector for a value
* which is already immutable by design.
*/
public static MutationDetector noopMutationDetector() {
return new NoopMutationDetector();
}
/**
* A {@link MutationDetector} for {@code null}, which is immutable.
*/
private static class NoopMutationDetector implements MutationDetector {
@Override
public void verifyUnmodified() { }
@Override
public void close() { }
}
/**
* Given a value of type {@code T} and a {@link Coder} for that type, provides facilities to save
* check that the value has not changed.
*
* @param <T> the type of values checked for mutation
*/
private static class CodedValueMutationDetector<T> implements MutationDetector {
private final Coder<T> coder;
private final T clonedOriginalValue;
/**
* A saved pointer to an in-memory value provided upon construction, which we will check for
* forbidden mutations.
*/
private final T possiblyModifiedObject;
/**
* A saved encoded copy of the same value as {@link #possiblyModifiedObject}. Naturally, it
* will not change if {@link #possiblyModifiedObject} is mutated.
*/
private final byte[] encodedOriginalObject;
/**
* The object decoded from {@link #encodedOriginalObject}. It will be used during every call to
* {@link #verifyUnmodified}, which could be called many times throughout the lifetime of this
* {@link CodedValueMutationDetector}.
*/
private final T clonedOriginalObject;
/**
* The structural value from {@link #possiblyModifiedObject}. It will be used during every call
* to {@link #verifyUnmodified}, which could be called many times throughout the lifetime of
* this {@link CodedValueMutationDetector}.
*/
private final Object originalStructuralValue;
/**
* Create a mutation detector for the provided {@code value}, using the provided {@link Coder}
* for cloning and checking serialized forms for equality.
*/
public CodedValueMutationDetector(T value, Coder<T> coder) throws CoderException {
this.coder = coder;
// We need to clone the original value before getting it's structural value.
// If the object is consistent with equals, the Structural value will be the
// exact same object reference making it impossible to detect changes.
clonedOriginalValue = CoderUtils.clone(coder, value);
this.originalStructuralValue = coder.structuralValue(clonedOriginalValue);
this.possiblyModifiedObject = value;
this.encodedOriginalObject = CoderUtils.encodeToByteArray(coder, value);
this.clonedOriginalObject = CoderUtils.decodeFromByteArray(coder, encodedOriginalObject);
}
@Override
public void verifyUnmodified() {
try {
verifyUnmodifiedThrowingCheckedExceptions();
} catch (CoderException exn) {
throw new RuntimeException(exn);
}
}
private void verifyUnmodifiedThrowingCheckedExceptions() throws CoderException {
// Since there is no guarantee that cloning an object via the coder will
// return the exact same type as value, We are cloning the possiblyModifiedObject
// before getting it's structural value. This way we are guaranteed to compare the same
// types.
T possiblyModifiedClonedValue = CoderUtils.clone(coder, possiblyModifiedObject);
Object newStructuralValue = coder.structuralValue(possiblyModifiedClonedValue);
if (originalStructuralValue.equals(newStructuralValue)) {
return;
}
illegalMutation(clonedOriginalObject, possiblyModifiedClonedValue);
}
private void illegalMutation(T previousValue, T newValue) throws CoderException {
throw new IllegalMutationException(
String.format("Value %s mutated illegally, new value was %s."
+ " Encoding was %s, now %s.",
previousValue, newValue,
CoderUtils.encodeToBase64(coder, previousValue),
CoderUtils.encodeToBase64(coder, newValue)),
previousValue, newValue);
}
@Override
public void close() {
verifyUnmodified();
}
}
}