Java源码示例:org.apache.accumulo.core.client.ScannerBase

示例1
public Map<String,Double> getStat(Set<String> fields, Set<String> dataTypes, SortedSet<String> dates) throws IOException {
    final ScannerBase scanner;
    try {
        Authorizations auths = con.securityOperations().getUserAuthorizations(con.whoami());
        if (fields.isEmpty()) {
            scanner = con.createScanner(table, auths);
        } else {
            BatchScanner bScanner = con.createBatchScanner(table, auths, fields.size());
            bScanner.setRanges(buildRanges(fields));
            scanner = bScanner;
        }
    } catch (Exception e) {
        log.error(e);
        throw new IOException(e);
    }
    
    configureScanIterators(scanner, dataTypes, dates);
    
    Map<String,Double> results = scanResults(scanner);
    
    if (scanner instanceof BatchScanner) {
        scanner.close();
    }
    
    return results;
}
 
示例2
public static final void setExpansionFields(ShardQueryConfiguration config, ScannerBase bs, boolean reverseIndex, Collection<String> expansionFields) {
    
    // Now restrict the fields returned to those that are specified and then only those that are indexed or reverse indexed
    if (expansionFields == null || expansionFields.isEmpty()) {
        expansionFields = (reverseIndex ? config.getReverseIndexedFields() : config.getIndexedFields());
    } else {
        expansionFields = Sets.newHashSet(expansionFields);
        expansionFields.retainAll(reverseIndex ? config.getReverseIndexedFields() : config.getIndexedFields());
    }
    if (expansionFields.isEmpty()) {
        bs.fetchColumnFamily(new Text(Constants.NO_FIELD));
    } else {
        for (String field : expansionFields) {
            bs.fetchColumnFamily(new Text(field));
        }
    }
    
}
 
示例3
@Override
public void close() {
    super.close();
    
    if (scannerFactory == null) {
        if (log.isDebugEnabled()) {
            log.debug("ScannerFactory is null; not closing it.");
        }
    } else {
        int nClosed = 0;
        scannerFactory.lockdown();
        for (ScannerBase bs : Lists.newArrayList(scannerFactory.currentScanners())) {
            scannerFactory.close(bs);
            ++nClosed;
        }
        if (log.isDebugEnabled()) {
            log.debug("Cleaned up " + nClosed + " batch scanners associated with this query logic.");
        }
    }
}
 
示例4
/**
 * This method calls the base logic's close method, and then attempts to close all batch scanners tracked by the scanner factory, if it is not null.
 */
@Override
public void close() {
    super.close();
    final ScannerFactory factory = this.scannerFactory;
    if (null == factory) {
        log.debug("ScannerFactory is null; not closing it.");
    } else {
        int nClosed = 0;
        factory.lockdown();
        for (final ScannerBase bs : Lists.newArrayList(factory.currentScanners())) {
            factory.close(bs);
            ++nClosed;
        }
        if (log.isDebugEnabled())
            log.debug("Cleaned up " + nClosed + " batch scanners associated with this query logic.");
    }
}
 
示例5
protected static void addVisibilityFilters(Iterator<Authorizations> iter, ScannerBase scanner) {
    for (int priority = 10; iter.hasNext(); priority++) {
        IteratorSetting cfg = new IteratorSetting(priority, ConfigurableVisibilityFilter.class);
        cfg.setName("visibilityFilter" + priority);
        cfg.addOption(ConfigurableVisibilityFilter.AUTHORIZATIONS_OPT, iter.next().toString());
        // Set the visibility filter as a "system" iterator, which means that normal
        // modify, remove, clear operations performed
        // on the scanner will not modify/remove/clear this iterator. This way, if a
        // query logic attempts to reconfigure the
        // scanner's iterators, then this iterator will remain intact.
        if (scanner instanceof ScannerBaseDelegate) {
            ((ScannerBaseDelegate) scanner).addSystemScanIterator(cfg);
        } else {
            logger.warn("Adding system visibility filter to non-wrapped scanner {}.", scanner.getClass(),
                    new Exception());
            scanner.addScanIterator(cfg);
        }
    }
}
 
示例6
/**
 * statements where the datetime is exactly the same as the queryInstant.
 */
