译:流和 React 服务器组件

原文:https://minimalistweb.dev/p/streams-and-react-server-components
作者:Mohammad
译者:ChatGPT 4 Turbo

许多开发者在构建技术时使用了流,但有多少人真正理解了它们的复杂性及其与 React 服务器组件的联系?就我个人而言,这个概念从未真正为我所理解。直到我为 Waku 做出贡献,并对 RSCs 如何流式传输 html 感到好奇——这要求我认真对待它们。Waku 是一个在 React 服务器组件之上使用 Vite 的最小化层。

在这篇文章中,我将更多地讨论这个概念,Streams API 围绕这个概念提供的抽象,以及 React 服务器组件如何利用这个 API。

概念

在增量方式下创建、处理和消费的数据,而无需将所有数据一次性读入内存。whatwg

嗯,流是一种顺序处理数据的方式!例如,你可以逐块处理文件(通过划分),而不是一次性将整个文件读入内存,这样可以避免耗尽资源。

const response = await fetch(url);
for await (const chunk of response.body) {
  // Do something with each small "chunk"
}

这样可以节省内存,因为你一次只处理一个数据块,而不是整个文件。读取该文件几乎变得瞬间完成,只要你的电脑处理了第一个数据块,它就会立即传递给你,无需等待所有数据都被处理。

一个 chunk 是写入或从流中读取的单个数据块。它可以是任何类型;流甚至可以包含不同类型的 chunk。whatwg

Web Streams API 为这一概念提供了一个标准 API,它不仅处理文件,还处理任何形式的数据!

// All these methods embed the whole response in memory and return all of the results as opposed to response.body
response.blob()
response.text()
response.json()

RSC 和 Transfer-Encoding: chunked

在好奇 React 团队如何流式传输 html 的同时,大多数 Google 文章都提到了使用 HLS(HTTP 实时流)流式传输视频,这并不是我想要的信息。直到我偶然发现了这篇 mdn 文档

这个头表示我发送的响应被分成了多个块,我们可以在初始响应之后以非阻塞方式向这些块中添加内容。关键是 React 只返回一个流(对于 edge 是 ReadableStream,对于 Nodejs Streams 是一个 pipe 方法),并让服务器处理剩下的部分!

在 Nodejs 或 Cloudflare workers 中,由于响应体是一个流,因此会为响应设置 Transfer-Encoding: chunked 头。

RSC 在 Waku 中使用 ReadableStream 流式传输 html 文档。

ReadableStream

// Fetch the original image
fetch("./tortoise.png")
  // Retrieve its body as ReadableStream
  .then((response) => response.body);

// https://developer.mozilla.org/en-US/docs/Web/API/Streams_API/Using_readable_streams#consuming_a_fetch_as_a_stream

这是一个你应该从中读取的流,就这么简单,除非你是提供该流的人,否则你不能写入(入队)到它。如果你是提供者,那么你将可以访问一个叫做控制器的对象!

// 你可能想要打开控制台并尝试运行这段代码
let controller;
const readable = new ReadableStream({
  start(c) {
    controller = c;
  },
});

当你实例化 ReadableStream 类时, start 方法会运行,然后你会得到控制器( ReadableStreamDefaultController )。

controller.enqueue('chunk 1')
controller.enqueue('chunk 2')
controller.enqueue('chunk 3')
controller.close() // no more data

在从另一个源,如 fetch API 接收 ReadableStream 时,你将无法写入数据,因为你无法访问控制器。此外,你也不能修改来自服务器的响应,这是有道理的 🙂

Pull

Matteo 强调了 pull 方法作为向可读流添加数据的更好方式,这是我不熟悉的。 pull 方法在消费者主动想要接收数据时触发,不同于 start ,后者在可读流创建时激活。

在前面的例子中,数据的添加并没有考虑消费者是否需要它。在这种情况下,流的行为就像一个推送源。通过使用 pull 方法,我们将我们的流转换成一个拉取源。这允许流只在需要时提供必要的数据。这里有一个例子。

Reader

为了从可读流中读取数据,我们需要一个读取器。

const reader = readable.getReader()

await reader.read()
// {done: false, value: 'chunk 1'}
await reader.read()
// {done: false, value: 'chunk 2'}
await reader.read()
// {done: false, value: 'chunk 3'}
await reader.read()
// {done: true, value: undefined}

读取器所做的是将流锁定给自己,这样就没有其他读取器能从该流中读取数据。我猜是因为它嫉妒,开个玩笑 😄。原因是流一次处理一个数据块,如果我们有两个读取器,这两个读取器可能处于不同的读取(消费)阶段,或者其中一个可能会取消流,这将干扰另一个读取器(消费者)。

Teeing (T)

