Java源码示例:parquet.hadoop.metadata.BlockMetaData
示例1
public ParquetReader(MessageType fileSchema,
MessageType requestedSchema,
List<BlockMetaData> blocks,
ParquetDataSource dataSource,
TypeManager typeManager)
{
this.fileSchema = fileSchema;
this.requestedSchema = requestedSchema;
this.blocks = blocks;
this.dataSource = dataSource;
this.typeManager = typeManager;
initializeColumnReaders();
}
示例2
/**
* get the blocks (row groups) of the parquet files.
* @return
*/
public List<BlockMetaData> getBlocks ()
{
if (this.blockMetaDataList == null || this.blockMetaDataList.size() == 0)
{
this.blockMetaDataList = new ArrayList<BlockMetaData>();
for (ParquetFileMetadata meta : this.fileMetaDataList)
{
this.blockMetaDataList.addAll(meta.getBlocks());
}
}
return this.blockMetaDataList;
}
示例3
/**
* get the total compressed size of the parquet files.
* @return
*/
@Override
public long getTotalSize ()
{
long size = 0;
for (BlockMetaData meta : this.getBlocks())
{
size += meta.getCompressedSize();
}
return size;
}
示例4
@Test
public void testGetStat () throws IOException, MetadataException
{
//ParquetMetadataStat stat = new ParquetMetadataStat("10.172.96.77", 9000, "/tmp/hive-root/hive_2015-02-04_10-57-36_131_1404874572956637570-1/_tmp.-ext-10002");
ParquetMetadataStat stat = new ParquetMetadataStat("192.168.124.15", 9000, "/msra/parquet_test");
double[] columnSize = stat.getAvgColumnChunkSize();
List<String> names = stat.getFieldNames();
int i = 0;
double total = 0;
for (double size: columnSize)
{
//System.out.println(names.get(i) + "\t" + size);
total += size;
i++;
}
System.out.println(total/1024/1024 + "\t" + stat.getRowGroupCount() + "\t" + stat.getFileCount());
for (BlockMetaData bm : stat.getBlocks())
{
System.out.println(bm.getCompressedSize() + ", " + bm.getTotalByteSize() + ", " + bm.getRowCount());
}
List<ParquetFileMetadata> metaDatas = stat.getFileMetaData();
System.out.println(metaDatas.get(0).getFileMetaData().getCreatedBy());
Map<String, String> keyValueMetaData = metaDatas.get(0).getFileMetaData().getKeyValueMetaData();
for (String key : keyValueMetaData.keySet())
{
System.out.println(key + "=" + keyValueMetaData.get(key));
}
for (int j = 0; j < names.size(); ++j)
{
System.out.println(names.get(j) + "\t" + columnSize[j] + "\n");
}
}
示例5
public static void showDetails(PrettyPrintWriter out, ParquetMetadata meta) {
showDetails(out, meta.getFileMetaData());
long i = 1;
for (BlockMetaData bmeta : meta.getBlocks()) {
out.println();
showDetails(out, bmeta, i++);
}
}
示例6
private static void showDetails(PrettyPrintWriter out, BlockMetaData meta, Long num) {
long rows = meta.getRowCount();
long tbs = meta.getTotalByteSize();
out.format("row group%s: RC:%d TS:%d%n", (num == null ? "" : " " + num), rows, tbs);
out.rule('-');
showDetails(out, meta.getColumns());
}
示例7
public static ParquetMetadata readFooter(FileSystem fileSystem, Path file)
throws IOException
{
FileStatus fileStatus = fileSystem.getFileStatus(file);
try (FSDataInputStream inputStream = fileSystem.open(file)) {
// Parquet File Layout:
//
// MAGIC
// variable: Data
// variable: Metadata
// 4 bytes: MetadataLength
// MAGIC
long length = fileStatus.getLen();
validateParquet(length >= MAGIC.length + PARQUET_METADATA_LENGTH + MAGIC.length, "%s is not a valid Parquet File", file);
long metadataLengthIndex = length - PARQUET_METADATA_LENGTH - MAGIC.length;
inputStream.seek(metadataLengthIndex);
int metadataLength = readIntLittleEndian(inputStream);
byte[] magic = new byte[MAGIC.length];
inputStream.readFully(magic);
validateParquet(Arrays.equals(MAGIC, magic), "Not valid Parquet file: %s expected magic number: %s got: %s", file, Arrays.toString(MAGIC), Arrays.toString(magic));
long metadataIndex = metadataLengthIndex - metadataLength;
validateParquet(
metadataIndex >= MAGIC.length && metadataIndex < metadataLengthIndex,
"Corrupted Parquet file: %s metadata index: %s out of range",
file,
metadataIndex);
inputStream.seek(metadataIndex);
FileMetaData fileMetaData = readFileMetaData(inputStream);
List<SchemaElement> schema = fileMetaData.getSchema();
validateParquet(!schema.isEmpty(), "Empty Parquet schema in file: %s", file);
MessageType messageType = readParquetSchema(schema);
List<BlockMetaData> blocks = new ArrayList<>();
List<RowGroup> rowGroups = fileMetaData.getRow_groups();
if (rowGroups != null) {
for (RowGroup rowGroup : rowGroups) {
BlockMetaData blockMetaData = new BlockMetaData();
blockMetaData.setRowCount(rowGroup.getNum_rows());
blockMetaData.setTotalByteSize(rowGroup.getTotal_byte_size());
List<ColumnChunk> columns = rowGroup.getColumns();
validateParquet(!columns.isEmpty(), "No columns in row group: %s", rowGroup);
String filePath = columns.get(0).getFile_path();
for (ColumnChunk columnChunk : columns) {
validateParquet(
(filePath == null && columnChunk.getFile_path() == null)
|| (filePath != null && filePath.equals(columnChunk.getFile_path())),
"all column chunks of the same row group must be in the same file");
ColumnMetaData metaData = columnChunk.meta_data;
String[] path = metaData.path_in_schema.toArray(new String[metaData.path_in_schema.size()]);
ColumnPath columnPath = ColumnPath.get(path);
ColumnChunkMetaData column = ColumnChunkMetaData.get(
columnPath,
messageType.getType(columnPath.toArray()).asPrimitiveType().getPrimitiveTypeName(),
CompressionCodecName.fromParquet(metaData.codec),
readEncodings(metaData.encodings),
readStats(metaData.statistics, messageType.getType(columnPath.toArray()).asPrimitiveType().getPrimitiveTypeName()),
metaData.data_page_offset,
metaData.dictionary_page_offset,
metaData.num_values,
metaData.total_compressed_size,
metaData.total_uncompressed_size);
blockMetaData.addColumn(column);
}
blockMetaData.setPath(filePath);
blocks.add(blockMetaData);
}
}
Map<String, String> keyValueMetaData = new HashMap<>();
List<KeyValue> keyValueList = fileMetaData.getKey_value_metadata();
if (keyValueList != null) {
for (KeyValue keyValue : keyValueList) {
keyValueMetaData.put(keyValue.key, keyValue.value);
}
}
return new ParquetMetadata(new parquet.hadoop.metadata.FileMetaData(messageType, keyValueMetaData, fileMetaData.getCreated_by()), blocks);
}
}
示例8
private Optional<ConnectorPageSource> createParaflowPageSource(
Path path,
long start,
long length,
List<ParaflowColumnHandle> columns)
{
Optional<FileSystem> fileSystemOptional = fsFactory.getFileSystem();
FileSystem fileSystem;
ParquetDataSource dataSource;
if (fileSystemOptional.isPresent()) {
fileSystem = fileSystemOptional.get();
}
else {
throw new RuntimeException("Could not find filesystem for path " + path);
}
try {
dataSource = buildHdfsParquetDataSource(fileSystem, path, start, length);
// default length is file size, which means whole file is a split
length = dataSource.getSize();
ParquetMetadata parquetMetadata = ParquetMetadataReader.readFooter(fileSystem, path);
FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
MessageType fileSchema = fileMetaData.getSchema();
List<Type> fields = columns.stream()
.filter(column -> column.getColType() != ParaflowColumnHandle.ColumnType.NOTVALID)
.map(column -> getParquetType(column, fileSchema))
.filter(Objects::nonNull)
.collect(Collectors.toList());
MessageType requestedSchema = new MessageType(fileSchema.getName(), fields);
List<BlockMetaData> blocks = new ArrayList<>();
for (BlockMetaData block : parquetMetadata.getBlocks()) {
long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset();
if (firstDataPage >= start && firstDataPage < start + length) {
blocks.add(block);
}
}
ParquetReader parquetReader = new ParquetReader(
fileSchema,
requestedSchema,
blocks,
dataSource,
typeManager);
return Optional.of(new ParaflowPageSource(
parquetReader,
dataSource,
fileSchema,
requestedSchema,
length,
columns,
typeManager));
}
catch (IOException e) {
log.error(e);
return Optional.empty();
}
}
示例9
public List<BlockMetaData> getBlocks ()
{
return this.metaData.getBlocks();
}
示例10
@Test
public void test11() throws IOException, InterruptedException {
// verify locations in order
ArrayList<InputSplit> rawSplits = new ArrayList<InputSplit>();
// first split is parquetinputsplit
rawSplits.add(new ParquetInputSplit(new Path("path1"), 0, 100,
new String[] { "l1", "l2", "l3" },
new ArrayList<BlockMetaData>(), "", "",
new HashMap<String, String>(), new HashMap<String, String>()));
// second split is file split
rawSplits.add(new FileSplit(new Path("path2"), 0, 400, new String[] {
"l5", "l6", "l1" }));
List<InputSplit> result = pigInputFormat.getPigSplits(rawSplits, 0, ok,
null, true, conf);
// pig combines two into one pigsplit
Assert.assertEquals(result.size(), 1);
for (InputSplit split : result) {
PigSplit pigSplit = (PigSplit) split;
// write to a byte array output stream
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
DataOutput out = new DataOutputStream(outputStream);
pigSplit.write(out);
// restore the pig split from the byte array
ByteArrayInputStream inputStream = new ByteArrayInputStream(
outputStream.toByteArray());
DataInput in = new DataInputStream(inputStream);
PigSplit anotherSplit = new PigSplit();
anotherSplit.setConf(conf);
anotherSplit.readFields(in);
Assert.assertEquals(500, anotherSplit.getLength());
Assert.assertEquals(2, anotherSplit.getNumPaths());
Assert.assertEquals("parquet.hadoop.ParquetInputSplit",
(anotherSplit.getWrappedSplit(0).getClass().getName()));
Assert.assertEquals(
"org.apache.hadoop.mapreduce.lib.input.FileSplit",
(anotherSplit.getWrappedSplit(1).getClass().getName()));
}
}
示例11
public static void showDetails(PrettyPrintWriter out, BlockMetaData meta) {
showDetails(out, meta, null);
}