blob: b038e026c2bb87101289a377aa45b91745573149 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.messaging;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.storm.Config;
import org.apache.storm.daemon.worker.WorkerState;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.serialization.KryoTupleDeserializer;
import org.apache.storm.task.GeneralTopologyContext;
import org.apache.storm.tuple.AddressedTuple;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.ObjectReader;
/**
* A class that is called when a TaskMessage arrives.
*/
public class DeserializingConnectionCallback implements IConnectionCallback, IMetric {
private final WorkerState.ILocalTransferCallback cb;
private final Map<String, Object> conf;
private final GeneralTopologyContext context;
private final ThreadLocal<KryoTupleDeserializer> des =
new ThreadLocal<KryoTupleDeserializer>() {
@Override
protected KryoTupleDeserializer initialValue() {
return new KryoTupleDeserializer(conf, context);
}
};
// Track serialized size of messages.
private final boolean sizeMetricsEnabled;
private final ConcurrentHashMap<String, AtomicLong> byteCounts = new ConcurrentHashMap<>();
public DeserializingConnectionCallback(final Map<String, Object> conf, final GeneralTopologyContext context,
WorkerState.ILocalTransferCallback callback) {
this.conf = conf;
this.context = context;
cb = callback;
sizeMetricsEnabled = ObjectReader.getBoolean(conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS), false);
}
@Override
public void recv(List<TaskMessage> batch) {
KryoTupleDeserializer des = this.des.get();
ArrayList<AddressedTuple> ret = new ArrayList<>(batch.size());
for (TaskMessage message : batch) {
Tuple tuple = des.deserialize(message.message());
AddressedTuple addrTuple = new AddressedTuple(message.task(), tuple);
updateMetrics(tuple.getSourceTask(), message);
ret.add(addrTuple);
}
cb.transfer(ret);
}
/**
* Returns serialized byte count traffic metrics.
*
* @return Map of metric counts, or null if disabled
*/
@Override
public Object getValueAndReset() {
if (!sizeMetricsEnabled) {
return null;
}
HashMap<String, Long> outMap = new HashMap<>();
for (Map.Entry<String, AtomicLong> ent : byteCounts.entrySet()) {
AtomicLong count = ent.getValue();
if (count.get() > 0) {
outMap.put(ent.getKey(), count.getAndSet(0L));
}
}
return outMap;
}
/**
* Update serialized byte counts for each message.
*
* @param sourceTaskId source task
* @param message serialized message
*/
protected void updateMetrics(int sourceTaskId, TaskMessage message) {
if (sizeMetricsEnabled) {
int dest = message.task();
int len = message.message().length;
String key = Integer.toString(sourceTaskId) + "-" + Integer.toString(dest);
byteCounts.computeIfAbsent(key, k -> new AtomicLong(0L)).addAndGet(len);
}
}
}