blob: b0b8cedffbfd26d1e70a2bb557e3fd6cad36c7cb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.tests.queryablestate;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.queryablestate.client.QueryableStateClient;
import org.apache.flink.queryablestate.exceptions.UnknownKeyOrNamespaceException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
* A simple implementation of a queryable state client.
* This client queries the state for a while (~2.5 mins) and prints
* out the values that it found in the map state
*
* <p>Usage: java -jar QsStateClient.jar --host HOST --port PORT --job-id JOB_ID
*/
public class QsStateClient {
private static final int BOOTSTRAP_RETRIES = 240;
public static void main(final String[] args) throws Exception {
ParameterTool parameters = ParameterTool.fromArgs(args);
// setup values
String jobId = parameters.getRequired("job-id");
String host = parameters.get("host", "localhost");
int port = parameters.getInt("port", 9069);
int numIterations = parameters.getInt("iterations", 1500);
QueryableStateClient client = new QueryableStateClient(host, port);
client.setExecutionConfig(new ExecutionConfig());
MapStateDescriptor<EmailId, EmailInformation> stateDescriptor =
new MapStateDescriptor<>(
QsConstants.STATE_NAME,
TypeInformation.of(new TypeHint<EmailId>() {
}),
TypeInformation.of(new TypeHint<EmailInformation>() {
})
);
// wait for state to exist
for (int i = 0; i < BOOTSTRAP_RETRIES; i++) { // ~120s
try {
getMapState(jobId, client, stateDescriptor);
break;
} catch (ExecutionException e) {
if (e.getCause() instanceof UnknownKeyOrNamespaceException) {
System.err.println("State does not exist yet; sleeping 500ms");
Thread.sleep(500L);
} else {
throw e;
}
}
if (i == (BOOTSTRAP_RETRIES - 1)) {
throw new RuntimeException("Timeout: state doesn't exist after 120s");
}
}
// query state
for (int iterations = 0; iterations < numIterations; iterations++) {
MapState<EmailId, EmailInformation> mapState =
getMapState(jobId, client, stateDescriptor);
int counter = 0;
for (Map.Entry<EmailId, EmailInformation> entry: mapState.entries()) {
// this is to force deserialization
entry.getKey();
entry.getValue();
counter++;
}
System.out.println("MapState has " + counter + " entries"); // we look for it in the test
Thread.sleep(100L);
}
}
private static MapState<EmailId, EmailInformation> getMapState(
String jobId,
QueryableStateClient client,
MapStateDescriptor<EmailId, EmailInformation> stateDescriptor) throws InterruptedException, ExecutionException {
CompletableFuture<MapState<EmailId, EmailInformation>> resultFuture =
client.getKvState(
JobID.fromHexString(jobId),
QsConstants.QUERY_NAME,
QsConstants.KEY, // which key of the keyed state to access
BasicTypeInfo.STRING_TYPE_INFO,
stateDescriptor);
return resultFuture.get();
}
}