blob: 99bcdab2ca3725c3f56880005cfd20d7e986b5a5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package org.apache.polygene.index.elasticsearch;
import java.util.HashMap;
import java.util.Map;
import javax.json.JsonArrayBuilder;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import javax.json.JsonValue;
import org.apache.polygene.api.entity.EntityDescriptor;
import org.apache.polygene.api.entity.EntityReference;
import org.apache.polygene.api.identity.Identity;
import org.apache.polygene.api.injection.scope.Service;
import org.apache.polygene.api.injection.scope.Structure;
import org.apache.polygene.api.injection.scope.This;
import org.apache.polygene.api.mixin.Mixins;
import org.apache.polygene.api.serialization.Serializer;
import org.apache.polygene.api.structure.ModuleDescriptor;
import org.apache.polygene.api.time.SystemTime;
import org.apache.polygene.api.usecase.UsecaseBuilder;
import org.apache.polygene.api.util.Classes;
import org.apache.polygene.serialization.javaxjson.JavaxJsonFactories;
import org.apache.polygene.spi.entity.EntityState;
import org.apache.polygene.spi.entity.EntityStatus;
import org.apache.polygene.spi.entity.ManyAssociationState;
import org.apache.polygene.spi.entity.NamedAssociationState;
import org.apache.polygene.spi.entitystore.EntityStore;
import org.apache.polygene.spi.entitystore.EntityStoreUnitOfWork;
import org.apache.polygene.spi.entitystore.StateChangeListener;
import org.apache.polygene.spi.serialization.JsonSerializer;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.common.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Listen to Entity state changes and index them in ElasticSearch.
*
* QUID Use two indices, one for strict queries, one for full text and fuzzy search?
*/
@Mixins( ElasticSearchIndexer.Mixin.class )
public interface ElasticSearchIndexer extends StateChangeListener
{
class Mixin
implements StateChangeListener
{
private static final Logger LOGGER = LoggerFactory.getLogger( ElasticSearchIndexer.class );
@Structure
private ModuleDescriptor module;
@Service
private EntityStore entityStore;
@Service
private JsonSerializer jsonSerializer;
@Service
private JavaxJsonFactories jsonFactories;
@This
private ElasticSearchSupport support;
public void emptyIndex()
{
support.client().admin().indices().prepareDelete( support.index() ).execute().actionGet();
}
@Override
public void notifyChanges( Iterable<EntityState> changedStates )
{
// All updated or new states
Map<String, EntityState> newStates = new HashMap<>();
for( EntityState eState : changedStates )
{
if( eState.status() == EntityStatus.UPDATED || eState.status() == EntityStatus.NEW )
{
newStates.put( eState.entityReference().identity().toString(), eState );
}
}
EntityStoreUnitOfWork uow = entityStore.newUnitOfWork(
module,
UsecaseBuilder.newUsecase( "Load associations for indexing" ),
SystemTime.now()
);
// Bulk index request builder
BulkRequestBuilder bulkBuilder = support.client().prepareBulk();
// Handle changed entity states
for( EntityState changedState : changedStates )
{
if( changedState.entityDescriptor().queryable() )
{
switch( changedState.status() )
{
case REMOVED:
LOGGER.trace( "Removing Entity State from Index: {}", changedState );
remove( bulkBuilder, changedState.entityReference().identity().toString() );
break;
case UPDATED:
LOGGER.trace( "Updating Entity State in Index: {}", changedState );
remove( bulkBuilder, changedState.entityReference().identity().toString() );
String updatedJson = toJSON( changedState, newStates, uow ).toString();
LOGGER.trace( "Will index: {}", updatedJson );
index( bulkBuilder, changedState.entityReference().identity().toString(), updatedJson );
break;
case NEW:
LOGGER.trace( "Creating Entity State in Index: {}", changedState );
String newJson = toJSON( changedState, newStates, uow ).toString();
LOGGER.trace( "Will index: {}", newJson );
index( bulkBuilder, changedState.entityReference().identity().toString(), newJson );
break;
case LOADED:
default:
// Ignored
break;
}
}
}
uow.discard();
if( bulkBuilder.numberOfActions() > 0 )
{
// Execute bulk actions
BulkResponse bulkResponse = bulkBuilder.execute().actionGet();
// Handle errors
if( bulkResponse.hasFailures() )
{
throw new ElasticSearchIndexingException( bulkResponse.buildFailureMessage() );
}
LOGGER.debug( "Indexing changed Entity states took {}ms", bulkResponse.getTookInMillis() );
// Refresh index
support.client().admin().indices().prepareRefresh( support.index() ).execute().actionGet();
}
}
private void remove( BulkRequestBuilder bulkBuilder, String identity )
{
bulkBuilder.add( support.client().
prepareDelete( support.index(), support.entitiesType(), identity ) );
}
private void index( BulkRequestBuilder bulkBuilder, String identity, String json )
{
bulkBuilder.add( support.client().
prepareIndex( support.index(), support.entitiesType(), identity ).
setSource( json, XContentType.JSON ) );
}
/**
* <pre>
* {
* "_identity": "ENTITY-IDENTITY",
* "_types": [ "All", "Entity", "types" ],
* "property.name": property.value,
* "association.name": { "reference": "ASSOCIATED-IDENTITY" }
* "manyassociation.name": [ { "reference": "ASSOCIATED" }, { "reference": "IDENTITIES" } ]
* "namedassociation.name": [ { "_named": "NAMED", "reference": "IDENTITY" } }
* }
* </pre>
*/
private JsonObject toJSON( EntityState state, Map<String, EntityState> newStates, EntityStoreUnitOfWork uow )
{
JsonObjectBuilder builder = jsonFactories.builderFactory().createObjectBuilder();
builder.add( "_identity", state.entityReference().identity().toString() );
JsonArrayBuilder typesBuilder = jsonFactories.builderFactory().createArrayBuilder();
state.entityDescriptor().mixinTypes().map( Classes.toClassName() ).forEach( typesBuilder::add );
builder.add( "_types", typesBuilder.build() );
EntityDescriptor entityType = state.entityDescriptor();
// Properties
entityType.state().properties().forEach(
propDesc ->
{
if( propDesc.queryable() )
{
String key = propDesc.qualifiedName().name();
Object value = state.propertyValueOf( propDesc.qualifiedName() );
JsonValue jsonValue = jsonSerializer.toJson( Serializer.Options.NO_TYPE_INFO, value );
builder.add( key, jsonValue );
}
} );
// Associations
entityType.state().associations().forEach(
assocDesc ->
{
if( assocDesc.queryable() )
{
String key = assocDesc.qualifiedName().name();
EntityReference associated = state.associationValueOf( assocDesc.qualifiedName() );
if( associated == null )
{
builder.add( key, JsonValue.NULL );
}
else
{
if( assocDesc.isAggregated() || support.indexNonAggregatedAssociations() )
{
if( newStates.containsKey( associated.identity().toString() ) )
{
builder.add( key, toJSON( newStates.get( associated.identity().toString() ),
newStates, uow ) );
}
else
{
EntityReference reference = EntityReference.create( associated.identity() );
EntityState assocState = uow.entityStateOf( entityType.module(), reference );
builder.add( key, toJSON( assocState, newStates, uow ) );
}
}
else
{
builder.add( key, jsonFactories.builderFactory().createObjectBuilder()
.add( "reference", associated.identity().toString() ) );
}
}
}
} );
// ManyAssociations
entityType.state().manyAssociations().forEach(
manyAssocDesc ->
{
if( manyAssocDesc.queryable() )
{
String key = manyAssocDesc.qualifiedName().name();
JsonArrayBuilder assBuilder = jsonFactories.builderFactory().createArrayBuilder();
ManyAssociationState assocs = state.manyAssociationValueOf( manyAssocDesc.qualifiedName() );
for( EntityReference associated : assocs )
{
if( manyAssocDesc.isAggregated() || support.indexNonAggregatedAssociations() )
{
if( newStates.containsKey( associated.identity().toString() ) )
{
assBuilder.add( toJSON( newStates.get( associated.identity().toString() ),
newStates, uow ) );
}
else
{
EntityReference reference = EntityReference.create( associated.identity() );
EntityState assocState = uow.entityStateOf( entityType.module(), reference );
assBuilder.add( toJSON( assocState, newStates, uow ) );
}
}
else
{
assBuilder.add( jsonFactories.builderFactory().createObjectBuilder()
.add( "reference",
associated.identity().toString() ) );
}
}
builder.add( key, assBuilder.build() );
}
} );
// NamedAssociations
entityType.state().namedAssociations().forEach(
namedAssocDesc ->
{
if( namedAssocDesc.queryable() )
{
String key = namedAssocDesc.qualifiedName().name();
JsonArrayBuilder assBuilder = jsonFactories.builderFactory().createArrayBuilder();
NamedAssociationState assocs = state.namedAssociationValueOf(
namedAssocDesc.qualifiedName() );
for( String name : assocs )
{
Identity identity = assocs.get( name ).identity();
if( namedAssocDesc.isAggregated() || support.indexNonAggregatedAssociations() )
{
String identityString = identity.toString();
if( newStates.containsKey( identityString ) )
{
assBuilder.add(
jsonFactories.cloneBuilder( toJSON( newStates.get( identityString ),
newStates, uow ) )
.add( "_named", name )
.build() );
}
else
{
EntityReference reference = EntityReference.create( identity );
EntityState assocState = uow.entityStateOf( entityType.module(), reference );
assBuilder.add(
jsonFactories.cloneBuilder( toJSON( assocState, newStates, uow ) )
.add( "_named", name ).build() );
}
}
else
{
assBuilder.add( jsonFactories.builderFactory().createObjectBuilder()
.add( "_named", name )
.add( "reference", identity.toString() )
.build() );
}
}
builder.add( key, assBuilder.build() );
}
} );
return builder.build();
}
}
}