Iteration 10 从 3/26 开始到 4/8 结束,为期两周。这个周期里面我觉得这些工作比较有意思:

OpenDAL 错误处理改进

OpenDAL 最开始的定位就是要做一个独立的数据层,所以错误处理也是使用了自己内部的一套,没有复用 Databend 现有库。这导致上层业务逻辑在判断的时候需要做很多额外的工作,比如在 PR ISSUE-4241: version of storage layout 中,为了处理 error 的转换,贡献者需要额外做判断。配合 OpenDAL 的 API 调整,这个周期中我将 OpenDAL 返回的错误类型修改为 srd::io::Error。主要的考量是基于两个方面:API 设计与用户体验。

首先考虑的是 API 的设计是否自洽。

在 OpenDAL v0.4 的重构中,Accessor 中的 write 函数从接收一个 BytesReader 变为返回一个 BytesWriter

pub trait BytesWrite: AsyncWrite + Unpin + Send { }

pub type BytesWriter = Box<dyn BytesWrite>;

async fn write(&self, args: &OpWrite) -> Result<BytesWriter> {
    let _ = args;
    unimplemented!()
}

BytesWriter 本质上就是一个被 Box 的 AsyncWrite,而 AsyncWrite 中返回的 futures::io::Error 是到 std::io::Error 的 alias。