我不会深入讨论这个问题,但由于我们不能同时从一个可读流中有两个读取器,我们可以对那个可读流进行 tee 操作,创建两个新的可读流来提供相同的数据。Tee 操作会迫使我们使用这两个新流中的任何一个。而原始流将会为它们锁定。

readable.tee();
// [ReadableStream, ReadableStream]

WritableStream

这是一个你应该写入的流(即使你没有访问控制器的权限),所以它与 ReadableStream 正好相反。但这里有个问题,你也可以读取和跟踪正在写入此流的值,但前提是你提供了这个流。如果你得到写入数据到这个流的机会,你将被称为生产者。它通常是对底层接收器的封装,接收器是数据的目的地,比如文件系统。我们有两种方法将数据写入可写流!

Readable Streams

当数据流入可读流时,我们可以将该流导入可写流。就像从网络(可读流)读取并写入磁盘上的某个文件(可写流)。连接这两个的过程称为管道。

在 Waku 这样的项目中,这种方法更常见!

// readable: our readable stream from the previous example
readable.pipeTo(new WritableStream({
    write(chunk) {
        console.log(chunk)
    }
}))
// log: chunk 1
// log: chunk 2
// log: chunk 3

Writer

对于可读流,我们已经能够锁定流并直接从中读取。对于可写流也是如此,有了写入器,我们将能够直接写入可写流!

const writable = new WritableStream({
    write(chunk) {
        console.log(chunk)
        // imagine everytime we get a chunk, we write it directly to a file
    }
})

const writer = writable.getWriter()

writer.write('chunk 1')
// log: chunk 1
writer.write('chunk 2')
// log: chunk 2
writer.write('chunk 3')
// log: chunk 3

TransformStream

它是一个主要用于转换我们接收到的数据块的流。它给我们两个流,一个可读的,一个可写的!我们写入可写流,数据块将通过我们传递给 TransformStream 的 transform 函数进行处理(转换),然后任何结果都将排队到可读流中。如果没有 transform 函数,它将仅作为可写流和可读流之间的桥梁。

// https://streams.spec.whatwg.org/#example-transform-identity
const { writable, readable } = new TransformStream();

fetch("...", { body: readable }).then(response => /* ... */);

const writer = writable.getWriter();
writer.write(new Uint8Array([0x73, 0x74, 0x72, 0x65, 0x61, 0x6D, 0x73, 0x21])); // "streams!"
writer.close();

这是一个很好的例子,在这里我们将一个 readable 作为请求的 body 传递,然后我们使用 writable stream 的 writer 来写入 body 内容。

const { readable, writable} = new TransformStream({
    transform(chunk, controller) {
        controller.enqueue(chunk.split('').reverse().join(''))
    }
})
const writer = writable.getWriter()
const reader = readable.getReader()

await writer.write('olleh')
// 'olleh' -> transform() -> 'hello'
await reader.read()
// {done: false, value: 'hello'}

这是一个使用 TransformStream 反转字符串的例子!

pipeThrough

这是可读流上的一种方法,当我们想要将可读流管道到 TransformStream 的可写侧时,这个方法很有用。它会返回 TransformStream 的可读侧。

const transformer = new TransformStream()

// instead of this
readable.pipeTo(transformer.writable)
// we can do this
readable.pipeThrough(transformer) // This would return transformer.readable

使用 pipeThrough(transform, options) 构建管道链的典型示例看起来像这样。whatwg

// https://streams.spec.whatwg.org/#example-pipe-chain
httpResponseBody
  .pipeThrough(decompressorTransform)
  .pipeThrough(ignoreNonImageFilesTransform)
  .pipeTo(mediaGallery); // a pipe chain

定义

内部队列

对于可读流,它是已经入队到流中但尚未被消费者(一个读取器或另一个可写/转换流)读取的数据。对于可写流,它是已经写入到流本身但仍未写入底层接收器(例如文件系统)的数据。

高水位标记

流在内部队列中可以处理的数据量的边界称为高水位标记。如果我们越过那个边界,那么就会施加背压。

Backpressure 背压

消费者流向父流发送的信号减缓了我们接收(读取或写入)的数据量,因此我们首先处理当前内部队列中的现有数据。

就是这样!Streams 的概念在不同的上下文中可能意味着不同的事情,但核心思想是顺序处理小块数据,无论是视频流还是 React 服务器组件。这个概念并不复杂,但生成兼容流的数据是具有挑战性的部分,我猜这就是为什么 React 团队花了多年时间才达到我们今天所说的 React 服务器组件。

我是 Mohammad,这是我在博客上的第一篇文章和帖子,很乐意听听你的想法。欢迎在我的私信里打招呼!我也在找工作机会,如果你有机会的话,我们可以讨论一下 🙂

Reviewers

感谢所有审阅此文档的朋友们。

Resources