RecordReaderImpl.java
private void readPartialDataStreams(StripeInformation stripe) throws IOException { List<OrcProto.Stream> streamList = stripeFooter.getStreamsList(); //先获取各个 stream 的偏移量(在一个stripe 的偏移量) 等元数据 DiskRangeList toRead = planReadPartialDataStreams(streamList, indexes, fileIncluded, includedRowGroups, dataReader.getCompressionCodec() != null, stripeFooter.getColumnsList(), types, bufferSize, true); if (LOG.isDebugEnabled()) { LOG.debug("chunks = " + RecordReaderUtils.stringifyDiskRanges(toRead)); } // 根据元数据来获取 data bufferChunks = dataReader.readFileData(toRead, stripe.getOffset(), false); if (LOG.isDebugEnabled()) { LOG.debug("merge = " + RecordReaderUtils.stringifyDiskRanges(bufferChunks)); } createStreams(streamList, bufferChunks, fileIncluded, dataReader.getCompressionCodec(), bufferSize, streams); }RecordReaderImpl.java
static DiskRangeList planReadPartialDataStreams( List<OrcProto.Stream> streamList, OrcProto.RowIndex[] indexes, boolean[] includedColumns, boolean[] includedRowGroups, boolean isCompressed, List<OrcProto.ColumnEncoding> encodings, List<OrcProto.Type> types, int compressionSize, boolean doMergeBuffers) { long offset = 0; // figure out which columns have a present stream boolean[] hasNull = RecordReaderUtils.findPresentStreamsByColumn(streamList, types); CreateHelper list = new CreateHelper(); for (OrcProto.Stream stream : streamList) { long length = stream.getLength(); int column = stream.getColumn(); OrcProto.Stream.Kind streamKind = stream.getKind(); // since stream kind is optional, first check if it exists if (stream.hasKind() && (StreamName.getArea(streamKind) == StreamName.Area.DATA) && (column < includedColumns.length && includedColumns[column])) { // if we aren't filtering or it is a dictionary, load it. if (includedRowGroups == null || RecordReaderUtils.isDictionary(streamKind, encodings.get(column))) { // 获取需要的流 RecordReaderUtils.addEntireStreamToRanges(offset, length, list, doMergeBuffers); } else { RecordReaderUtils.addRgFilteredStreamToRanges(stream, includedRowGroups, isCompressed, indexes[column], encodings.get(column), types.get(column), compressionSize, hasNull[column], offset, length, list, doMergeBuffers); } } offset += length; } return list.extract(); }RecordReaderUtils.java
public static void addEntireStreamToRanges(long offset, long length, CreateHelper list, boolean doMergeBuffers) { list.addOrMerge(offset, offset + length, doMergeBuffers, false); }DiskRangeList.java
public void addOrMerge(long offset, long end, boolean doMerge, boolean doLogNew) { // 此处会合并相邻的列或流, 记录整体的偏移和位置 if (!doMerge || this.tail == null || !this.tail.merge(offset, end)) { if (doLogNew) { DiskRangeList.LOG.debug("Creating new range; last range (which can include some previous adds) was " + this.tail); } DiskRangeList node = new DiskRangeList(offset, end); if (this.tail == null) { this.head = this.tail = node; } else { this.tail = this.tail.insertAfter(node); } } }RecordReaderUtils.java
@Override public DiskRangeList readFileData( DiskRangeList range, long baseOffset, boolean doForceDirect) throws IOException { return RecordReaderUtils.readDiskRanges(file, zcr, baseOffset, range, doForceDirect); }里面的逻辑同 readAllDataStreams