blob: 293ce58bed03eea576c9f996119ea9596571b35e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.polygene.entitystore.sql;
import java.io.IOException;
import java.io.Reader;
import java.io.StringReader;
import java.io.StringWriter;
import java.io.Writer;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;
import javax.sql.DataSource;
import liquibase.Contexts;
import liquibase.Liquibase;
import liquibase.database.Database;
import liquibase.database.ObjectQuotingStrategy;
import liquibase.exception.LiquibaseException;
import org.apache.polygene.api.configuration.Configuration;
import org.apache.polygene.api.entity.EntityDescriptor;
import org.apache.polygene.api.entity.EntityReference;
import org.apache.polygene.api.injection.scope.Service;
import org.apache.polygene.api.injection.scope.This;
import org.apache.polygene.api.injection.scope.Uses;
import org.apache.polygene.api.service.ServiceActivation;
import org.apache.polygene.api.service.ServiceDescriptor;
import org.apache.polygene.library.sql.liquibase.LiquibaseService;
import org.apache.polygene.serialization.javaxjson.JavaxJsonFactories;
import org.apache.polygene.spi.entitystore.EntityNotFoundException;
import org.apache.polygene.spi.entitystore.helpers.JSONKeys;
import org.apache.polygene.spi.entitystore.helpers.MapEntityStore;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.Query;
import org.jooq.Record;
import org.jooq.SQLDialect;
import org.jooq.Schema;
import org.jooq.Table;
import org.jooq.conf.Settings;
import org.jooq.impl.DSL;
public class SQLEntityStoreMixin
implements ServiceActivation, MapEntityStore
{
private static final String TABLE_NAME_LIQUIBASE_PARAMETER = "es-sql.table";
private static final String IDENTITY_COLUMN_NAME = "ENTITY_IDENTITY";
private static final String VERSION_COLUMN_NAME = "ENTITY_VERSION";
private static final String STATE_COLUMN_NAME = "ENTITY_STATE";
@Service
private DataSource dataSource;
@Service
private LiquibaseService liquibaseService;
@Service
private JavaxJsonFactories jsonFactories;
@Uses
private ServiceDescriptor descriptor;
@This
private Configuration<SQLEntityStoreConfiguration> configuration;
private Schema schema;
private Table<Record> table;
private Field<String> identityColumn;
private Field<String> versionColumn;
private Field<String> stateColumn;
private DSLContext dsl;
@Override
public void activateService() throws Exception
{
configuration.refresh();
SQLEntityStoreConfiguration config = configuration.get();
// Prepare jooq DSL
SQLDialect dialect = descriptor.metaInfo( SQLDialect.class );
Settings settings = descriptor.metaInfo( Settings.class );
String schemaName = config.schemaName().get();
String tableName = config.entityTableName().get();
schema = DSL.schema( DSL.name( schemaName ) );
table = DSL.table(
dialect.equals( SQLDialect.SQLITE )
? DSL.name( tableName )
: DSL.name( schema.getName(), tableName )
);
identityColumn = DSL.field( DSL.name( IDENTITY_COLUMN_NAME ), String.class );
versionColumn = DSL.field( DSL.name( VERSION_COLUMN_NAME ), String.class );
stateColumn = DSL.field( DSL.name( STATE_COLUMN_NAME ), String.class );
dsl = DSL.using( dataSource, dialect, settings );
// Eventually create schema and apply Liquibase changelog
if( config.createIfMissing().get() )
{
if( !dialect.equals( SQLDialect.SQLITE )
&& dsl.meta().getSchemas().stream().noneMatch( s -> schema.getName().equalsIgnoreCase( s.getName() ) ) )
{
dsl.createSchema( schema ).execute();
}
applyLiquibaseChangelog( dialect );
}
}
private void applyLiquibaseChangelog( SQLDialect dialect ) throws SQLException, LiquibaseException
{
Liquibase liquibase = liquibaseService.newConnectedLiquibase();
Database db = liquibase.getDatabase();
db.setObjectQuotingStrategy( ObjectQuotingStrategy.QUOTE_ALL_OBJECTS );
try
{
if( !dialect.equals( SQLDialect.SQLITE ) )
{
if( db.supportsSchemas() )
{
db.setDefaultSchemaName( schema.getName() );
db.setLiquibaseSchemaName( schema.getName() );
}
if( db.supportsCatalogs() )
{
db.setDefaultCatalogName( schema.getName() );
db.setLiquibaseCatalogName( schema.getName() );
}
}
liquibase.getChangeLogParameters().set( TABLE_NAME_LIQUIBASE_PARAMETER, table.getName() );
liquibase.update( new Contexts() );
}
finally
{
db.close();
}
}
@Override
public void passivateService() throws Exception
{
dsl = null;
schema = null;
table = null;
identityColumn = null;
versionColumn = null;
stateColumn = null;
}
@Override
public Reader get( EntityReference entityReference )
{
String state = dsl.select( stateColumn )
.from( table )
.where( identityColumn.equal( entityReference.identity().toString() ) )
.fetchOptional( stateColumn )
.orElseThrow( () -> new EntityNotFoundException( entityReference ) );
return new StringReader( state );
}
@Override
public Stream<Reader> entityStates()
{
return dsl.select( stateColumn )
.from( table )
.fetch( stateColumn )
.stream()
.map( StringReader::new );
}
@Override
public void applyChanges( MapChanges changes ) throws Exception
{
List<Query> operations = new ArrayList<>();
changes.visitMap( new MapChanger()
{
@Override
public Writer newEntity( EntityReference ref, EntityDescriptor entityDescriptor )
{
return new StringWriter( 1000 )
{
@Override
public void close() throws IOException
{
super.close();
String state = toString();
String version = jsonFactories.readerFactory().createReader( new StringReader( state ) )
.readObject()
.getString( JSONKeys.VERSION );
operations.add(
dsl.insertInto( table )
.columns( identityColumn, versionColumn, stateColumn )
.values( ref.identity().toString(), version, state )
);
}
};
}
@Override
public Writer updateEntity( MapChange mapChange )
{
return new StringWriter( 1000 )
{
@Override
public void close() throws IOException
{
super.close();
String state = toString();
operations.add(
dsl.update( table )
.set( versionColumn, mapChange.newVersion() )
.set( stateColumn, state )
.where( identityColumn.equal( mapChange.reference().identity().toString() ) )
.and( versionColumn.equal( mapChange.previousVersion() ) )
);
}
};
}
@Override
public void removeEntity( EntityReference ref, EntityDescriptor entityDescriptor )
{
operations.add(
dsl.deleteFrom( table )
.where( identityColumn.equal( ref.identity().toString() ) )
);
}
} );
dsl.batch( operations ).execute();
}
}