blob: 17b031c6c646342e08b15d8223416febd7c527b9 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.store.derby;
import java.io.InputStream;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.SizeMonitoringSettings;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.jdbc.AbstractJDBCMessageStore;
import org.apache.qpid.server.store.jdbc.JdbcUtils;
public abstract class AbstractDerbyMessageStore extends AbstractJDBCMessageStore
{
private final AtomicBoolean _messageStoreOpen = new AtomicBoolean(false);
private long _persistentSizeLowThreshold;
private long _persistentSizeHighThreshold;
private long _totalStoreSize;
private boolean _limitBusted;
private ConfiguredObject<?> _parent;
@Override
public final void openMessageStore(final ConfiguredObject<?> parent)
{
if (_messageStoreOpen.compareAndSet(false, true))
{
_parent = parent;
initMessageStore(parent);
DerbyUtils.loadDerbyDriver();
doOpen(parent);
final SizeMonitoringSettings sizeMonitorSettings = (SizeMonitoringSettings) parent;
_persistentSizeHighThreshold = sizeMonitorSettings.getStoreOverfullSize();
_persistentSizeLowThreshold = sizeMonitorSettings.getStoreUnderfullSize();
if (_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l)
{
_persistentSizeLowThreshold = _persistentSizeHighThreshold;
}
createOrOpenMessageStoreDatabase();
setInitialSize();
setMaximumMessageId();
}
}
protected abstract void doOpen(final ConfiguredObject<?> parent);
@Override
public final void upgradeStoreStructure() throws StoreException
{
checkMessageStoreOpen();
upgrade(_parent);
}
@Override
public final void closeMessageStore()
{
if (_messageStoreOpen.compareAndSet(true, false))
{
try
{
doClose();
}
finally
{
super.closeMessageStore();
}
}
}
protected abstract void doClose();
@Override
protected boolean isMessageStoreOpen()
{
return _messageStoreOpen.get();
}
@Override
protected void checkMessageStoreOpen()
{
if (!_messageStoreOpen.get())
{
throw new IllegalStateException("Message store is not open");
}
}
@Override
protected String getSqlBlobType()
{
return "blob";
}
@Override
protected String getSqlBlobStorage(String columnName)
{
return "";
}
@Override
protected String getSqlVarBinaryType(int size)
{
return "varchar("+size+") for bit data";
}
@Override
protected String getSqlBigIntType()
{
return "bigint";
}
@Override
protected InputStream getBlobAsInputStream(ResultSet rs, int col) throws SQLException
{
return DerbyUtils.getBlobAsInputStream(rs, col);
}
@Override
protected boolean tableExists(final String tableName, final Connection conn) throws SQLException
{
return DerbyUtils.tableExists(tableName, conn);
}
@Override
protected void storedSizeChange(final int delta)
{
if(getPersistentSizeHighThreshold() > 0)
{
synchronized(this)
{
// the delta supplied is an approximation of a store size change. we don;t want to check the statistic every
// time, so we do so only when there's been enough change that it is worth looking again. We do this by
// assuming the total size will change by less than twice the amount of the message data change.
long newSize = _totalStoreSize += 3*delta;
Connection conn = null;
try
{
if(!_limitBusted && newSize > getPersistentSizeHighThreshold())
{
conn = newAutoCommitConnection();
_totalStoreSize = getSizeOnDisk(conn);
if(_totalStoreSize > getPersistentSizeHighThreshold())
{
_limitBusted = true;
_eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
}
}
else if(_limitBusted && newSize < getPersistentSizeLowThreshold())
{
long oldSize = _totalStoreSize;
conn = newAutoCommitConnection();
_totalStoreSize = getSizeOnDisk(conn);
if(oldSize <= _totalStoreSize)
{
reduceSizeOnDisk(conn);
_totalStoreSize = getSizeOnDisk(conn);
}
if(_totalStoreSize < getPersistentSizeLowThreshold())
{
_limitBusted = false;
_eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
}
}
}
catch (SQLException e)
{
JdbcUtils.closeConnection(conn, getLogger());
throw new StoreException("Exception while processing store size change", e);
}
}
}
}
private void setInitialSize()
{
Connection conn = null;
try
{
conn = newAutoCommitConnection();
_totalStoreSize = getSizeOnDisk(conn);
}
catch (SQLException e)
{
getLogger().error("Unable to set initial store size", e);
}
finally
{
JdbcUtils.closeConnection(conn, getLogger());
}
}
private long getSizeOnDisk(Connection conn)
{
PreparedStatement stmt = null;
try
{
String sizeQuery = "SELECT SUM(T2.NUMALLOCATEDPAGES * T2.PAGESIZE) TOTALSIZE" +
" FROM " +
" SYS.SYSTABLES systabs," +
" TABLE (SYSCS_DIAG.SPACE_TABLE(systabs.tablename)) AS T2" +
" WHERE systabs.tabletype = 'T'";
stmt = conn.prepareStatement(sizeQuery);
ResultSet rs = null;
long size = 0l;
try
{
rs = stmt.executeQuery();
while(rs.next())
{
size = rs.getLong(1);
}
}
finally
{
if(rs != null)
{
rs.close();
}
}
return size;
}
catch (SQLException e)
{
throw new StoreException("Error establishing on disk size", e);
}
finally
{
JdbcUtils.closePreparedStatement(stmt, getLogger());
}
}
private void reduceSizeOnDisk(Connection conn)
{
CallableStatement cs = null;
PreparedStatement stmt = null;
try
{
String tableQuery =
"SELECT S.SCHEMANAME, T.TABLENAME FROM SYS.SYSSCHEMAS S, SYS.SYSTABLES T WHERE S.SCHEMAID = T.SCHEMAID AND T.TABLETYPE='T'";
stmt = conn.prepareStatement(tableQuery);
ResultSet rs = null;
List<String> schemas = new ArrayList<String>();
List<String> tables = new ArrayList<String>();
try
{
rs = stmt.executeQuery();
while(rs.next())
{
schemas.add(rs.getString(1));
tables.add(rs.getString(2));
}
}
finally
{
if(rs != null)
{
rs.close();
}
}
cs = conn.prepareCall
("CALL SYSCS_UTIL.SYSCS_COMPRESS_TABLE(?, ?, ?)");
for(int i = 0; i < schemas.size(); i++)
{
cs.setString(1, schemas.get(i));
cs.setString(2, tables.get(i));
cs.setShort(3, (short) 0);
cs.execute();
}
}
catch (SQLException e)
{
throw new StoreException("Error reducing on disk size", e);
}
finally
{
JdbcUtils.closePreparedStatement(stmt, getLogger());
JdbcUtils.closePreparedStatement(cs, getLogger());
}
}
private long getPersistentSizeLowThreshold()
{
return _persistentSizeLowThreshold;
}
private long getPersistentSizeHighThreshold()
{
return _persistentSizeHighThreshold;
}
}