Java源码示例:org.apache.hive.hcatalog.data.transfer.ReaderContext
示例1
private static ReaderContext getHiveReaderContext(String database, String table, Map<String, String> partitionKV) throws Exception {
HiveConf hiveConf = new HiveConf(HiveTableReader.class);
Iterator<Entry<String, String>> itr = hiveConf.iterator();
Map<String, String> map = new HashMap<String, String>();
while (itr.hasNext()) {
Entry<String, String> kv = itr.next();
map.put(kv.getKey(), kv.getValue());
}
ReadEntity entity;
if (partitionKV == null || partitionKV.size() == 0) {
entity = new ReadEntity.Builder().withDatabase(database).withTable(table).build();
} else {
entity = new ReadEntity.Builder().withDatabase(database).withTable(table).withPartition(partitionKV).build();
}
HCatReader reader = DataTransferFactory.getHCatReader(entity, map);
ReaderContext cntxt = reader.prepareRead();
return cntxt;
}
示例2
/**
* Calculates the 'desired' number of splits based on desiredBundleSizeBytes which is passed as
* a hint to native API. Retrieves the actual splits generated by native API, which could be
* different from the 'desired' split count calculated using desiredBundleSizeBytes
*/
@Override
public List<BoundedSource<HCatRecord>> split(
long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
int desiredSplitCount = 1;
long estimatedSizeBytes = getEstimatedSizeBytes(options);
if (desiredBundleSizeBytes > 0 && estimatedSizeBytes > 0) {
desiredSplitCount = (int) Math.ceil((double) estimatedSizeBytes / desiredBundleSizeBytes);
}
ReaderContext readerContext = getReaderContext(desiredSplitCount);
// process the splits returned by native API
// this could be different from 'desiredSplitCount' calculated above
LOG.info(
"Splitting into bundles of {} bytes: "
+ "estimated size {}, desired split count {}, actual split count {}",
desiredBundleSizeBytes,
estimatedSizeBytes,
desiredSplitCount,
readerContext.numSplits());
List<BoundedSource<HCatRecord>> res = new ArrayList<>();
for (int split = 0; split < readerContext.numSplits(); split++) {
res.add(new BoundedHCatalogSource(spec.withContext(readerContext).withSplitId(split)));
}
return res;
}
示例3
/** Test of Read using SourceTestUtils.readFromSource(..). */
@Test
@NeedsTestData
public void testReadFromSource() throws Exception {
ReaderContext context = getReaderContext(getConfigPropertiesAsMap(service.getHiveConf()));
HCatalogIO.Read spec =
HCatalogIO.read()
.withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf()))
.withContext(context)
.withTable(TEST_TABLE);
List<String> records = new ArrayList<>();
for (int i = 0; i < context.numSplits(); i++) {
BoundedHCatalogSource source = new BoundedHCatalogSource(spec.withSplitId(i));
for (HCatRecord record : SourceTestUtils.readFromSource(source, OPTIONS)) {
records.add(record.get(0).toString());
}
}
assertThat(records, containsInAnyOrder(getExpectedRecords(TEST_RECORDS_COUNT).toArray()));
}
示例4
/** Test of Read using SourceTestUtils.assertSourcesEqualReferenceSource(..). */
@Test
@NeedsTestData
public void testSourceEqualsSplits() throws Exception {
final int numRows = 1500;
final int numSamples = 10;
final long bytesPerRow = 15;
ReaderContext context = getReaderContext(getConfigPropertiesAsMap(service.getHiveConf()));
HCatalogIO.Read spec =
HCatalogIO.read()
.withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf()))
.withContext(context)
.withTable(TEST_TABLE);
BoundedHCatalogSource source = new BoundedHCatalogSource(spec);
List<BoundedSource<HCatRecord>> unSplitSource = source.split(-1, OPTIONS);
assertEquals(1, unSplitSource.size());
List<BoundedSource<HCatRecord>> splits =
source.split(numRows * bytesPerRow / numSamples, OPTIONS);
assertTrue(splits.size() >= 1);
SourceTestUtils.assertSourcesEqualReferenceSource(unSplitSource.get(0), splits, OPTIONS);
}
示例5
private static ReaderContext getHiveReaderContext(String database, String table, Map<String, String> partitionKV) throws Exception {
HiveConf hiveConf = new HiveConf(HiveTableReader.class);
Iterator<Entry<String, String>> itr = hiveConf.iterator();
Map<String, String> map = new HashMap<String, String>();
while (itr.hasNext()) {
Entry<String, String> kv = itr.next();
map.put(kv.getKey(), kv.getValue());
}
ReadEntity entity;
if (partitionKV == null || partitionKV.size() == 0) {
entity = new ReadEntity.Builder().withDatabase(database).withTable(table).build();
} else {
entity = new ReadEntity.Builder().withDatabase(database).withTable(table).withPartition(partitionKV).build();
}
HCatReader reader = DataTransferFactory.getHCatReader(entity, map);
ReaderContext cntxt = reader.prepareRead();
return cntxt;
}
示例6
private static ReaderContext getHiveReaderContext(String database, String table, Map<String, String> partitionKV) throws Exception {
HiveConf hiveConf = new HiveConf(HiveTableReader.class);
Iterator<Entry<String, String>> itr = hiveConf.iterator();
Map<String, String> map = new HashMap<String, String>();
while (itr.hasNext()) {
Entry<String, String> kv = itr.next();
map.put(kv.getKey(), kv.getValue());
}
ReadEntity entity;
if (partitionKV == null || partitionKV.size() == 0) {
entity = new ReadEntity.Builder().withDatabase(database).withTable(table).build();
} else {
entity = new ReadEntity.Builder().withDatabase(database).withTable(table).withPartition(partitionKV).build();
}
HCatReader reader = DataTransferFactory.getHCatReader(entity, map);
ReaderContext cntxt = reader.prepareRead();
return cntxt;
}
示例7
private ReaderContext getReaderContext(Read readRequest, Integer partitionIndexToRead)
throws Exception {
final List<Partition> partitions =
metaStoreClient.listPartitions(
readRequest.getDatabase(), readRequest.getTable(), Short.MAX_VALUE);
final Partition partition = partitions.get(partitionIndexToRead);
checkArgument(
partition != null, "Unable to find a partition to read at index " + partitionIndexToRead);
final int desiredSplitCount = HCatalogUtils.getSplitCount(readRequest, partition);
final List<String> values = partition.getValues();
final List<String> partitionCols = readRequest.getPartitionCols();
checkArgument(
values.size() == partitionCols.size(),
"Number of input partitions should be equal to the values of list partition values.");
List<String> filter = new ArrayList<>();
for (int i = 0; i < partitionCols.size(); i++) {
filter.add(partitionCols.get(i) + "=" + "'" + values.get(i) + "'");
}
final String filterString = String.join(" and ", filter);
ReadEntity entity =
new ReadEntity.Builder()
.withDatabase(readRequest.getDatabase())
.withFilter(filterString)
.withTable(readRequest.getTable())
.build();
// pass the 'desired' split count as an hint to the API
Map<String, String> configProps = new HashMap<>(readRequest.getConfigProperties());
configProps.put(
HCatConstants.HCAT_DESIRED_PARTITION_NUM_SPLITS, String.valueOf(desiredSplitCount));
return DataTransferFactory.getHCatReader(entity, configProps).prepareRead();
}
示例8
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
final Read readRequest = c.element().getKey();
final Integer partitionIndexToRead = c.element().getValue();
ReaderContext readerContext = getReaderContext(readRequest, partitionIndexToRead);
for (int i = 0; i < readerContext.numSplits(); i++) {
HCatReader reader = DataTransferFactory.getHCatReader(readerContext, i);
Iterator<HCatRecord> hcatIterator = reader.read();
while (hcatIterator.hasNext()) {
final HCatRecord record = hcatIterator.next();
c.output(record);
}
}
}
示例9
private ReaderContext getReaderContext(long desiredSplitCount) throws HCatException {
ReadEntity entity =
new ReadEntity.Builder()
.withDatabase(spec.getDatabase())
.withTable(spec.getTable())
.withFilter(spec.getFilter())
.build();
// pass the 'desired' split count as an hint to the API
Map<String, String> configProps = new HashMap<>(spec.getConfigProperties());
configProps.put(
HCatConstants.HCAT_DESIRED_PARTITION_NUM_SPLITS, String.valueOf(desiredSplitCount));
return DataTransferFactory.getHCatReader(entity, configProps).prepareRead();
}
示例10
private static Iterator<HCatRecord> loadHCatRecordItr(ReaderContext readCntxt, int dataSplit) throws HCatException {
HCatReader currentHCatReader = DataTransferFactory.getHCatReader(readCntxt, dataSplit);
return currentHCatReader.read();
}
示例11
/** Returns a ReaderContext instance for the passed datastore config params. */
public static ReaderContext getReaderContext(Map<String, String> config) throws HCatException {
return DataTransferFactory.getHCatReader(READ_ENTITY, config).prepareRead();
}
示例12
@Nullable
abstract ReaderContext getContext();
示例13
Read withContext(ReaderContext context) {
return toBuilder().setContext(context).build();
}
示例14
private static Iterator<HCatRecord> loadHCatRecordItr(ReaderContext readCntxt, int dataSplit) throws HCatException {
HCatReader currentHCatReader = DataTransferFactory.getHCatReader(readCntxt, dataSplit);
return currentHCatReader.read();
}
示例15
private static Iterator<HCatRecord> loadHCatRecordItr(ReaderContext readCntxt, int dataSplit) throws HCatException {
HCatReader currentHCatReader = DataTransferFactory.getHCatReader(readCntxt, dataSplit);
return currentHCatReader.read();
}
示例16
abstract Builder setContext(ReaderContext context);