@Override
public CloseableIteration<Statement, QueryEvaluationException> queryInstantEqualsInstant(
        final TemporalInstant queryInstant, final StatementConstraints constraints)
        throws QueryEvaluationException {
        // get rows where the repository time is equal to the given time in queryInstant.
        final Query query = new Query() {
            @Override
            public Range getRange(final KeyParts keyParts) {
                //System.out.println("Scanning queryInstantEqualsInstant: prefix:" + KeyParts.toHumanString(keyParts.getQueryKey()));
                return Range.prefix(keyParts.getQueryKey()); // <-- specific logic
            }
        };
        final ScannerBase scanner = query.doQuery(queryInstant, constraints);
        // TODO currently context constraints are filtered on the client.
        return getContextIteratorWrapper(scanner, constraints.getContext());
}
 
示例7
/**
 * get statements where the date object is after the given queryInstant.
 */
@Override
public CloseableIteration<Statement, QueryEvaluationException> queryInstantAfterInstant(
        final TemporalInstant queryInstant, final StatementConstraints constraints)
        throws QueryEvaluationException {
    final Query query = new Query() {
        @Override
        public Range getRange(final KeyParts keyParts) {
            final Text start = Range.followingPrefix(keyParts.getQueryKey());  // <-- specific logic
            Text endAt = null;  // no constraints                            // <-- specific logic
            if (keyParts.constraintPrefix != null ) {
                endAt = Range.followingPrefix(keyParts.constraintPrefix);
            }
            //System.out.println("Scanning queryInstantAfterInstant from after:" + KeyParts.toHumanString(start) + " up to:" + KeyParts.toHumanString(endAt));
            return new Range(start, true, endAt, false);
        }
    };
    final ScannerBase scanner = query.doQuery(queryInstant, constraints);
    return getContextIteratorWrapper(scanner, constraints.getContext());
}
 
示例8
/**
 * Get instances inside a given interval.
 * Returns after interval's beginning time, and before ending time,
 * exclusive (don't match the beginning and ending).
 */
@Override
public CloseableIteration<Statement, QueryEvaluationException> queryInstantInsideInterval(
        final TemporalInterval queryInterval, final StatementConstraints constraints)
        throws QueryEvaluationException {
    // get rows where the time is after the given interval's beginning time and before the ending time.
    final TemporalInterval theQueryInterval = queryInterval;
    final Query query = new Query() {
        private final TemporalInterval queryInterval = theQueryInterval;
        @Override
        public Range getRange(final KeyParts keyParts) {
            final Text start = Range.followingPrefix(new Text(keyParts.getQueryKey(queryInterval.getHasBeginning())));
            final Text endAt = new Text(keyParts.getQueryKey(queryInterval.getHasEnd())); // <-- end specific logic
            //System.out.println("Scanning queryInstantInsideInterval: from excluding:" + KeyParts.toHumanString(start) + " up to:" + KeyParts.toHumanString(endAt));
            return new Range(start, false, endAt, false);
        }
    };
    final ScannerBase scanner = query.doQuery(queryInterval.getHasBeginning(), constraints);
    return getContextIteratorWrapper(scanner, constraints.getContext());
}
 
示例9
protected <T> void addIndexFilterToIterator(
    final ReaderParams<T> params,
    final ScannerBase scanner) {
  final List<MultiDimensionalCoordinateRangesArray> coords = params.getCoordinateRanges();
  if ((coords != null) && !coords.isEmpty()) {
    final IteratorSetting iteratorSetting =
        new IteratorSetting(
            NumericIndexStrategyFilterIterator.IDX_FILTER_ITERATOR_PRIORITY,
            NumericIndexStrategyFilterIterator.IDX_FILTER_ITERATOR_NAME,
            NumericIndexStrategyFilterIterator.class);

    iteratorSetting.addOption(
        NumericIndexStrategyFilterIterator.INDEX_STRATEGY_KEY,
        ByteArrayUtils.byteArrayToString(
            PersistenceUtils.toBinary(params.getIndex().getIndexStrategy())));

    iteratorSetting.addOption(
        NumericIndexStrategyFilterIterator.COORDINATE_RANGE_KEY,
        ByteArrayUtils.byteArrayToString(
            new ArrayOfArrays(
                coords.toArray(new MultiDimensionalCoordinateRangesArray[] {})).toBinary()));
    scanner.addScanIterator(iteratorSetting);
  }
}
 
