OpenDAL RangeReader 的奥妙
前情提要
我们(GreptimeDB (opens in a new tab))把 OpenDAL (opens in a new tab) 作为统一的数据访问层。前段时间同事告诉我,数据库执行 Copy From
语句从 S3 导入一个 800 KiB 的 Parquet 文件需要 10s;经过一些调查,研读了相关 Reader 的文档和具体实现 (暴露了之前没有 RTFSC 🥲),谨以本文做一个记录和简单的总结。
本文涉及的 OpenDAL 源码 Commit: 6980cd1 (opens in a new tab)
TL;DR
- OpenDAL
RangeReader
调用seek
方法后会重置内部状态,下一次调用read
调用会有一次远程调用请求(后端为 S3 的场景)。(相关 issue (opens in a new tab) 和讨论 (opens in a new tab)) std::io::BufReader
和tokio::io::BufReader
都会在seek
后清除内部Buffer
,如果希望继续读Buffer
内的内容,应该调用seek_relative
。- 从 S3 读取元信息在尾部文件格式,可以考虑加一层
Tailing Buffer
(即,任何落入用户定义的Tailing Bytes
范围内的seek
的操作,只在第一次加载并缓存Tailing Bytes
),一些场景可以减少一次 S3 get 请求(这个本质上是一种分开读尾部元信息的替换方案)。
先来聊聊该怎么读 OpenDAL 源码?
坦白说,我也是最近才理清楚 OpenDAL 的源码和其调用关系,之前都是一知半解。
从 Operator 开始
我们所有的 IO 操作都是围绕着 Operator
展开的,先来看下 Operator
是怎么构建的。
以 main.rs
为例,首先我们在 L7 创建了一个基于文件系统的 Backend Builder
;
在 L13 将其构建为 accessor
(实现了 Accessor
trait);
L14 我们将该 accessor
传入了 OperatorBuilder::new
,最后调用了 finish
。
OpenDAL 通过 Accessor
trait 统一了不同存储后端(Backend)的行为,并向上层暴露统一的 IO 接口,例如 create_dir
, read
, write
等。
在 OperatorBuilder::new 发生了什么
我们传入的 accessor
在调用 new
时,被追加了两层 Layer
,并在调用 finish
时,被追加了一层内部 Layer
。
追加 Layer
后,当我们调用 Operator
暴露出来的接口时,调用会从最外层 CompleteLayer
开始,并最终抵达最内层 FsAccessor
。
_10FsAccessor_10ErrorContextLayer_10CompleteLayer_10^_10|_10| Invoking (`read`, `reader_with`, `stat`...)
TL;DR 其实说了半天,想强调一下,代码应该从 CompleteLayer
开始读(大雾
从 Operator 开始
我们所有的 IO 操作都是围绕着 Operator
展开的,先来看下 Operator
是怎么构建的。
以 main.rs
为例,首先我们在 L7 创建了一个基于文件系统的 Backend Builder
;
在 L13 将其构建为 accessor
(实现了 Accessor
trait);
L14 我们将该 accessor
传入了 OperatorBuilder::new
,最后调用了 finish
。
OpenDAL 通过 Accessor
trait 统一了不同存储后端(Backend)的行为,并向上层暴露统一的 IO 接口,例如 create_dir
, read
, write
等。
在 OperatorBuilder::new 发生了什么
我们传入的 accessor
在调用 new
时,被追加了两层 Layer
,并在调用 finish
时,被追加了一层内部 Layer
。
追加 Layer
后,当我们调用 Operator
暴露出来的接口时,调用会从最外层 CompleteLayer
开始,并最终抵达最内层 FsAccessor
。
_10FsAccessor_10ErrorContextLayer_10CompleteLayer_10^_10|_10| Invoking (`read`, `reader_with`, `stat`...)
TL;DR 其实说了半天,想强调一下,代码应该从 CompleteLayer
开始读(大雾
我们的上下文
这里我们补充一些必要的上下文信息,以便理解后文内容。
LruCacheLayer
目前,在查询场景,我们追加了一层 LruCacheLayer
,那么我们 Operator
就如下图所示:
_17S3Accessor FsAccessor_17ErrorContextLayer ErrorContextLayer_17CompleteLayer CompleteLayer_17 ▲ ▲ │_17 │ │ │_17 │`inner` `cache`│ │_17 │ │ │_17 │ │ │_17 │ │ │_17 └───── LruCacheLayer ─────┘ │_17 ▲ │ _17 │ │_17 │ │ _17 │ ▼_17 │ FileReader::new(oio::TokioReader<tokio::fs::File>) _17 │ _17 Invoking(`reader`, `reader_with`)
以 read
接口为例,LruCacheLayer
会将 S3 的文件缓存到文件系统中,
并向上层返回缓存的基于文件系统的 Box<dyn oio::Read>
(FileReader::new(oio::TokioReader<tokio::fs::File>)
);
当然如果读取的文件不存在于缓存时,则先全量从 S3 加载文件至本地的文件系统中。
当 RangeReader 的 read
被调用时,会建立取回文件的 TCP 连接,以字节流的形式返回给上层,
上层应用可能只需要读取若干字节就关闭了字节流(并关闭了 TCP 连接)。
然而当我们调用 let reader = op.reader_with()
时,我们的缓存层会全量加载对应文件并缓存整个文件。
这里未来可以做一个懒加载的优化(即,在 reader.read()
被调用时才去请求对应字节或加载对应缓存)。
_18struct LruCacheLayer {_18 inner: Operator, // S3Backend_18 cache: Operator, // FsBackend_18 index: CacheIndex_18}_18_18impl LayeredAccessor for LruCacheLayer {_18 ..._18 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {_18 if self.index.hit(path, args) {_18 // Returns `Box<dyn oio::Read>`_18 self.cache.read(path, args).await _18 } else {_18 // Fetches cache and stores..._18 }_18 }_18 ..._18}
Copy From 的场景
在 Copy From
场景,我并没有加这一层 LruCacheLayer
。那么我们 Operator
就如下图所示:
_10S3Accessor_10ErrorContextLayer_10CompleteLayer_10 ▲ │_10 │ │_10 │ │_10 │ ▼ _10 │ RangeReader::new(IncomingAsyncBody)_10 │_10 Invoking (`reader`, `reader_with`)
关于为什么没有加 Cache
原因如下:
- 错误理解 OpenDAL
Reader
工作方式,当时以为Reader
在其生命周期内,在其全量读取 S3 文件后,调用其seek
,read
不需要额外的 S3 get 请求。 Copy From
的文件通常只会被读取一次,当时认为加一层Cache
并没有不大。
总之就是我当时写的不行🥹
在使用 RangeReader 时遇到的问题
从构建 ParquetRecordBatchStream 说起
在 Copy From
中,我们拿到文件信息后,首先会调用 operator.reader
返回一个实现 AsyncReader
+ AsyncSeek
的 reader
,再套一层 BufReader
;
最终将该 reader
传入至 ParquetRecordBatchStreamBuilder
中。
这里面 BufReader
也是多此一举,我们在后面会提到。(算了,不想写了;自己看代码吧😇)
ParquetRecordBatchStream::new 读取元信息
读取元信息逻辑如下,首先调用 seek(SeekFrom::End(-FOOTER_SIZE_I64))
,读取 FOOTER_SIZE
字节后解析出 metadata_len
;
随后再一次调用 seek
,并读取 metadata_len
字节后解析出元信息。
真正的问题
到上面为止,都是一些小问题。真正比较棘手的问题发生在这里,这里变量 stream
就是我们上面构建的 ParquetRecordBatchStream
,当我们调用 next
时,ParquetRecordBatchStream
会调用多次 reader
(RangeReader
)的 seek
和 read
。
然而每次调用 seek
都会重置 RangeReader
的内部状态(丢弃掉之前的字节流),并在下次调用 read
时,重新发起一个远程请求(后端为 S3 的场景)。(相关 issue (opens in a new tab) 和讨论 (opens in a new tab))
ParquetRecordBatchStream
在取回每列数据时:会先调用 RangeReader seek
,随后调用 read
读取一些字节。那么总共需要发起的远程调用次数为 RowGroup 数
乘上 RowGroup 内列的数
。
我们 800KiB 包含了 50 个 RowGroup 和 12 列,也就是发起了 600 次 S3 get 请求!
从构建 ParquetRecordBatchStream 说起
在 Copy From
中,我们拿到文件信息后,首先会调用 operator.reader
返回一个实现 AsyncReader
+ AsyncSeek
的 reader
,再套一层 BufReader
;
最终将该 reader
传入至 ParquetRecordBatchStreamBuilder
中。
这里面 BufReader
也是多此一举,我们在后面会提到。(算了,不想写了;自己看代码吧😇)
ParquetRecordBatchStream::new 读取元信息
读取元信息逻辑如下,首先调用 seek(SeekFrom::End(-FOOTER_SIZE_I64))
,读取 FOOTER_SIZE
字节后解析出 metadata_len
;
随后再一次调用 seek
,并读取 metadata_len
字节后解析出元信息。
真正的问题
到上面为止,都是一些小问题。真正比较棘手的问题发生在这里,这里变量 stream
就是我们上面构建的 ParquetRecordBatchStream
,当我们调用 next
时,ParquetRecordBatchStream
会调用多次 reader
(RangeReader
)的 seek
和 read
。
然而每次调用 seek
都会重置 RangeReader
的内部状态(丢弃掉之前的字节流),并在下次调用 read
时,重新发起一个远程请求(后端为 S3 的场景)。(相关 issue (opens in a new tab) 和讨论 (opens in a new tab))
ParquetRecordBatchStream
在取回每列数据时:会先调用 RangeReader seek
,随后调用 read
读取一些字节。那么总共需要发起的远程调用次数为 RowGroup 数
乘上 RowGroup 内列的数
。
我们 800KiB 包含了 50 个 RowGroup 和 12 列,也就是发起了 600 次 S3 get 请求!
读一读 RangeReader 的源码
RangeReader
其核心功能是将 non-seekable 的 Reader
变成一个 seekable 的 Reader
。在 seek(pos)
被调用后,下次 read
调用便会请求底层服务重新返回一个包含 [pos,size)
的 Reader
。
看看 self.poll_read()
RangeReader
其 self.state
初始值为 State::Idle
,首先我们假设 self.offset
为 Some(0)
;
随后 self.state
被设置为 State::SendRead(BoxFuture<'static, Result<(RpRead, R)>>)
,
并再次调用 self.poll_read(cx, buf)
。
在 self.read_future()
发生了什么
显而易见,self.read_future()
返回了一个 BoxedFuture
;
在 BoxedFuture
中调用底层的 Accessor
的 read
接口(acc.read(&path, op).await
)。
Accessor
可以是 S3 的存储后端实现,也可以是 OSS 实现等;当它的 read
接口被调用时,会建立取回文件的 TCP 连接,并将响应以字节流的形式返回给上层。
再次调用 self.poll_read()
到此为止,poll_read
还没有返回;在上文中 self.poll_read()
被再次调用,此时 self.state
为 State::SendRead(BoxFuture<'static, Result<(RpRead, R)>>)
。
这里的 ready!(Pin::new(fut).poll(cx))
返回值就是上文中 acc.read(&path, op).await
调用的返回值。(对于 S3 存储后端,远程调用发生在这里)
最后内部状态 self.state
被设置为 State::Read(r)
,并再次调用 self.poll_read()
。
再次调用 self.poll_read()
后,RangeReader
内部状态被设置为 State::Reader(R)
。
这里的 R(r)
便是读取请求响应的字节流,对于 S3 存储后端,Pin::new(r).poll_read(cx, buf)
将 TCP 缓冲区的字节数据写入到上层应用中。
最后看下 self.poll_seek()
还记得刚才我们 RangeReader
内部状态吗?没错,是State::Reader(R)
。
如果我们在 read
之后在调用 seek
,RangeReader
内部的字节流会被丢弃,状态重新设置为 State::Idle
。
也就是说,在每次 seek
调用后再次调用 read
,RangeReader
便会请求底层 Accessor
的 read
接口(acc.read(&path, op).await
)
发起一个远程调用,返回一个包含 [Pos, size)
的 Reader
;然而对于 S3 存储后端,调用这个接口的开销是非常昂贵的(TTFB 通常高达百毫秒)。
另外还有一个性能相关的重点,当我们尝试 SeekFrom::End()
的时,且 self.size
未知时,会有一次额外的 stat
操作。
self.poll_seek()
调用后 self.cur
会被设置为 base.checked_add(amt)
。
看看 self.poll_read()
RangeReader
其 self.state
初始值为 State::Idle
,首先我们假设 self.offset
为 Some(0)
;
随后 self.state
被设置为 State::SendRead(BoxFuture<'static, Result<(RpRead, R)>>)
,
并再次调用 self.poll_read(cx, buf)
。
在 self.read_future()
发生了什么
显而易见,self.read_future()
返回了一个 BoxedFuture
;
在 BoxedFuture
中调用底层的 Accessor
的 read
接口(acc.read(&path, op).await
)。
Accessor
可以是 S3 的存储后端实现,也可以是 OSS 实现等;当它的 read
接口被调用时,会建立取回文件的 TCP 连接,并将响应以字节流的形式返回给上层。
再次调用 self.poll_read()
到此为止,poll_read
还没有返回;在上文中 self.poll_read()
被再次调用,此时 self.state
为 State::SendRead(BoxFuture<'static, Result<(RpRead, R)>>)
。
这里的 ready!(Pin::new(fut).poll(cx))
返回值就是上文中 acc.read(&path, op).await
调用的返回值。(对于 S3 存储后端,远程调用发生在这里)
最后内部状态 self.state
被设置为 State::Read(r)
,并再次调用 self.poll_read()
。
再次调用 self.poll_read()
后,RangeReader
内部状态被设置为 State::Reader(R)
。
这里的 R(r)
便是读取请求响应的字节流,对于 S3 存储后端,Pin::new(r).poll_read(cx, buf)
将 TCP 缓冲区的字节数据写入到上层应用中。
最后看下 self.poll_seek()
还记得刚才我们 RangeReader
内部状态吗?没错,是State::Reader(R)
。
如果我们在 read
之后在调用 seek
,RangeReader
内部的字节流会被丢弃,状态重新设置为 State::Idle
。
也就是说,在每次 seek
调用后再次调用 read
,RangeReader
便会请求底层 Accessor
的 read
接口(acc.read(&path, op).await
)
发起一个远程调用,返回一个包含 [Pos, size)
的 Reader
;然而对于 S3 存储后端,调用这个接口的开销是非常昂贵的(TTFB 通常高达百毫秒)。
另外还有一个性能相关的重点,当我们尝试 SeekFrom::End()
的时,且 self.size
未知时,会有一次额外的 stat
操作。
self.poll_seek()
调用后 self.cur
会被设置为 base.checked_add(amt)
。
后续
RFC (opens in a new tab) 会为 CompleteLayer Reader 引入一个 Buffer
,缓解频繁 IO (以及远程调用)带来的开销。
_10 let reader = op.reader_with("path").buffer(32 * 1024 * 1024).await;
最后,Reader 会支持并行加载数据,主要是针对 S3 这类存储后端的优化。
_10 let reader = op.reader_with("path").buffer(32 * 1024 * 1024).concurrent(4).await;