blob: 3dbfa2788cce0fec77a78fe01f9063e837a4504f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.kudu.examples;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.CreateTableOptions;
import org.apache.kudu.client.Insert;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.OperationResponse;
import org.apache.kudu.client.RowError;
import org.apache.kudu.client.SessionConfiguration.FlushMode;
public class KuduCollectlExample {
private static final int GRAPHITE_PORT = 2003;
private static final String TABLE_NAME = "metrics";
private static final String ID_TABLE_NAME = "metric_ids";
private static final String KUDU_MASTER =
System.getProperty("kuduMasters", "localhost:7051");
private KuduClient client;
private KuduTable table;
private KuduTable idTable;
private Set<String> existingMetrics = Collections.newSetFromMap(
new ConcurrentHashMap<String, Boolean>());
public static void main(String[] args) throws Exception {
new KuduCollectlExample().run();
}
KuduCollectlExample() {
this.client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build();
}
public void run() throws Exception {
createTableIfNecessary();
createIdTableIfNecessary();
this.table = client.openTable(TABLE_NAME);
this.idTable = client.openTable(ID_TABLE_NAME);
try (ServerSocket listener = new ServerSocket(GRAPHITE_PORT)) {
while (true) {
Socket s = listener.accept();
new HandlerThread(s).start();
}
}
}
private void createTableIfNecessary() throws Exception {
if (client.tableExists(TABLE_NAME)) {
return;
}
List<ColumnSchema> cols = new ArrayList<>();
cols.add(new ColumnSchemaBuilder("host", Type.STRING).key(true).encoding(
ColumnSchema.Encoding.DICT_ENCODING).build());
cols.add(new ColumnSchemaBuilder("metric", Type.STRING).key(true).encoding(
ColumnSchema.Encoding.DICT_ENCODING).build());
cols.add(new ColumnSchemaBuilder("timestamp", Type.INT32).key(true).encoding(
ColumnSchema.Encoding.BIT_SHUFFLE).build());
cols.add(new ColumnSchemaBuilder("value", Type.DOUBLE)
.encoding(ColumnSchema.Encoding.BIT_SHUFFLE).build());
// Need to set this up since we're not pre-partitioning.
List<String> rangeKeys = new ArrayList<>();
rangeKeys.add("host");
rangeKeys.add("metric");
rangeKeys.add("timestamp");
client.createTable(TABLE_NAME, new Schema(cols),
new CreateTableOptions().setRangePartitionColumns(rangeKeys));
}
private void createIdTableIfNecessary() throws Exception {
if (client.tableExists(ID_TABLE_NAME)) {
return;
}
ArrayList<ColumnSchema> cols = new ArrayList<>();
cols.add(new ColumnSchemaBuilder("host", Type.STRING).key(true).build());
cols.add(new ColumnSchemaBuilder("metric", Type.STRING).key(true).build());
// Need to set this up since we're not pre-partitioning.
List<String> rangeKeys = new ArrayList<>();
rangeKeys.add("host");
rangeKeys.add("metric");
client.createTable(ID_TABLE_NAME, new Schema(cols),
new CreateTableOptions().setRangePartitionColumns(rangeKeys));
}
class HandlerThread extends Thread {
private Socket socket;
private KuduSession session;
HandlerThread(Socket s) {
this.socket = s;
this.session = client.newSession();
// TODO: AUTO_FLUSH_BACKGROUND would be better for this kind of usecase, but
// it seems like it's buffering data too long, and only flushing based on size.
// Perhaps we should support a time-based buffering as well?
session.setFlushMode(FlushMode.MANUAL_FLUSH);
// Increase the number of mutations that we can buffer
session.setMutationBufferSpace(10000);
}
@Override
public void run() {
try {
doRun();
} catch (Exception e) {
System.err.println("exception handling connection from " + socket);
e.printStackTrace();
}
}
private void insertIdIfNecessary(String host, String metric) throws Exception {
String id = host + "/" + metric;
if (existingMetrics.contains(id)) {
return;
}
Insert ins = idTable.newInsert();
ins.getRow().addString("host", host);
ins.getRow().addString("metric", metric);
session.apply(ins);
session.flush();
// TODO: error handling!
//System.err.println("registered new metric " + id);
existingMetrics.add(id);
}
private void doRun() throws Exception {
BufferedReader br = new BufferedReader(new InputStreamReader(
socket.getInputStream()));
socket = null;
// Read lines from collectd. Each line should look like:
// hostname.example.com/.cpuload.avg1 2.27 1435788059
String input;
while ((input = br.readLine()) != null) {
String[] fields = input.split(" ");
if (fields.length != 3) {
throw new Exception("Invalid input: " + input);
}
String[] hostAndMetric = fields[0].split("/.");
if (hostAndMetric.length != 2) {
System.err.println("bad line: " + input);
throw new Exception("expected /. delimiter between host and metric name. " +
"Did you run collectl with --export=collectl,<hostname>,p=/ ?");
}
String host = hostAndMetric[0];
String metric = hostAndMetric[1];
insertIdIfNecessary(host, metric);
double val = Double.parseDouble(fields[1]);
int ts = Integer.parseInt(fields[2]);
Insert insert = table.newInsert();
insert.getRow().addString("host", hostAndMetric[0]);
insert.getRow().addString("metric", hostAndMetric[1]);
insert.getRow().addInt("timestamp", ts);
insert.getRow().addDouble("value", val);
session.apply(insert);
// If there's more data to read, don't flush yet -- better to accumulate
// a larger batch.
if (!br.ready()) {
List<OperationResponse> responses = session.flush();
for (OperationResponse r : responses) {
if (r.hasRowError()) {
RowError e = r.getRowError();
// TODO: the client should offer an enum for different row errors, instead
// of string comparison!
if ("ALREADY_PRESENT".equals(e.getStatus())) {
continue;
}
System.err.println("Error inserting " + e.getOperation().toString()
+ ": " + e.toString());
}
}
}
}
}
}
}