示例10
protected <T> void addFieldSubsettingToIterator(
    final RangeReaderParams<T> params,
    final ScannerBase scanner) {
  if ((params.getFieldSubsets() != null) && !params.isAggregation()) {
    final String[] fieldNames = params.getFieldSubsets().getLeft();
    final DataTypeAdapter<?> associatedAdapter = params.getFieldSubsets().getRight();
    if ((fieldNames != null) && (fieldNames.length > 0) && (associatedAdapter != null)) {
      final IteratorSetting iteratorSetting = AttributeSubsettingIterator.getIteratorSetting();

      AttributeSubsettingIterator.setFieldNames(
          iteratorSetting,
          associatedAdapter,
          fieldNames,
          params.getIndex().getIndexModel());

      iteratorSetting.addOption(
          AttributeSubsettingIterator.WHOLE_ROW_ENCODED_KEY,
          Boolean.toString(params.isMixedVisibility()));
      scanner.addScanIterator(iteratorSetting);
    }
  }
}
 
示例11
public AccumuloReader(
    final ScannerBase scanner,
    final GeoWaveRowIteratorTransformer<T> transformer,
    final int partitionKeyLength,
    final boolean wholeRowEncoding,
    final boolean clientSideRowMerging,
    final boolean parallel) {
  this.scanner = scanner;
  this.partitionKeyLength = partitionKeyLength;
  this.wholeRowEncoding = wholeRowEncoding;
  this.baseIter = scanner.iterator();

  if (parallel) {
    this.parallelDecoder =
        new SimpleParallelDecoder<>(transformer, getIterator(clientSideRowMerging));
    try {
      this.parallelDecoder.startDecode();
    } catch (final Exception e) {
      Throwables.propagate(e);
    }

    this.iterator = parallelDecoder;
  } else {
    this.iterator = transformer.apply(getIterator(clientSideRowMerging));
  }
}
 
示例12
/**
 * Converts a {@link ScannerBase} into a {@link CloseableIterable}
 */
public static CloseableIterable<Entry<Key, Value>> closeableIterable(final ScannerBase scanner) {
    checkNotNull(scanner);
    return new FluentCloseableIterable<Entry<Key, Value>>() {
        @Override
        protected void doClose() throws IOException {
            if (scanner instanceof BatchScanner)
                ((BatchScanner) scanner).close();
        }

        @Override
        protected Iterator<Entry<Key, Value>> retrieveIterator() {
            return scanner.iterator();
        }
    };
}
 
示例13
protected ScannerBase metricScanner(AccumuloFeatureConfig xform, Date start, Date end, String group, Iterable<String> types, String name, TimeUnit timeUnit, Auths auths) {
    checkNotNull(xform);

    try {
        group = defaultString(group);
        timeUnit = (timeUnit == null ? TimeUnit.MINUTES : timeUnit);

        BatchScanner scanner = connector.createBatchScanner(tableName + REVERSE_SUFFIX, auths.getAuths(), config.getMaxQueryThreads());

        Collection<Range> typeRanges = new ArrayList();
        for (String type : types)
            typeRanges.add(buildRange(type, start, end, timeUnit));

        scanner.setRanges(typeRanges);
        scanner.fetchColumn(new Text(combine(timeUnit.toString(), xform.featureName())), new Text(combine(group, name)));

        return scanner;

    } catch (TableNotFoundException e) {
        throw new RuntimeException(e);
    }
}
 
示例14
public static final void configureGlobalIndexDateRangeFilter(ShardQueryConfiguration config, ScannerBase bs, LongRange dateRange) {
    // Setup the GlobalIndexDateRangeFilter
    
    if (log.isTraceEnabled()) {
        log.trace("Configuring GlobalIndexDateRangeFilter with " + dateRange);
    }
    IteratorSetting cfg = configureGlobalIndexDateRangeFilter(config, dateRange);
    bs.addScanIterator(cfg);
}
 
示例15
public static final void configureGlobalIndexDataTypeFilter(ShardQueryConfiguration config, ScannerBase bs, Collection<String> dataTypes) {
    if (dataTypes == null || dataTypes.isEmpty()) {
        return;
    }
    if (log.isTraceEnabled()) {
        log.trace("Configuring GlobalIndexDataTypeFilter with " + dataTypes);
    }
    
    IteratorSetting cfg = configureGlobalIndexDataTypeFilter(config, dataTypes);
    if (cfg == null) {
        return;
    }
    
    bs.addScanIterator(cfg);
}
 
