blob: ef9d4e76791e405424dd6a70650eca33778f837f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.karaf.decanter.collector.jdbc;
import org.apache.karaf.decanter.collector.utils.PropertiesPreparator;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.ConfigurationPolicy;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.osgi.service.event.EventConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.sql.DataSource;
import java.math.BigDecimal;
import java.sql.*;
import java.util.ArrayList;
import java.util.Dictionary;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Component(
name = "org.apache.karaf.decanter.collector.jdbc",
immediate = true,
configurationPolicy = ConfigurationPolicy.REQUIRE,
property = { "decanter.collector.name=jdbc",
"scheduler.period:Long=60",
"scheduler.concurrent:Boolean=false",
"scheduler.name=decanter-collector-jdbc"}
)
public class JdbcCollector implements Runnable {
@Reference
public EventAdmin dispatcher;
@Reference
public DataSource dataSource;
private final static Logger LOGGER = LoggerFactory.getLogger(JdbcCollector.class);
private String query;
private String topic;
private Dictionary<String, Object> properties;
private Connection connection;
private PreparedStatement preparedStatement;
@Activate
public void activate(ComponentContext context) throws Exception {
properties = context.getProperties();
activate(properties);
}
public void activate(Dictionary<String, Object> config) throws Exception {
query = getProperty(config, "query", null);
if (query == null) {
throw new IllegalStateException("Query is mandatory");
}
topic = getProperty(config, EventConstants.EVENT_TOPIC, "decanter/collect/jdbc");
connection = dataSource.getConnection();
preparedStatement = connection.prepareStatement(query);
}
@Deactivate
public void deactivate() throws Exception {
if (preparedStatement != null) {
preparedStatement.close();
}
if (connection != null) {
connection.close();
}
}
private String getProperty(Dictionary<String, Object> properties, String key, String defaultValue) {
return (properties.get(key) != null) ? (String) properties.get(key) : defaultValue;
}
@Override
public void run() {
LOGGER.debug("Karaf Decanter JDBC collector exectutes query {}", query);
List<Map<String, Object>> dataRows = query();
for (Map<String, Object> data : dataRows) {
Event event = new Event(topic, data);
dispatcher.postEvent(event);
}
}
public List<Map<String, Object>> query() {
List<Map<String, Object>> dataRows = new ArrayList<>();
try (ResultSet resultSet = preparedStatement.executeQuery()) {
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
int columnCount = resultSetMetaData.getColumnCount();
int rowId = 1;
while (resultSet.next()) {
Map<String, Object> data = new HashMap<>();
data.put("type", "jdbc");
// unfortunately, getRow() is not fully supported by all JDBC driver, like Derby
// data.put("row", resultSet.getRow());
data.put("rowId", rowId);
rowId++;
for (int i = 1; i <= columnCount; i++) {
String columnName = resultSetMetaData.getColumnName(i);
int columnType = resultSetMetaData.getColumnType(i);
if (columnType == Types.CHAR || columnType == Types.VARCHAR) {
String value = resultSet.getString(i);
data.put(columnName, value);
}
if (columnType == Types.BOOLEAN) {
Boolean value = resultSet.getBoolean(i);
data.put(columnName, value);
}
if (columnType == Types.INTEGER) {
Integer value = resultSet.getInt(i);
data.put(columnName, value);
}
if (columnType == Types.CLOB) {
Clob value = resultSet.getClob(i);
data.put(columnName, value);
}
if (columnType == Types.ARRAY) {
Array value = resultSet.getArray(i);
data.put(columnName, value);
}
if (columnType == Types.FLOAT) {
Float value = resultSet.getFloat(i);
data.put(columnName, value);
}
if (columnType == Types.BLOB) {
Blob value = resultSet.getBlob(i);
data.put(columnName, value);
}
if (columnType == Types.DATE) {
Date value = resultSet.getDate(i);
data.put(columnName, value);
}
if (columnType == Types.DOUBLE) {
Double value = resultSet.getDouble(i);
data.put(columnName, value);
}
if (columnType == Types.BIGINT) {
Long value = resultSet.getLong(i);
data.put(columnName, value);
}
if (columnType == Types.TIME) {
Time value = resultSet.getTime(i);
data.put(columnName, value);
}
if (columnType == Types.TIMESTAMP) {
Timestamp value = resultSet.getTimestamp(i);
data.put(columnName, value);
}
if (columnType == Types.NUMERIC) {
BigDecimal value = resultSet.getBigDecimal(i);
data.put(columnName, value);
}
}
try {
PropertiesPreparator.prepare(data, properties);
} catch (Exception e) {
LOGGER.warn("Can't prepare data for the dispatcher", e);
}
dataRows.add(data);
}
} catch (Exception e) {
LOGGER.warn("Can't get data from database", e);
}
return dataRows;
}
}