blob: 5e53d58d658b3a3e3fd783a3046804cdabf78016 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.direct;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.theInstance;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaceForTest;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.GroupingState;
import org.apache.beam.sdk.state.MapState;
import org.apache.beam.sdk.state.SetState;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Tests for {@link CopyOnAccessInMemoryStateInternals}. */
@RunWith(JUnit4.class)
public class CopyOnAccessInMemoryStateInternalsTest {
@Rule public final TestPipeline pipeline = TestPipeline.create();
@Rule public ExpectedException thrown = ExpectedException.none();
private String key = "foo";
@Test
public void testGetWithEmpty() {
CopyOnAccessInMemoryStateInternals<String> internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
StateNamespace namespace = new StateNamespaceForTest("foo");
StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
BagState<String> stringBag = internals.state(namespace, bagTag);
assertThat(stringBag.read(), emptyIterable());
stringBag.add("bar");
stringBag.add("baz");
assertThat(stringBag.read(), containsInAnyOrder("baz", "bar"));
BagState<String> reReadStringBag = internals.state(namespace, bagTag);
assertThat(reReadStringBag.read(), containsInAnyOrder("baz", "bar"));
}
@Test
public void testGetWithAbsentInUnderlying() {
CopyOnAccessInMemoryStateInternals<String> underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
CopyOnAccessInMemoryStateInternals<String> internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
StateNamespace namespace = new StateNamespaceForTest("foo");
StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
BagState<String> stringBag = internals.state(namespace, bagTag);
assertThat(stringBag.read(), emptyIterable());
stringBag.add("bar");
stringBag.add("baz");
assertThat(stringBag.read(), containsInAnyOrder("baz", "bar"));
BagState<String> reReadVoidBag = internals.state(namespace, bagTag);
assertThat(reReadVoidBag.read(), containsInAnyOrder("baz", "bar"));
BagState<String> underlyingState = underlying.state(namespace, bagTag);
assertThat(underlyingState.read(), emptyIterable());
}
/**
* Tests that retrieving state with an underlying StateInternals with an existing value returns a
* value that initially has equal value to the provided state but can be modified without
* modifying the existing state.
*/
@Test
public void testGetWithPresentInUnderlying() {
CopyOnAccessInMemoryStateInternals<String> underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
StateNamespace namespace = new StateNamespaceForTest("foo");
StateTag<ValueState<String>> valueTag = StateTags.value("foo", StringUtf8Coder.of());
ValueState<String> underlyingValue = underlying.state(namespace, valueTag);
assertThat(underlyingValue.read(), nullValue(String.class));
underlyingValue.write("bar");
assertThat(underlyingValue.read(), equalTo("bar"));
CopyOnAccessInMemoryStateInternals<String> internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
ValueState<String> copyOnAccessState = internals.state(namespace, valueTag);
assertThat(copyOnAccessState.read(), equalTo("bar"));
copyOnAccessState.write("baz");
assertThat(copyOnAccessState.read(), equalTo("baz"));
assertThat(underlyingValue.read(), equalTo("bar"));
ValueState<String> reReadUnderlyingValue = underlying.state(namespace, valueTag);
assertThat(underlyingValue.read(), equalTo(reReadUnderlyingValue.read()));
}
@Test
public void testBagStateWithUnderlying() {
CopyOnAccessInMemoryStateInternals<String> underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
StateNamespace namespace = new StateNamespaceForTest("foo");
StateTag<BagState<Integer>> valueTag = StateTags.bag("foo", VarIntCoder.of());
BagState<Integer> underlyingValue = underlying.state(namespace, valueTag);
assertThat(underlyingValue.read(), emptyIterable());
underlyingValue.add(1);
assertThat(underlyingValue.read(), containsInAnyOrder(1));
CopyOnAccessInMemoryStateInternals<String> internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
BagState<Integer> copyOnAccessState = internals.state(namespace, valueTag);
assertThat(copyOnAccessState.read(), containsInAnyOrder(1));
copyOnAccessState.add(4);
assertThat(copyOnAccessState.read(), containsInAnyOrder(4, 1));
assertThat(underlyingValue.read(), containsInAnyOrder(1));
BagState<Integer> reReadUnderlyingValue = underlying.state(namespace, valueTag);
assertThat(
Lists.newArrayList(underlyingValue.read()),
equalTo(Lists.newArrayList(reReadUnderlyingValue.read())));
}
@Test
public void testSetStateWithUnderlying() {
CopyOnAccessInMemoryStateInternals<String> underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
StateNamespace namespace = new StateNamespaceForTest("foo");
StateTag<SetState<Integer>> valueTag = StateTags.set("foo", VarIntCoder.of());
SetState<Integer> underlyingValue = underlying.state(namespace, valueTag);
assertThat(underlyingValue.read(), emptyIterable());
underlyingValue.add(1);
assertThat(underlyingValue.read(), containsInAnyOrder(1));
CopyOnAccessInMemoryStateInternals<String> internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
SetState<Integer> copyOnAccessState = internals.state(namespace, valueTag);
assertThat(copyOnAccessState.read(), containsInAnyOrder(1));
copyOnAccessState.add(4);
assertThat(copyOnAccessState.read(), containsInAnyOrder(4, 1));
assertThat(underlyingValue.read(), containsInAnyOrder(1));
SetState<Integer> reReadUnderlyingValue = underlying.state(namespace, valueTag);
assertThat(underlyingValue.read(), equalTo(reReadUnderlyingValue.read()));
}
@Test
public void testMapStateWithUnderlying() {
CopyOnAccessInMemoryStateInternals<String> underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
StateNamespace namespace = new StateNamespaceForTest("foo");
StateTag<MapState<String, Integer>> valueTag =
StateTags.map("foo", StringUtf8Coder.of(), VarIntCoder.of());
MapState<String, Integer> underlyingValue = underlying.state(namespace, valueTag);
assertThat(underlyingValue.entries().read(), emptyIterable());
underlyingValue.put("hello", 1);
assertThat(underlyingValue.get("hello").read(), equalTo(1));
CopyOnAccessInMemoryStateInternals<String> internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
MapState<String, Integer> copyOnAccessState = internals.state(namespace, valueTag);
assertThat(copyOnAccessState.get("hello").read(), equalTo(1));
copyOnAccessState.put("world", 4);
assertThat(copyOnAccessState.get("hello").read(), equalTo(1));
assertThat(copyOnAccessState.get("world").read(), equalTo(4));
assertThat(underlyingValue.get("hello").read(), equalTo(1));
assertNull(underlyingValue.get("world").read());
MapState<String, Integer> reReadUnderlyingValue = underlying.state(namespace, valueTag);
assertThat(underlyingValue.entries().read(), equalTo(reReadUnderlyingValue.entries().read()));
}
@Test
public void testAccumulatorCombiningStateWithUnderlying() throws CannotProvideCoderException {
CopyOnAccessInMemoryStateInternals<String> underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
CombineFn<Long, long[], Long> sumLongFn = Sum.ofLongs();
StateNamespace namespace = new StateNamespaceForTest("foo");
CoderRegistry reg = pipeline.getCoderRegistry();
StateTag<CombiningState<Long, long[], Long>> stateTag =
StateTags.combiningValue(
"summer", sumLongFn.getAccumulatorCoder(reg, reg.getCoder(Long.class)), sumLongFn);
GroupingState<Long, Long> underlyingValue = underlying.state(namespace, stateTag);
assertThat(underlyingValue.read(), equalTo(0L));
underlyingValue.add(1L);
assertThat(underlyingValue.read(), equalTo(1L));
CopyOnAccessInMemoryStateInternals<String> internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
GroupingState<Long, Long> copyOnAccessState = internals.state(namespace, stateTag);
assertThat(copyOnAccessState.read(), equalTo(1L));
copyOnAccessState.add(4L);
assertThat(copyOnAccessState.read(), equalTo(5L));
assertThat(underlyingValue.read(), equalTo(1L));
GroupingState<Long, Long> reReadUnderlyingValue = underlying.state(namespace, stateTag);
assertThat(underlyingValue.read(), equalTo(reReadUnderlyingValue.read()));
}
@Test
public void testWatermarkHoldStateWithUnderlying() {
CopyOnAccessInMemoryStateInternals<String> underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
TimestampCombiner timestampCombiner = TimestampCombiner.EARLIEST;
StateNamespace namespace = new StateNamespaceForTest("foo");
StateTag<WatermarkHoldState> stateTag =
StateTags.watermarkStateInternal("wmstate", timestampCombiner);
WatermarkHoldState underlyingValue = underlying.state(namespace, stateTag);
assertThat(underlyingValue.read(), nullValue());
underlyingValue.add(new Instant(250L));
assertThat(underlyingValue.read(), equalTo(new Instant(250L)));
CopyOnAccessInMemoryStateInternals<String> internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
WatermarkHoldState copyOnAccessState = internals.state(namespace, stateTag);
assertThat(copyOnAccessState.read(), equalTo(new Instant(250L)));
copyOnAccessState.add(new Instant(100L));
assertThat(copyOnAccessState.read(), equalTo(new Instant(100L)));
assertThat(underlyingValue.read(), equalTo(new Instant(250L)));
copyOnAccessState.add(new Instant(500L));
assertThat(copyOnAccessState.read(), equalTo(new Instant(100L)));
WatermarkHoldState reReadUnderlyingValue = underlying.state(namespace, stateTag);
assertThat(underlyingValue.read(), equalTo(reReadUnderlyingValue.read()));
}
@Test
public void testCommitWithoutUnderlying() {
CopyOnAccessInMemoryStateInternals<String> internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
StateNamespace namespace = new StateNamespaceForTest("foo");
StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
BagState<String> stringBag = internals.state(namespace, bagTag);
assertThat(stringBag.read(), emptyIterable());
stringBag.add("bar");
stringBag.add("baz");
assertThat(stringBag.read(), containsInAnyOrder("baz", "bar"));
internals.commit();
BagState<String> reReadStringBag = internals.state(namespace, bagTag);
assertThat(reReadStringBag.read(), containsInAnyOrder("baz", "bar"));
assertThat(internals.isEmpty(), is(false));
}
@Test
public void testCommitWithUnderlying() {
CopyOnAccessInMemoryStateInternals<String> underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
CopyOnAccessInMemoryStateInternals<String> internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
StateNamespace namespace = new StateNamespaceForTest("foo");
StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
BagState<String> stringBag = underlying.state(namespace, bagTag);
assertThat(stringBag.read(), emptyIterable());
stringBag.add("bar");
stringBag.add("baz");
internals.commit();
BagState<String> reReadStringBag = internals.state(namespace, bagTag);
assertThat(reReadStringBag.read(), containsInAnyOrder("baz", "bar"));
reReadStringBag.add("spam");
BagState<String> underlyingState = underlying.state(namespace, bagTag);
assertThat(underlyingState.read(), containsInAnyOrder("spam", "bar", "baz"));
assertThat(underlyingState, is(theInstance(stringBag)));
assertThat(internals.isEmpty(), is(false));
}
@Test
public void testCommitWithClearedInUnderlying() {
CopyOnAccessInMemoryStateInternals<String> underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
CopyOnAccessInMemoryStateInternals<String> secondUnderlying =
spy(CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying));
CopyOnAccessInMemoryStateInternals<String> internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, secondUnderlying);
StateNamespace namespace = new StateNamespaceForTest("foo");
StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
BagState<String> stringBag = underlying.state(namespace, bagTag);
assertThat(stringBag.read(), emptyIterable());
stringBag.add("bar");
stringBag.add("baz");
stringBag.clear();
// We should not read through the cleared bag
secondUnderlying.commit();
// Should not be visible
stringBag.add("foo");
internals.commit();
BagState<String> internalsStringBag = internals.state(namespace, bagTag);
assertThat(internalsStringBag.read(), emptyIterable());
verify(secondUnderlying, never()).state(namespace, bagTag);
assertThat(internals.isEmpty(), is(false));
}
@Test
public void testCommitWithOverwrittenUnderlying() {
CopyOnAccessInMemoryStateInternals<String> underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
CopyOnAccessInMemoryStateInternals<String> internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
StateNamespace namespace = new StateNamespaceForTest("foo");
StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
BagState<String> stringBag = underlying.state(namespace, bagTag);
assertThat(stringBag.read(), emptyIterable());
stringBag.add("bar");
stringBag.add("baz");
BagState<String> internalsState = internals.state(namespace, bagTag);
internalsState.add("eggs");
internalsState.add("ham");
internalsState.add("0x00ff00");
internalsState.add("&");
internals.commit();
BagState<String> reReadInternalState = internals.state(namespace, bagTag);
assertThat(
reReadInternalState.read(),
containsInAnyOrder("bar", "baz", "0x00ff00", "eggs", "&", "ham"));
BagState<String> reReadUnderlyingState = underlying.state(namespace, bagTag);
assertThat(reReadUnderlyingState.read(), containsInAnyOrder("bar", "baz"));
}
@Test
public void testCommitWithAddedUnderlying() {
CopyOnAccessInMemoryStateInternals<String> underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
CopyOnAccessInMemoryStateInternals<String> internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
internals.commit();
StateNamespace namespace = new StateNamespaceForTest("foo");
StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
BagState<String> stringBag = underlying.state(namespace, bagTag);
assertThat(stringBag.read(), emptyIterable());
stringBag.add("bar");
stringBag.add("baz");
BagState<String> internalState = internals.state(namespace, bagTag);
assertThat(internalState.read(), emptyIterable());
BagState<String> reReadUnderlyingState = underlying.state(namespace, bagTag);
assertThat(reReadUnderlyingState.read(), containsInAnyOrder("bar", "baz"));
}
@Test
public void testCommitWithEmptyTableIsEmpty() {
CopyOnAccessInMemoryStateInternals<String> internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
internals.commit();
assertThat(internals.isEmpty(), is(true));
}
@Test
public void testCommitWithOnlyClearedValuesIsEmpty() {
CopyOnAccessInMemoryStateInternals<String> internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
StateNamespace namespace = new StateNamespaceForTest("foo");
StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
BagState<String> stringBag = internals.state(namespace, bagTag);
assertThat(stringBag.read(), emptyIterable());
stringBag.add("foo");
stringBag.clear();
internals.commit();
assertThat(internals.isEmpty(), is(true));
}
@Test
public void testCommitWithEmptyNewAndFullUnderlyingIsNotEmpty() {
CopyOnAccessInMemoryStateInternals<String> underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
CopyOnAccessInMemoryStateInternals<String> internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
StateNamespace namespace = new StateNamespaceForTest("foo");
StateTag<BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
BagState<String> stringBag = underlying.state(namespace, bagTag);
assertThat(stringBag.read(), emptyIterable());
stringBag.add("bar");
stringBag.add("baz");
internals.commit();
assertThat(internals.isEmpty(), is(false));
}
@Test
public void testGetEarliestWatermarkHoldAfterCommit() {
BoundedWindow first =
new BoundedWindow() {
@Override
public Instant maxTimestamp() {
return new Instant(2048L);
}
};
BoundedWindow second =
new BoundedWindow() {
@Override
public Instant maxTimestamp() {
return new Instant(689743L);
}
};
CopyOnAccessInMemoryStateInternals<String> internals =
CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null);
StateTag<WatermarkHoldState> firstHoldAddress =
StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
WatermarkHoldState firstHold =
internals.state(StateNamespaces.window(null, first), firstHoldAddress);
firstHold.add(new Instant(22L));
StateTag<WatermarkHoldState> secondHoldAddress =
StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
WatermarkHoldState secondHold =
internals.state(StateNamespaces.window(null, second), secondHoldAddress);
secondHold.add(new Instant(2L));
internals.commit();
assertThat(internals.getEarliestWatermarkHold(), equalTo(new Instant(2L)));
}
@Test
public void testGetEarliestWatermarkHoldWithEarliestInUnderlyingTable() {
BoundedWindow first =
new BoundedWindow() {
@Override
public Instant maxTimestamp() {
return new Instant(2048L);
}
};
BoundedWindow second =
new BoundedWindow() {
@Override
public Instant maxTimestamp() {
return new Instant(689743L);
}
};
CopyOnAccessInMemoryStateInternals<String> underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null);
StateTag<WatermarkHoldState> firstHoldAddress =
StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
WatermarkHoldState firstHold =
underlying.state(StateNamespaces.window(null, first), firstHoldAddress);
firstHold.add(new Instant(22L));
CopyOnAccessInMemoryStateInternals<String> internals =
CopyOnAccessInMemoryStateInternals.withUnderlying("foo", underlying.commit());
StateTag<WatermarkHoldState> secondHoldAddress =
StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
WatermarkHoldState secondHold =
internals.state(StateNamespaces.window(null, second), secondHoldAddress);
secondHold.add(new Instant(244L));
internals.commit();
assertThat(internals.getEarliestWatermarkHold(), equalTo(new Instant(22L)));
}
@Test
public void testGetEarliestWatermarkHoldWithEarliestInNewTable() {
BoundedWindow first =
new BoundedWindow() {
@Override
public Instant maxTimestamp() {
return new Instant(2048L);
}
};
BoundedWindow second =
new BoundedWindow() {
@Override
public Instant maxTimestamp() {
return new Instant(689743L);
}
};
CopyOnAccessInMemoryStateInternals<String> underlying =
CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null);
StateTag<WatermarkHoldState> firstHoldAddress =
StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
WatermarkHoldState firstHold =
underlying.state(StateNamespaces.window(null, first), firstHoldAddress);
firstHold.add(new Instant(224L));
CopyOnAccessInMemoryStateInternals<String> internals =
CopyOnAccessInMemoryStateInternals.withUnderlying("foo", underlying.commit());
StateTag<WatermarkHoldState> secondHoldAddress =
StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST);
WatermarkHoldState secondHold =
internals.state(StateNamespaces.window(null, second), secondHoldAddress);
secondHold.add(new Instant(24L));
internals.commit();
assertThat(internals.getEarliestWatermarkHold(), equalTo(new Instant(24L)));
}
@Test
public void testGetEarliestHoldBeforeCommit() {
CopyOnAccessInMemoryStateInternals<String> internals =
CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
internals
.state(
StateNamespaces.global(),
StateTags.watermarkStateInternal("foo", TimestampCombiner.EARLIEST))
.add(new Instant(1234L));
thrown.expect(IllegalStateException.class);
thrown.expectMessage(CopyOnAccessInMemoryStateInternals.class.getSimpleName());
thrown.expectMessage("Can't get the earliest watermark hold");
thrown.expectMessage("before it is committed");
internals.getEarliestWatermarkHold();
}
}