示例16
public static final void configureGlobalIndexTermMatchingIterator(ShardQueryConfiguration config, ScannerBase bs, Collection<String> literals,
                Collection<String> patterns, boolean reverseIndex, boolean limitToUniqueTerms, Collection<String> expansionFields) {
    if (CollectionUtils.isEmpty(literals) && CollectionUtils.isEmpty(patterns)) {
        return;
    }
    if (log.isTraceEnabled()) {
        log.trace("Configuring GlobalIndexTermMatchingIterator with " + literals + " and " + patterns);
    }
    
    IteratorSetting cfg = configureGlobalIndexTermMatchingIterator(config, literals, patterns, reverseIndex, limitToUniqueTerms);
    
    bs.addScanIterator(cfg);
    
    setExpansionFields(config, bs, reverseIndex, expansionFields);
}
 
示例17
public synchronized boolean close(ScannerBase bs) {
    boolean removed = instances.remove(bs);
    if (removed) {
        log.debug("Closed scanner " + System.identityHashCode(bs));
        if (log.isTraceEnabled()) {
            log.trace("Closing instance " + bs.hashCode());
        }
        bs.close();
    }
    return removed;
}
 
示例18
@Override
public void close() {
    super.close();
    
    if (null != scannerFactory) {
        scannerFactory.lockdown();
        for (ScannerBase scanner : scannerFactory.currentScanners()) {
            scanner.close();
        }
    }
}
 
示例19
public List<FieldStat> getStat(Set<String> fields, Set<String> dataTypes, SortedSet<String> dates) throws IOException {
    // To allow "fields" to be empty and configure the scanners the same,
    // I have to have to references to the Scanner implementation because
    // ScannerBase does not implement Iterable<Entry<Key, Value>>
    final ScannerBase scanner;
    final Iterable<Entry<Key,Value>> dataSource;
    try {
        Set<Authorizations> auths = Collections.singleton(con.securityOperations().getUserAuthorizations(con.whoami()));
        if (fields.isEmpty()) {
            Scanner simpleScanner = ScannerHelper.createScanner(con, table, auths);
            dataSource = simpleScanner;
            scanner = simpleScanner;
        } else {
            BatchScanner bScanner = ScannerHelper.createBatchScanner(con, table, auths, fields.size());
            bScanner.setRanges(buildRanges(fields));
            scanner = bScanner;
            dataSource = bScanner;
        }
    } catch (Exception e) {
        log.error(e);
        throw new IOException(e);
    }
    
    configureScanIterators(scanner, dataTypes, dates);
    
    List<FieldStat> results = scanResults(dataSource);
    
    if (scanner instanceof BatchScanner) {
        scanner.close();
    }
    
    return results;
}
 
示例20
@SuppressWarnings("unchecked")
public void logStartIterator(String table, ScannerBase scanner) {
    if (!queryLogger.isTraceEnabled()) {
        return;
    }

    SortedSet<Column> fetchedColumns = null;
    if (scanner instanceof ScannerOptions) {
        fetchedColumns = ((ScannerOptions) scanner).getFetchedColumns();
    }

    if (scanner instanceof BatchScanner) {
        try {
            Field rangesField = scanner.getClass().getDeclaredField("ranges");
            rangesField.setAccessible(true);
            ArrayList<Range> ranges = (ArrayList<Range>) rangesField.get(scanner);
            if (ranges.size() == 0) {
                logStartIterator(table, (Range) null, fetchedColumns);
            } else if (ranges.size() == 1) {
                logStartIterator(table, ranges.iterator().next(), fetchedColumns);
            } else {
                logStartIterator(table, ranges, fetchedColumns);
            }
        } catch (Exception e) {
            queryLogger.trace("Could not get ranges from BatchScanner", e);
        }
    } else if (scanner instanceof Scanner) {
        Range range = ((Scanner) scanner).getRange();
        logStartIterator(table, range, fetchedColumns);
    } else {
        queryLogger.trace("begin accumulo iterator: %s", scanner.getClass().getName());
    }
}
 
