Java源码示例:org.apache.parquet.io.SeekableInputStream
示例1
public synchronized int read(DrillBuf buf, int off, int len) throws IOException {
buf.clear();
ByteBuffer directBuffer = buf.nioBuffer(0, len);
int lengthLeftToRead = len;
SeekableInputStream seekableInputStream = HadoopStreams.wrap(getInputStream());
while (lengthLeftToRead > 0) {
if(logger.isTraceEnabled()) {
logger.trace("PERF: Disk read start. {}, StartOffset: {}, TotalByteSize: {}", this.streamId, this.startOffset, this.totalByteSize);
}
Stopwatch timer = Stopwatch.createStarted();
int bytesRead = seekableInputStream.read(directBuffer);
if (bytesRead < 0) {
return bytesRead;
}
lengthLeftToRead -= bytesRead;
if(logger.isTraceEnabled()) {
logger.trace(
"PERF: Disk read complete. {}, StartOffset: {}, TotalByteSize: {}, BytesRead: {}, Time: {} ms",
this.streamId, this.startOffset, this.totalByteSize, bytesRead,
((double) timer.elapsed(TimeUnit.MICROSECONDS)) / 1000);
}
}
buf.writerIndex(len);
return len;
}
示例2
private void loadDictionaryIfExists(final ColumnReader<?> parentStatus,
final ColumnChunkMetaData columnChunkMetaData, final SeekableInputStream f) throws IOException {
Stopwatch timer = Stopwatch.createUnstarted();
if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
f.seek(columnChunkMetaData.getDictionaryPageOffset());
long start=f.getPos();
timer.start();
final PageHeader pageHeader = Util.readPageHeader(f);
long timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
long pageHeaderBytes=f.getPos()-start;
this.updateStats(pageHeader, "Page Header", start, timeToRead, pageHeaderBytes, pageHeaderBytes);
assert pageHeader.type == PageType.DICTIONARY_PAGE;
assert isDictionaryEncoded(columnChunkMetaData.getEncodings()) :
format("Missing dictionary encoding for dictionary page %s, in column chunk %s", pageHeader, columnChunkMetaData);
readDictionaryPage(pageHeader, parentStatus);
}
}
示例3
private DictionaryPage readCompressedDictionary(
PageHeader pageHeader, SeekableInputStream fin) throws IOException {
DictionaryPageHeader dictHeader = pageHeader.getDictionary_page_header();
int uncompressedPageSize = pageHeader.getUncompressed_page_size();
int compressedPageSize = pageHeader.getCompressed_page_size();
byte [] dictPageBytes = new byte[compressedPageSize];
fin.readFully(dictPageBytes);
BytesInput bin = BytesInput.from(dictPageBytes);
return new DictionaryPage(
bin, uncompressedPageSize, dictHeader.getNum_values(),
converter.getEncoding(dictHeader.getEncoding()));
}
示例4
/**
* Copy from a FS input stream to an output stream. Thread-safe
*
* @param from a {@link SeekableInputStream}
* @param to any {@link PositionOutputStream}
* @param start where in the from stream to start copying
* @param length the number of bytes to copy
* @throws IOException if there is an error while reading or writing
*/
private static void copy(SeekableInputStream from, PositionOutputStream to,
long start, long length) throws IOException{
LOG.debug("Copying {} bytes at {} to {}" ,length , start , to.getPos());
from.seek(start);
long bytesCopied = 0;
byte[] buffer = COPY_BUFFER.get();
while (bytesCopied < length) {
long bytesLeft = length - bytesCopied;
int bytesRead = from.read(buffer, 0,
(buffer.length < bytesLeft ? buffer.length : (int) bytesLeft));
if (bytesRead < 0) {
throw new IllegalArgumentException(
"Unexpected end of input file at " + start + bytesCopied);
}
to.write(buffer, 0, bytesRead);
bytesCopied += bytesRead;
}
}
示例5
DeprecatedSingleStreamPageReader(ColumnReader<?> parentStatus, SeekableInputStream inputStream, Path path, ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException {
super(parentStatus, inputStream, path, columnChunkMetaData);
try {
lastPosition = inputStream.getPos();
} catch (IOException e) {
throw new ExecutionSetupException("Error in getting current position for parquet file at location: " + path, e);
}
this.inputStream = inputStream;
}
示例6
PageReader(ColumnReader<?> parentStatus, SeekableInputStream inputStream, Path path, ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException {
this.parentColumnReader = parentStatus;
allocatedDictionaryBuffers = new ArrayList<>();
codecFactory = parentColumnReader.parentReader.getCodecFactory();
this.stats = parentColumnReader.parentReader.parquetReaderStats;
long start = columnChunkMetaData.getFirstDataPageOffset();
this.inputStream = inputStream;
try {
this.dataReader = new ColumnDataReader(inputStream, start, columnChunkMetaData.getTotalSize());
loadDictionaryIfExists(parentStatus, columnChunkMetaData, inputStream);
} catch (IOException e) {
throw new ExecutionSetupException("Error opening or reading metadata for parquet file at location: "
+ path.getName(), e);
}
}
示例7
private static SeekableInputStream openFile(FileSystem fs, Path path) throws ExecutionSetupException {
try {
return Streams.wrap(fs.open(path));
} catch (IOException e) {
throw new ExecutionSetupException("Error opening or reading metadata for parquet file at location: "
+ path.getName(), e);
}
}
示例8
@Override
public SeekableInputStream newStream() {
return new DelegatingSeekableInputStream(Channels.newInputStream(seekableByteChannel)) {
@Override
public long getPos() throws IOException {
return seekableByteChannel.position();
}
@Override
public void seek(long newPos) throws IOException {
seekableByteChannel.position(newPos);
}
};
}
示例9
/**
* Reads the meta data block in the footer of the file using provided input stream
* @param file a {@link InputFile} to read
* @param filter the filter to apply to row groups
* @return the metadata blocks in the footer
* @throws IOException if an error occurs while reading the file
* @deprecated will be removed in 2.0.0;
* use {@link ParquetFileReader#open(InputFile, ParquetReadOptions)}
*/
@Deprecated
public static final ParquetMetadata readFooter(InputFile file, MetadataFilter filter) throws IOException {
ParquetReadOptions options;
if (file instanceof HadoopInputFile) {
options = HadoopReadOptions.builder(((HadoopInputFile) file).getConfiguration())
.withMetadataFilter(filter).build();
} else {
options = ParquetReadOptions.builder().withMetadataFilter(filter).build();
}
try (SeekableInputStream in = file.newStream()) {
return readFooter(file, options, in);
}
}
示例10
private static final ParquetMetadata readFooter(InputFile file, ParquetReadOptions options, SeekableInputStream f, ParquetMetadataConverter converter) throws IOException {
long fileLen = file.getLength();
LOG.debug("File length {}", fileLen);
int FOOTER_LENGTH_SIZE = 4;
if (fileLen < MAGIC.length + FOOTER_LENGTH_SIZE + MAGIC.length) { // MAGIC + data + footer + footerIndex + MAGIC
throw new RuntimeException(file.toString() + " is not a Parquet file (too small length: " + fileLen + ")");
}
long footerLengthIndex = fileLen - FOOTER_LENGTH_SIZE - MAGIC.length;
LOG.debug("reading footer index at {}", footerLengthIndex);
f.seek(footerLengthIndex);
int footerLength = readIntLittleEndian(f);
byte[] magic = new byte[MAGIC.length];
f.readFully(magic);
if (!Arrays.equals(MAGIC, magic)) {
throw new RuntimeException(file.toString() + " is not a Parquet file. expected magic number at tail " + Arrays.toString(MAGIC) + " but found " + Arrays.toString(magic));
}
long footerIndex = footerLengthIndex - footerLength;
LOG.debug("read footer length: {}, footer index: {}", footerLength, footerIndex);
if (footerIndex < MAGIC.length || footerIndex >= footerLengthIndex) {
throw new RuntimeException("corrupted file: the footer index is not within the file: " + footerIndex);
}
f.seek(footerIndex);
// Read all the footer bytes in one time to avoid multiple read operations,
// since it can be pretty time consuming for a single read operation in HDFS.
ByteBuffer footerBytesBuffer = ByteBuffer.allocate(footerLength);
f.readFully(footerBytesBuffer);
LOG.debug("Finished to read all footer bytes.");
footerBytesBuffer.flip();
InputStream footerBytesStream = ByteBufferInputStream.wrap(footerBytesBuffer);
return converter.readParquetMetadata(footerBytesStream, options.getMetadataFilter());
}
示例11
void add(ChunkDescriptor descriptor, List<ByteBuffer> buffers, SeekableInputStream f) {
ChunkData data = map.get(descriptor);
if (data == null) {
data = new ChunkData();
map.put(descriptor, data);
}
data.buffers.addAll(buffers);
lastDescriptor = descriptor;
this.f = f;
}
示例12
/**
* @param f file to read the chunks from
* @param builder used to build chunk list to read the pages for the different columns
* @throws IOException if there is an error while reading from the stream
*/
public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOException {
List<Chunk> result = new ArrayList<Chunk>(chunks.size());
f.seek(offset);
int fullAllocations = length / options.getMaxAllocationSize();
int lastAllocationSize = length % options.getMaxAllocationSize();
int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0);
List<ByteBuffer> buffers = new ArrayList<>(numAllocations);
for (int i = 0; i < fullAllocations; i += 1) {
buffers.add(options.getAllocator().allocate(options.getMaxAllocationSize()));
}
if (lastAllocationSize > 0) {
buffers.add(options.getAllocator().allocate(lastAllocationSize));
}
for (ByteBuffer buffer : buffers) {
f.readFully(buffer);
buffer.flip();
}
// report in a counter the data we just scanned
BenchmarkCounter.incrementBytesRead(length);
ByteBufferInputStream stream = ByteBufferInputStream.wrap(buffers);
for (int i = 0; i < chunks.size(); i++) {
ChunkDescriptor descriptor = chunks.get(i);
builder.add(descriptor, stream.sliceBuffers(descriptor.size), f);
}
}
示例13
public void appendRowGroups(SeekableInputStream file,
List<BlockMetaData> rowGroups,
boolean dropColumns) throws IOException {
for (BlockMetaData block : rowGroups) {
appendRowGroup(file, block, dropColumns);
}
}
示例14
@SuppressWarnings("unchecked")
private static Class<SeekableInputStream> getH2SeekableClass() {
try {
return (Class<SeekableInputStream>) Class.forName(
"org.apache.parquet.hadoop.util.H2SeekableInputStream");
} catch (ClassNotFoundException | NoClassDefFoundError e) {
return null;
}
}
示例15
private static Constructor<SeekableInputStream> getH2SeekableConstructor() {
Class<SeekableInputStream> h2SeekableClass = getH2SeekableClass();
if (h2SeekableClass != null) {
try {
return h2SeekableClass.getConstructor(FSDataInputStream.class);
} catch (NoSuchMethodException e) {
return null;
}
}
return null;
}
示例16
private ParquetInputStreamAdapter(org.apache.iceberg.io.SeekableInputStream delegate) {
super(delegate);
this.delegate = delegate;
}
示例17
@Override
public SeekableInputStream newStream() throws IOException {
return stream(file.newStream());
}
示例18
private ParquetInputStreamAdapter(com.netflix.iceberg.io.SeekableInputStream delegate) {
super(delegate);
this.delegate = delegate;
}
示例19
@Override
public SeekableInputStream newStream() throws IOException {
return stream(file.newStream());
}
示例20
public static void readFromStream(SeekableInputStream input, final ArrowBuf outputBuffer, final int bytesToRead) throws IOException{
final ByteBuffer directBuffer = outputBuffer.nioBuffer(0, bytesToRead);
input.readFully(directBuffer);
outputBuffer.writerIndex(bytesToRead);
}
示例21
public static BulkInputStream wrap(SeekableInputStream is) {
return new SeekableBulkInputStream(is);
}
示例22
public SeekableBulkInputStream(SeekableInputStream is) {
super();
this.is = is;
}
示例23
SeekableInputStream getSingleStream() {
return singleInputStream;
}
示例24
public ColumnDataReader(SeekableInputStream input, long start, long length) throws IOException{
this.input = input;
this.input.seek(start);
this.endPosition = start + length;
}
示例25
public SeekableInputStream getInputStream() {
return input;
}
示例26
private void testSeekableStream(SeekableInputStream inputStream) throws IOException {
int streamPos = 0;
assertEquals(streamPos, inputStream.getPos());
// Read some bytes from the start
final byte[] buf = new byte[1000];
inputStream.readFully(buf, 0, 88);
compareData(buf, 0, streamPos, 88);
streamPos += 88;
assertEquals(streamPos, inputStream.getPos());
final byte[] shortBuf = new byte[17];
inputStream.readFully(shortBuf);
compareData(shortBuf, 0, streamPos, 17);
streamPos += 17;
assertEquals(streamPos, inputStream.getPos());
// test ByteBuffer interfaces
final ByteBuffer shortByteBuf = ByteBuffer.allocate(25);
inputStream.read(shortByteBuf);
compareData(shortByteBuf.array(), 0, streamPos, 25);
streamPos += 25;
assertEquals(streamPos, inputStream.getPos());
final ByteBuffer shortByteBuf2 = ByteBuffer.allocateDirect(71);
inputStream.read(shortByteBuf2);
final ByteBuf compareBuf = Unpooled.directBuffer(100);
shortByteBuf2.flip();
compareBuf.writeBytes(shortByteBuf2);
compareData(compareBuf, streamPos, 71);
streamPos += 71;
assertEquals(streamPos, inputStream.getPos());
final ByteBuffer shortByteBuf3 = ByteBuffer.allocate(66);
inputStream.readFully(shortByteBuf3);
compareData(shortByteBuf3.array(), 0, streamPos, 66);
streamPos += 66;
assertEquals(streamPos, inputStream.getPos());
// Test plain old read interface
buf[0] = (byte) inputStream.read();
buf[1] = (byte) inputStream.read();
buf[2] = (byte) inputStream.read();
compareData(buf, 0, streamPos, 3);
streamPos += 3;
assertEquals(streamPos, inputStream.getPos());
// Skip some, then read
streamPos += 50; // skip 50 bytes
inputStream.seek(streamPos);
inputStream.readFully(buf, 0, 37);
compareData(buf, 0, streamPos, 37);
streamPos += 37;
assertEquals(streamPos, inputStream.getPos());
// skip to near the end, then read
streamPos = TEST_DATA_SIZE - 100;
inputStream.seek(streamPos);
inputStream.readFully(buf, 0, 100);
compareData(buf, 0, streamPos,100);
streamPos += 100;
assertEquals(streamPos, inputStream.getPos());
}
示例27
private static final ParquetMetadata readFooter(InputFile file, ParquetReadOptions options, SeekableInputStream f) throws IOException {
ParquetMetadataConverter converter = new ParquetMetadataConverter(options);
return readFooter(file, options, f, converter);
}
示例28
/**
* @param descriptor the descriptor of the chunk
* @param f the file stream positioned at the end of this chunk
*/
private WorkaroundChunk(ChunkDescriptor descriptor, List<ByteBuffer> buffers, SeekableInputStream f, OffsetIndex offsetIndex) {
super(descriptor, buffers, offsetIndex);
this.f = f;
}
示例29
@Override
public SeekableInputStream newStream() throws IOException {
return HadoopStreams.wrap(fs.open(stat.getPath()));
}
示例30
/**
* Test whether corruption in the page content is detected by checksum verification
*/
@Test
public void testCorruptedPage() throws IOException {
Configuration conf = new Configuration();
conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true);
Path path = writeSimpleParquetFile(conf, CompressionCodecName.UNCOMPRESSED);
InputFile inputFile = HadoopInputFile.fromPath(path, conf);
try (SeekableInputStream inputStream = inputFile.newStream()) {
int fileLen = (int) inputFile.getLength();
byte[] fileBytes = new byte[fileLen];
inputStream.readFully(fileBytes);
inputStream.close();
// There are 4 pages in total (2 per column), we corrupt the first page of the first column
// and the second page of the second column. We do this by altering a byte roughly in the
// middle of each page to be corrupted
fileBytes[fileLen / 8]++;
fileBytes[fileLen / 8 + ((fileLen / 4) * 3)]++;
OutputFile outputFile = HadoopOutputFile.fromPath(path, conf);
try (PositionOutputStream outputStream = outputFile.createOrOverwrite(1024 * 1024)) {
outputStream.write(fileBytes);
outputStream.close();
// First we disable checksum verification, the corruption will go undetected as it is in the
// data section of the page
conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, false);
try (ParquetFileReader reader = getParquetFileReader(path, conf,
Arrays.asList(colADesc, colBDesc))) {
PageReadStore pageReadStore = reader.readNextRowGroup();
DataPageV1 colAPage1 = readNextPage(colADesc, pageReadStore);
assertFalse("Data in page was not corrupted",
Arrays.equals(colAPage1.getBytes().toByteArray(), colAPage1Bytes));
readNextPage(colADesc, pageReadStore);
readNextPage(colBDesc, pageReadStore);
DataPageV1 colBPage2 = readNextPage(colBDesc, pageReadStore);
assertFalse("Data in page was not corrupted",
Arrays.equals(colBPage2.getBytes().toByteArray(), colBPage2Bytes));
}
// Now we enable checksum verification, the corruption should be detected
conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, true);
try (ParquetFileReader reader =
getParquetFileReader(path, conf, Arrays.asList(colADesc, colBDesc))) {
// We expect an exception on the first encountered corrupt page (in readAllPages)
assertVerificationFailed(reader);
}
}
}
}