blob: 2f77735f7ba97192ceaf4530d4d8be60b0912cf1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oodt.cas.catalog.mapping;
//JDK imports
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Calendar;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;
//SQL imports
import javax.sql.DataSource;
//OODT imports
import org.apache.oodt.cas.catalog.exception.CatalogRepositoryException;
import org.apache.oodt.cas.catalog.page.CatalogReceipt;
import org.apache.oodt.cas.catalog.page.IndexPager;
import org.apache.oodt.cas.catalog.page.IngestReceipt;
import org.apache.oodt.cas.catalog.struct.TransactionId;
import org.apache.oodt.cas.catalog.struct.TransactionIdFactory;
import org.apache.oodt.commons.database.DatabaseConnectionBuilder;
import org.apache.oodt.commons.date.DateUtils;
/**
* @author bfoster
* @version $Revision$
*
* <p>
* A Ingest Mapper that indexes to an DataSource Database
* <p>
*/
public class DataSourceIngestMapper implements IngestMapper {
protected DataSource dataSource;
public DataSourceIngestMapper(String user, String pass, String driver,
String jdbcUrl) throws InstantiationException {
this.dataSource = DatabaseConnectionBuilder.buildDataSource(user, pass,
driver, jdbcUrl);
}
public synchronized void deleteAllMappingsForCatalog(String catalogId)
throws CatalogRepositoryException {
Connection conn = null;
Statement stmt = null;
try {
conn = this.dataSource.getConnection();
stmt = conn.createStatement();
stmt.execute("DELETE FROM CatalogServiceMapper WHERE CATALOG_ID = '" + catalogId + "'");
}catch (Exception e) {
throw new CatalogRepositoryException(e.getMessage(), e);
}finally {
try {
conn.close();
}catch(Exception e) {}
try {
stmt.close();
}catch(Exception e) {}
}
}
public synchronized void deleteAllMappingsForCatalogServiceTransactionId(
TransactionId<?> catalogServiceTransactionId)
throws CatalogRepositoryException {
Connection conn = null;
Statement stmt = null;
try {
conn = this.dataSource.getConnection();
stmt = conn.createStatement();
stmt.execute("DELETE FROM CatalogServiceMapper WHERE CAT_SERV_TRANS_ID = '" + catalogServiceTransactionId + "'");
conn.commit();
}catch (Exception e) {
throw new CatalogRepositoryException(e.getMessage(), e);
}finally {
try {
conn.close();
}catch(Exception e) {}
try {
stmt.close();
}catch(Exception e) {}
}
}
public synchronized void deleteTransactionIdMapping(
TransactionId<?> catalogTransactionId, String catalogId)
throws CatalogRepositoryException {
Connection conn = null;
Statement stmt = null;
try {
conn = this.dataSource.getConnection();
stmt = conn.createStatement();
stmt.execute("DELETE FROM CatalogServiceMapper WHERE CAT_TRANS_ID = '" + catalogTransactionId + "' AND CATALOG_ID = '" + catalogId + "'");
conn.commit();
}catch (Exception e) {
throw new CatalogRepositoryException(e.getMessage(), e);
}finally {
try {
conn.close();
}catch(Exception e) {}
try {
stmt.close();
}catch(Exception e) {}
}
}
public synchronized TransactionId<?> getCatalogServiceTransactionId(
TransactionId<?> catalogTransactionId, String catalogId)
throws CatalogRepositoryException {
Connection conn = null;
Statement stmt = null;
ResultSet rs = null;
try {
conn = this.dataSource.getConnection();
stmt = conn.createStatement();
rs = stmt.executeQuery("SELECT CAT_SERV_TRANS_ID,CAT_SERV_TRANS_FACTORY FROM CatalogServiceMapper WHERE CAT_TRANS_ID = '"+ catalogTransactionId + "' AND CATALOG_ID = '" + catalogId + "'");
while(rs.next())
return ((TransactionIdFactory) Class.forName(rs.getString("CAT_SERV_TRANS_FACTORY")).newInstance()).createTransactionId(rs.getString("CAT_SERV_TRANS_ID"));
return null;
}catch (Exception e) {
throw new CatalogRepositoryException(e.getMessage(), e);
}finally {
try {
conn.close();
}catch(Exception e) {}
try {
stmt.close();
}catch(Exception e) {}
try {
rs.close();
}catch(Exception e) {}
}
}
public synchronized TransactionId<?> getCatalogTransactionId(
TransactionId<?> catalogServiceTransactionId, String catalogId)
throws CatalogRepositoryException {
Connection conn = null;
Statement stmt = null;
ResultSet rs = null;
try {
conn = this.dataSource.getConnection();
stmt = conn.createStatement();
rs = stmt.executeQuery("SELECT CAT_TRANS_ID,CAT_TRANS_FACTORY FROM CatalogServiceMapper WHERE CAT_SERV_TRANS_ID = '"+ catalogServiceTransactionId + "' AND CATALOG_ID = '" + catalogId + "'");
while(rs.next())
return ((TransactionIdFactory) Class.forName(rs.getString("CAT_TRANS_FACTORY")).newInstance()).createTransactionId(rs.getString("CAT_TRANS_ID"));
return null;
}catch (Exception e) {
throw new CatalogRepositoryException(e.getMessage(), e);
}finally {
try {
conn.close();
}catch(Exception e) {}
try {
stmt.close();
}catch(Exception e) {}
try {
rs.close();
}catch(Exception e) {}
}
}
public synchronized Set<String> getCatalogIds(
TransactionId<?> catalogServiceTransactionId)
throws CatalogRepositoryException {
Connection conn = null;
Statement stmt = null;
ResultSet rs = null;
try {
conn = this.dataSource.getConnection();
stmt = conn.createStatement();
rs = stmt.executeQuery("SELECT CATALOG_ID FROM CatalogServiceMapper WHERE CAT_SERV_TRANS_ID = '"+ catalogServiceTransactionId + "'");
Set<String> catalogIds = new HashSet<String>();
while(rs.next())
catalogIds.add(rs.getString("CATALOG_ID"));
return catalogIds;
}catch (Exception e) {
throw new CatalogRepositoryException(e.getMessage(), e);
}finally {
try {
conn.close();
}catch(Exception e) {}
try {
stmt.close();
}catch(Exception e) {}
try {
rs.close();
}catch(Exception e) {}
}
}
public synchronized Set<TransactionId<?>> getPageOfCatalogTransactionIds(
IndexPager indexPager, String catalogId)
throws CatalogRepositoryException {
Connection conn = null;
Statement stmt = null;
ResultSet rs = null;
try {
conn = this.dataSource.getConnection();
stmt = conn.createStatement();
rs = stmt.executeQuery(
"SELECT * FROM "
+"( SELECT a.*, ROWNUM r FROM "
+ "( SELECT CAT_TRANS_FACTORY,CAT_TRANS_ID FROM CatalogServiceMapper WHERE CatalogServiceMapper.CATALOG_ID = '" + catalogId + "' ORDER BY CatalogServiceMapper.CAT_SERV_TRANS_ID DESC ) a "
+ "WHERE ROWNUM <= " + (indexPager.getPageSize() * (indexPager.getPageNum() + 1)) + " ) "
+ "WHERE r >= " + ((indexPager.getPageSize() * indexPager.getPageNum()) + 1));
Set<TransactionId<?>> transactionIds = new HashSet<TransactionId<?>>();
while(rs.next())
transactionIds.add(((TransactionIdFactory) Class.forName(rs.getString("CAT_TRANS_FACTORY")).newInstance()).createTransactionId(rs.getString("CAT_TRANS_ID")));
return transactionIds;
}catch (Exception e) {
throw new CatalogRepositoryException(e.getMessage(), e);
}finally {
try {
conn.close();
}catch(Exception e) {}
try {
stmt.close();
}catch(Exception e) {}
try {
rs.close();
}catch(Exception e) {}
}
}
public synchronized boolean hasCatalogServiceTransactionId(
TransactionId<?> catalogServiceTransactionId)
throws CatalogRepositoryException {
Connection conn = null;
Statement stmt = null;
ResultSet rs = null;
try {
conn = this.dataSource.getConnection();
stmt = conn.createStatement();
rs = stmt.executeQuery("SELECT CAT_SERV_TRANS_ID FROM CatalogServiceMapper WHERE CAT_SERV_TRANS_ID = '"+ catalogServiceTransactionId + "'");
return rs.next();
}catch (Exception e) {
throw new CatalogRepositoryException(e.getMessage(), e);
}finally {
try {
conn.close();
}catch(Exception e) {}
try {
stmt.close();
}catch(Exception e) {}
try {
rs.close();
}catch(Exception e) {}
}
}
public synchronized void storeTransactionIdMapping(
TransactionId<?> catalogServiceTransactionId,
TransactionIdFactory catalogServiceTransactionIdFactory,
CatalogReceipt catalogReceipt,
TransactionIdFactory catalogTransactionIdFactory)
throws CatalogRepositoryException {
Connection conn = null;
Statement stmt = null;
try {
conn = this.dataSource.getConnection();
stmt = conn.createStatement();
Calendar calTime = DateUtils.getCurrentUtcTime();
calTime.setTime(catalogReceipt.getTransactionDate());
stmt.execute("INSERT INTO CatalogServiceMapper (CAT_SERV_TRANS_ID, CAT_SERV_TRANS_FACTORY, CAT_TRANS_ID, CAT_TRANS_FACTORY, CAT_TRANS_DATE, CATALOG_ID) VALUES ('"
+ catalogServiceTransactionId + "', '"
+ catalogServiceTransactionIdFactory.getClass().getName() + "', '"
+ catalogReceipt.getTransactionId() + "', '"
+ catalogTransactionIdFactory.getClass().getName() + "', '"
+ DateUtils.toString(calTime) + "', '"
+ catalogReceipt.getCatalogId() + "')");
conn.commit();
}catch (Exception e) {
throw new CatalogRepositoryException(e.getMessage(), e);
}finally {
try {
conn.close();
}catch(Exception e) {}
try {
stmt.close();
}catch(Exception e) {}
}
}
public CatalogReceipt getCatalogReceipt(
TransactionId<?> catalogServiceTransactionId, String catalogId)
throws CatalogRepositoryException {
Connection conn = null;
Statement stmt = null;
ResultSet rs = null;
try {
conn = this.dataSource.getConnection();
stmt = conn.createStatement();
rs = stmt.executeQuery("SELECT CAT_TRANS_ID, CAT_TRANS_FACTORY, CAT_TRANS_DATE FROM CatalogServiceMapper WHERE CAT_SERV_TRANS_ID = '"+ catalogServiceTransactionId + "' AND CATALOG_ID = '" + catalogId + "'");
if(rs.next()) {
TransactionId<?> catalogTransactionId = ((TransactionIdFactory) Class.forName(rs.getString("CAT_TRANS_FACTORY")).newInstance()).createTransactionId(rs.getString("CAT_TRANS_ID"));
Date transactionDate = DateUtils.toCalendar(rs.getString("CAT_TRANS_DATE"), DateUtils.FormatType.UTC_FORMAT).getTime();
return new CatalogReceipt(new IngestReceipt(catalogTransactionId, transactionDate), catalogId);
}else {
return null;
}
}catch (Exception e) {
throw new CatalogRepositoryException(e.getMessage(), e);
}finally {
try {
conn.close();
}catch(Exception e) {}
try {
stmt.close();
}catch(Exception e) {}
try {
rs.close();
}catch(Exception e) {}
}
}
}