blob: 86c430ac0a5d56c407807bc966652f0268efe344 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iotdb.db.protocol.mqtt;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.IoTDBConstant.ClientVersion;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.protocol.session.MqttClientSession;
import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.utils.CommonUtils;
import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import io.moquette.interception.AbstractInterceptHandler;
import io.moquette.interception.messages.InterceptConnectMessage;
import io.moquette.interception.messages.InterceptDisconnectMessage;
import io.moquette.interception.messages.InterceptPublishMessage;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.ZoneId;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
/** PublishHandler handle the messages from MQTT clients. */
public class MPPPublishHandler extends AbstractInterceptHandler {
private static final Logger LOG = LoggerFactory.getLogger(MPPPublishHandler.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private final SessionManager sessionManager = SessionManager.getInstance();
private final ConcurrentHashMap<String, MqttClientSession> clientIdToSessionMap =
new ConcurrentHashMap<>();
private final PayloadFormatter payloadFormat;
private final IPartitionFetcher partitionFetcher;
private final ISchemaFetcher schemaFetcher;
public MPPPublishHandler(IoTDBConfig config) {
this.payloadFormat = PayloadFormatManager.getPayloadFormat(config.getMqttPayloadFormatter());
partitionFetcher = ClusterPartitionFetcher.getInstance();
schemaFetcher = ClusterSchemaFetcher.getInstance();
}
@Override
public String getID() {
return "iotdb-mqtt-broker-listener";
}
@Override
public void onConnect(InterceptConnectMessage msg) {
if (!clientIdToSessionMap.containsKey(msg.getClientID())) {
MqttClientSession session = new MqttClientSession(msg.getClientID());
sessionManager.login(
session,
msg.getUsername(),
new String(msg.getPassword()),
ZoneId.systemDefault().toString(),
TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3,
ClientVersion.V_1_0);
clientIdToSessionMap.put(msg.getClientID(), session);
}
}
@Override
public void onDisconnect(InterceptDisconnectMessage msg) {
MqttClientSession session = clientIdToSessionMap.remove(msg.getClientID());
if (null != session) {
sessionManager.closeSession(session, Coordinator.getInstance()::cleanupQueryExecution);
}
}
@Override
public void onPublish(InterceptPublishMessage msg) {
try {
String clientId = msg.getClientID();
if (!clientIdToSessionMap.containsKey(clientId)) {
return;
}
MqttClientSession session = clientIdToSessionMap.get(msg.getClientID());
ByteBuf payload = msg.getPayload();
String topic = msg.getTopicName();
String username = msg.getUsername();
MqttQoS qos = msg.getQos();
LOG.debug(
"Receive publish message. clientId: {}, username: {}, qos: {}, topic: {}, payload: {}",
clientId,
username,
qos,
topic,
payload);
List<Message> events = payloadFormat.format(payload);
if (events == null) {
return;
}
for (Message event : events) {
if (event == null) {
continue;
}
TSStatus tsStatus = null;
try {
InsertRowStatement statement = new InsertRowStatement();
statement.setDevicePath(
DataNodeDevicePathCache.getInstance().getPartialPath(event.getDevice()));
TimestampPrecisionUtils.checkTimestampPrecision(event.getTimestamp());
statement.setTime(event.getTimestamp());
statement.setMeasurements(event.getMeasurements().toArray(new String[0]));
if (event.getDataTypes() == null) {
statement.setDataTypes(new TSDataType[event.getMeasurements().size()]);
statement.setValues(event.getValues().toArray(new Object[0]));
statement.setNeedInferType(true);
} else {
List<TSDataType> dataTypes = event.getDataTypes();
List<String> values = event.getValues();
Object[] inferredValues = new Object[values.size()];
for (int i = 0; i < values.size(); ++i) {
inferredValues[i] = CommonUtils.parseValue(dataTypes.get(i), values.get(i));
}
statement.setDataTypes(dataTypes.toArray(new TSDataType[0]));
statement.setValues(inferredValues);
}
statement.setAligned(false);
tsStatus = AuthorityChecker.checkAuthority(statement, session);
if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOG.warn(tsStatus.message);
} else {
long queryId = sessionManager.requestQueryId();
ExecutionResult result =
Coordinator.getInstance()
.executeForTreeModel(
statement,
queryId,
sessionManager.getSessionInfo(session),
"",
partitionFetcher,
schemaFetcher,
config.getQueryTimeoutThreshold());
tsStatus = result.status;
}
} catch (Exception e) {
LOG.warn(
"meet error when inserting device {}, measurements {}, at time {}, because ",
event.getDevice(),
event.getMeasurements(),
event.getTimestamp(),
e);
}
LOG.debug("event process result: {}", tsStatus);
}
} finally {
// release the payload of the message
super.onPublish(msg);
}
}
@Override
public void onSessionLoopError(Throwable throwable) {
// TODO: Implement something sensible here ...
}
}