原文:https://minimalistweb.dev/p/streams-and-react-server-components
作者:Mohammad
译者:ChatGPT 4 Turbo
许多开发者在构建技术时使用了流,但有多少人真正理解了它们的复杂性及其与 React 服务器组件的联系?就我个人而言,这个概念从未真正为我所理解。直到我为 Waku 做出贡献,并对 RSCs 如何流式传输 html 感到好奇——这要求我认真对待它们。Waku 是一个在 React 服务器组件之上使用 Vite 的最小化层。
在这篇文章中,我将更多地讨论这个概念,Streams API 围绕这个概念提供的抽象,以及 React 服务器组件如何利用这个 API。
概念
在增量方式下创建、处理和消费的数据,而无需将所有数据一次性读入内存。whatwg
嗯,流是一种顺序处理数据的方式!例如,你可以逐块处理文件(通过划分),而不是一次性将整个文件读入内存,这样可以避免耗尽资源。
const response = await fetch(url);
for await (const chunk of response.body) {
// Do something with each small "chunk"
}
这样可以节省内存,因为你一次只处理一个数据块,而不是整个文件。读取该文件几乎变得瞬间完成,只要你的电脑处理了第一个数据块,它就会立即传递给你,无需等待所有数据都被处理。
一个 chunk 是写入或从流中读取的单个数据块。它可以是任何类型;流甚至可以包含不同类型的 chunk。whatwg
Web Streams API 为这一概念提供了一个标准 API,它不仅处理文件,还处理任何形式的数据!
// All these methods embed the whole response in memory and return all of the results as opposed to response.body
response.blob()
response.text()
response.json()
RSC 和 Transfer-Encoding: chunked
在好奇 React 团队如何流式传输 html 的同时,大多数 Google 文章都提到了使用 HLS(HTTP 实时流)流式传输视频,这并不是我想要的信息。直到我偶然发现了这篇 mdn 文档。
这个头表示我发送的响应被分成了多个块,我们可以在初始响应之后以非阻塞方式向这些块中添加内容。关键是 React 只返回一个流(对于 edge 是 ReadableStream,对于 Nodejs Streams 是一个 pipe 方法),并让服务器处理剩下的部分!
在 Nodejs 或 Cloudflare workers 中,由于响应体是一个流,因此会为响应设置 Transfer-Encoding: chunked
头。
RSC 在 Waku 中使用 ReadableStream 流式传输 html 文档。
ReadableStream
// Fetch the original image
fetch("./tortoise.png")
// Retrieve its body as ReadableStream
.then((response) => response.body);
// https://developer.mozilla.org/en-US/docs/Web/API/Streams_API/Using_readable_streams#consuming_a_fetch_as_a_stream
这是一个你应该从中读取的流,就这么简单,除非你是提供该流的人,否则你不能写入(入队)到它。如果你是提供者,那么你将可以访问一个叫做控制器的对象!
// 你可能想要打开控制台并尝试运行这段代码
let controller;
const readable = new ReadableStream({
start(c) {
controller = c;
},
});
当你实例化 ReadableStream
类时, start
方法会运行,然后你会得到控制器( ReadableStreamDefaultController
)。
controller.enqueue('chunk 1')
controller.enqueue('chunk 2')
controller.enqueue('chunk 3')
controller.close() // no more data
在从另一个源,如 fetch
API 接收 ReadableStream
时,你将无法写入数据,因为你无法访问控制器。此外,你也不能修改来自服务器的响应,这是有道理的 🙂
Pull
Matteo 强调了 pull 方法作为向可读流添加数据的更好方式,这是我不熟悉的。 pull
方法在消费者主动想要接收数据时触发,不同于 start
,后者在可读流创建时激活。
在前面的例子中,数据的添加并没有考虑消费者是否需要它。在这种情况下,流的行为就像一个推送源。通过使用 pull
方法,我们将我们的流转换成一个拉取源。这允许流只在需要时提供必要的数据。这里有一个例子。
Reader
为了从可读流中读取数据,我们需要一个读取器。
const reader = readable.getReader()
await reader.read()
// {done: false, value: 'chunk 1'}
await reader.read()
// {done: false, value: 'chunk 2'}
await reader.read()
// {done: false, value: 'chunk 3'}
await reader.read()
// {done: true, value: undefined}
读取器所做的是将流锁定给自己,这样就没有其他读取器能从该流中读取数据。我猜是因为它嫉妒,开个玩笑 😄。原因是流一次处理一个数据块,如果我们有两个读取器,这两个读取器可能处于不同的读取(消费)阶段,或者其中一个可能会取消流,这将干扰另一个读取器(消费者)。
Teeing (T
)
我不会深入讨论这个问题,但由于我们不能同时从一个可读流中有两个读取器,我们可以对那个可读流进行 tee 操作,创建两个新的可读流来提供相同的数据。Tee 操作会迫使我们使用这两个新流中的任何一个。而原始流将会为它们锁定。
readable.tee();
// [ReadableStream, ReadableStream]
WritableStream
这是一个你应该写入的流(即使你没有访问控制器的权限),所以它与 ReadableStream
正好相反。但这里有个问题,你也可以读取和跟踪正在写入此流的值,但前提是你提供了这个流。如果你得到写入数据到这个流的机会,你将被称为生产者。它通常是对底层接收器的封装,接收器是数据的目的地,比如文件系统。我们有两种方法将数据写入可写流!
Readable Streams
当数据流入可读流时,我们可以将该流导入可写流。就像从网络(可读流)读取并写入磁盘上的某个文件(可写流)。连接这两个的过程称为管道。
在 Waku 这样的项目中,这种方法更常见!
// readable: our readable stream from the previous example
readable.pipeTo(new WritableStream({
write(chunk) {
console.log(chunk)
}
}))
// log: chunk 1
// log: chunk 2
// log: chunk 3
Writer
对于可读流,我们已经能够锁定流并直接从中读取。对于可写流也是如此,有了写入器,我们将能够直接写入可写流!
const writable = new WritableStream({
write(chunk) {
console.log(chunk)
// imagine everytime we get a chunk, we write it directly to a file
}
})
const writer = writable.getWriter()
writer.write('chunk 1')
// log: chunk 1
writer.write('chunk 2')
// log: chunk 2
writer.write('chunk 3')
// log: chunk 3
TransformStream
它是一个主要用于转换我们接收到的数据块的流。它给我们两个流,一个可读的,一个可写的!我们写入可写流,数据块将通过我们传递给 TransformStream 的 transform
函数进行处理(转换),然后任何结果都将排队到可读流中。如果没有 transform
函数,它将仅作为可写流和可读流之间的桥梁。
// https://streams.spec.whatwg.org/#example-transform-identity
const { writable, readable } = new TransformStream();
fetch("...", { body: readable }).then(response => /* ... */);
const writer = writable.getWriter();
writer.write(new Uint8Array([0x73, 0x74, 0x72, 0x65, 0x61, 0x6D, 0x73, 0x21])); // "streams!"
writer.close();
这是一个很好的例子,在这里我们将一个 readable 作为请求的 body 传递,然后我们使用 writable stream 的 writer 来写入 body 内容。
const { readable, writable} = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.split('').reverse().join(''))
}
})
const writer = writable.getWriter()
const reader = readable.getReader()
await writer.write('olleh')
// 'olleh' -> transform() -> 'hello'
await reader.read()
// {done: false, value: 'hello'}
这是一个使用 TransformStream 反转字符串的例子!
pipeThrough
这是可读流上的一种方法,当我们想要将可读流管道到 TransformStream
的可写侧时,这个方法很有用。它会返回 TransformStream
的可读侧。
const transformer = new TransformStream()
// instead of this
readable.pipeTo(transformer.writable)
// we can do this
readable.pipeThrough(transformer) // This would return transformer.readable
使用
pipeThrough(transform, options)
构建管道链的典型示例看起来像这样。whatwg
// https://streams.spec.whatwg.org/#example-pipe-chain
httpResponseBody
.pipeThrough(decompressorTransform)
.pipeThrough(ignoreNonImageFilesTransform)
.pipeTo(mediaGallery); // a pipe chain
定义
内部队列
对于可读流,它是已经入队到流中但尚未被消费者(一个读取器或另一个可写/转换流)读取的数据。对于可写流,它是已经写入到流本身但仍未写入底层接收器(例如文件系统)的数据。
高水位标记
流在内部队列中可以处理的数据量的边界称为高水位标记。如果我们越过那个边界,那么就会施加背压。
Backpressure 背压
消费者流向父流发送的信号减缓了我们接收(读取或写入)的数据量,因此我们首先处理当前内部队列中的现有数据。
就是这样!Streams 的概念在不同的上下文中可能意味着不同的事情,但核心思想是顺序处理小块数据,无论是视频流还是 React 服务器组件。这个概念并不复杂,但生成兼容流的数据是具有挑战性的部分,我猜这就是为什么 React 团队花了多年时间才达到我们今天所说的 React 服务器组件。
我是 Mohammad,这是我在博客上的第一篇文章和帖子,很乐意听听你的想法。欢迎在我的私信里打招呼!我也在找工作机会,如果你有机会的话,我们可以讨论一下 🙂
Reviewers
感谢所有审阅此文档的朋友们。