OpenDAL RangeReader 的奥妙

Weny,RustOpenDAL

前情提要

我们(GreptimeDB (opens in a new tab))把 OpenDAL (opens in a new tab) 作为统一的数据访问层。前段时间同事告诉我,数据库执行 Copy From 语句从 S3 导入一个 800 KiB 的 Parquet 文件需要 10s;经过一些调查,研读了相关 Reader 的文档和具体实现 (暴露了之前没有 RTFSC 🥲),谨以本文做一个记录和简单的总结。

ℹ️

本文涉及的 OpenDAL 源码 Commit: 6980cd1 (opens in a new tab)

TL;DR

先来聊聊该怎么读 OpenDAL 源码?

坦白说,我也是最近才理清楚 OpenDAL 的源码和其调用关系,之前都是一知半解。

从 Operator 开始

我们所有的 IO 操作都是围绕着 Operator 展开的,先来看下 Operator 是怎么构建的。 以 main.rs 为例,首先我们在 L7 创建了一个基于文件系统的 Backend Builder在 L13 将其构建为 accessor(实现了 Accessor trait)L14 我们将该 accessor 传入了 OperatorBuilder::new,最后调用了 finish

ℹ️

OpenDAL 通过 Accessor trait 统一了不同存储后端(Backend)的行为,并向上层暴露统一的 IO 接口,例如 create_dir, read, write 等。

main.rs

_17
use opendal::services::Fs;
_17
use opendal::Operator;
_17
_17
#[tokio::main]
_17
async fn main() -> Result<()> {
_17
// Create fs backend builder.
_17
let mut builder = Fs::default();
_17
// Set the root for fs, all operations will happen under this root.
_17
//
_17
// NOTE: the root must be absolute path.
_17
builder.root("/tmp");
_17
_17
let accessor = builder.build()?;
_17
let op: Operator = OperatorBuilder::new(accessor)?.finish();
_17
_17
Ok(())
_17
}

在 OperatorBuilder::new 发生了什么

我们传入的 accessor 在调用 new 时,被追加了两层 Layer,并在调用 finish 时,被追加了一层内部 Layer

main.rs
src/types/operator/builder.rs

_18
impl<A: Accessor> OperatorBuilder<A> {
_18
/// Create a new operator builder.
_18
#[allow(clippy::new_ret_no_self)]
_18
pub fn new(accessor: A) -> OperatorBuilder<impl Accessor> {
_18
// Make sure error context layer has been attached.
_18
OperatorBuilder { accessor }
_18
.layer(ErrorContextLayer)
_18
.layer(CompleteLayer)
_18
}
_18
_18
...
_18
_18
/// Finish the building to construct an Operator.
_18
pub fn finish(self) -> Operator {
_18
let ob = self.layer(TypeEraseLayer);
_18
Operator::from_inner(Arc::new(ob.accessor) as FusedAccessor)
_18
}
_18
}

追加 Layer 后,当我们调用 Operator 暴露出来的接口时,调用会从最外层 CompleteLayer开始,并最终抵达最内层 FsAccessor


_10
FsAccessor
_10
ErrorContextLayer
_10
CompleteLayer
_10
^
_10
|
_10
| Invoking (`read`, `reader_with`, `stat`...)

🤣

