blob: fc0f8c3a694620c63f3a02d1c5eed1525bcecdc4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.usergrid.corepersistence.migration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.corepersistence.rx.impl.AllApplicationsObservable;
import org.apache.usergrid.corepersistence.service.ConnectionService;
import org.apache.usergrid.persistence.core.migration.data.DataMigration;
import org.apache.usergrid.persistence.core.migration.data.ProgressObserver;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@Singleton
public class DeDupConnectionDataMigration implements DataMigration {
private static final Logger logger = LoggerFactory.getLogger(DeDupConnectionDataMigration.class);
private static final long UPDATE_COUNT = 1000;
private final ConnectionService connectionService;
private final AllApplicationsObservable allApplicationsObservable;
@Inject
public DeDupConnectionDataMigration( final ConnectionService connectionService,
final AllApplicationsObservable allApplicationsObservable ) {
this.connectionService = connectionService;
this.allApplicationsObservable = allApplicationsObservable;
}
@Override
public int migrate( final int currentVersion, final ProgressObserver observer ) {
final int migrationVersion = getMaxVersion();
observer.start();
connectionService.deDupeConnections( allApplicationsObservable.getData() ).reduce( 0l, ( count, deDuped ) -> {
final long newCount = count + 1;
/**
* Update our progress observer
*/
if ( newCount % UPDATE_COUNT == 0 ) {
logger.info( "De duped {} edges", newCount );
observer.update( migrationVersion, String.format( "De duped %d edges", newCount ) );
}
return newCount;
} ).doOnNext( total -> {
logger.info( "Completed de-duping {} edges", total );
observer.complete();
} ).toBlocking().lastOrDefault( null ); //want this to run through all records
return migrationVersion;
}
@Override
public boolean supports( final int currentVersion ) {
return currentVersion <= getMaxVersion() - 1;
}
@Override
public int getMaxVersion() {
//needs to be 2 b/c our obsolete EntityTypeMappingMigration was 1
return 2;
}
}