blob: 2969fdc31a91a71c55662decbd70e2bf1aa028bc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.query;
import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.CLOSED;
import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.LOCK_NOT_ACQUIRED;
import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.RENEWED;
import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.THRESHOLD_NOT_REACHED;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.lang.ref.WeakReference;
import java.sql.DriverManager;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.phoenix.iterate.RenewLeaseOnlyTableIterator;
import org.apache.phoenix.iterate.TableResultIterator;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.ConnectionQueryServicesImpl.RenewLeaseTask;
import org.apache.phoenix.util.PropertiesUtil;
import org.junit.Test;
public class ScannerLeaseRenewalTest extends BaseConnectionlessQueryTest {
@Test
public void testRenewLeaseTaskBehavior() throws Exception {
// add connection to the queue
PhoenixConnection pconn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES)).unwrap(PhoenixConnection.class);
LinkedBlockingQueue<WeakReference<PhoenixConnection>> connectionsQueue = new LinkedBlockingQueue<>();
connectionsQueue.add(new WeakReference<PhoenixConnection>(pconn));
// create a scanner and add it to the queue
int numLeaseRenewals = 4;
int skipRenewLeaseCount = 2;
int failToAcquireLockAt = 3;
RenewLeaseOnlyTableIterator itr = new RenewLeaseOnlyTableIterator(numLeaseRenewals, skipRenewLeaseCount, failToAcquireLockAt, -1);
LinkedBlockingQueue<WeakReference<TableResultIterator>> scannerQueue = pconn.getScanners();
scannerQueue.add(new WeakReference<TableResultIterator>(itr));
RenewLeaseTask task = new RenewLeaseTask(connectionsQueue);
assertTrue(connectionsQueue.size() == 1);
assertTrue(scannerQueue.size() == 1);
task.run();
assertTrue(connectionsQueue.size() == 1);
assertTrue(scannerQueue.size() == 1); // lease renewed
assertEquals(RENEWED, itr.getLastRenewLeaseStatus());
task.run();
assertTrue(scannerQueue.size() == 1);
assertTrue(connectionsQueue.size() == 1); // renew lease skipped but scanner still in the queue
assertEquals(THRESHOLD_NOT_REACHED, itr.getLastRenewLeaseStatus());
task.run();
assertTrue(scannerQueue.size() == 1);
assertTrue(connectionsQueue.size() == 1);
assertEquals(LOCK_NOT_ACQUIRED, itr.getLastRenewLeaseStatus()); // lock couldn't be acquired
task.run();
assertTrue(scannerQueue.size() == 1);
assertTrue(connectionsQueue.size() == 1);
assertEquals(RENEWED, itr.getLastRenewLeaseStatus()); // lease renewed
task.run();
assertTrue(scannerQueue.size() == 0);
assertTrue(connectionsQueue.size() == 1);
assertEquals(CLOSED, itr.getLastRenewLeaseStatus()); // scanner closed and removed from the queue
pconn.close();
task.run();
assertTrue(scannerQueue.size() == 0);
assertTrue("Closing the connection should have removed it from the queue", connectionsQueue.size() == 0);
}
@Test
public void testRenewLeaseTaskBehaviorOnError() throws Exception {
// add connection to the queue
PhoenixConnection pconn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES)).unwrap(PhoenixConnection.class);
LinkedBlockingQueue<WeakReference<PhoenixConnection>> connectionsQueue = new LinkedBlockingQueue<>();
connectionsQueue.add(new WeakReference<PhoenixConnection>(pconn));
// create a scanner and add it to the queue
int numLeaseRenewals = 4;
int lockNotAcquiredAt = 1;
int thresholdNotReachedCount = 2;
int failLeaseRenewalAt = 3;
RenewLeaseOnlyTableIterator itr = new RenewLeaseOnlyTableIterator(numLeaseRenewals, thresholdNotReachedCount, lockNotAcquiredAt, failLeaseRenewalAt);
LinkedBlockingQueue<WeakReference<TableResultIterator>> scannerQueue = pconn.getScanners();
scannerQueue.add(new WeakReference<TableResultIterator>(itr));
RenewLeaseTask task = new RenewLeaseTask(connectionsQueue);
assertTrue(connectionsQueue.size() == 1);
assertTrue(scannerQueue.size() == 1);
task.run();
assertTrue(connectionsQueue.size() == 1);
assertTrue(scannerQueue.size() == 1); // lock not acquired
assertEquals(LOCK_NOT_ACQUIRED, itr.getLastRenewLeaseStatus());
task.run();
assertTrue(scannerQueue.size() == 1);
assertTrue(connectionsQueue.size() == 1); // renew lease skipped but scanner still in the queue
assertEquals(THRESHOLD_NOT_REACHED, itr.getLastRenewLeaseStatus());
task.run();
assertTrue(scannerQueue.size() == 0);
assertTrue(connectionsQueue.size() == 0); // there was only one connection in the connectionsQueue and it wasn't added back because of error
pconn.close();
task.run();
assertTrue(scannerQueue.size() == 0);
assertTrue("Closing the connection should have removed it from the queue", connectionsQueue.size() == 0);
}
}