blob: d5996d28696f16ba83e60f940a771bdd6efe3dec [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.utils;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.storm.generated.LSApprovedWorkers;
import org.apache.storm.generated.LSSupervisorAssignments;
import org.apache.storm.generated.LSSupervisorId;
import org.apache.storm.generated.LSTopoHistory;
import org.apache.storm.generated.LSTopoHistoryList;
import org.apache.storm.generated.LSWorkerHeartbeat;
import org.apache.storm.generated.LocalAssignment;
import org.apache.storm.generated.LocalStateData;
import org.apache.storm.generated.ThriftSerializedObject;
import org.apache.storm.shade.org.apache.commons.io.FileUtils;
import org.apache.storm.thrift.TBase;
import org.apache.storm.thrift.TDeserializer;
import org.apache.storm.thrift.TSerializer;
import org.apache.storm.thrift.protocol.TProtocolException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A simple, durable, atomic K/V database. *Very inefficient*, should only be used for occasional reads/writes. Every read/write hits disk.
*/
public class LocalState {
public static final Logger LOG = LoggerFactory.getLogger(LocalState.class);
public static final String LS_WORKER_HEARTBEAT = "worker-heartbeat";
public static final String LS_ID = "supervisor-id";
public static final String LS_LOCAL_ASSIGNMENTS = "local-assignments";
public static final String LS_APPROVED_WORKERS = "approved-workers";
public static final String LS_TOPO_HISTORY = "topo-hist";
private VersionedStore versionedStore;
public LocalState(String backingDir, boolean createBackingDir) throws IOException {
LOG.debug("New Local State for {}", backingDir);
versionedStore = new VersionedStore(backingDir, createBackingDir);
}
public synchronized Map<String, TBase> snapshot() {
int attempts = 0;
while (true) {
try {
return deserializeLatestVersion();
} catch (Exception e) {
attempts++;
if (attempts >= 10) {
throw new RuntimeException(e);
}
}
}
}
private Map<String, TBase> deserializeLatestVersion() throws IOException {
Map<String, TBase> result = new HashMap<>();
TDeserializer td = new TDeserializer();
for (Map.Entry<String, ThriftSerializedObject> ent : partialDeserializeLatestVersion(td).entrySet()) {
result.put(ent.getKey(), deserialize(ent.getValue(), td));
}
return result;
}
private TBase deserialize(ThriftSerializedObject obj, TDeserializer td) {
try {
Class<?> clazz;
try {
clazz = Class.forName(obj.get_name());
} catch (ClassNotFoundException ex) {
//Try to maintain rolling upgrade compatible with 0.10 releases
clazz = Class.forName(obj.get_name().replaceAll("^backtype\\.storm\\.", "org.apache.storm."));
}
TBase instance = (TBase) clazz.newInstance();
td.deserialize(instance, obj.get_bits());
return instance;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private Map<String, ThriftSerializedObject> partialDeserializeLatestVersion(TDeserializer td) {
try {
String latestPath = versionedStore.mostRecentVersionPath();
Map<String, ThriftSerializedObject> result = new HashMap<>();
if (latestPath != null) {
byte[] serialized = FileUtils.readFileToByteArray(new File(latestPath));
if (serialized.length == 0) {
LOG.warn("LocalState file '{}' contained no data, resetting state", latestPath);
} else {
if (td == null) {
td = new TDeserializer();
}
LocalStateData data = new LocalStateData();
td.deserialize(data, serialized);
result = data.get_serialized_parts();
}
}
return result;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private synchronized Map<String, ThriftSerializedObject> partialSnapshot(TDeserializer td) {
int attempts = 0;
while (true) {
try {
return partialDeserializeLatestVersion(td);
} catch (Exception e) {
attempts++;
if (attempts >= 10) {
if (e.getCause() instanceof TProtocolException) {
LOG.warn("LocalState file is corrupted, resetting state.", e);
return new HashMap<>();
}
throw new RuntimeException(e);
}
}
}
}
public TBase get(String key) {
TDeserializer td = new TDeserializer();
Map<String, ThriftSerializedObject> partial = partialSnapshot(td);
ThriftSerializedObject tso = partial.get(key);
TBase ret = null;
if (tso != null) {
ret = deserialize(tso, td);
}
return ret;
}
public void put(String key, TBase val) {
put(key, val, true);
}
public synchronized void put(String key, TBase val, boolean cleanup) {
Map<String, ThriftSerializedObject> curr = partialSnapshot(null);
TSerializer ser = new TSerializer();
curr.put(key, serialize(val, ser));
persistInternal(curr, ser, cleanup);
}
public void remove(String key) {
remove(key, true);
}
public synchronized void remove(String key, boolean cleanup) {
Map<String, ThriftSerializedObject> curr = partialSnapshot(null);
curr.remove(key);
persistInternal(curr, null, cleanup);
}
public synchronized void cleanup(int keepVersions) throws IOException {
versionedStore.cleanup(keepVersions);
}
public List<LSTopoHistory> getTopoHistoryList() {
LSTopoHistoryList lsTopoHistoryListWrapper = (LSTopoHistoryList) get(LS_TOPO_HISTORY);
if (null != lsTopoHistoryListWrapper) {
return lsTopoHistoryListWrapper.get_topo_history();
}
return null;
}
/**
* Remove topologies from local state which are older than cutOffAge.
*/
public void filterOldTopologies(long cutOffAge) {
LSTopoHistoryList lsTopoHistoryListWrapper = (LSTopoHistoryList) get(LS_TOPO_HISTORY);
List<LSTopoHistory> filteredTopoHistoryList = new ArrayList<>();
if (null != lsTopoHistoryListWrapper) {
for (LSTopoHistory topoHistory : lsTopoHistoryListWrapper.get_topo_history()) {
if (topoHistory.get_time_stamp() > cutOffAge) {
filteredTopoHistoryList.add(topoHistory);
}
}
}
put(LS_TOPO_HISTORY, new LSTopoHistoryList(filteredTopoHistoryList));
}
public void addTopologyHistory(LSTopoHistory lsTopoHistory) {
LSTopoHistoryList lsTopoHistoryListWrapper = (LSTopoHistoryList) get(LS_TOPO_HISTORY);
List<LSTopoHistory> currentTopoHistoryList = new ArrayList<>();
if (null != lsTopoHistoryListWrapper) {
currentTopoHistoryList.addAll(lsTopoHistoryListWrapper.get_topo_history());
}
currentTopoHistoryList.add(lsTopoHistory);
put(LS_TOPO_HISTORY, new LSTopoHistoryList(currentTopoHistoryList));
}
public String getSupervisorId() {
LSSupervisorId lsSupervisorId = (LSSupervisorId) get(LS_ID);
if (null != lsSupervisorId) {
return lsSupervisorId.get_supervisor_id();
}
return null;
}
public void setSupervisorId(String supervisorId) {
put(LS_ID, new LSSupervisorId(supervisorId));
}
public Map<String, Integer> getApprovedWorkers() {
LSApprovedWorkers lsApprovedWorkers = (LSApprovedWorkers) get(LS_APPROVED_WORKERS);
if (null != lsApprovedWorkers) {
return lsApprovedWorkers.get_approved_workers();
}
return null;
}
public void setApprovedWorkers(Map<String, Integer> approvedWorkers) {
put(LS_APPROVED_WORKERS, new LSApprovedWorkers(approvedWorkers));
}
public LSWorkerHeartbeat getWorkerHeartBeat() {
return (LSWorkerHeartbeat) get(LS_WORKER_HEARTBEAT);
}
public void setWorkerHeartBeat(LSWorkerHeartbeat workerHeartBeat) {
put(LS_WORKER_HEARTBEAT, workerHeartBeat, false);
}
public Map<Integer, LocalAssignment> getLocalAssignmentsMap() {
LSSupervisorAssignments assignments = (LSSupervisorAssignments) get(LS_LOCAL_ASSIGNMENTS);
if (null != assignments) {
return assignments.get_assignments();
}
return null;
}
public void setLocalAssignmentsMap(Map<Integer, LocalAssignment> localAssignmentMap) {
put(LS_LOCAL_ASSIGNMENTS, new LSSupervisorAssignments(localAssignmentMap));
}
private void persistInternal(Map<String, ThriftSerializedObject> serialized, TSerializer ser, boolean cleanup) {
try {
if (ser == null) {
ser = new TSerializer();
}
byte[] toWrite = ser.serialize(new LocalStateData(serialized));
String newPath = versionedStore.createVersion();
File file = new File(newPath);
FileUtils.writeByteArrayToFile(file, toWrite);
if (toWrite.length != file.length()) {
throw new IOException("Tried to serialize " + toWrite.length
+ " bytes to " + file.getCanonicalPath() + ", but "
+ file.length() + " bytes were written.");
}
versionedStore.succeedVersion(newPath);
if (cleanup) {
versionedStore.cleanup(4);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private ThriftSerializedObject serialize(TBase o, TSerializer ser) {
try {
return new ThriftSerializedObject(o.getClass().getName(), ByteBuffer.wrap(ser.serialize(o)));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}