blob: 6d696095df2db00edc1c4592e0767185825d039b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.servicecomb.pack.alpha.spec.tcc.db.service;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.servicecomb.pack.alpha.spec.tcc.db.jpa.EventConverter;
import org.apache.servicecomb.pack.alpha.spec.tcc.db.jpa.GlobalTxEvent;
import org.apache.servicecomb.pack.alpha.spec.tcc.db.jpa.ParticipatedEvent;
import org.apache.servicecomb.pack.alpha.spec.tcc.db.jpa.TccTxEvent;
import org.apache.servicecomb.pack.alpha.spec.tcc.db.jpa.TccTxType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.domain.Pageable;
public class MemoryTxEventRepository implements TccTxEventRepository {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final Map<String, Set<TccTxEvent>> tccEventMap = new ConcurrentHashMap<>();
@Override
public void saveGlobalTxEvent(GlobalTxEvent event) {
save(EventConverter.convertToTccTxEvent(event));
}
@Override
public void saveParticipatedEvent(ParticipatedEvent event) {
save(EventConverter.convertToTccTxEvent(event));
}
@Override
public void updateParticipatedEventStatus(ParticipatedEvent event) {
save(EventConverter.convertToTccTxEvent(event));
}
@Override
public void coordinated(TccTxEvent event) {
}
@Override
public void save(TccTxEvent event) {
tccEventMap
.computeIfAbsent(event.getGlobalTxId(), key-> new LinkedHashSet<>())
.add(event);
}
@Override
public Optional<List<TccTxEvent>> findByGlobalTxId(String globalTxId) {
Set<TccTxEvent> events = tccEventMap.get(globalTxId);
if ( events != null) {
return Optional.of(new ArrayList<>(events));
} else {
return Optional.empty();
}
}
@Override
public Optional<List<ParticipatedEvent>> findParticipatedByGlobalTxId(String globalTxId) {
return Optional.of(
findByGlobalTxId(globalTxId)
.orElse(new ArrayList<>()).stream()
.filter(e -> TccTxType.P_TX_ENDED.name().equals(e.getTxType()))
.map(EventConverter::convertToParticipatedEvent).collect(Collectors.toList())
);
}
@Override
public Optional<List<TccTxEvent>> findByGlobalTxIdAndTxType(String globalTxId, TccTxType tccTxType) {
Set<TccTxEvent> events = tccEventMap.get(globalTxId);
if ( events != null) {
return Optional.of(events.stream().filter(e -> tccTxType.name().equals(e.getTxType())).collect(Collectors.toList()));
} else {
return Optional.empty();
}
}
@Override
public Optional<TccTxEvent> findByUniqueKey(String globalTxId, String localTxId, TccTxType tccTxType) {
Set<TccTxEvent> events = tccEventMap.get(globalTxId);
if (events != null) {
return events.stream().filter(e ->
tccTxType.name().equals(e.getTxType())
&& localTxId.equals(e.getLocalTxId())).findAny();
} else {
return Optional.empty();
}
}
@Override
public Optional<List<GlobalTxEvent>> findTimeoutGlobalTx(Date deadLine, String txType, Pageable pageable) {
return Optional.empty();
}
@Override
public void clearCompletedGlobalTx(Pageable pageable) {
}
@Override
public Iterable<TccTxEvent> findAll() {
List<TccTxEvent> events = new ArrayList<>();
for (String globalTxId : tccEventMap.keySet()) {
events.addAll(tccEventMap.get(globalTxId));
}
return events;
}
@Override
public void deleteAll() {
tccEventMap.clear();
}
}