blob: 2e131a253b0d07ae56c058b55986b559e7eb4bd2 [file] [log] [blame]
// Copyright 2014-2015 Quickstep Technologies LLC.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tmb.voltdb;
import org.voltdb.*;
public class Receive extends VoltProcedure {
public final SQLStmt checkConnectedSQL = new SQLStmt(
"SELECT client_id FROM client WHERE client_id = ? "
+ "AND disconnect_time IS NULL;");
// TODO(chasseur): The ORDER BY clause in these queries necessitates a sort,
// even though tuples should come out of the index in the correct order.
// Figure out a way to elide the sort (thus far omitting the ORDER BY clause
// causes priority order to be reversed, regardless of whether ASC or DESC is
// specified for the priority order in the schema).
public final SQLStmt getMessagesSQL = new SQLStmt(
"SELECT message_id, sender_id, send_time, type_id, payload, "
+ "payload_bytes "
+ "FROM queued_message "
+ "WHERE receiver_id = ? AND priority >= ? "
+ "AND NOW < expiration_time "
+ "ORDER BY priority DESC, expiration_time ASC, send_time ASC, "
+ "message_id ASC, receiver_id;");
public final SQLStmt getMessagesWithLimitSQL = new SQLStmt(
"SELECT message_id, sender_id, send_time, type_id, payload, "
+ "payload_bytes "
+ "FROM queued_message "
+ "WHERE receiver_id = ? AND priority >= ? "
+ "AND NOW < expiration_time "
+ "ORDER BY priority DESC, expiration_time ASC, send_time ASC, "
+ "message_id ASC, receiver_id "
+ "LIMIT ?;");
public VoltTable run(int client_id,
short minimum_priority,
long max_messages) throws VoltAbortException {
voltQueueSQL(checkConnectedSQL, EXPECT_ONE_ROW, client_id);
if (max_messages == 0) {
voltQueueSQL(getMessagesSQL, client_id, minimum_priority);
} else {
voltQueueSQL(getMessagesWithLimitSQL, client_id,
minimum_priority,
max_messages);
}
VoltTable[] results = voltExecuteSQL();
return results[1];
}
}