TL;DR 其实说了半天,想强调一下,代码应该从 CompleteLayer 开始读(大雾

从 Operator 开始

我们所有的 IO 操作都是围绕着 Operator 展开的,先来看下 Operator 是怎么构建的。 以 main.rs 为例,首先我们在 L7 创建了一个基于文件系统的 Backend Builder在 L13 将其构建为 accessor(实现了 Accessor trait)L14 我们将该 accessor 传入了 OperatorBuilder::new,最后调用了 finish

ℹ️

OpenDAL 通过 Accessor trait 统一了不同存储后端(Backend)的行为,并向上层暴露统一的 IO 接口,例如 create_dir, read, write 等。

在 OperatorBuilder::new 发生了什么

我们传入的 accessor 在调用 new 时,被追加了两层 Layer,并在调用 finish 时,被追加了一层内部 Layer

追加 Layer 后,当我们调用 Operator 暴露出来的接口时,调用会从最外层 CompleteLayer开始,并最终抵达最内层 FsAccessor


_10
FsAccessor
_10
ErrorContextLayer
_10
CompleteLayer
_10
^
_10
|
_10
| Invoking (`read`, `reader_with`, `stat`...)

🤣

TL;DR 其实说了半天,想强调一下,代码应该从 CompleteLayer 开始读(大雾

main.rs

_17
use opendal::services::Fs;
_17
use opendal::Operator;
_17
_17
#[tokio::main]
_17
async fn main() -> Result<()> {
_17
// Create fs backend builder.
_17
let mut builder = Fs::default();
_17
// Set the root for fs, all operations will happen under this root.
_17
//
_17
// NOTE: the root must be absolute path.
_17
builder.root("/tmp");
_17
_17
let accessor = builder.build()?;
_17
let op: Operator = OperatorBuilder::new(accessor)?.finish();
_17
_17
Ok(())
_17
}

我们的上下文

这里我们补充一些必要的上下文信息,以便理解后文内容。

LruCacheLayer

目前,在查询场景,我们追加了一层 LruCacheLayer,那么我们 Operator 就如下图所示:


_17
S3Accessor FsAccessor
_17
ErrorContextLayer ErrorContextLayer
_17
CompleteLayer CompleteLayer
_17
▲ ▲ │
_17
│ │ │
_17
│`inner` `cache`│ │
_17
│ │ │
_17
│ │ │
_17
│ │ │
_17
└───── LruCacheLayer ─────┘ │
_17
▲ │
_17
│ │
_17
│ │
_17
│ ▼
_17
│ FileReader::new(oio::TokioReader<tokio::fs::File>)
_17
_17
Invoking(`reader`, `reader_with`)

read 接口为例,LruCacheLayer会将 S3 的文件缓存到文件系统中, 并向上层返回缓存的基于文件系统的 Box<dyn oio::Read>(FileReader::new(oio::TokioReader<tokio::fs::File>)); 当然如果读取的文件不存在于缓存时,则先全量从 S3 加载文件至本地的文件系统中

🥺

当 RangeReader 的 read 被调用时,会建立取回文件的 TCP 连接,以字节流的形式返回给上层, 上层应用可能只需要读取若干字节就关闭了字节流(并关闭了 TCP 连接)。 然而当我们调用 let reader = op.reader_with() 时,我们的缓存层会全量加载对应文件并缓存整个文件。 这里未来可以做一个懒加载的优化(即,在 reader.read() 被调用时才去请求对应字节或加载对应缓存)。


_18
struct LruCacheLayer {
_18
inner: Operator, // S3Backend
_18
cache: Operator, // FsBackend
_18
index: CacheIndex
_18
}
_18
_18
impl LayeredAccessor for LruCacheLayer {
_18
...
_18
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
_18
if self.index.hit(path, args) {
_18
// Returns `Box<dyn oio::Read>`
_18
self.cache.read(path, args).await
_18
} else {
_18
// Fetches cache and stores...
_18
}
_18
}
_18
...
_18
}

Copy From 的场景

Copy From 场景,我并没有加这一层 LruCacheLayer。那么我们 Operator 就如下图所示:


_10
S3Accessor
_10
ErrorContextLayer
_10
CompleteLayer
_10
▲ │
_10
│ │
_10
│ │
_10
│ ▼
_10
│ RangeReader::new(IncomingAsyncBody)
_10
_10
Invoking (`reader`, `reader_with`)

关于为什么没有加 Cache 原因如下:

  1. 错误理解 OpenDAL Reader 工作方式,当时以为 Reader 在其生命周期内,在其全量读取 S3 文件后,调用其 seek, read 不需要额外的 S3 get 请求。
  2. Copy From 的文件通常只会被读取一次,当时认为加一层 Cache 并没有不大。

总之就是我当时写的不行🥹

在使用 RangeReader 时遇到的问题

从构建 ParquetRecordBatchStream 说起

Copy From 中,我们拿到文件信息后,首先会调用 operator.reader 返回一个实现 AsyncReader + AsyncSeekreader再套一层 BufReader最终将该 reader 传入至 ParquetRecordBatchStreamBuilder

🥲

这里面 BufReader 也是多此一举,我们在后面会提到。(算了,不想写了;自己看代码吧😇)

operator/src/statement/copy_table_from.rs

_17
...
_17
let reader = operator
_17
.reader(path)
_17
.await
_17
.context(error::ReadObjectSnafu { path })?;
_17
_17
let buf_reader = BufReader::new(reader.compat());
_17
_17
let builder = ParquetRecordBatchStreamBuilder::new(buf_reader)
_17
.await
_17
.context(error::ReadParquetSnafu)?;
_17
_17
let upstream = builder
_17
.build()
_17
.context(error::BuildParquetRecordBatchStreamSnafu)?;
_17
_17
...

ParquetRecordBatchStream::new 读取元信息

读取元信息逻辑如下,首先调用 seek(SeekFrom::End(-FOOTER_SIZE_I64)) ,读取 FOOTER_SIZE 字节后解析出 metadata_len随后再一次调用 seek,并读取 metadata_len 字节后解析出元信息

operator/src/statement/copy_table_from.rs
parquet/arrow/async_reader/mod.rs

_37
impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
_37
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
_37
async move {
_37
self.seek(SeekFrom::Start(range.start as u64)).await?;
_37
_37
let to_read = range.end - range.start;
_37
let mut buffer = Vec::with_capacity(to_read);
_37
let read = self.take(to_read as u64).read_to_end(&mut buffer).await?;
_37
if read != to_read {
_37
return Err(eof_err!("expected to read {} bytes, got {}", to_read, read));
_37
}
_37
_37
Ok(buffer.into())
_37
}
_37
.boxed()
_37
}
_37
_37
fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
_37
const FOOTER_SIZE_I64: i64 = FOOTER_SIZE as i64;
_37
async move {
_37
self.seek(SeekFrom::End(-FOOTER_SIZE_I64)).await?;
_37
_37
let mut buf = [0_u8; FOOTER_SIZE];
_37
self.read_exact(&mut buf).await?;
_37
_37
let metadata_len = decode_footer(&buf)?;
_37
self.seek(SeekFrom::End(-FOOTER_SIZE_I64 - metadata_len as i64))
_37
.await?;
_37
_37
let mut buf = Vec::with_capacity(metadata_len);
_37
self.take(metadata_len as _).read_to_end(&mut buf).await?;
_37
_37
Ok(Arc::new(decode_metadata(&buf)?))
_37
}
_37
.boxed()
_37
}
_37
}

