| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.pdx.internal; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.logging.log4j.Logger; |
| |
| import org.apache.geode.InternalGemFireError; |
| import org.apache.geode.cache.client.Pool; |
| import org.apache.geode.cache.client.ServerConnectivityException; |
| import org.apache.geode.cache.client.internal.AddPDXEnumOp; |
| import org.apache.geode.cache.client.internal.AddPDXTypeOp; |
| import org.apache.geode.cache.client.internal.ExecutablePool; |
| import org.apache.geode.cache.client.internal.GetPDXEnumByIdOp; |
| import org.apache.geode.cache.client.internal.GetPDXEnumsOp; |
| import org.apache.geode.cache.client.internal.GetPDXIdForEnumOp; |
| import org.apache.geode.cache.client.internal.GetPDXIdForTypeOp; |
| import org.apache.geode.cache.client.internal.GetPDXTypeByIdOp; |
| import org.apache.geode.cache.client.internal.GetPDXTypesOp; |
| import org.apache.geode.cache.client.internal.PoolImpl; |
| import org.apache.geode.cache.wan.GatewaySender; |
| import org.apache.geode.internal.cache.InternalCache; |
| import org.apache.geode.internal.cache.PoolManagerImpl; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| |
| public class ClientTypeRegistration implements TypeRegistration { |
| |
| private static final Logger logger = LogService.getLogger(); |
| |
| private final InternalCache cache; |
| |
| public ClientTypeRegistration(InternalCache cache) { |
| this.cache = cache; |
| |
| // See GEODE-5771: Even when set, PDX persistence is internally ignored. |
| if (cache.getPdxPersistent() || StringUtils.isNotBlank(cache.getPdxDiskStore())) { |
| logger.warn("PDX persistence is not supported on client side."); |
| } |
| } |
| |
| @Override |
| public int defineType(PdxType newType) { |
| Collection<Pool> pools = getAllPools(); |
| |
| ServerConnectivityException lastException = null; |
| int newTypeId = -1; |
| for (Pool pool : pools) { |
| try { |
| newTypeId = GetPDXIdForTypeOp.execute((ExecutablePool) pool, newType); |
| newType.setTypeId(newTypeId); |
| copyTypeToOtherPools(newType, newTypeId, pool); |
| return newTypeId; |
| } catch (ServerConnectivityException e) { |
| // ignore, try the next pool. |
| lastException = e; |
| } |
| } |
| throw returnCorrectExceptionForFailure(pools, newTypeId, lastException); |
| } |
| |
| /** |
| * Send a type to all pools. This used to make sure that any types |
| * used by this client make it to all clusters this client is connected to. |
| */ |
| private void copyTypeToOtherPools(PdxType newType, int newTypeId, Pool exception) { |
| Collection<Pool> pools = getAllPoolsExcept(exception); |
| for (Pool pool : pools) { |
| try { |
| sendTypeToPool(newType, newTypeId, pool); |
| } catch (ServerConnectivityException e) { |
| logger.debug("Received an exception sending pdx type to pool {}, {}", pool, e.getMessage(), |
| e); |
| } |
| } |
| } |
| |
| private Collection<Pool> getAllPoolsExcept(Pool pool) { |
| Collection<Pool> targetPools = new ArrayList<>(getAllPools()); |
| targetPools.remove(pool); |
| return targetPools; |
| } |
| |
| private void sendTypeToPool(PdxType type, int id, Pool pool) { |
| try { |
| AddPDXTypeOp.execute((ExecutablePool) pool, id, type); |
| } catch (ServerConnectivityException serverConnectivityException) { |
| logger.debug("Received an exception sending pdx type to pool {}, {}", pool, |
| serverConnectivityException.getMessage(), serverConnectivityException); |
| throw serverConnectivityException; |
| } |
| } |
| |
| @Override |
| public PdxType getType(int typeId) { |
| Collection<Pool> pools = getAllPools(); |
| |
| ServerConnectivityException lastException = null; |
| for (Pool pool : pools) { |
| try { |
| PdxType type = GetPDXTypeByIdOp.execute((ExecutablePool) pool, typeId); |
| if (type != null) { |
| return type; |
| } |
| } catch (ServerConnectivityException e) { |
| logger.debug("Received an exception getting pdx type from pool {}, {}", pool, |
| e.getMessage(), e); |
| // ignore, try the next pool. |
| lastException = e; |
| } |
| } |
| |
| if (lastException != null) { |
| throw lastException; |
| } else { |
| throw returnCorrectExceptionForFailure(pools, typeId, lastException); |
| } |
| } |
| |
| private Collection<Pool> getAllPools() { |
| Collection<Pool> pools = PoolManagerImpl.getPMI().getMap().values(); |
| |
| for (Iterator<Pool> itr = pools.iterator(); itr.hasNext();) { |
| PoolImpl pool = (PoolImpl) itr.next(); |
| if (pool.isUsedByGateway()) { |
| itr.remove(); |
| } |
| } |
| |
| if (pools.isEmpty()) { |
| if (this.cache.isClosed()) { |
| throw cache.getCacheClosedException("PDX detected cache was closed"); |
| } |
| throw cache.getCacheClosedException( |
| "Client pools have been closed so the PDX type registry is not available."); |
| } |
| return pools; |
| } |
| |
| @Override |
| public void addRemoteType(int typeId, PdxType type) { |
| throw new UnsupportedOperationException("Clients will not be asked to add remote types"); |
| } |
| |
| public int getLastAllocatedTypeId() { |
| throw new UnsupportedOperationException("Clients does not keep track of last allocated id"); |
| } |
| |
| @Override |
| public void initialize() { |
| // do nothing |
| } |
| |
| @Override |
| public void gatewaySenderStarted(GatewaySender gatewaySender) { |
| // do nothing |
| } |
| |
| @Override |
| public void creatingPersistentRegion() { |
| // do nothing |
| } |
| |
| @Override |
| public void creatingPool() { |
| // do nothing |
| } |
| |
| @Override |
| public int getEnumId(Enum<?> v) { |
| EnumInfo enumInfo = new EnumInfo(v); |
| return processEnumInfoForEnumId(enumInfo); |
| } |
| |
| private int processEnumInfoForEnumId(EnumInfo enumInfo) { |
| Collection<Pool> pools = getAllPools(); |
| ServerConnectivityException lastException = null; |
| for (Pool pool : pools) { |
| try { |
| int result = GetPDXIdForEnumOp.execute((ExecutablePool) pool, enumInfo); |
| copyEnumToOtherPools(enumInfo, result, pool); |
| return result; |
| } catch (ServerConnectivityException e) { |
| // ignore, try the next pool. |
| lastException = e; |
| } |
| } |
| throw returnCorrectExceptionForFailure(pools, -1, lastException); |
| } |
| |
| /** |
| * Send an enum to all pools. This used to make sure that any enums |
| * used by this client make it to all clusters this client is connected to. |
| */ |
| private void copyEnumToOtherPools(EnumInfo enumInfo, int newTypeId, Pool exception) { |
| Collection<Pool> pools = getAllPoolsExcept(exception); |
| for (Pool pool : pools) { |
| try { |
| sendEnumIdToPool(enumInfo, newTypeId, pool); |
| } catch (ServerConnectivityException e) { |
| logger.debug("Received an exception sending pdx enum to pool {}, {}", pool, e.getMessage()); |
| } |
| } |
| } |
| |
| private void sendEnumIdToPool(EnumInfo enumInfo, int id, Pool pool) { |
| try { |
| AddPDXEnumOp.execute((ExecutablePool) pool, id, enumInfo); |
| } catch (ServerConnectivityException serverConnectivityException) { |
| logger.debug("Received an exception sending pdx type to pool {}, {}", pool, |
| serverConnectivityException.getMessage(), serverConnectivityException); |
| throw serverConnectivityException; |
| } |
| } |
| |
| @Override |
| public void addRemoteEnum(int enumId, EnumInfo newInfo) { |
| throw new UnsupportedOperationException("Clients will not be asked to add remote enums"); |
| } |
| |
| @Override |
| public int defineEnum(EnumInfo newInfo) { |
| return processEnumInfoForEnumId(newInfo); |
| } |
| |
| @Override |
| public EnumInfo getEnumById(int enumId) { |
| Collection<Pool> pools = getAllPools(); |
| |
| ServerConnectivityException lastException = null; |
| for (Pool pool : pools) { |
| try { |
| EnumInfo result = GetPDXEnumByIdOp.execute((ExecutablePool) pool, enumId); |
| if (result != null) { |
| return result; |
| } |
| } catch (ServerConnectivityException e) { |
| logger.debug("Received an exception getting pdx type from pool {}, {}", pool, |
| e.getMessage(), e); |
| // ignore, try the next pool. |
| lastException = e; |
| } |
| } |
| |
| throw returnCorrectExceptionForFailure(pools, enumId, lastException); |
| } |
| |
| @SuppressWarnings({"unchecked", "serial"}) |
| @Override |
| public Map<Integer, PdxType> types() { |
| Collection<Pool> pools = getAllPools(); |
| |
| Map<Integer, PdxType> types = new HashMap<>(); |
| for (Pool p : pools) { |
| try { |
| types.putAll(GetPDXTypesOp.execute((ExecutablePool) p)); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| } |
| } |
| return types; |
| } |
| |
| @SuppressWarnings({"unchecked", "serial"}) |
| @Override |
| public Map<Integer, EnumInfo> enums() { |
| Collection<Pool> pools = getAllPools(); |
| |
| Map<Integer, EnumInfo> enums = new HashMap<>(); |
| for (Pool p : pools) { |
| enums.putAll(GetPDXEnumsOp.execute((ExecutablePool) p)); |
| } |
| return enums; |
| } |
| |
| |
| @Override |
| public PdxType getPdxTypeForField(String fieldName, String className) { |
| for (Object value : types().values()) { |
| if (value instanceof PdxType) { |
| PdxType pdxType = (PdxType) value; |
| if (pdxType.getClassName().equals(className) && pdxType.getPdxField(fieldName) != null) { |
| return pdxType; |
| } |
| } |
| } |
| return null; |
| } |
| |
| @Override |
| public Set<PdxType> getPdxTypesForClassName(String className) { |
| Set<PdxType> result = new HashSet<>(); |
| for (Object value : types().values()) { |
| if (value instanceof PdxType) { |
| PdxType pdxType = (PdxType) value; |
| if (pdxType.getClassName().equals(className)) { |
| result.add(pdxType); |
| } |
| } |
| } |
| return result; |
| } |
| |
| @Override |
| public boolean isClient() { |
| return true; |
| } |
| |
| /** |
| * Add an type as part of an import. The type is sent to all pools in case |
| * the pools are connected to different clusters, but if one pool fails |
| * the import will fail. |
| */ |
| @Override |
| public void addImportedType(int typeId, PdxType importedType) { |
| Collection<Pool> pools = getAllPools(); |
| for (Pool pool : pools) { |
| try { |
| sendTypeToPool(importedType, typeId, pool); |
| } catch (ServerConnectivityException e) { |
| throw returnCorrectExceptionForFailure(pools, typeId, e); |
| } |
| } |
| } |
| |
| /** |
| * Add an enum as part of an import. The enum is sent to all pools in case |
| * the pools are connected to different clusters, but if one pool fails |
| * the import will fail. |
| */ |
| @Override |
| public void addImportedEnum(int enumId, EnumInfo importedInfo) { |
| Collection<Pool> pools = getAllPools(); |
| |
| for (Pool pool : pools) { |
| try { |
| sendEnumIdToPool(importedInfo, enumId, pool); |
| } catch (ServerConnectivityException e) { |
| throw returnCorrectExceptionForFailure(pools, enumId, e); |
| } |
| } |
| } |
| |
| private RuntimeException returnCorrectExceptionForFailure(final Collection<Pool> pools, |
| final int typeId, final ServerConnectivityException lastException) { |
| if (lastException != null) { |
| throw lastException; |
| } else { |
| throw new InternalGemFireError("Unable to determine PDXType for id " + typeId); |
| } |
| } |
| |
| @Override |
| public int getLocalSize() { |
| return 0; |
| } |
| } |