blob: b0cbf2aef696efa0cb2160ac70cf8d36b27aef5e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.iotdb.commons.sync.pipesink;
import org.apache.iotdb.commons.exception.sync.PipeSinkException;
import org.apache.iotdb.commons.sync.utils.SyncConstant;
import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class IoTDBPipeSink implements PipeSink {
private final PipeSinkType pipeSinkType = PipeSinkType.IoTDB;
private String name;
private String ip;
private int port;
private static final String ATTRIBUTE_IP_KEY = "ip";
private static final String ATTRIBUTE_PORT_KEY = "port";
public IoTDBPipeSink() {}
public IoTDBPipeSink(String name) {
this();
this.ip = SyncConstant.DEFAULT_PIPE_SINK_IP;
this.port = SyncConstant.DEFAULT_PIPE_SINK_PORT;
this.name = name;
}
@Override
public void setAttribute(List<Pair<String, String>> params) throws PipeSinkException {
for (Pair<String, String> pair : params) {
String attr = pair.left;
String value = pair.right;
attr = attr.toLowerCase();
if (attr.equals(ATTRIBUTE_IP_KEY)) {
ip = value;
} else if (attr.equals(ATTRIBUTE_PORT_KEY)) {
try {
port = Integer.parseInt(value);
} catch (NumberFormatException e) {
throw new PipeSinkException(attr, value, TSDataType.INT32.name());
}
} else {
throw new PipeSinkException(
"There is No attribute " + attr + " in " + PipeSinkType.IoTDB + " pipeSink.");
}
}
}
@Override
public void setAttribute(Map<String, String> params) throws PipeSinkException {
for (Map.Entry<String, String> entry : params.entrySet()) {
String attr = entry.getKey();
String value = entry.getValue();
attr = attr.toLowerCase();
if (attr.equals(ATTRIBUTE_IP_KEY)) {
ip = value;
} else if (attr.equals(ATTRIBUTE_PORT_KEY)) {
try {
port = Integer.parseInt(value);
} catch (NumberFormatException e) {
throw new PipeSinkException(attr, value, TSDataType.INT32.name());
}
} else {
throw new PipeSinkException(
"There is No attribute " + attr + " in " + PipeSinkType.IoTDB + " pipeSink.");
}
}
}
public String getIp() {
return ip;
}
public int getPort() {
return port;
}
@Override
public String getPipeSinkName() {
return name;
}
@Override
public PipeSinkType getType() {
return pipeSinkType;
}
@Override
public String showAllAttributes() {
return String.format("%s='%s',%s=%d", ATTRIBUTE_IP_KEY, ip, ATTRIBUTE_PORT_KEY, port);
}
@Override
public TPipeSinkInfo getTPipeSinkInfo() {
Map<String, String> attributes = new HashMap<>();
attributes.put(ATTRIBUTE_IP_KEY, ip);
attributes.put(ATTRIBUTE_PORT_KEY, String.valueOf(port));
return new TPipeSinkInfo(this.name, this.pipeSinkType.name()).setAttributes(attributes);
}
@Override
public void serialize(OutputStream outputStream) throws IOException {
ReadWriteIOUtils.write((byte) pipeSinkType.ordinal(), outputStream);
ReadWriteIOUtils.write(name, outputStream);
ReadWriteIOUtils.write(ip, outputStream);
ReadWriteIOUtils.write(port, outputStream);
}
@Override
public void deserialize(InputStream inputStream) throws IOException {
name = ReadWriteIOUtils.readString(inputStream);
ip = ReadWriteIOUtils.readString(inputStream);
port = ReadWriteIOUtils.readInt(inputStream);
}
@Override
public String toString() {
return "IoTDBPipeSink{"
+ "pipeSinkType="
+ pipeSinkType
+ ", name='"
+ name
+ '\''
+ ", ip='"
+ ip
+ '\''
+ ", port="
+ port
+ '}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
IoTDBPipeSink pipeSink = (IoTDBPipeSink) o;
return port == pipeSink.port
&& pipeSinkType == pipeSink.pipeSinkType
&& Objects.equals(name, pipeSink.name)
&& Objects.equals(ip, pipeSink.ip);
}
@Override
public int hashCode() {
return Objects.hash(pipeSinkType, name, ip, port);
}
}