| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs.server.federation.metrics; |
| |
| import static org.apache.hadoop.util.Time.now; |
| |
| import java.io.IOException; |
| import java.lang.reflect.Method; |
| import java.net.InetAddress; |
| import java.net.InetSocketAddress; |
| import java.net.UnknownHostException; |
| import java.text.SimpleDateFormat; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.LinkedHashMap; |
| import java.util.LinkedHashSet; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.TimeUnit; |
| import java.util.function.Function; |
| import java.util.function.ToIntFunction; |
| import java.util.function.ToLongFunction; |
| import java.util.stream.Collectors; |
| |
| import javax.management.NotCompliantMBeanException; |
| import javax.management.ObjectName; |
| import javax.management.StandardMBean; |
| |
| import org.apache.hadoop.hdfs.protocol.DatanodeInfo; |
| import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; |
| import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; |
| import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; |
| import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo; |
| import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; |
| import org.apache.hadoop.hdfs.server.federation.router.Router; |
| import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; |
| import org.apache.hadoop.hdfs.server.federation.store.MembershipStore; |
| import org.apache.hadoop.hdfs.server.federation.store.MountTableStore; |
| import org.apache.hadoop.hdfs.server.federation.store.RouterStore; |
| import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; |
| import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; |
| import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; |
| import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest; |
| import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse; |
| import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoRequest; |
| import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoResponse; |
| import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationsRequest; |
| import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationsResponse; |
| import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; |
| import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; |
| import org.apache.hadoop.hdfs.server.federation.store.records.MembershipStats; |
| import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; |
| import org.apache.hadoop.hdfs.server.federation.store.records.RouterState; |
| import org.apache.hadoop.hdfs.server.federation.store.records.StateStoreVersion; |
| import org.apache.hadoop.metrics2.util.MBeans; |
| import org.apache.hadoop.util.StringUtils; |
| import org.apache.hadoop.util.VersionInfo; |
| import org.codehaus.jettison.json.JSONObject; |
| import org.eclipse.jetty.util.ajax.JSON; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| |
| /** |
| * Implementation of the Router metrics collector. |
| */ |
| public class FederationMetrics implements FederationMBean { |
| |
| private static final Logger LOG = |
| LoggerFactory.getLogger(FederationMetrics.class); |
| |
| /** Format for a date. */ |
| private static final String DATE_FORMAT = "yyyy/MM/dd HH:mm:ss"; |
| |
| /** Prevent holding the page from load too long. */ |
| private static final long TIME_OUT = TimeUnit.SECONDS.toMillis(1); |
| |
| |
| /** Router interface. */ |
| private final Router router; |
| |
| /** FederationState JMX bean. */ |
| private ObjectName beanName; |
| |
| /** Resolve the namenode for each namespace. */ |
| private final ActiveNamenodeResolver namenodeResolver; |
| |
| /** State store. */ |
| private final StateStoreService stateStore; |
| /** Membership state store. */ |
| private MembershipStore membershipStore; |
| /** Mount table store. */ |
| private MountTableStore mountTableStore; |
| /** Router state store. */ |
| private RouterStore routerStore; |
| |
| |
| public FederationMetrics(Router router) throws IOException { |
| this.router = router; |
| |
| try { |
| StandardMBean bean = new StandardMBean(this, FederationMBean.class); |
| this.beanName = MBeans.register("Router", "FederationState", bean); |
| LOG.info("Registered Router MBean: {}", this.beanName); |
| } catch (NotCompliantMBeanException e) { |
| throw new RuntimeException("Bad Router MBean setup", e); |
| } |
| |
| // Resolve namenode for each nameservice |
| this.namenodeResolver = this.router.getNamenodeResolver(); |
| |
| // State store interfaces |
| this.stateStore = this.router.getStateStore(); |
| if (this.stateStore == null) { |
| LOG.error("State store not available"); |
| } else { |
| this.membershipStore = stateStore.getRegisteredRecordStore( |
| MembershipStore.class); |
| this.mountTableStore = stateStore.getRegisteredRecordStore( |
| MountTableStore.class); |
| this.routerStore = stateStore.getRegisteredRecordStore( |
| RouterStore.class); |
| } |
| } |
| |
| /** |
| * Unregister the JMX beans. |
| */ |
| public void close() { |
| if (this.beanName != null) { |
| MBeans.unregister(beanName); |
| } |
| } |
| |
| @Override |
| public String getNamenodes() { |
| final Map<String, Map<String, Object>> info = new LinkedHashMap<>(); |
| try { |
| // Get the values from the store |
| GetNamenodeRegistrationsRequest request = |
| GetNamenodeRegistrationsRequest.newInstance(); |
| GetNamenodeRegistrationsResponse response = |
| membershipStore.getNamenodeRegistrations(request); |
| |
| // Order the namenodes |
| final List<MembershipState> namenodes = response.getNamenodeMemberships(); |
| if (namenodes == null || namenodes.size() == 0) { |
| return JSON.toString(info); |
| } |
| List<MembershipState> namenodesOrder = new ArrayList<>(namenodes); |
| Collections.sort(namenodesOrder, MembershipState.NAME_COMPARATOR); |
| |
| // Dump namenodes information into JSON |
| for (MembershipState namenode : namenodesOrder) { |
| Map<String, Object> innerInfo = new HashMap<>(); |
| Map<String, Object> map = getJson(namenode); |
| innerInfo.putAll(map); |
| long dateModified = namenode.getDateModified(); |
| long lastHeartbeat = getSecondsSince(dateModified); |
| innerInfo.put("lastHeartbeat", lastHeartbeat); |
| MembershipStats stats = namenode.getStats(); |
| long used = stats.getTotalSpace() - stats.getAvailableSpace(); |
| innerInfo.put("used", used); |
| info.put(namenode.getNamenodeKey(), |
| Collections.unmodifiableMap(innerInfo)); |
| } |
| } catch (IOException e) { |
| LOG.error("Enable to fetch json representation of namenodes {}", |
| e.getMessage()); |
| return "{}"; |
| } |
| return JSON.toString(info); |
| } |
| |
| @Override |
| public String getNameservices() { |
| final Map<String, Map<String, Object>> info = new LinkedHashMap<>(); |
| try { |
| final List<MembershipState> namenodes = getActiveNamenodeRegistrations(); |
| List<MembershipState> namenodesOrder = new ArrayList<>(namenodes); |
| Collections.sort(namenodesOrder, MembershipState.NAME_COMPARATOR); |
| |
| // Dump namenodes information into JSON |
| for (MembershipState namenode : namenodesOrder) { |
| Map<String, Object> innerInfo = new HashMap<>(); |
| Map<String, Object> map = getJson(namenode); |
| innerInfo.putAll(map); |
| long dateModified = namenode.getDateModified(); |
| long lastHeartbeat = getSecondsSince(dateModified); |
| innerInfo.put("lastHeartbeat", lastHeartbeat); |
| MembershipStats stats = namenode.getStats(); |
| long used = stats.getTotalSpace() - stats.getAvailableSpace(); |
| innerInfo.put("used", used); |
| info.put(namenode.getNamenodeKey(), |
| Collections.unmodifiableMap(innerInfo)); |
| } |
| } catch (IOException e) { |
| LOG.error("Cannot retrieve nameservices for JMX: {}", e.getMessage()); |
| return "{}"; |
| } |
| return JSON.toString(info); |
| } |
| |
| @Override |
| public String getMountTable() { |
| final List<Map<String, Object>> info = new LinkedList<>(); |
| |
| try { |
| // Get all the mount points in order |
| GetMountTableEntriesRequest request = |
| GetMountTableEntriesRequest.newInstance("/"); |
| GetMountTableEntriesResponse response = |
| mountTableStore.getMountTableEntries(request); |
| final List<MountTable> mounts = response.getEntries(); |
| List<MountTable> orderedMounts = new ArrayList<>(mounts); |
| Collections.sort(orderedMounts, MountTable.SOURCE_COMPARATOR); |
| |
| // Dump mount table entries information into JSON |
| for (MountTable entry : orderedMounts) { |
| // Sumarize destinations |
| Set<String> nameservices = new LinkedHashSet<>(); |
| Set<String> paths = new LinkedHashSet<>(); |
| for (RemoteLocation location : entry.getDestinations()) { |
| nameservices.add(location.getNameserviceId()); |
| paths.add(location.getDest()); |
| } |
| |
| Map<String, Object> map = getJson(entry); |
| // We add some values with a cleaner format |
| map.put("dateCreated", getDateString(entry.getDateCreated())); |
| map.put("dateModified", getDateString(entry.getDateModified())); |
| |
| Map<String, Object> innerInfo = new HashMap<>(); |
| innerInfo.putAll(map); |
| innerInfo.put("nameserviceId", StringUtils.join(",", nameservices)); |
| innerInfo.put("path", StringUtils.join(",", paths)); |
| if (nameservices.size() > 1) { |
| innerInfo.put("order", entry.getDestOrder().toString()); |
| } else { |
| innerInfo.put("order", ""); |
| } |
| innerInfo.put("readonly", entry.isReadOnly()); |
| info.add(Collections.unmodifiableMap(innerInfo)); |
| } |
| } catch (IOException e) { |
| LOG.error( |
| "Cannot generate JSON of mount table from store: {}", e.getMessage()); |
| return "[]"; |
| } |
| return JSON.toString(info); |
| } |
| |
| @Override |
| public String getRouters() { |
| final Map<String, Map<String, Object>> info = new LinkedHashMap<>(); |
| try { |
| // Get all the routers in order |
| GetRouterRegistrationsRequest request = |
| GetRouterRegistrationsRequest.newInstance(); |
| GetRouterRegistrationsResponse response = |
| routerStore.getRouterRegistrations(request); |
| final List<RouterState> routers = response.getRouters(); |
| List<RouterState> routersOrder = new ArrayList<>(routers); |
| Collections.sort(routersOrder); |
| |
| // Dump router information into JSON |
| for (RouterState record : routersOrder) { |
| Map<String, Object> innerInfo = new HashMap<>(); |
| Map<String, Object> map = getJson(record); |
| innerInfo.putAll(map); |
| long dateModified = record.getDateModified(); |
| long lastHeartbeat = getSecondsSince(dateModified); |
| innerInfo.put("lastHeartbeat", lastHeartbeat); |
| |
| StateStoreVersion stateStoreVersion = record.getStateStoreVersion(); |
| if (stateStoreVersion == null) { |
| LOG.error("Cannot get State Store versions"); |
| } else { |
| setStateStoreVersions(innerInfo, stateStoreVersion); |
| } |
| |
| info.put(record.getPrimaryKey(), |
| Collections.unmodifiableMap(innerInfo)); |
| } |
| } catch (IOException e) { |
| LOG.error("Cannot get Routers JSON from the State Store", e); |
| return "{}"; |
| } |
| return JSON.toString(info); |
| } |
| |
| /** |
| * Populate the map with the State Store versions. |
| * |
| * @param innerInfo Map with the information. |
| * @param version State Store versions. |
| */ |
| private static void setStateStoreVersions( |
| Map<String, Object> map, StateStoreVersion version) { |
| |
| long membershipVersion = version.getMembershipVersion(); |
| String lastMembershipUpdate = getDateString(membershipVersion); |
| map.put("lastMembershipUpdate", lastMembershipUpdate); |
| |
| long mountTableVersion = version.getMountTableVersion(); |
| String lastMountTableDate = getDateString(mountTableVersion); |
| map.put("lastMountTableUpdate", lastMountTableDate); |
| } |
| |
| @Override |
| public long getTotalCapacity() { |
| return getNameserviceAggregatedLong(MembershipStats::getTotalSpace); |
| } |
| |
| @Override |
| public long getRemainingCapacity() { |
| return getNameserviceAggregatedLong(MembershipStats::getAvailableSpace); |
| } |
| |
| @Override |
| public long getProvidedSpace() { |
| return getNameserviceAggregatedLong(MembershipStats::getProvidedSpace); |
| } |
| |
| @Override |
| public long getUsedCapacity() { |
| return getTotalCapacity() - getRemainingCapacity(); |
| } |
| |
| @Override |
| public int getNumNameservices() { |
| try { |
| Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); |
| return nss.size(); |
| } catch (IOException e) { |
| LOG.error( |
| "Cannot fetch number of expired registrations from the store: {}", |
| e.getMessage()); |
| return 0; |
| } |
| } |
| |
| @Override |
| public int getNumNamenodes() { |
| try { |
| GetNamenodeRegistrationsRequest request = |
| GetNamenodeRegistrationsRequest.newInstance(); |
| GetNamenodeRegistrationsResponse response = |
| membershipStore.getNamenodeRegistrations(request); |
| List<MembershipState> memberships = response.getNamenodeMemberships(); |
| return memberships.size(); |
| } catch (IOException e) { |
| LOG.error("Cannot retrieve numNamenodes for JMX: {}", e.getMessage()); |
| return 0; |
| } |
| } |
| |
| @Override |
| public int getNumExpiredNamenodes() { |
| try { |
| GetNamenodeRegistrationsRequest request = |
| GetNamenodeRegistrationsRequest.newInstance(); |
| GetNamenodeRegistrationsResponse response = |
| membershipStore.getExpiredNamenodeRegistrations(request); |
| List<MembershipState> expiredMemberships = |
| response.getNamenodeMemberships(); |
| return expiredMemberships.size(); |
| } catch (IOException e) { |
| LOG.error( |
| "Cannot retrieve numExpiredNamenodes for JMX: {}", e.getMessage()); |
| return 0; |
| } |
| } |
| |
| @Override |
| public int getNumLiveNodes() { |
| return getNameserviceAggregatedInt( |
| MembershipStats::getNumOfActiveDatanodes); |
| } |
| |
| @Override |
| public int getNumDeadNodes() { |
| return getNameserviceAggregatedInt(MembershipStats::getNumOfDeadDatanodes); |
| } |
| |
| @Override |
| public int getNumDecommissioningNodes() { |
| return getNameserviceAggregatedInt( |
| MembershipStats::getNumOfDecommissioningDatanodes); |
| } |
| |
| @Override |
| public int getNumDecomLiveNodes() { |
| return getNameserviceAggregatedInt( |
| MembershipStats::getNumOfDecomActiveDatanodes); |
| } |
| |
| @Override |
| public int getNumDecomDeadNodes() { |
| return getNameserviceAggregatedInt( |
| MembershipStats::getNumOfDecomDeadDatanodes); |
| } |
| |
| @Override // NameNodeMXBean |
| public String getNodeUsage() { |
| float median = 0; |
| float max = 0; |
| float min = 0; |
| float dev = 0; |
| |
| final Map<String, Map<String, Object>> info = new HashMap<>(); |
| try { |
| RouterRpcServer rpcServer = this.router.getRpcServer(); |
| DatanodeInfo[] live = rpcServer.getDatanodeReport( |
| DatanodeReportType.LIVE, false, TIME_OUT); |
| |
| if (live.length > 0) { |
| float totalDfsUsed = 0; |
| float[] usages = new float[live.length]; |
| int i = 0; |
| for (DatanodeInfo dn : live) { |
| usages[i++] = dn.getDfsUsedPercent(); |
| totalDfsUsed += dn.getDfsUsedPercent(); |
| } |
| totalDfsUsed /= live.length; |
| Arrays.sort(usages); |
| median = usages[usages.length / 2]; |
| max = usages[usages.length - 1]; |
| min = usages[0]; |
| |
| for (i = 0; i < usages.length; i++) { |
| dev += (usages[i] - totalDfsUsed) * (usages[i] - totalDfsUsed); |
| } |
| dev = (float) Math.sqrt(dev / usages.length); |
| } |
| } catch (IOException e) { |
| LOG.error("Cannot get the live nodes: {}", e.getMessage()); |
| } |
| |
| final Map<String, Object> innerInfo = new HashMap<>(); |
| innerInfo.put("min", StringUtils.format("%.2f%%", min)); |
| innerInfo.put("median", StringUtils.format("%.2f%%", median)); |
| innerInfo.put("max", StringUtils.format("%.2f%%", max)); |
| innerInfo.put("stdDev", StringUtils.format("%.2f%%", dev)); |
| info.put("nodeUsage", innerInfo); |
| |
| return JSON.toString(info); |
| } |
| |
| @Override |
| public long getNumBlocks() { |
| return getNameserviceAggregatedLong(MembershipStats::getNumOfBlocks); |
| } |
| |
| @Override |
| public long getNumOfMissingBlocks() { |
| return getNameserviceAggregatedLong(MembershipStats::getNumOfBlocksMissing); |
| } |
| |
| @Override |
| public long getNumOfBlocksPendingReplication() { |
| return getNameserviceAggregatedLong( |
| MembershipStats::getNumOfBlocksPendingReplication); |
| } |
| |
| @Override |
| public long getNumOfBlocksUnderReplicated() { |
| return getNameserviceAggregatedLong( |
| MembershipStats::getNumOfBlocksUnderReplicated); |
| } |
| |
| @Override |
| public long getNumOfBlocksPendingDeletion() { |
| return getNameserviceAggregatedLong( |
| MembershipStats::getNumOfBlocksPendingDeletion); |
| } |
| |
| @Override |
| public long getNumFiles() { |
| return getNameserviceAggregatedLong(MembershipStats::getNumOfFiles); |
| } |
| |
| @Override |
| public String getRouterStarted() { |
| long startTime = this.router.getStartTime(); |
| return new Date(startTime).toString(); |
| } |
| |
| @Override |
| public String getVersion() { |
| return VersionInfo.getVersion() + ", r" + VersionInfo.getRevision(); |
| } |
| |
| @Override |
| public String getCompiledDate() { |
| return VersionInfo.getDate(); |
| } |
| |
| @Override |
| public String getCompileInfo() { |
| return VersionInfo.getDate() + " by " + VersionInfo.getUser() + " from " |
| + VersionInfo.getBranch(); |
| } |
| |
| @Override |
| public String getHostAndPort() { |
| InetSocketAddress address = this.router.getHttpServerAddress(); |
| if (address != null) { |
| try { |
| String hostname = InetAddress.getLocalHost().getHostName(); |
| int port = address.getPort(); |
| return hostname + ":" + port; |
| } catch (UnknownHostException ignored) { } |
| } |
| return "Unknown"; |
| } |
| |
| @Override |
| public String getRouterId() { |
| return this.router.getRouterId(); |
| } |
| |
| @Override |
| public String getClusterId() { |
| try { |
| Collection<String> clusterIds = |
| getNamespaceInfo(FederationNamespaceInfo::getClusterId); |
| return clusterIds.toString(); |
| } catch (IOException e) { |
| LOG.error("Cannot fetch cluster ID metrics: {}", e.getMessage()); |
| return ""; |
| } |
| } |
| |
| @Override |
| public String getBlockPoolId() { |
| try { |
| Collection<String> blockpoolIds = |
| getNamespaceInfo(FederationNamespaceInfo::getBlockPoolId); |
| return blockpoolIds.toString(); |
| } catch (IOException e) { |
| LOG.error("Cannot fetch block pool ID metrics: {}", e.getMessage()); |
| return ""; |
| } |
| } |
| |
| @Override |
| public String getRouterStatus() { |
| return "RUNNING"; |
| } |
| |
| /** |
| * Build a set of unique values found in all namespaces. |
| * |
| * @param f Method reference of the appropriate FederationNamespaceInfo |
| * getter function |
| * @return Set of unique string values found in all discovered namespaces. |
| * @throws IOException if the query could not be executed. |
| */ |
| private Collection<String> getNamespaceInfo( |
| Function<FederationNamespaceInfo, String> f) throws IOException { |
| GetNamespaceInfoRequest request = GetNamespaceInfoRequest.newInstance(); |
| GetNamespaceInfoResponse response = |
| membershipStore.getNamespaceInfo(request); |
| return response.getNamespaceInfo().stream() |
| .map(f) |
| .collect(Collectors.toSet()); |
| } |
| |
| /** |
| * Get the aggregated value for a method for all nameservices. |
| * @param f Method reference |
| * @return Aggregated integer. |
| */ |
| private int getNameserviceAggregatedInt(ToIntFunction<MembershipStats> f) { |
| try { |
| return getActiveNamenodeRegistrations().stream() |
| .map(MembershipState::getStats) |
| .collect(Collectors.summingInt(f)); |
| } catch (IOException e) { |
| LOG.error("Unable to extract metrics: {}", e.getMessage()); |
| return 0; |
| } |
| } |
| |
| /** |
| * Get the aggregated value for a method for all nameservices. |
| * @param f Method reference |
| * @return Aggregated long. |
| */ |
| private long getNameserviceAggregatedLong(ToLongFunction<MembershipStats> f) { |
| try { |
| return getActiveNamenodeRegistrations().stream() |
| .map(MembershipState::getStats) |
| .collect(Collectors.summingLong(f)); |
| } catch (IOException e) { |
| LOG.error("Unable to extract metrics: {}", e.getMessage()); |
| return 0; |
| } |
| } |
| |
| /** |
| * Fetches the most active namenode memberships for all known nameservices. |
| * The fetched membership may not or may not be active. Excludes expired |
| * memberships. |
| * @throws IOException if the query could not be performed. |
| * @return List of the most active NNs from each known nameservice. |
| */ |
| private List<MembershipState> getActiveNamenodeRegistrations() |
| throws IOException { |
| |
| List<MembershipState> resultList = new ArrayList<>(); |
| GetNamespaceInfoRequest request = GetNamespaceInfoRequest.newInstance(); |
| GetNamespaceInfoResponse response = |
| membershipStore.getNamespaceInfo(request); |
| for (FederationNamespaceInfo nsInfo : response.getNamespaceInfo()) { |
| // Fetch the most recent namenode registration |
| String nsId = nsInfo.getNameserviceId(); |
| List<? extends FederationNamenodeContext> nns = |
| namenodeResolver.getNamenodesForNameserviceId(nsId); |
| if (nns != null) { |
| FederationNamenodeContext nn = nns.get(0); |
| if (nn != null && nn instanceof MembershipState) { |
| resultList.add((MembershipState) nn); |
| } |
| } |
| } |
| return resultList; |
| } |
| |
| /** |
| * Get time as a date string. |
| * @param time Seconds since 1970. |
| * @return String representing the date. |
| */ |
| @VisibleForTesting |
| static String getDateString(long time) { |
| if (time <= 0) { |
| return "-"; |
| } |
| Date date = new Date(time); |
| |
| SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT); |
| return sdf.format(date); |
| } |
| |
| /** |
| * Get the number of seconds passed since a date. |
| * |
| * @param timeMs to use as a reference. |
| * @return Seconds since the date. |
| */ |
| private static long getSecondsSince(long timeMs) { |
| if (timeMs < 0) { |
| return -1; |
| } |
| return (now() - timeMs) / 1000; |
| } |
| |
| /** |
| * Get JSON for this record. |
| * |
| * @return Map representing the data for the JSON representation. |
| */ |
| private static Map<String, Object> getJson(BaseRecord record) { |
| Map<String, Object> json = new HashMap<>(); |
| Map<String, Class<?>> fields = getFields(record); |
| |
| for (String fieldName : fields.keySet()) { |
| if (!fieldName.equalsIgnoreCase("proto")) { |
| try { |
| Object value = getField(record, fieldName); |
| if (value instanceof BaseRecord) { |
| BaseRecord recordField = (BaseRecord) value; |
| json.putAll(getJson(recordField)); |
| } else { |
| json.put(fieldName, value == null ? JSONObject.NULL : value); |
| } |
| } catch (Exception e) { |
| throw new IllegalArgumentException( |
| "Cannot serialize field " + fieldName + " into JSON"); |
| } |
| } |
| } |
| return json; |
| } |
| |
| /** |
| * Returns all serializable fields in the object. |
| * |
| * @return Map with the fields. |
| */ |
| private static Map<String, Class<?>> getFields(BaseRecord record) { |
| Map<String, Class<?>> getters = new HashMap<>(); |
| for (Method m : record.getClass().getDeclaredMethods()) { |
| if (m.getName().startsWith("get")) { |
| try { |
| Class<?> type = m.getReturnType(); |
| char[] c = m.getName().substring(3).toCharArray(); |
| c[0] = Character.toLowerCase(c[0]); |
| String key = new String(c); |
| getters.put(key, type); |
| } catch (Exception e) { |
| LOG.error("Cannot execute getter {} on {}", m.getName(), record); |
| } |
| } |
| } |
| return getters; |
| } |
| |
| /** |
| * Fetches the value for a field name. |
| * |
| * @param fieldName the legacy name of the field. |
| * @return The field data or null if not found. |
| */ |
| private static Object getField(BaseRecord record, String fieldName) { |
| Object result = null; |
| Method m = locateGetter(record, fieldName); |
| if (m != null) { |
| try { |
| result = m.invoke(record); |
| } catch (Exception e) { |
| LOG.error("Cannot get field {} on {}", fieldName, record); |
| } |
| } |
| return result; |
| } |
| |
| /** |
| * Finds the appropriate getter for a field name. |
| * |
| * @param fieldName The legacy name of the field. |
| * @return The matching getter or null if not found. |
| */ |
| private static Method locateGetter(BaseRecord record, String fieldName) { |
| for (Method m : record.getClass().getMethods()) { |
| if (m.getName().equalsIgnoreCase("get" + fieldName)) { |
| return m; |
| } |
| } |
| return null; |
| } |
| } |