blob: 7ae839347c750509e2360dd1d271a4f154f4d64a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache license, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the license for the specific language governing permissions and
* limitations under the license.
*/
package org.apache.logging.log4j.cassandra;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.ManagerFactory;
import org.apache.logging.log4j.core.appender.db.AbstractDatabaseManager;
import org.apache.logging.log4j.core.appender.db.ColumnMapping;
import org.apache.logging.log4j.plugins.convert.TypeConverters;
import org.apache.logging.log4j.core.net.SocketAddress;
import org.apache.logging.log4j.jdbc.convert.DateTypeConverter;
import org.apache.logging.log4j.spi.ThreadContextMap;
import org.apache.logging.log4j.spi.ThreadContextStack;
import org.apache.logging.log4j.util.ReadOnlyStringMap;
import org.apache.logging.log4j.util.Strings;
/**
* Manager for a Cassandra appender instance.
*/
public class CassandraManager extends AbstractDatabaseManager {
private static final int DEFAULT_PORT = 9042;
private final Cluster cluster;
private final String keyspace;
private final String insertQueryTemplate;
private final List<ColumnMapping> columnMappings;
private final BatchStatement batchStatement;
// re-usable argument binding array
private final Object[] values;
private Session session;
private PreparedStatement preparedStatement;
private CassandraManager(final String name, final int bufferSize, final Cluster cluster,
final String keyspace, final String insertQueryTemplate,
final List<ColumnMapping> columnMappings, final BatchStatement batchStatement) {
super(name, bufferSize);
this.cluster = cluster;
this.keyspace = keyspace;
this.insertQueryTemplate = insertQueryTemplate;
this.columnMappings = columnMappings;
this.batchStatement = batchStatement;
this.values = new Object[columnMappings.size()];
}
@Override
protected void startupInternal() throws Exception {
session = cluster.connect(keyspace);
preparedStatement = session.prepare(insertQueryTemplate);
}
@Override
protected boolean shutdownInternal() throws Exception {
session.close();
cluster.close();
return true;
}
@Override
protected void connectAndStart() {
// a Session automatically manages connections for us
}
@Override
protected void writeInternal(final LogEvent event, final Serializable serializable) {
for (int i = 0; i < columnMappings.size(); i++) {
final ColumnMapping columnMapping = columnMappings.get(i);
if (ThreadContextMap.class.isAssignableFrom(columnMapping.getType())
|| ReadOnlyStringMap.class.isAssignableFrom(columnMapping.getType())) {
values[i] = event.getContextData().toMap();
} else if (ThreadContextStack.class.isAssignableFrom(columnMapping.getType())) {
values[i] = event.getContextStack().asList();
} else if (Date.class.isAssignableFrom(columnMapping.getType())) {
values[i] = DateTypeConverter.fromMillis(event.getTimeMillis(), columnMapping.getType().asSubclass(Date.class));
} else {
values[i] = TypeConverters.convert(columnMapping.getLayout().toSerializable(event),
columnMapping.getType(), null);
}
}
final BoundStatement boundStatement = preparedStatement.bind(values);
if (batchStatement == null) {
session.execute(boundStatement);
} else {
batchStatement.add(boundStatement);
}
}
@Override
protected boolean commitAndClose() {
if (batchStatement != null) {
session.execute(batchStatement);
}
return true;
}
public static CassandraManager getManager(final String name, final SocketAddress[] contactPoints,
final ColumnMapping[] columns, final boolean useTls,
final String clusterName, final String keyspace, final String table,
final String username, final String password,
final boolean useClockForTimestampGenerator, final int bufferSize,
final boolean batched, final BatchStatement.Type batchType) {
return getManager(name,
new FactoryData(contactPoints, columns, useTls, clusterName, keyspace, table, username, password,
useClockForTimestampGenerator, bufferSize, batched, batchType), CassandraManagerFactory.INSTANCE);
}
private static class CassandraManagerFactory implements ManagerFactory<CassandraManager, FactoryData> {
private static final CassandraManagerFactory INSTANCE = new CassandraManagerFactory();
@Override
public CassandraManager createManager(final String name, final FactoryData data) {
final Cluster.Builder builder = Cluster.builder()
.addContactPointsWithPorts(data.contactPoints)
.withClusterName(data.clusterName);
if (data.useTls) {
builder.withSSL();
}
if (Strings.isNotBlank(data.username)) {
builder.withCredentials(data.username, data.password);
}
if (data.useClockForTimestampGenerator) {
builder.withTimestampGenerator(new ClockTimestampGenerator());
}
final Cluster cluster = builder.build();
final StringBuilder sb = new StringBuilder("INSERT INTO ").append(data.table).append(" (");
for (final ColumnMapping column : data.columns) {
sb.append(column.getName()).append(',');
}
sb.setCharAt(sb.length() - 1, ')');
sb.append(" VALUES (");
final List<ColumnMapping> columnMappings = new ArrayList<>(data.columns.length);
for (final ColumnMapping column : data.columns) {
if (Strings.isNotEmpty(column.getLiteralValue())) {
sb.append(column.getLiteralValue());
} else {
sb.append('?');
columnMappings.add(column);
}
sb.append(',');
}
sb.setCharAt(sb.length() - 1, ')');
final String insertQueryTemplate = sb.toString();
LOGGER.debug("Using CQL for appender {}: {}", name, insertQueryTemplate);
return new CassandraManager(name, data.getBufferSize(), cluster, data.keyspace, insertQueryTemplate,
columnMappings, data.batched ? new BatchStatement(data.batchType) : null);
}
}
private static class FactoryData extends AbstractFactoryData {
private final InetSocketAddress[] contactPoints;
private final ColumnMapping[] columns;
private final boolean useTls;
private final String clusterName;
private final String keyspace;
private final String table;
private final String username;
private final String password;
private final boolean useClockForTimestampGenerator;
private final boolean batched;
private final BatchStatement.Type batchType;
private FactoryData(final SocketAddress[] contactPoints, final ColumnMapping[] columns, final boolean useTls,
final String clusterName, final String keyspace, final String table, final String username,
final String password, final boolean useClockForTimestampGenerator, final int bufferSize,
final boolean batched, final BatchStatement.Type batchType) {
super(bufferSize, null);
this.contactPoints = convertAndAddDefaultPorts(contactPoints);
this.columns = columns;
this.useTls = useTls;
this.clusterName = clusterName;
this.keyspace = keyspace;
this.table = table;
this.username = username;
this.password = password;
this.useClockForTimestampGenerator = useClockForTimestampGenerator;
this.batched = batched;
this.batchType = batchType;
}
private static InetSocketAddress[] convertAndAddDefaultPorts(final SocketAddress... socketAddresses) {
final InetSocketAddress[] inetSocketAddresses = new InetSocketAddress[socketAddresses.length];
for (int i = 0; i < inetSocketAddresses.length; i++) {
final SocketAddress socketAddress = socketAddresses[i];
inetSocketAddresses[i] = socketAddress.getPort() == 0
? new InetSocketAddress(socketAddress.getAddress(), DEFAULT_PORT)
: socketAddress.getSocketAddress();
}
return inetSocketAddresses;
}
}
}