真正的问题

到上面为止,都是一些小问题。真正比较棘手的问题发生在这里,这里变量 stream 就是我们上面构建的 ParquetRecordBatchStream,当我们调用 next 时,ParquetRecordBatchStream 会调用多次 reader (RangeReader)的 seekread。 然而每次调用 seek 都会重置 RangeReader 的内部状态(丢弃掉之前的字节流),并在下次调用 read 时,重新发起一个远程请求(后端为 S3 的场景)。(相关 issue (opens in a new tab)讨论 (opens in a new tab)

🥵

ParquetRecordBatchStream 在取回每列数据时:会先调用 RangeReader seek,随后调用 read 读取一些字节。那么总共需要发起的远程调用次数为 RowGroup 数乘上 RowGroup 内列的数。 我们 800KiB 包含了 50 个 RowGroup 和 12 列,也就是发起了 600 次 S3 get 请求!

operator/src/statement/copy_table_from.rs
parquet/arrow/async_reader/mod.rs

_31
pub async fn copy_table_from(
_31
...
_31
while let Some(r) = stream.next().await {
_31
let record_batch = r.context(error::ReadDfRecordBatchSnafu)?;
_31
let vectors =
_31
Helper::try_into_vectors(record_batch.columns()).context(IntoVectorsSnafu)?;
_31
_31
pending_mem_size += vectors.iter().map(|v| v.memory_size()).sum::<usize>();
_31
_31
let columns_values = fields
_31
.iter()
_31
.cloned()
_31
.zip(vectors)
_31
.collect::<HashMap<_, _>>();
_31
_31
pending.push(self.inserter.handle_table_insert(
_31
InsertRequest {
_31
catalog_name: req.catalog_name.to_string(),
_31
schema_name: req.schema_name.to_string(),
_31
table_name: req.table_name.to_string(),
_31
columns_values,
_31
},
_31
query_ctx.clone(),
_31
));
_31
_31
if pending_mem_size as u64 >= pending_mem_threshold {
_31
rows_inserted += batch_insert(&mut pending, &mut pending_mem_size).await?;
_31
}
_31
}
_31
_31
...

从构建 ParquetRecordBatchStream 说起

Copy From 中,我们拿到文件信息后,首先会调用 operator.reader 返回一个实现 AsyncReader + AsyncSeekreader再套一层 BufReader最终将该 reader 传入至 ParquetRecordBatchStreamBuilder

🥲

这里面 BufReader 也是多此一举,我们在后面会提到。(算了,不想写了;自己看代码吧😇)

ParquetRecordBatchStream::new 读取元信息

读取元信息逻辑如下,首先调用 seek(SeekFrom::End(-FOOTER_SIZE_I64)) ,读取 FOOTER_SIZE 字节后解析出 metadata_len随后再一次调用 seek,并读取 metadata_len 字节后解析出元信息

真正的问题

到上面为止,都是一些小问题。真正比较棘手的问题发生在这里,这里变量 stream 就是我们上面构建的 ParquetRecordBatchStream,当我们调用 next 时,ParquetRecordBatchStream 会调用多次 reader (RangeReader)的 seekread。 然而每次调用 seek 都会重置 RangeReader 的内部状态(丢弃掉之前的字节流),并在下次调用 read 时,重新发起一个远程请求(后端为 S3 的场景)。(相关 issue (opens in a new tab)讨论 (opens in a new tab)

🥵

ParquetRecordBatchStream 在取回每列数据时:会先调用 RangeReader seek,随后调用 read 读取一些字节。那么总共需要发起的远程调用次数为 RowGroup 数乘上 RowGroup 内列的数。 我们 800KiB 包含了 50 个 RowGroup 和 12 列,也就是发起了 600 次 S3 get 请求!

operator/src/statement/copy_table_from.rs

_17
...
_17
let reader = operator
_17
.reader(path)
_17
.await
_17
.context(error::ReadObjectSnafu { path })?;
_17
_17
let buf_reader = BufReader::new(reader.compat());
_17
_17
let builder = ParquetRecordBatchStreamBuilder::new(buf_reader)
_17
.await
_17
.context(error::ReadParquetSnafu)?;
_17
_17
let upstream = builder
_17
.build()
_17
.context(error::BuildParquetRecordBatchStreamSnafu)?;
_17
_17
...

读一读 RangeReader 的源码

RangeReader 其核心功能是将 non-seekable 的 Reader 变成一个 seekable 的 Reader。在 seek(pos) 被调用后,下次 read 调用便会请求底层服务重新返回一个包含 [pos,size)Reader

看看 self.poll_read()

RangeReaderself.state 初始值为 State::Idle,首先我们假设 self.offsetSome(0)随后 self.state 被设置为 State::SendRead(BoxFuture<'static, Result<(RpRead, R)>>)并再次调用 self.poll_read(cx, buf)

core/src/raw/oio/read/range_read.rs

_77
impl<A, R> oio::Read for RangeReader<A, R>
_77
where
_77
A: Accessor<Reader = R>,
_77
R: oio::Read,
_77
{
_77
fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
_77
// Sanity check for normal cases.
_77
if buf.is_empty() || self.cur >= self.size.unwrap_or(u64::MAX) {
_77
return Poll::Ready(Ok(0));
_77
}
_77
_77
match &mut self.state {
_77
State::Idle => {
_77
self.state = if self.offset.is_none() {
_77
// Offset is none means we are doing tailing reading.
_77
// we should stat first to get the correct offset.
_77
State::SendStat(self.stat_future())
_77
} else {
_77
State::SendRead(self.read_future())
_77
};
_77
_77
self.poll_read(cx, buf)
_77
}
_77
State::SendStat(fut) => {
_77
let rp = ready!(Pin::new(fut).poll(cx)).map_err(|err| {
_77
// If stat future returns an error, we should reset
_77
// state to Idle so that we can retry it.
_77
self.state = State::Idle;
_77
err
_77
})?;
_77
_77
let length = rp.into_metadata().content_length();
_77
self.fill_range(length).map_err(|err| {
_77
// If stat future returns an error, we should reset
_77
// state to Idle so that we can retry it.
_77
self.state = State::Idle;
_77
err
_77
})?;
_77
_77
self.state = State::Idle;
_77
self.poll_read(cx, buf)
_77
}
_77
State::SendRead(fut) => {
_77
let (rp, r) = ready!(Pin::new(fut).poll(cx)).map_err(|err| {
_77
// If read future returns an error, we should reset
_77
// state to Idle so that we can retry it.
_77
self.state = State::Idle;
_77
err
_77
})?;
_77
_77
// Set size if read returns size hint.
_77
if let Some(size) = rp.size() {
_77
if size != 0 && self.size.is_none() {
_77
self.size = Some(size + self.cur);
_77
}
_77
}
_77
self.state = State::Read(r);
_77
self.poll_read(cx, buf)
_77
}
_77
State::Read(r) => match ready!(Pin::new(r).poll_read(cx, buf)) {
_77
Ok(0) => {
_77
// Reset state to Idle after all data has been consumed.
_77
self.state = State::Idle;
_77
Poll::Ready(Ok(0))
_77
}
_77
Ok(n) => {
_77
self.cur += n as u64;
_77
Poll::Ready(Ok(n))
_77
}
_77
Err(e) => {
_77
self.state = State::Idle;
_77
Poll::Ready(Err(e))
_77
}
_77
},
_77
}
_77
}
_77
}

self.read_future() 发生了什么

显而易见,self.read_future() 返回了一个 BoxedFutureBoxedFuture 中调用底层的 Accessorread 接口(acc.read(&path, op).await)Accessor 可以是 S3 的存储后端实现,也可以是 OSS 实现等;当它的 read 接口被调用时,会建立取回文件的 TCP 连接,并将响应以字节流的形式返回给上层。

core/src/raw/oio/read/range_read.rs

_40
impl<A, R> RangeReader<A, R>
_40
where
_40
A: Accessor<Reader = R>,
_40
R: oio::Read,
_40
{
_40
fn read_future(&self) -> BoxFuture<'static, Result<(RpRead, R)>> {
_40
let acc = self.acc.clone();
_40
let path = self.path.clone();
_40
_40
let mut op = self.op.clone();
_40
// cur != 0 means we have read some data out, we should convert
_40
// the op into deterministic to avoid ETag changes.
_40
if self.cur != 0 {
_40
op = op.into_deterministic();
_40
}
_40
// Alter OpRead with correct calculated range.
_40
op = op.with_range(self.calculate_range());
_40
_40
Box::pin(async move { acc.read(&path, op).await })
_40
}
_40
_40
fn stat_future(&self) -> BoxFuture<'static, Result<RpStat>> {
_40
let acc = self.acc.clone();
_40
let path = self.path.clone();
_40
_40
// Handle if-match and if-none-match correctly.
_40
let mut args = OpStat::default();
_40
// TODO: stat should support range to check if ETag matches.
_40
if self.op.range().is_full() {
_40
if let Some(v) = self.op.if_match() {
_40
args = args.with_if_match(v);
_40
}
_40
if let Some(v) = self.op.if_none_match() {
_40
args = args.with_if_none_match(v);
_40
}
_40
}
_40
_40
Box::pin(async move { acc.stat(&path, args).await })
_40
}
_40
}

再次调用 self.poll_read()

到此为止,poll_read 还没有返回;在上文中 self.poll_read() 被再次调用,此时 self.stateState::SendRead(BoxFuture<'static, Result<(RpRead, R)>>)。 这里的 ready!(Pin::new(fut).poll(cx)) 返回值就是上文中 acc.read(&path, op).await 调用的返回值。(对于 S3 存储后端,远程调用发生在这里) 最后内部状态 self.state 被设置为 State::Read(r),并再次调用 self.poll_read()再次调用 self.poll_read() 后,RangeReader 内部状态被设置为 State::Reader(R)这里的 R(r) 便是读取请求响应的字节流,对于 S3 存储后端,Pin::new(r).poll_read(cx, buf) 将 TCP 缓冲区的字节数据写入到上层应用中

core/src/raw/oio/read/range_read.rs

_77
impl<A, R> oio::Read for RangeReader<A, R>
_77
where
_77
A: Accessor<Reader = R>,
_77
R: oio::Read,
_77
{
_77
fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
_77
// Sanity check for normal cases.
_77
if buf.is_empty() || self.cur >= self.size.unwrap_or(u64::MAX) {
_77
return Poll::Ready(Ok(0));
_77
}
_77
_77
match &mut self.state {
_77
State::Idle => {
_77
self.state = if self.offset.is_none() {
_77
// Offset is none means we are doing tailing reading.
_77
// we should stat first to get the correct offset.
_77
State::SendStat(self.stat_future())
_77
} else {
_77
State::SendRead(self.read_future())
_77
};
_77
_77
self.poll_read(cx, buf)
_77
}
_77
State::SendStat(fut) => {
_77
let rp = ready!(Pin::new(fut).poll(cx)).map_err(|err| {
_77
// If stat future returns an error, we should reset
_77
// state to Idle so that we can retry it.
_77
self.state = State::Idle;
_77
err
_77
})?;
_77
_77
let length = rp.into_metadata().content_length();
_77
self.fill_range(length).map_err(|err| {
_77
// If stat future returns an error, we should reset
_77
// state to Idle so that we can retry it.
_77
self.state = State::Idle;
_77
err
_77
})?;
_77
_77
self.state = State::Idle;
_77
self.poll_read(cx, buf)
_77
}
_77
State::SendRead(fut) => {
_77
let (rp, r) = ready!(Pin::new(fut).poll(cx)).map_err(|err| {
_77
// If read future returns an error, we should reset
_77
// state to Idle so that we can retry it.
_77
self.state = State::Idle;
_77
err
_77
})?;
_77
_77
// Set size if read returns size hint.
_77
if let Some(size) = rp.size() {
_77
if size != 0 && self.size.is_none() {
_77
self.size = Some(size + self.cur);
_77
}
_77
}
_77
self.state = State::Read(r);
_77
self.poll_read(cx, buf)
_77
}
_77
State::Read(r) => match ready!(Pin::new(r).poll_read(cx, buf)) {
_77
Ok(0) => {
_77
// Reset state to Idle after all data has been consumed.
_77
self.state = State::Idle;
_77
Poll::Ready(Ok(0))
_77
}
_77
Ok(n) => {
_77
self.cur += n as u64;
_77
Poll::Ready(Ok(n))
_77
}
_77
Err(e) => {
_77
self.state = State::Idle;
_77
Poll::Ready(Err(e))
_77
}
_77
},
_77
}
_77
}
_77
}

最后看下 self.poll_seek()

还记得刚才我们 RangeReader 内部状态吗?没错,是State::Reader(R)。 如果我们在 read 之后在调用 seekRangeReader 内部的字节流会被丢弃,状态重新设置为 State::Idle。 也就是说,在每次 seek 调用后再次调用 readRangeReader 便会请求底层 Accessorread 接口(acc.read(&path, op).await) 发起一个远程调用,返回一个包含 [Pos, size)Reader;然而对于 S3 存储后端,调用这个接口的开销是非常昂贵的(TTFB 通常高达百毫秒)。

另外还有一个性能相关的重点,当我们尝试 SeekFrom::End() 的时,且 self.size 未知时,会有一次额外的 stat 操作self.poll_seek() 调用后 self.cur 会被设置为 base.checked_add(amt)

core/src/raw/oio/read/range_read.rs

_68
impl<A, R> oio::Read for RangeReader<A, R>
_68
where
_68
A: Accessor<Reader = R>,
_68
R: oio::Read,
_68
{
_68
fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> Poll<Result<u64>> {
_68
match &mut self.state {
_68
State::Idle => {
_68
let (base, amt) = match pos {
_68
SeekFrom::Start(n) => (0, n as i64),
_68
SeekFrom::Current(n) => (self.cur as i64, n),
_68
SeekFrom::End(n) => {
_68
if let Some(size) = self.size {
_68
(size as i64, n)
_68
} else {
_68
self.state = State::SendStat(self.stat_future());
_68
return self.poll_seek(cx, pos);
_68
}
_68
}
_68
};
_68
_68
let seek_pos = match base.checked_add(amt) {
_68
Some(n) if n >= 0 => n as u64,
_68
_ => {
_68
return Poll::Ready(Err(Error::new(
_68
ErrorKind::InvalidInput,
_68
"invalid seek to a negative or overflowing position",
_68
)))
_68
}
_68
};
_68
_68
self.cur = seek_pos;
_68
Poll::Ready(Ok(self.cur))
_68
}
_68
State::SendStat(fut) => {
_68
let rp = ready!(Pin::new(fut).poll(cx)).map_err(|err| {
_68
// If stat future returns an error, we should reset
_68
// state to Idle so that we can retry it.
_68
self.state = State::Idle;
_68
err
_68
})?;
_68
_68
let length = rp.into_metadata().content_length();
_68
self.fill_range(length)?;
_68
_68
self.state = State::Idle;
_68
self.poll_seek(cx, pos)
_68
}
_68
State::SendRead(_) => {
_68
// It's impossible for us to go into this state while
_68
// poll_seek. We can just drop this future and check state.
_68
self.state = State::Idle;
_68
self.poll_seek(cx, pos)
_68
}
_68
State::Read(_) => {
_68
// There is an optimization here that we can calculate if users trying to seek
_68
// the same position, for example, `reader.seek(SeekFrom::Current(0))`.
_68
// In this case, we can just return current position without dropping reader.
_68
if pos == SeekFrom::Current(0) || pos == SeekFrom::Start(self.cur) {
_68
return Poll::Ready(Ok(self.cur));
_68
}
_68
_68
self.state = State::Idle;
_68
self.poll_seek(cx, pos)
_68
}
_68
}
_68
}
_68
}

看看 self.poll_read()

RangeReaderself.state 初始值为 State::Idle,首先我们假设 self.offsetSome(0)随后 self.state 被设置为 State::SendRead(BoxFuture<'static, Result<(RpRead, R)>>)并再次调用 self.poll_read(cx, buf)

self.read_future() 发生了什么

显而易见,self.read_future() 返回了一个 BoxedFutureBoxedFuture 中调用底层的 Accessorread 接口(acc.read(&path, op).await)Accessor 可以是 S3 的存储后端实现,也可以是 OSS 实现等;当它的 read 接口被调用时,会建立取回文件的 TCP 连接,并将响应以字节流的形式返回给上层。

再次调用 self.poll_read()

到此为止,poll_read 还没有返回;在上文中 self.poll_read() 被再次调用,此时 self.stateState::SendRead(BoxFuture<'static, Result<(RpRead, R)>>)。 这里的 ready!(Pin::new(fut).poll(cx)) 返回值就是上文中 acc.read(&path, op).await 调用的返回值。(对于 S3 存储后端,远程调用发生在这里) 最后内部状态 self.state 被设置为 State::Read(r),并再次调用 self.poll_read()再次调用 self.poll_read() 后,RangeReader 内部状态被设置为 State::Reader(R)这里的 R(r) 便是读取请求响应的字节流,对于 S3 存储后端,Pin::new(r).poll_read(cx, buf) 将 TCP 缓冲区的字节数据写入到上层应用中

最后看下 self.poll_seek()

还记得刚才我们 RangeReader 内部状态吗?没错,是State::Reader(R)。 如果我们在 read 之后在调用 seekRangeReader 内部的字节流会被丢弃,状态重新设置为 State::Idle。 也就是说,在每次 seek 调用后再次调用 readRangeReader 便会请求底层 Accessorread 接口(acc.read(&path, op).await) 发起一个远程调用,返回一个包含 [Pos, size)Reader;然而对于 S3 存储后端,调用这个接口的开销是非常昂贵的(TTFB 通常高达百毫秒)。

另外还有一个性能相关的重点,当我们尝试 SeekFrom::End() 的时,且 self.size 未知时,会有一次额外的 stat 操作self.poll_seek() 调用后 self.cur 会被设置为 base.checked_add(amt)

core/src/raw/oio/read/range_read.rs

_77
impl<A, R> oio::Read for RangeReader<A, R>
_77
where
_77
A: Accessor<Reader = R>,
_77
R: oio::Read,
_77
{
_77
fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
_77
// Sanity check for normal cases.
_77
if buf.is_empty() || self.cur >= self.size.unwrap_or(u64::MAX) {
_77
return Poll::Ready(Ok(0));
_77
}
_77
_77
match &mut self.state {
_77
State::Idle => {
_77
self.state = if self.offset.is_none() {
_77
// Offset is none means we are doing tailing reading.
_77
// we should stat first to get the correct offset.
_77
State::SendStat(self.stat_future())
_77
} else {
_77
State::SendRead(self.read_future())
_77
};
_77
_77
self.poll_read(cx, buf)
_77
}
_77
State::SendStat(fut) => {
_77
let rp = ready!(Pin::new(fut).poll(cx)).map_err(|err| {
_77
// If stat future returns an error, we should reset
_77
// state to Idle so that we can retry it.
_77
self.state = State::Idle;
_77
err
_77
})?;
_77
_77
let length = rp.into_metadata().content_length();
_77
self.fill_range(length).map_err(|err| {
_77
// If stat future returns an error, we should reset
_77
// state to Idle so that we can retry it.
_77
self.state = State::Idle;
_77
err
_77
})?;
_77
_77
self.state = State::Idle;
_77
self.poll_read(cx, buf)
_77
}
_77
State::SendRead(fut) => {
_77
let (rp, r) = ready!(Pin::new(fut).poll(cx)).map_err(|err| {
_77
// If read future returns an error, we should reset
_77
// state to Idle so that we can retry it.
_77
self.state = State::Idle;
_77
err
_77
})?;
_77
_77
// Set size if read returns size hint.
_77
if let Some(size) = rp.size() {
_77
if size != 0 && self.size.is_none() {
_77
self.size = Some(size + self.cur);
_77
}
_77
}
_77
self.state = State::Read(r);
_77
self.poll_read(cx, buf)
_77
}
_77
State::Read(r) => match ready!(Pin::new(r).poll_read(cx, buf)) {
_77
Ok(0) => {
_77
// Reset state to Idle after all data has been consumed.
_77
self.state = State::Idle;
_77
Poll::Ready(Ok(0))
_77
}
_77
Ok(n) => {
_77
self.cur += n as u64;
_77
Poll::Ready(Ok(n))
_77
}
_77
Err(e) => {
_77
self.state = State::Idle;
_77
Poll::Ready(Err(e))
_77
}
_77
},
_77
}
_77
}
_77
}

后续

RFC (opens in a new tab) 会为 CompleteLayer Reader 引入一个 Buffer,缓解频繁 IO (以及远程调用)带来的开销。


_10
let reader = op.reader_with("path").buffer(32 * 1024 * 1024).await;

最后,Reader 会支持并行加载数据,主要是针对 S3 这类存储后端的优化。


_10
let reader = op.reader_with("path").buffer(32 * 1024 * 1024).concurrent(4).await;

2023 © Grep.ing