Java源码示例:org.apache.parquet.io.SeekableInputStream

示例1
public synchronized int read(DrillBuf buf, int off, int len) throws IOException {
  buf.clear();
  ByteBuffer directBuffer = buf.nioBuffer(0, len);
  int lengthLeftToRead = len;
  SeekableInputStream seekableInputStream = HadoopStreams.wrap(getInputStream());
  while (lengthLeftToRead > 0) {
    if(logger.isTraceEnabled()) {
      logger.trace("PERF: Disk read start. {}, StartOffset: {}, TotalByteSize: {}", this.streamId, this.startOffset, this.totalByteSize);
    }
    Stopwatch timer = Stopwatch.createStarted();
    int bytesRead = seekableInputStream.read(directBuffer);
    if (bytesRead < 0) {
      return bytesRead;
    }
    lengthLeftToRead -= bytesRead;
    if(logger.isTraceEnabled()) {
      logger.trace(
          "PERF: Disk read complete. {}, StartOffset: {}, TotalByteSize: {}, BytesRead: {}, Time: {} ms",
          this.streamId, this.startOffset, this.totalByteSize, bytesRead,
          ((double) timer.elapsed(TimeUnit.MICROSECONDS)) / 1000);
    }
  }
  buf.writerIndex(len);
  return len;
}
 
示例2
private void loadDictionaryIfExists(final ColumnReader<?> parentStatus,
    final ColumnChunkMetaData columnChunkMetaData, final SeekableInputStream f) throws IOException {
  Stopwatch timer = Stopwatch.createUnstarted();
  if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
    f.seek(columnChunkMetaData.getDictionaryPageOffset());
    long start=f.getPos();
    timer.start();
    final PageHeader pageHeader = Util.readPageHeader(f);
    long timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
    long pageHeaderBytes=f.getPos()-start;
    this.updateStats(pageHeader, "Page Header", start, timeToRead, pageHeaderBytes, pageHeaderBytes);
    assert pageHeader.type == PageType.DICTIONARY_PAGE;
    assert isDictionaryEncoded(columnChunkMetaData.getEncodings()) :
      format("Missing dictionary encoding for dictionary page %s, in column chunk %s", pageHeader, columnChunkMetaData);
    readDictionaryPage(pageHeader, parentStatus);
  }
}
 
示例3
private DictionaryPage readCompressedDictionary(
    PageHeader pageHeader, SeekableInputStream fin) throws IOException {
  DictionaryPageHeader dictHeader = pageHeader.getDictionary_page_header();

  int uncompressedPageSize = pageHeader.getUncompressed_page_size();
  int compressedPageSize = pageHeader.getCompressed_page_size();

  byte [] dictPageBytes = new byte[compressedPageSize];
  fin.readFully(dictPageBytes);

  BytesInput bin = BytesInput.from(dictPageBytes);

  return new DictionaryPage(
      bin, uncompressedPageSize, dictHeader.getNum_values(),
      converter.getEncoding(dictHeader.getEncoding()));
}
 
示例4
/**
 * Copy from a FS input stream to an output stream. Thread-safe
 *
 * @param from a {@link SeekableInputStream}
 * @param to any {@link PositionOutputStream}
 * @param start where in the from stream to start copying
 * @param length the number of bytes to copy
 * @throws IOException if there is an error while reading or writing
 */
private static void copy(SeekableInputStream from, PositionOutputStream to,
                         long start, long length) throws IOException{
  LOG.debug("Copying {} bytes at {} to {}" ,length , start , to.getPos());
  from.seek(start);
  long bytesCopied = 0;
  byte[] buffer = COPY_BUFFER.get();
  while (bytesCopied < length) {
    long bytesLeft = length - bytesCopied;
    int bytesRead = from.read(buffer, 0,
        (buffer.length < bytesLeft ? buffer.length : (int) bytesLeft));
    if (bytesRead < 0) {
      throw new IllegalArgumentException(
          "Unexpected end of input file at " + start + bytesCopied);
    }
    to.write(buffer, 0, bytesRead);
    bytesCopied += bytesRead;
  }
}
 