示例21
private Map<String, byte[]> streamingPropertyValueTableDatas(List<String> dataRowKeys) {
    try {
        if (dataRowKeys.size() == 0) {
            return Collections.emptyMap();
        }

        List<org.apache.accumulo.core.data.Range> ranges = dataRowKeys.stream()
            .map(RangeUtils::createRangeFromString)
            .collect(Collectors.toList());

        final long timerStartTime = System.currentTimeMillis();
        ScannerBase scanner = graph.createBatchScanner(graph.getDataTableName(), ranges, new org.apache.accumulo.core.security.Authorizations());

        graph.getGraphLogger().logStartIterator(graph.getDataTableName(), scanner);
        Span trace = Trace.start("streamingPropertyValueTableData");
        trace.data("dataRowKeyCount", Integer.toString(dataRowKeys.size()));
        try {
            Map<String, byte[]> results = new HashMap<>();
            for (Map.Entry<Key, Value> col : scanner) {
                results.put(col.getKey().getRow().toString(), col.getValue().get());
            }
            return results;
        } finally {
            scanner.close();
            trace.stop();
            graph.getGraphLogger().logEndIterator(System.currentTimeMillis() - timerStartTime);
        }
    } catch (Exception ex) {
        throw new VertexiumException(ex);
    }
}
 
示例22
private ScannerBase getScanner() throws IOException {
    if (closed) {
        throw new IOException("stream already closed");
    }
    if (scanner != null) {
        return scanner;
    }
    ArrayList<Range> ranges = Lists.newArrayList(RangeUtils.createRangeFromString(dataRowKey));

    timerStartTime = System.currentTimeMillis();
    try {
        scanner = graph.createBatchScanner(graph.getDataTableName(), ranges, new org.apache.accumulo.core.security.Authorizations());
    } catch (TableNotFoundException ex) {
        throw new VertexiumException("Could not create scanner", ex);
    }

    IteratorSetting iteratorSetting = new IteratorSetting(
        80,
        TimestampFilter.class.getSimpleName(),
        TimestampFilter.class
    );
    TimestampFilter.setStart(iteratorSetting, timestamp, true);
    TimestampFilter.setEnd(iteratorSetting, timestamp, true);
    scanner.addScanIterator(iteratorSetting);

    graph.getGraphLogger().logStartIterator(graph.getDataTableName(), scanner);
    trace = Trace.start("streamingPropertyValueTableData");
    trace.data("dataRowKeyCount", Integer.toString(1));
    return scanner;
}
 
示例23
public ScannerBase doQuery(final TemporalInstant queryInstant, final StatementConstraints constraints) throws QueryEvaluationException {
    // key is contraintPrefix + time, or just time.
    // Any constraints handled here, if the constraints are empty, the
    // thisKeyParts.contraintPrefix will be null.
    final List<KeyParts> keyParts = KeyParts.keyPartsForQuery(queryInstant, constraints);
    ScannerBase scanner = null;
    if (keyParts.size() > 1) {
        scanner = getBatchScanner();
    } else {
        scanner = getScanner();
    }

    final Collection<Range> ranges = new HashSet<Range>();
    KeyParts lastKeyParts = null;
    Range range = null;
    for (final KeyParts thisKeyParts : keyParts) {
        range = getRange(thisKeyParts);
        ranges.add(range);
        lastKeyParts = thisKeyParts;
    }
    if (lastKeyParts == null || scanner == null) {
        throw new NullPointerException("lastkeyParts or scanner is null, impossible! keyParts.size()= " + keyParts.size() + " scanner= " + scanner);
    }
    //System.out.println("Scanning columns, cf:" + lastKeyParts.cf + "CQ:" + lastKeyParts.cq);
    scanner.fetchColumn(new Text(lastKeyParts.cf), new Text(lastKeyParts.cq));
    if (scanner instanceof BatchScanner) {
        ((BatchScanner) scanner).setRanges(ranges);
    } else if (range != null) {
        ((Scanner) scanner).setRange(range);
    }
    return scanner;
}
 
示例24
/**
 * An iteration wrapper for a loaded scanner that is returned for queries above.
 * Currently, this temporal index supports contexts only on the client, using this filter.
 *
 * @param scanner  the results to iterate, then close.
 * @param constraints  limit statements returned by next() to those matching the constraints.
 * @return an anonymous object that will iterate the resulting statements from a given scanner.
 * @throws QueryEvaluationException
 */
private static CloseableIteration<Statement, QueryEvaluationException> getContextIteratorWrapper(final ScannerBase scanner, final Resource context) {
    if (context==null) {
        return getIteratorWrapper(scanner);
    }
    return new ConstrainedIteratorWrapper(scanner) {
        @Override
        public boolean allowedBy(final Statement statement) {
            return allowedByContext(statement,  context);
        }
    };
}
 
