blob: 7c1eaf923835f176852e59299f9864f25f1a82fb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.servicecomb.config.kie.client;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import org.apache.servicecomb.config.common.ConfigConverter;
import org.apache.servicecomb.config.common.ConfigurationChangedEvent;
import org.apache.servicecomb.config.kie.client.model.ConfigurationsRequest;
import org.apache.servicecomb.config.kie.client.model.ConfigurationsRequestFactory;
import org.apache.servicecomb.config.kie.client.model.ConfigurationsResponse;
import org.apache.servicecomb.config.kie.client.model.KieConfiguration;
import org.apache.servicecomb.http.client.task.AbstractTask;
import org.apache.servicecomb.http.client.task.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.eventbus.EventBus;
public class KieConfigManager extends AbstractTask {
private static final Logger LOGGER = LoggerFactory.getLogger(KieConfigManager.class);
private static final long LONG_POLLING_INTERVAL = 1000;
private final KieConfigOperation configKieClient;
private final EventBus eventBus;
private final ConfigConverter configConverter;
private final List<ConfigurationsRequest> configurationsRequests;
private final KieConfiguration kieConfiguration;
public KieConfigManager(KieConfigOperation configKieClient, EventBus eventBus,
KieConfiguration kieConfiguration,
ConfigConverter configConverter) {
super("config-center-configuration-task");
this.configurationsRequests = ConfigurationsRequestFactory.buildConfigurationRequests(kieConfiguration);
this.configurationsRequests.sort(ConfigurationsRequest::compareTo);
this.configKieClient = configKieClient;
this.eventBus = eventBus;
this.configConverter = configConverter;
this.kieConfiguration = kieConfiguration;
}
public void firstPull() {
try {
Map<String, Object> data = new HashMap<>();
this.configurationsRequests.forEach(r -> {
r.setRevision(ConfigurationsRequest.INITIAL_REVISION);
ConfigurationsResponse response = configKieClient.queryConfigurations(r);
if (response.isChanged()) {
r.setRevision(response.getRevision());
r.setLastRawData(response.getConfigurations());
data.putAll(response.getConfigurations());
}
});
this.configConverter.updateData(data);
} catch (RuntimeException e) {
if (this.kieConfiguration.isFirstPullRequired()) {
throw e;
} else {
LOGGER.warn("first pull failed, and ignore {}", e.getMessage());
}
}
}
private void onDataChanged() {
Map<String, Object> latestData = new HashMap<>();
this.configurationsRequests.forEach(r -> latestData.putAll(r.getLastRawData()));
Map<String, Object> lastData = configConverter.updateData(latestData);
ConfigurationChangedEvent event = ConfigurationChangedEvent
.createIncremental(configConverter.getCurrentData(), lastData);
if (event.isChanged()) {
eventBus.post(event);
}
}
@Override
protected void initTaskPool(String taskName) {
this.taskPool = Executors.newFixedThreadPool(3, (task) ->
new Thread(task, taskName));
}
public void startConfigKieManager() {
this.configurationsRequests.forEach((t) ->
this.startTask(new PollConfigurationTask(0, t)));
}
class PollConfigurationTask implements Task {
final int failCount;
ConfigurationsRequest configurationsRequest;
public PollConfigurationTask(int failCount, ConfigurationsRequest configurationsRequest) {
this.failCount = failCount;
this.configurationsRequest = configurationsRequest;
}
@Override
public void execute() {
try {
ConfigurationsResponse response = configKieClient.queryConfigurations(configurationsRequest);
if (response.isChanged()) {
configurationsRequest.setRevision(response.getRevision());
configurationsRequest.setLastRawData(response.getConfigurations());
onDataChanged();
}
if (KieConfigManager.this.kieConfiguration.isEnableLongPolling()) {
startTask(new BackOffSleepTask(LONG_POLLING_INTERVAL, new PollConfigurationTask(0, this.configurationsRequest)));
} else {
startTask(new BackOffSleepTask(kieConfiguration.getRefreshIntervalInMillis(), new PollConfigurationTask(0, this.configurationsRequest)));
}
} catch (Exception e) {
LOGGER.error("get configurations from KieConfigCenter failed, and will try again.", e);
startTask(
new BackOffSleepTask(failCount + 1, new PollConfigurationTask(failCount + 1, this.configurationsRequest)));
}
}
}
}