示例5
DeprecatedSingleStreamPageReader(ColumnReader<?> parentStatus, SeekableInputStream inputStream, Path path, ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException {
  super(parentStatus, inputStream, path, columnChunkMetaData);
  try {
    lastPosition = inputStream.getPos();
  } catch (IOException e) {
    throw new ExecutionSetupException("Error in getting current position for parquet file at location: " + path, e);
  }
  this.inputStream = inputStream;
}
 
示例6
PageReader(ColumnReader<?> parentStatus, SeekableInputStream inputStream, Path path, ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException {
  this.parentColumnReader = parentStatus;
  allocatedDictionaryBuffers = new ArrayList<>();
  codecFactory = parentColumnReader.parentReader.getCodecFactory();
  this.stats = parentColumnReader.parentReader.parquetReaderStats;
  long start = columnChunkMetaData.getFirstDataPageOffset();
  this.inputStream = inputStream;
  try {
    this.dataReader = new ColumnDataReader(inputStream, start, columnChunkMetaData.getTotalSize());
    loadDictionaryIfExists(parentStatus, columnChunkMetaData, inputStream);
  } catch (IOException e) {
    throw new ExecutionSetupException("Error opening or reading metadata for parquet file at location: "
      + path.getName(), e);
  }
}
 
示例7
private static SeekableInputStream openFile(FileSystem fs, Path path) throws ExecutionSetupException {
  try {
    return Streams.wrap(fs.open(path));
  } catch (IOException e) {
    throw new ExecutionSetupException("Error opening or reading metadata for parquet file at location: "
      + path.getName(), e);
  }
}
 
示例8
@Override
public SeekableInputStream newStream() {
  return new DelegatingSeekableInputStream(Channels.newInputStream(seekableByteChannel)) {

    @Override
    public long getPos() throws IOException {
      return seekableByteChannel.position();
    }

    @Override
    public void seek(long newPos) throws IOException {
      seekableByteChannel.position(newPos);
    }
  };
}
 
示例9
/**
 * Reads the meta data block in the footer of the file using provided input stream
 * @param file a {@link InputFile} to read
 * @param filter the filter to apply to row groups
 * @return the metadata blocks in the footer
 * @throws IOException if an error occurs while reading the file
 * @deprecated will be removed in 2.0.0;
 *             use {@link ParquetFileReader#open(InputFile, ParquetReadOptions)}
 */
@Deprecated
public static final ParquetMetadata readFooter(InputFile file, MetadataFilter filter) throws IOException {
  ParquetReadOptions options;
  if (file instanceof HadoopInputFile) {
    options = HadoopReadOptions.builder(((HadoopInputFile) file).getConfiguration())
        .withMetadataFilter(filter).build();
  } else {
    options = ParquetReadOptions.builder().withMetadataFilter(filter).build();
  }

  try (SeekableInputStream in = file.newStream()) {
    return readFooter(file, options, in);
  }
}
 
示例10
private static final ParquetMetadata readFooter(InputFile file, ParquetReadOptions options, SeekableInputStream f, ParquetMetadataConverter converter) throws IOException {
  long fileLen = file.getLength();
  LOG.debug("File length {}", fileLen);
  int FOOTER_LENGTH_SIZE = 4;
  if (fileLen < MAGIC.length + FOOTER_LENGTH_SIZE + MAGIC.length) { // MAGIC + data + footer + footerIndex + MAGIC
    throw new RuntimeException(file.toString() + " is not a Parquet file (too small length: " + fileLen + ")");
  }
  long footerLengthIndex = fileLen - FOOTER_LENGTH_SIZE - MAGIC.length;
  LOG.debug("reading footer index at {}", footerLengthIndex);

  f.seek(footerLengthIndex);
  int footerLength = readIntLittleEndian(f);
  byte[] magic = new byte[MAGIC.length];
  f.readFully(magic);
  if (!Arrays.equals(MAGIC, magic)) {
    throw new RuntimeException(file.toString() + " is not a Parquet file. expected magic number at tail " + Arrays.toString(MAGIC) + " but found " + Arrays.toString(magic));
  }
  long footerIndex = footerLengthIndex - footerLength;
  LOG.debug("read footer length: {}, footer index: {}", footerLength, footerIndex);
  if (footerIndex < MAGIC.length || footerIndex >= footerLengthIndex) {
    throw new RuntimeException("corrupted file: the footer index is not within the file: " + footerIndex);
  }
  f.seek(footerIndex);
  // Read all the footer bytes in one time to avoid multiple read operations,
  // since it can be pretty time consuming for a single read operation in HDFS.
  ByteBuffer footerBytesBuffer = ByteBuffer.allocate(footerLength);
  f.readFully(footerBytesBuffer);
  LOG.debug("Finished to read all footer bytes.");
  footerBytesBuffer.flip();
  InputStream footerBytesStream = ByteBufferInputStream.wrap(footerBytesBuffer);
  return converter.readParquetMetadata(footerBytesStream, options.getMetadataFilter());
}
 
示例11
void add(ChunkDescriptor descriptor, List<ByteBuffer> buffers, SeekableInputStream f) {
  ChunkData data = map.get(descriptor);
  if (data == null) {
    data = new ChunkData();
    map.put(descriptor, data);
  }
  data.buffers.addAll(buffers);

  lastDescriptor = descriptor;
  this.f = f;
}
 
示例12
/**
 * @param f file to read the chunks from
 * @param builder used to build chunk list to read the pages for the different columns
 * @throws IOException if there is an error while reading from the stream
 */
public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOException {
  List<Chunk> result = new ArrayList<Chunk>(chunks.size());
  f.seek(offset);

  int fullAllocations = length / options.getMaxAllocationSize();
  int lastAllocationSize = length % options.getMaxAllocationSize();

  int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0);
  List<ByteBuffer> buffers = new ArrayList<>(numAllocations);

  for (int i = 0; i < fullAllocations; i += 1) {
    buffers.add(options.getAllocator().allocate(options.getMaxAllocationSize()));
  }

  if (lastAllocationSize > 0) {
    buffers.add(options.getAllocator().allocate(lastAllocationSize));
  }

  for (ByteBuffer buffer : buffers) {
    f.readFully(buffer);
    buffer.flip();
  }

  // report in a counter the data we just scanned
  BenchmarkCounter.incrementBytesRead(length);
  ByteBufferInputStream stream = ByteBufferInputStream.wrap(buffers);
  for (int i = 0; i < chunks.size(); i++) {
    ChunkDescriptor descriptor = chunks.get(i);
    builder.add(descriptor, stream.sliceBuffers(descriptor.size), f);
  }
}
 
示例13
public void appendRowGroups(SeekableInputStream file,
                            List<BlockMetaData> rowGroups,
                            boolean dropColumns) throws IOException {
  for (BlockMetaData block : rowGroups) {
    appendRowGroup(file, block, dropColumns);
  }
}
 
示例14
@SuppressWarnings("unchecked")
private static Class<SeekableInputStream> getH2SeekableClass() {
  try {
    return (Class<SeekableInputStream>) Class.forName(
        "org.apache.parquet.hadoop.util.H2SeekableInputStream");
  } catch (ClassNotFoundException | NoClassDefFoundError e) {
    return null;
  }
}
 
示例15
private static Constructor<SeekableInputStream> getH2SeekableConstructor() {
  Class<SeekableInputStream> h2SeekableClass = getH2SeekableClass();
  if (h2SeekableClass != null) {
    try {
      return h2SeekableClass.getConstructor(FSDataInputStream.class);
    } catch (NoSuchMethodException e) {
      return null;
    }
  }
  return null;
}
 
示例16
private ParquetInputStreamAdapter(org.apache.iceberg.io.SeekableInputStream delegate) {
  super(delegate);
  this.delegate = delegate;
}
 
示例17
@Override
public SeekableInputStream newStream() throws IOException {
  return stream(file.newStream());
}
 
示例18
private ParquetInputStreamAdapter(com.netflix.iceberg.io.SeekableInputStream delegate) {
  super(delegate);
  this.delegate = delegate;
}
 
示例19
@Override
public SeekableInputStream newStream() throws IOException {
  return stream(file.newStream());
}
 
示例20
public static void readFromStream(SeekableInputStream input, final ArrowBuf outputBuffer, final int bytesToRead) throws IOException{
  final ByteBuffer directBuffer = outputBuffer.nioBuffer(0, bytesToRead);
  input.readFully(directBuffer);
  outputBuffer.writerIndex(bytesToRead);
}
 
示例21
public static BulkInputStream wrap(SeekableInputStream is) {
  return new SeekableBulkInputStream(is);
}
 
示例22
public SeekableBulkInputStream(SeekableInputStream is) {
  super();
  this.is = is;
}
 
示例23
SeekableInputStream getSingleStream() {
  return singleInputStream;
}
 
示例24
public ColumnDataReader(SeekableInputStream input, long start, long length) throws IOException{
  this.input = input;
  this.input.seek(start);
  this.endPosition = start + length;
}
 
示例25
public SeekableInputStream getInputStream() {
  return input;
}
 
示例26
private void testSeekableStream(SeekableInputStream inputStream) throws IOException {
  int streamPos = 0;
  assertEquals(streamPos, inputStream.getPos());

  // Read some bytes from the start
  final byte[] buf = new byte[1000];
  inputStream.readFully(buf, 0, 88);
  compareData(buf, 0, streamPos, 88);
  streamPos += 88;
  assertEquals(streamPos, inputStream.getPos());

  final byte[] shortBuf = new byte[17];
  inputStream.readFully(shortBuf);
  compareData(shortBuf, 0, streamPos, 17);
  streamPos += 17;
  assertEquals(streamPos, inputStream.getPos());

  // test ByteBuffer interfaces
  final ByteBuffer shortByteBuf = ByteBuffer.allocate(25);
  inputStream.read(shortByteBuf);
  compareData(shortByteBuf.array(), 0, streamPos, 25);
  streamPos += 25;
  assertEquals(streamPos, inputStream.getPos());

  final ByteBuffer shortByteBuf2 = ByteBuffer.allocateDirect(71);
  inputStream.read(shortByteBuf2);
  final ByteBuf compareBuf = Unpooled.directBuffer(100);
  shortByteBuf2.flip();
  compareBuf.writeBytes(shortByteBuf2);
  compareData(compareBuf, streamPos, 71);
  streamPos += 71;
  assertEquals(streamPos, inputStream.getPos());

  final ByteBuffer shortByteBuf3 = ByteBuffer.allocate(66);
  inputStream.readFully(shortByteBuf3);
  compareData(shortByteBuf3.array(), 0, streamPos, 66);
  streamPos += 66;
  assertEquals(streamPos, inputStream.getPos());

  // Test plain old read interface
  buf[0] = (byte) inputStream.read();
  buf[1] = (byte) inputStream.read();
  buf[2] = (byte) inputStream.read();
  compareData(buf, 0, streamPos, 3);
  streamPos += 3;
  assertEquals(streamPos, inputStream.getPos());

  // Skip some, then read
  streamPos += 50;  // skip 50 bytes
  inputStream.seek(streamPos);
  inputStream.readFully(buf, 0, 37);
  compareData(buf, 0, streamPos, 37);
  streamPos += 37;
  assertEquals(streamPos, inputStream.getPos());

  // skip to near the end, then read
  streamPos = TEST_DATA_SIZE - 100;
  inputStream.seek(streamPos);
  inputStream.readFully(buf, 0, 100);
  compareData(buf, 0, streamPos,100);
  streamPos += 100;
  assertEquals(streamPos, inputStream.getPos());
}
 
示例27
private static final ParquetMetadata readFooter(InputFile file, ParquetReadOptions options, SeekableInputStream f) throws IOException {
  ParquetMetadataConverter converter = new ParquetMetadataConverter(options);
  return readFooter(file, options, f, converter);
}
 
示例28
/**
 * @param descriptor the descriptor of the chunk
 * @param f the file stream positioned at the end of this chunk
 */
private WorkaroundChunk(ChunkDescriptor descriptor, List<ByteBuffer> buffers, SeekableInputStream f, OffsetIndex offsetIndex) {
  super(descriptor, buffers, offsetIndex);
  this.f = f;
}
 
示例29
@Override
public SeekableInputStream newStream() throws IOException {
  return HadoopStreams.wrap(fs.open(stat.getPath()));
}
 
示例30
/**
 * Test whether corruption in the page content is detected by checksum verification
 */
@Test
public void testCorruptedPage() throws IOException {
  Configuration conf = new Configuration();
  conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true);

  Path path = writeSimpleParquetFile(conf, CompressionCodecName.UNCOMPRESSED);

  InputFile inputFile = HadoopInputFile.fromPath(path, conf);
  try (SeekableInputStream inputStream = inputFile.newStream()) {
    int fileLen = (int) inputFile.getLength();
    byte[] fileBytes = new byte[fileLen];
    inputStream.readFully(fileBytes);
    inputStream.close();

    // There are 4 pages in total (2 per column), we corrupt the first page of the first column
    // and the second page of the second column. We do this by altering a byte roughly in the
    // middle of each page to be corrupted
    fileBytes[fileLen / 8]++;
    fileBytes[fileLen / 8 + ((fileLen / 4) * 3)]++;

    OutputFile outputFile = HadoopOutputFile.fromPath(path, conf);
    try (PositionOutputStream outputStream = outputFile.createOrOverwrite(1024 * 1024)) {
      outputStream.write(fileBytes);
      outputStream.close();

      // First we disable checksum verification, the corruption will go undetected as it is in the
      // data section of the page
      conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, false);
      try (ParquetFileReader reader = getParquetFileReader(path, conf,
        Arrays.asList(colADesc, colBDesc))) {
        PageReadStore pageReadStore = reader.readNextRowGroup();

        DataPageV1 colAPage1 = readNextPage(colADesc, pageReadStore);
        assertFalse("Data in page was not corrupted",
          Arrays.equals(colAPage1.getBytes().toByteArray(), colAPage1Bytes));
        readNextPage(colADesc, pageReadStore);
        readNextPage(colBDesc, pageReadStore);
        DataPageV1 colBPage2 = readNextPage(colBDesc, pageReadStore);
        assertFalse("Data in page was not corrupted",
          Arrays.equals(colBPage2.getBytes().toByteArray(), colBPage2Bytes));
      }

      // Now we enable checksum verification, the corruption should be detected
      conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, true);
      try (ParquetFileReader reader =
             getParquetFileReader(path, conf, Arrays.asList(colADesc, colBDesc))) {
        // We expect an exception on the first encountered corrupt page (in readAllPages)
        assertVerificationFailed(reader);
      }
    }
  }
}