如何在 RxJS 中使用 Promise

温馨提示:本文使用 ChatGPT 润色

参考链接:

RxJS:https://rxjs.tech/guide/overview

Promise:https://developer.mozilla.org/zh-CN/docs/Web/JavaScript/Reference/Global_Objects/Promise

服务器端事件发送(SSE):https://docs.nestjs.cn/9/techniques?id=%e6%9c%8d%e5%8a%a1%e5%99%a8%e7%ab%af%e4%ba%8b%e4%bb%b6%e5%8f%91%e9%80%81

前言

最近在研究如何在 Nest.js 中实现 SSE,结果发现需要使用 RxJS。由于需要执行异步操作,不可避免地使用了 Promise。因此,我研究了一下如何在 RxJS 中使用 Promise,并有一些心得,现在特此记录。

SSE

服务器端事件发送(SSE)是一个服务器推送技术,用来使客户端在HTTP连接下自动接收服务器更新消息。每个消息以一个由一对新行符号作为结束的文字块发送(参见这里)。

如果要在 Nest.js 中使用 SSE, 需要返回一个Observable流,例如:

export interface MessageEvent {
  data: string | object;
  id?: string;
  type?: string;
  retry?: number;
}

/// 省略其他部分代码

@Sse('sse')
sse(): Observable<MessageEvent> {
  return interval(1000).pipe(map((_) => ({ data: { hello: 'world' } })));
}

这样就可以实现一个每秒发送一次 { hello: 'world' }的 SSE 路由。

在浏览器端/客户端只要如下使用即可:

const eventSource = new EventSource('/sse');
eventSource.onmessage = ({ data }) => {
  console.log('New message', JSON.parse(data));
};

使用 Promise

在具体的业务逻辑中,返回的 Observable 里必然会包含异步逻辑,其中就有可能用到 Promise。

处理单个 Promise

处理单个 Promise 非常简单,只需要使用 RxJS 提供的 from 操作符即可:

import { from } from 'rxjs';

const promise = new Promise((resolve, reject) => {
    resolve('ok')
})
const observable = from(promise)

observable.subscribe((e) => console.log(e)) // ok 

通过 from 操作符,可以直接获取到 Promise 的 resolve 值。

处理多个 Promise

但在更多情况下,Observable 中会返回多个 Promise,例如:

@Sse('sse')
sse(): Observable<MessageEvent> {
  return interval(1000)
      .pipe(
        map(async () => {
            const data = await client.get("key") // 进行异步操作
            return {
                data
            }
        }),
    )
}

上述代码会每秒执行一次异步操作,然后返回一个 Promise,此时就得到了多个 Promise。

在这种情况下,要获取 Promise 的 resolve 值就会稍微麻烦些,需要使用mergeAll操作符 来将高阶 Observable 转换为一阶 Observable。,从而获取到 Promise 的 resolve 值。

这其中的原理就在于 Promise 实际上也会被视为一个 Observable ,因此多个 Promise 就构成了高阶 Observable,故需要用 mergeAll操作符来展平 Promise 数组。

@Sse('sse')
sse(): Observable<MessageEvent> {
  return interval(1000)
      .pipe(
        map(async () => {
            const data = await client.get("key") // 进行异步操作
            return {
                data
            }
        }),
        mergeAll(),
        tap((e)=>{
            console.log(e) // 此时就能看到 Promise的resolve 值了,e 的类型为原始值,而不是 Promise包装后的值
        })
    )
}

总结

Promise 和 Observable 都是 JavaScript 中常用的异步编程工具。
Promise 简单易用,常用于处理单个异步操作,但当需要同时处理多个异步操作时,Promise 就显得力不从心了。
Observable 相对于 Promise 具有更多的特性和功能,能够处理多个异步操作并发执行,包括错误处理、取消操作等。在 RxJS 中,Observable 是核心概念,提供了许多操作符来处理各种异步操作,例如过滤、映射、合并等。Observable 是一个被广泛应用于前端开发中的工具。
在处理多个 Promise 的情况下,可以将多个 Promise 封装到一个 Observable 中,然后使用 mergeAll 操作符将高阶 Observable 转换为一阶 Observable,以便获取 Promise 的 resolve 值。
总之,当需要处理多个异步操作时,Observable 是一种更为强大和灵活的异步编程工具,而 Promise 则是一种更为简单和易用的异步编程工具,需要根据实际情况选择适合的工具来处理异步操作。


评论

发表回复