pub trait AsyncWrite {
    fn poll_write(
        self: Pin<&mut Self>, 
        cx: &mut Context<'_>, 
        buf: &[u8]
    ) -> Poll<Result<usize, Error>>;
    fn poll_flush(
        self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Result<(), Error>>;
    fn poll_close(
        self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Result<(), Error>>;

    fn poll_write_vectored(
        self: Pin<&mut Self>, 
        cx: &mut Context<'_>, 
        bufs: &[IoSlice<'_>]
    ) -> Poll<Result<usize, Error>> { ... }
}

如果我们需要传递自己的 error 类型的话,就需要反复的进行与 io::Error 的类型转换,不仅实现上特别罗嗦,而且会导致信息的丢失。通过分析返回的 error 的情况,还发现 OpenDAL 定义的 error 实际上就是 io::Error 的子集。所以不如直接修改成返回 io::Error,把内部 error 中的 kind 去掉,作为 context 传递。

/// ObjectError carries object related context.
///
/// # Notes
///
/// This error is used to carry context only, and should never be returned to users.
/// Please wrap in [`std::io::Error`] with correct [`std::io::ErrorKind`] instead.
#[derive(Error, Debug)]
#[error("object error: (op: {op}, path: {path}, source: {source})")]
pub(crate) struct ObjectError {
    op: &'static str,
    path: String,
    source: anyhow::Error,
}

impl ObjectError {
    pub fn new(op: &'static str, path: &str, source: impl Into<anyhow::Error>) -> Self {
        ObjectError {
            op,
            path: path.to_string(),
            source: source.into(),
        }
    }
}

其次是考虑用户体验。

std::io::Error 已经在各个 IO 相关的库中被广泛使用:返回 io::Error 能够极大地简化用户的处理逻辑,不需要再针对 OpenDAL 做特别的处理。而且用户的心智负担也会特别小,之前使用 std::fs 是如何检查错误的,使用 opendal 也是如何检查:

if let Err(e) = op.object("test_file").metadata().await {
    if e.kind() == ErrorKind::NotFound {
        println!("object not exist")
    }
}

综合以上改动,我在 PR deps: Bump to OpenDAL v0.4 中调整了 Databend 关于存储层的错误处理逻辑:

清理了没有被良好定义的错误:

- UnknownStorageSchemeName(3001),
- SecretKeyNotSet(3002),
- DalTransportError(3003),
- DalPathNotFound(3004),
- SerdeError(3005),
- DalError(3006),
- DalStatError(3007),
+ StorageNotFound(3001),
+ StoragePermissionDenied(3002),
+ StorageOther(4000)

实现了 io::Error 到内置 ErrorCode 的无缝转换

impl From<std::io::Error> for ErrorCode {
    fn from(error: std::io::Error) -> Self {
        use std::io::ErrorKind;

        let msg = format!("{} ({})", error.kind(), &error);

        match error.kind() {
            ErrorKind::NotFound => ErrorCode::StorageNotFound(msg),
            ErrorKind::PermissionDenied => ErrorCode::StoragePermissionDenied(msg),
            _ => ErrorCode::StorageOther(msg),
        }
    }
}

这样 Databend 就能够比较优雅地处理涉及到 opendal 的错误了:

- let meta = object.metadata().await.map_err(|e| match e.kind() {
-      DalErrorKind::ObjectNotExist => ErrorCode::DalPathNotFound(e.to_string()),
-      _ => ErrorCode::DalTransportError(e.to_string()),
- })?;
+ let meta = object.metadata().await?;

舒服了!

修改 Databend storage.disk 为 storage.fs

命名是头等大事:好名字能够准确直接地反应自身的功用,降低用户的理解成本;而坏名字则会引起混淆,让用户产生错误的理解。

长期以来,Databend 的 storage type 就一直在错误地混淆 aws_s3s3diskfs,前者在 opendal 立项的时候就予以纠正了,而后者直到最近才在 PR *: Rename storage.disk to storage.fs 中修复。

首先聊聊 aws_s3s3

s3 已经是一个事实上的业界标准,而不仅仅是 AWS 的某个产品。当我们提到 s3 时,更多的时候是在指兼容 s3 API 的对象存储服务,包括但不限于 AWS S3Aliyun OSSTencent COSminiodigitialocean S3 等一系列服务。而使用 aws_s3 时,则通常是在特指 AWS 提供的 S3 服务。

因此如果我们在代码和配置中强调 aws_s3,就会引起用户的混淆。

其次聊聊 diskfs

Proposal: Rename storage.disk to storage.fs 中,我提到了 diskfs 的区别:disk 通常用来指代块设备,比如 SSDHDD,在云上则是指形如 EBS 这样的服务,他们对外暴露的是块接口;而 fs 则是用来指代文件系统,比如 ext4btrfs,在云上指形如 EFS 这样的服务,他们对外暴露的接口是(类)POSIX 语义的文件接口。在 从应用接口视角看存储系统 中,我对这个话题有更详细的阐述。

显然的,Databend 不能直接基于块设备工作:它需要文件系统提供支持。不仅如此,disk 还抹除了更多的可能性,比如:NFS,基于 cephs3fsjuicefsFUSE 等等。

使用 disk 作为 storage type 会引起用户的混淆。

好在这些问题已经得到了纠正:)

mergify 踩坑小技巧

mergify 是一个用于 Pull Requests 自动化的服务,它支持按照预先制定好的策略对 PR 排队并 merge。由于 Github Action API 的限制,在指定这些策略的时候需要注意 Validating All Status Checks 的坑:

There is no such thing as "every status check" in GitHub.

  • 每个 PR 有自己的 status checks 列表
  • 在创建/有新提交的时候,PR 是没有 status checks 的
  • status checks 可能没有被正确汇报(比如 jenkins 挂了),导致它没有出现在列表中

基于以上限制,如果使用

  • #check-failure=0
  • check-success~=build

这样的判断,就会很容易导致测试实际上并没有跑完就 pass 了。

Databend 之前也踩过一些坑,使用 '#check-success>=12' 来保证通过的测试数量大于 12 个这样的 workaround 来解决。但是这个解决方案并不彻底,随着时间的推移,新测试的增加,我们发现有些 PR 在测试没有通过的情况下也被 merge 了。为了彻底解决这个问题,我在 PR ci: Fix mergify status check 中按照文档的描述显式指定了所有必须通过的测试列表。

RisingWave 蹭贡献

RisingWave 是一个 Rust 开发的云原生流式数据库,主打物化视图增量更新,兼容 PostgreSQL 接口,用来构建实时应用。我也抓着开源的机会蹭了一些贡献。

在 PR refactor(pgwire): Match on char to avoid utf8 convert 中,我修复了一处没有必要的 utf-8 convert:

-        let val = &[stream.read_u8().await?];
-        let tag = std::str::from_utf8(val).unwrap();
+        let val = stream.read_u8().await?;

这里其实可以直接对 char 做判断,不需要转换成 str。

在 PR refactor(storage/object/s3): Remove not needed check on ranged request 中,我尝试去掉一个不必要的 content length 检查,但最终发现这个检查是有必要的:因为 hyper 不会检查返回的 body content length 长度,所以如果服务器端返回的 body length 和 content-length 不一致,应用就会出现非预期的行为。

使用 minio 进行高可用测试,频繁地在读取数据时 kill -9 就会很容易遇到这样的情况:server 已经返回了 HTTP status code 和 headers 但是 body 因为服务器已经被 kill 了没有发过来,用户的读取就这样直接完成了。我在 PR 中给出了完整的复现代码:

async fn handle(_: Request<Body>) -> Result<Response<Body>, Infallible> {
    let resp = Response::builder()
        .status(StatusCode::OK)
        .header(CONTENT_LENGTH, 1024)
        .body(Body::empty())
        .expect("must success");

    Ok(resp)
}

#[tokio::test]
async fn test_hyper() -> Result<()> {
    let addr = SocketAddr::from(([127, 0, 0, 1], 9900));

    let make_svc = hyper::service::make_service_fn(|_conn| async {
        Ok::<_, Infallible>(hyper::service::service_fn(handle))
    });

    let server = hyper::Server::bind(&addr).serve(make_svc);

    tokio::spawn(async move {
        if let Err(e) = server.await {
            eprintln!("server error: {}", e);
        }
    });

    let client: hyper::Client<
        hyper_tls::HttpsConnector<hyper::client::HttpConnector>,
        hyper::Body,
    > = hyper::Client::builder().build(hyper_tls::HttpsConnector::new());

    let req = client
        .get(http::Uri::from_static("http://127.0.0.1:9900"))
        .await
        .expect("must success");

    let bs = hyper::body::to_bytes(req.into_body())
        .await
        .expect("must success");

    println!("{:?}", bs);

    Ok(())
}

不知道能否在 hyper 修复这个问题?


这个周期有意思的工作就是这些,下个周期再见~