Java源码示例:com.datastax.driver.core.PagingState
示例1
protected void setPageState(Query query, List<Select> selects) {
if (query.nolimit()) {
return;
}
for (Select select : selects) {
long total = query.total();
String page = query.page();
if (page == null) {
// Set limit
select.limit((int) total);
} else {
select.setFetchSize((int) total);
// It's the first time if page is empty
if (!page.isEmpty()) {
byte[] position = PageState.fromString(page).position();
try {
select.setPagingState(PagingState.fromBytes(position));
} catch (PagingStateException e) {
throw new BackendException(e);
}
}
}
}
}
示例2
@Override
protected PageState pageState() {
PagingState page = this.results.getExecutionInfo().getPagingState();
if (page == null || this.results.isExhausted()) {
return new PageState(PageState.EMPTY_BYTES, 0, (int) this.count());
}
byte[] position = page.toBytes();
return new PageState(position, 0, (int) this.count());
}
示例3
/**
* Reduces the fetch size and retries the query. Returns true if the query succeeded, false if the root cause
* of the exception does not indicate a frame size issue, if the frame size cannot be adjusted down any further,
* or if the retried query fails for an unrelated reason.
*/
private boolean reduceFetchSize(Throwable reason) {
if (!isAdaptiveException(reason) || --_remainingAdaptations == 0) {
return false;
}
ExecutionInfo executionInfo = _delegate.getExecutionInfo();
Statement statement = executionInfo.getStatement();
PagingState pagingState = executionInfo.getPagingState();
int fetchSize = statement.getFetchSize();
while (fetchSize > MIN_FETCH_SIZE) {
fetchSize = Math.max(fetchSize / 2, MIN_FETCH_SIZE);
_log.debug("Retrying query at next page with fetch size {} due to {}", fetchSize, reason.getMessage());
statement.setFetchSize(fetchSize);
statement.setPagingState(pagingState);
try {
_delegate = _session.execute(statement);
return true;
} catch (Throwable t) {
// Exit the adaptation loop if the exception isn't one where adapting further may help
if (!isAdaptiveException(t) || --_remainingAdaptations == 0) {
return false;
}
}
}
return false;
}
示例4
@Override
public Result<AuditLog> getAuditLogs( UUID messageId ) {
Statement query = QueryBuilder.select().all().from(TABLE_AUDIT_LOG)
.where( QueryBuilder.eq( COLUMN_MESSAGE_ID, messageId ) );
ResultSet rs = cassandraClient.getApplicationSession().execute( query );
final List<AuditLog> auditLogs = rs.all().stream().map( row ->
new AuditLog(
AuditLog.Action.valueOf( row.getString( COLUMN_ACTION )),
AuditLog.Status.valueOf( row.getString( COLUMN_STATUS )),
row.getString( COLUMN_QUEUE_NAME ),
row.getString( COLUMN_REGION ),
row.getUUID( COLUMN_MESSAGE_ID ),
row.getUUID( COLUMN_QUEUE_MESSAGE_ID ),
row.getLong( COLUMN_TRANSFER_TIME ) )
).collect( Collectors.toList() );
return new Result<AuditLog>() {
@Override
public PagingState getPagingState() {
return null; // no paging
}
@Override
public List<AuditLog> getEntities() {
return auditLogs;
}
};
}
示例5
@Override
public Result<TransferLog> getAllTransferLogs(PagingState pagingState, int fetchSize ) {
Statement query = QueryBuilder.select().all().from(TABLE_TRANSFER_LOG);
query.setFetchSize( fetchSize );
if ( pagingState != null ) {
query.setPagingState( pagingState );
}
ResultSet rs = cassandraClient.getApplicationSession().execute( query );
final PagingState newPagingState = rs.getExecutionInfo().getPagingState();
final List<TransferLog> transferLogs = new ArrayList<>();
int numReturned = rs.getAvailableWithoutFetching();
for ( int i=0; i<numReturned; i++ ) {
Row row = rs.one();
TransferLog tlog = new TransferLog(
row.getString( COLUMN_QUEUE_NAME ),
row.getString( COLUMN_SOURCE_REGION ),
row.getString( COLUMN_DEST_REGION ),
row.getUUID( COLUMN_MESSAGE_ID ),
row.getLong( COLUMN_TRANSFER_TIME ));
transferLogs.add( tlog );
}
return new Result<TransferLog>() {
@Override
public PagingState getPagingState() {
return newPagingState;
}
@Override
public List<TransferLog> getEntities() {
return transferLogs;
}
};
}
示例6
@Test
public void recordTransferLog() throws Exception {
TransferLogSerialization logSerialization = getInjector().getInstance( TransferLogSerialization.class );
CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
String queueName = "tlst_queue_" + RandomStringUtils.randomAlphanumeric( 15 );
String source = RandomStringUtils.randomAlphanumeric( 15 );
String dest = RandomStringUtils.randomAlphanumeric( 15 );
int numLogs = 100;
for ( int i=0; i<numLogs; i++ ) {
logSerialization.recordTransferLog( queueName, source, dest, UUIDGen.getTimeUUID());
}
int count = 0;
int fetchCount = 0;
PagingState pagingState = null;
while ( true ) {
Result<TransferLog> all = logSerialization.getAllTransferLogs( pagingState, 10 );
// we only want entities for our queue
List<TransferLog> logs = all.getEntities().stream()
.filter( log -> log.getQueueName().equals( queueName ) ).collect( Collectors.toList() );
count += logs.size();
fetchCount++;
if ( all.getPagingState() == null ) {
break;
}
pagingState = all.getPagingState();
}
Assert.assertEquals( numLogs, count );
}
示例7
private List<TransferLog> getTransferLogs(TransferLogSerialization logSerialization) {
PagingState pagingState = null;
List<TransferLog> allLogs = new ArrayList<>();
while ( true ) {
Result<TransferLog> result = logSerialization.getAllTransferLogs( pagingState, 100 );
allLogs.addAll( result.getEntities() );
if ( result.getPagingState() == null ) {
break;
}
pagingState = result.getPagingState();
}
return allLogs;
}
示例8
/**
* Get all transfer logs (for testing purposes)
*
* @param pagingState Paging state (or null if none)
* @param fetchSize Number of rows to be fetched per page (or -1 for default)
*/
Result<TransferLog> getAllTransferLogs(PagingState pagingState, int fetchSize);
示例9
PagingState getPagingState();