示例25
protected <T> void addConstraintsScanIteratorSettings(
    final RecordReaderParams params,
    final ScannerBase scanner,
    final DataStoreOptions options) {
  addFieldSubsettingToIterator(params, scanner);
  if (params.isMixedVisibility()) {
    // we have to at least use a whole row iterator
    final IteratorSetting iteratorSettings =
        new IteratorSetting(
            QueryFilterIterator.QUERY_ITERATOR_PRIORITY,
            QueryFilterIterator.QUERY_ITERATOR_NAME,
            WholeRowIterator.class);
    scanner.addScanIterator(iteratorSettings);
  }
}
 
示例26
protected <T> void addRowScanIteratorSettings(
    final ReaderParams<T> params,
    final ScannerBase scanner) {
  addFieldSubsettingToIterator(params, scanner);
  if (params.isMixedVisibility()) {
    // we have to at least use a whole row iterator
    final IteratorSetting iteratorSettings =
        new IteratorSetting(
            QueryFilterIterator.QUERY_ITERATOR_PRIORITY,
            QueryFilterIterator.QUERY_ITERATOR_NAME,
            WholeRowIterator.class);
    scanner.addScanIterator(iteratorSettings);
  }
}
 
示例27
@SuppressWarnings("unchecked")
@Override
public <T> RowReader<T> createReader(final ReaderParams<T> params) {
  final ScannerBase scanner = getScanner(params, false);

  addConstraintsScanIteratorSettings(params, scanner, options);

  return new AccumuloReader<>(
      scanner,
      params.getRowTransformer(),
      params.getIndex().getIndexStrategy().getPartitionKeyLength(),
      params.isMixedVisibility() && !params.isServersideAggregation(),
      params.isClientsideRowMerging(),
      true);
}
 
示例28
@SuppressWarnings("unchecked")
@Override
public RowReader<GeoWaveRow> createReader(final RecordReaderParams readerParams) {
  final ScannerBase scanner = getScanner(readerParams);
  addConstraintsScanIteratorSettings(readerParams, scanner, options);
  return new AccumuloReader<>(
      scanner,
      GeoWaveRowIteratorTransformer.NO_OP_TRANSFORMER,
      readerParams.getIndex().getIndexStrategy().getPartitionKeyLength(),
      readerParams.isMixedVisibility(),
      readerParams.isClientsideRowMerging(),
      false);
}
 
示例29
private static CloseableIterator<Entry<Key, Value>> getIterator(
    final Connector connector,
    final String namespace,
    final Index index)
    throws AccumuloException, AccumuloSecurityException, IOException, TableNotFoundException {
  CloseableIterator<Entry<Key, Value>> iterator = null;
  final AccumuloOptions options = new AccumuloOptions();
  final AccumuloOperations operations =
      new AccumuloOperations(connector, namespace, new AccumuloOptions());
  final IndexStore indexStore = new IndexStoreImpl(operations, options);
  final PersistentAdapterStore adapterStore = new AdapterStoreImpl(operations, options);

  if (indexStore.indexExists(index.getName())) {
    final ScannerBase scanner = operations.createBatchScanner(index.getName());
    ((BatchScanner) scanner).setRanges(AccumuloUtils.byteArrayRangesToAccumuloRanges(null));
    final IteratorSetting iteratorSettings =
        new IteratorSetting(10, "GEOWAVE_WHOLE_ROW_ITERATOR", WholeRowIterator.class);
    scanner.addScanIterator(iteratorSettings);

    final Iterator<Entry<Key, Value>> it =
        new IteratorWrapper(
            adapterStore,
            index,
            scanner.iterator(),
            new QueryFilter[] {new DedupeFilter()});

    iterator = new CloseableIteratorWrapper<>(new ScannerClosableWrapper(scanner), it);
  }
  return iterator;
}
 
示例30
static void setupScanner(ScannerBase scanner, Collection<Column> columns, long startTs,
    boolean showReadLocks) {
  for (Column col : columns) {
    if (col.isQualifierSet()) {
      scanner.fetchColumn(ByteUtil.toText(col.getFamily()), ByteUtil.toText(col.getQualifier()));
    } else {
      scanner.fetchColumnFamily(ByteUtil.toText(col.getFamily()));
    }
  }

  IteratorSetting iterConf = new IteratorSetting(10, SnapshotIterator.class);
  SnapshotIterator.setSnaptime(iterConf, startTs);
  SnapshotIterator.setReturnReadLockPresent(iterConf, showReadLocks);
  scanner.addScanIterator(iterConf);
}