Java源码示例:org.apache.phoenix.iterate.TableResultIterator
示例1
@Test
public void testRenewLeaseTaskBehaviorOnError() throws Exception {
// add connection to the queue
PhoenixConnection pconn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES)).unwrap(PhoenixConnection.class);
LinkedBlockingQueue<WeakReference<PhoenixConnection>> connectionsQueue = new LinkedBlockingQueue<>();
connectionsQueue.add(new WeakReference<PhoenixConnection>(pconn));
// create a scanner and add it to the queue
int numLeaseRenewals = 4;
int lockNotAcquiredAt = 1;
int thresholdNotReachedCount = 2;
int failLeaseRenewalAt = 3;
RenewLeaseOnlyTableIterator itr = new RenewLeaseOnlyTableIterator(numLeaseRenewals, thresholdNotReachedCount, lockNotAcquiredAt, failLeaseRenewalAt);
LinkedBlockingQueue<WeakReference<TableResultIterator>> scannerQueue = pconn.getScanners();
scannerQueue.add(new WeakReference<TableResultIterator>(itr));
RenewLeaseTask task = new RenewLeaseTask(connectionsQueue);
assertTrue(connectionsQueue.size() == 1);
assertTrue(scannerQueue.size() == 1);
task.run();
assertTrue(connectionsQueue.size() == 1);
assertTrue(scannerQueue.size() == 1); // lock not acquired
assertEquals(LOCK_NOT_ACQUIRED, itr.getLastRenewLeaseStatus());
task.run();
assertTrue(scannerQueue.size() == 1);
assertTrue(connectionsQueue.size() == 1); // renew lease skipped but scanner still in the queue
assertEquals(THRESHOLD_NOT_REACHED, itr.getLastRenewLeaseStatus());
task.run();
assertTrue(scannerQueue.size() == 0);
assertTrue(connectionsQueue.size() == 0); // there was only one connection in the connectionsQueue and it wasn't added back because of error
pconn.close();
task.run();
assertTrue(scannerQueue.size() == 0);
assertTrue("Closing the connection should have removed it from the queue", connectionsQueue.size() == 0);
}
示例2
public void addIteratorForLeaseRenewal(@Nonnull TableResultIterator itr) {
if (services.isRenewingLeasesEnabled()) {
checkNotNull(itr);
scannerQueue.add(new WeakReference<TableResultIterator>(itr));
}
}
示例3
public LinkedBlockingQueue<WeakReference<TableResultIterator>> getScanners() {
return scannerQueue;
}
示例4
@Test
public void testRenewLeaseTaskBehavior() throws Exception {
// add connection to the queue
PhoenixConnection pconn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES)).unwrap(PhoenixConnection.class);
LinkedBlockingQueue<WeakReference<PhoenixConnection>> connectionsQueue = new LinkedBlockingQueue<>();
connectionsQueue.add(new WeakReference<PhoenixConnection>(pconn));
// create a scanner and add it to the queue
int numLeaseRenewals = 4;
int skipRenewLeaseCount = 2;
int failToAcquireLockAt = 3;
RenewLeaseOnlyTableIterator itr = new RenewLeaseOnlyTableIterator(numLeaseRenewals, skipRenewLeaseCount, failToAcquireLockAt, -1);
LinkedBlockingQueue<WeakReference<TableResultIterator>> scannerQueue = pconn.getScanners();
scannerQueue.add(new WeakReference<TableResultIterator>(itr));
RenewLeaseTask task = new RenewLeaseTask(connectionsQueue);
assertTrue(connectionsQueue.size() == 1);
assertTrue(scannerQueue.size() == 1);
task.run();
assertTrue(connectionsQueue.size() == 1);
assertTrue(scannerQueue.size() == 1); // lease renewed
assertEquals(RENEWED, itr.getLastRenewLeaseStatus());
task.run();
assertTrue(scannerQueue.size() == 1);
assertTrue(connectionsQueue.size() == 1); // renew lease skipped but scanner still in the queue
assertEquals(THRESHOLD_NOT_REACHED, itr.getLastRenewLeaseStatus());
task.run();
assertTrue(scannerQueue.size() == 1);
assertTrue(connectionsQueue.size() == 1);
assertEquals(LOCK_NOT_ACQUIRED, itr.getLastRenewLeaseStatus()); // lock couldn't be acquired
task.run();
assertTrue(scannerQueue.size() == 1);
assertTrue(connectionsQueue.size() == 1);
assertEquals(RENEWED, itr.getLastRenewLeaseStatus()); // lease renewed
task.run();
assertTrue(scannerQueue.size() == 0);
assertTrue(connectionsQueue.size() == 1);
assertEquals(CLOSED, itr.getLastRenewLeaseStatus()); // scanner closed and removed from the queue
pconn.close();
task.run();
assertTrue(scannerQueue.size() == 0);
assertTrue("Closing the connection should have removed it from the queue", connectionsQueue.size() == 0);
}