blob: e7be06cc34986f6810f6ad9c507dc0d1a6cba718 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.datastructures;
import java.io.Externalizable;
import java.io.IOException;
import java.io.InvalidObjectException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.ObjectStreamException;
import java.util.Collection;
import java.util.Iterator;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteSet;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheGateway;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteRunnable;
import org.jetbrains.annotations.NotNull;
/**
* Cache set proxy.
*/
public class GridCacheSetProxy<T> implements IgniteSet<T>, Externalizable {
/** */
private static final long serialVersionUID = 0L;
/** Deserialization stash. */
private static final ThreadLocal<T3<GridKernalContext, String, String>> stash =
new ThreadLocal<T3<GridKernalContext, String, String>>() {
@Override protected T3<GridKernalContext, String, String> initialValue() {
return new T3<>();
}
};
/** Delegate set. */
private GridCacheSetImpl<T> delegate;
/** Cache context. */
private GridCacheContext cctx;
/** Cache gateway. */
private GridCacheGateway gate;
/** Busy lock. */
private GridSpinBusyLock busyLock;
/** Check removed flag. */
private boolean rmvCheck;
/**
* Required by {@link Externalizable}.
*/
public GridCacheSetProxy() {
// No-op.
}
/**
* @param cctx Cache context.
* @param delegate Delegate set.
*/
public GridCacheSetProxy(GridCacheContext cctx, GridCacheSetImpl<T> delegate) {
this.cctx = cctx;
this.delegate = delegate;
gate = cctx.gate();
busyLock = new GridSpinBusyLock();
}
/**
* @return Set delegate.
*/
public GridCacheSetImpl delegate() {
return delegate;
}
/**
* Remove callback.
*/
public void blockOnRemove() {
delegate.removed(true);
busyLock.block();
}
/** {@inheritDoc} */
@Override public int size() {
enterBusy();
try {
gate.enter();
try {
return delegate.size();
}
finally {
gate.leave();
}
}
finally {
leaveBusy();
}
}
/** {@inheritDoc} */
@Override public boolean isEmpty() {
enterBusy();
try {
gate.enter();
try {
return delegate.isEmpty();
}
finally {
gate.leave();
}
}
finally {
leaveBusy();
}
}
/** {@inheritDoc} */
@Override public boolean contains(final Object o) {
enterBusy();
try {
gate.enter();
try {
return delegate.contains(o);
}
finally {
gate.leave();
}
}
finally {
leaveBusy();
}
}
/** {@inheritDoc} */
@NotNull @Override public Object[] toArray() {
enterBusy();
try {
gate.enter();
try {
return delegate.toArray();
}
finally {
gate.leave();
}
}
finally {
leaveBusy();
}
}
/** {@inheritDoc} */
@NotNull @Override public <T1> T1[] toArray(final T1[] a) {
enterBusy();
try {
gate.enter();
try {
return delegate.toArray(a);
}
finally {
gate.leave();
}
}
finally {
leaveBusy();
}
}
/** {@inheritDoc} */
@Override public boolean add(final T t) {
enterBusy();
try {
gate.enter();
try {
return delegate.add(t);
}
finally {
gate.leave();
}
}
finally {
leaveBusy();
}
}
/** {@inheritDoc} */
@Override public boolean remove(final Object o) {
enterBusy();
try {
gate.enter();
try {
return delegate.remove(o);
}
finally {
gate.leave();
}
}
finally {
leaveBusy();
}
}
/** {@inheritDoc} */
@Override public boolean containsAll(final Collection<?> c) {
enterBusy();
try {
gate.enter();
try {
return delegate.containsAll(c);
}
finally {
gate.leave();
}
}
finally {
leaveBusy();
}
}
/** {@inheritDoc} */
@Override public boolean addAll(final Collection<? extends T> c) {
enterBusy();
try {
gate.enter();
try {
return delegate.addAll(c);
}
finally {
gate.leave();
}
}
finally {
leaveBusy();
}
}
/** {@inheritDoc} */
@Override public boolean retainAll(final Collection<?> c) {
enterBusy();
try {
gate.enter();
try {
return delegate.retainAll(c);
}
finally {
gate.leave();
}
}
finally {
leaveBusy();
}
}
/** {@inheritDoc} */
@Override public boolean removeAll(final Collection<?> c) {
enterBusy();
try {
gate.enter();
try {
return delegate.removeAll(c);
}
finally {
gate.leave();
}
}
finally {
leaveBusy();
}
}
/** {@inheritDoc} */
@Override public void clear() {
enterBusy();
try {
gate.enter();
try {
delegate.clear();
}
finally {
gate.leave();
}
}
finally {
leaveBusy();
}
}
/** {@inheritDoc} */
@Override public Iterator<T> iterator() {
enterBusy();
try {
gate.enter();
try {
return delegate.iterator();
}
finally {
gate.leave();
}
}
finally {
leaveBusy();
}
}
/** {@inheritDoc} */
@Override public void close() {
IgniteFuture<Boolean> destroyFut = null;
gate.enter();
try {
delegate.close();
if (delegate.separated()) {
IgniteInternalFuture<Boolean> fut = cctx.kernalContext().cache().dynamicDestroyCache(
cctx.cache().name(), false, true, false, null);
((GridFutureAdapter)fut).ignoreInterrupts();
destroyFut = new IgniteFutureImpl<>(fut);
}
}
finally {
gate.leave();
}
if (destroyFut != null)
destroyFut.get();
}
/** {@inheritDoc} */
@Override public String name() {
return delegate.name();
}
/** @return Group name for queue. */
public String groupName() {
return cctx.group().name();
}
/** @return Group id for queue. */
public int groupId() {
return cctx.group().groupId();
}
/** {@inheritDoc} */
@Override public boolean collocated() {
return delegate.collocated();
}
/** {@inheritDoc} */
@Override public boolean removed() {
return delegate.removed();
}
/** {@inheritDoc} */
@Override public void affinityRun(final IgniteRunnable job) {
delegate.affinityRun(job);
}
/** {@inheritDoc} */
@Override public <R> R affinityCall(final IgniteCallable<R> job) {
return delegate.affinityCall(job);
}
/** {@inheritDoc} */
@Override public <T1> IgniteSet<T1> withKeepBinary() {
return new GridCacheSetProxy<>(cctx, (GridCacheSetImpl<T1>)delegate.<T1>withKeepBinary());
}
/**
* Enters busy state.
*/
private void enterBusy() {
boolean rmvd;
if (rmvCheck) {
try {
rmvd = !delegate().checkHeader();
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
rmvCheck = false;
if (rmvd) {
delegate.removed(true);
cctx.dataStructures().onRemoved(this);
throw removedError();
}
}
if (!busyLock.enterBusy())
throw removedError();
}
/**
*
*/
public void needCheckNotRemoved() {
rmvCheck = true;
}
/**
* @return Error.
*/
private IllegalStateException removedError() {
return new IllegalStateException("Set has been removed from cache: " + delegate);
}
/**
* Leaves busy state.
*/
private void leaveBusy() {
busyLock.leaveBusy();
}
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(cctx.kernalContext());
U.writeString(out, name());
U.writeString(out, cctx.group().name());
}
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
T3<GridKernalContext, String, String> t = stash.get();
t.set1((GridKernalContext)in.readObject());
t.set2(U.readString(in));
t.set3(U.readString(in));
}
/**
* Reconstructs object on unmarshalling.
*
* @return Reconstructed object.
* @throws ObjectStreamException Thrown in case of unmarshalling error.
*/
protected Object readResolve() throws ObjectStreamException {
try {
T3<GridKernalContext, String, String> t = stash.get();
return t.get1().dataStructures().set(t.get2(), t.get3(), null);
}
catch (IgniteCheckedException e) {
throw U.withCause(new InvalidObjectException(e.getMessage()), e);
}
finally {
stash.remove();
}
}
/** {@inheritDoc} */
@Override public String toString() {
return delegate.toString();
}
}