惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

罗磊的独立博客
SecWiki News
SecWiki News
酷 壳 – CoolShell
酷 壳 – CoolShell
爱范儿
爱范儿
量子位
M
MIT News - Artificial intelligence
GbyAI
GbyAI
cs.AI updates on arXiv.org
cs.AI updates on arXiv.org
K
KPMG report finds enterprise disconnect between AI and its ROI | CIO
TaoSecurity Blog
TaoSecurity Blog
博客园 - 【当耐特】
H
Heimdal Security Blog
腾讯CDC
The Last Watchdog
The Last Watchdog
Security Archives - TechRepublic
Security Archives - TechRepublic
Hacker News: Ask HN
Hacker News: Ask HN
S
Schneier on Security
Microsoft Security Blog
Microsoft Security Blog
WordPress大学
WordPress大学
博客园 - 司徒正美
Recent Commits to openclaw:main
Recent Commits to openclaw:main
C
Cybersecurity and Infrastructure Security Agency CISA
S
SegmentFault 最新的问题
大猫的无限游戏
大猫的无限游戏
Application and Cybersecurity Blog
Application and Cybersecurity Blog
F
Full Disclosure
有赞技术团队
有赞技术团队
T
Tailwind CSS Blog
Engineering at Meta
Engineering at Meta
cs.CV updates on arXiv.org
cs.CV updates on arXiv.org
T
Threatpost
月光博客
月光博客
A
Arctic Wolf
OSCHINA 社区最新新闻
OSCHINA 社区最新新闻
雷峰网
雷峰网
T
Troy Hunt's Blog
freeCodeCamp Programming Tutorials: Python, JavaScript, Git & More
The Cloudflare Blog
D
DataBreaches.Net
O
OpenAI News
L
LINUX DO - 最新话题
宝玉的分享
宝玉的分享
小众软件
小众软件
V
Vulnerabilities – Threatpost
A
About on SuperTechFans
人人都是产品经理
人人都是产品经理
T
The Exploit Database - CXSecurity.com
Martin Fowler
Martin Fowler
美团技术团队
P
Privacy International News Feed

博客园 - 草珊瑚

前端git开发分支各种场景管理 微信小程序登陆流程(20200322) vue依赖收集的策略 eggjs2.x版本异步获取config配置方案 - 草珊瑚 - 博客园 dubbo连接过程 计算机中对流的理解 Egg.js运行环境配置场景 Promise和Observable的映射 eggjs异常捕获机制 极客时间数据结构与算法之美笔记7 JS项目快速压缩(windows平台) - 草珊瑚 - 博客园 JS项目快速压缩(windows平台) - 草珊瑚 - 博客园 Maven工程的POM继承 Docker构建一个node镜像 win10家庭版安装Docker for Windows linux,vim和bash命令小册 vue文档阅读笔记——计算属性和侦听器 nodejs的jekins部署 `vue-router`的`History`模式下的项目发布
RxJS Subject学习
草珊瑚 · 2020-04-26 · via 博客园 - 草珊瑚

一个Observable的例子

import { interval } from "rxjs";
import { take } from "rxjs/operators";

const interval$ = interval(1000).pipe(take(3));

interval$.subscribe(value => console.log("Observer A get value: " + value));

setTimeout(() => {
  interval$.subscribe(value => console.log("Observer B get value: " + value));
}, 1000);

输出

Observer A get value: 0
Observer A get value: 1
Observer B get value: 0
Observer A get value: 2
Observer B get value: 1
Observer B get value: 2

可以看到

  • Observable 对象可以被重复订阅。
  • Observable 对象每次被订阅后,都会重新执行。

一个Subject的例子

import { interval, Subject } from "rxjs";
import { take } from "rxjs/operators";

const interval$ = interval(1000).pipe(take(3));
const subject = new Subject();

const observerA = {
  next: value => console.log("Observer A get value: " + value),
  error: error => console.log("Observer A error: " + error),
  complete: () => console.log("Observer A complete!")
};

const observerB = {
  next: value => console.log("Observer B get value: " + value),
  error: error => console.log("Observer B error: " + error),
  complete: () => console.log("Observer B complete!")
};

subject.subscribe(observerA); // 添加观察者A
interval$.subscribe(subject); // 订阅interval$对象
setTimeout(() => {
  subject.subscribe(observerB); // 添加观察者B
}, 1000);

输出

Observer A get value: 0
Observer A get value: 1
Observer B get value: 1
Observer A get value: 2
Observer B get value: 2
Observer A complete!
Observer B complete!

可以看到

  • Subject 是 Observable 对象。
  • Subject 是保持内部状态的 Observable 对象。
  • Subject 还是 Observe 对象。

除了 Subject 之外,还有BehaviorSubject、ReplaySubject 和 AsyncSubject。

BehaviorSubject

先看一个Subject的例子。

import { Subject } from "rxjs";

const subject = new Subject();

const observerA = {
  next: value => console.log("Observer A get value: " + value),
  error: error => console.log("Observer A error: " + error),
  complete: () => console.log("Observer A complete!")
};

const observerB = {
  next: value => console.log("Observer B get value: " + value),
  error: error => console.log("Observer B error: " + error),
  complete: () => console.log("Observer B complete!")
};

subject.subscribe(observerA);

subject.next(1);
subject.next(2);
subject.next(3);

setTimeout(() => {
  subject.subscribe(observerB); // 1秒后订阅
}, 1000);

输出

Observer A get value: 1
Observer A get value: 2
Observer A get value: 3

这里的observerB没有订阅。
因为 Subject 对象没有再调用 next() 方法。
这里的Subject 不能保存当前的最新状态。
如果希望 Subject 对象能够保存当前的状态,当新增订阅者的时候,自动把当前最新的值发送给订阅者。
使用 BehaviorSubject。

import { BehaviorSubject } from "rxjs";
const subject = new BehaviorSubject(0);

const observerA = {
  next: value => console.log("Observer A get value: " + value),
  error: error => console.log("Observer A error: " + error),
  complete: () => console.log("Observer A complete!")
};

const observerB = {
  next: value => console.log("Observer B get value: " + value),
  error: error => console.log("Observer B error: " + error),
  complete: () => console.log("Observer B complete!")
};

subject.subscribe(observerA);

subject.next(1);
subject.next(2);
subject.next(3);

setTimeout(() => {
  subject.subscribe(observerB); // 1秒后订阅
}, 1000);

输出

Observer A get value: 0
Observer A get value: 1
Observer A get value: 2
Observer A get value: 3
Observer B get value: 3

同时我们看到const subject = new BehaviorSubject(0);有一个初始值为0,它用于表示 Subject 对象当前的状态。
subject.subscribe(observerA);这条语句执行后,便会输出Observer A get value: 0

ReplaySubject

如果我们希望新增的订阅者,可以接收到数据源最近发送的几个值。
可以使用ReplaySubject。

import { ReplaySubject } from "rxjs";
const subject = new ReplaySubject(2);

const observerA = {
  next: value => console.log("Observer A get value: " + value),
  error: error => console.log("Observer A error: " + error),
  complete: () => console.log("Observer A complete!")
};

const observerB = {
  next: value => console.log("Observer B get value: " + value),
  error: error => console.log("Observer B error: " + error),
  complete: () => console.log("Observer B complete!")
};

subject.subscribe(observerA);

subject.next(1);
subject.next(2);
subject.next(3);

setTimeout(() => {
  subject.subscribe(observerB); // 1秒后订阅
}, 1000);

输出

Observer A get value: 1
Observer A get value: 2
Observer A get value: 3
Observer B get value: 2
Observer B get value: 3

当你把const subject = new ReplaySubject(2);改为const subject = new ReplaySubject(1);
输出

Observer A get value: 1
Observer A get value: 2
Observer A get value: 3
Observer B get value: 3

AsyncSubject

AsyncSubject 类似于 last 操作符,它会在 Subject 结束后发出最后一个值。

import { AsyncSubject } from "rxjs";
const subject = new AsyncSubject();

const observerA = {
  next: value => console.log("Observer A get value: " + value),
  error: error => console.log("Observer A error: " + error),
  complete: () => console.log("Observer A complete!")
};

const observerB = {
  next: value => console.log("Observer B get value: " + value),
  error: error => console.log("Observer B error: " + error),
  complete: () => console.log("Observer B complete!")
};

subject.subscribe(observerA);

subject.next(1);
subject.next(2);
subject.next(3);

subject.complete();

setTimeout(() => {
  subject.subscribe(observerB); // 1秒后订阅
}, 1000);

输出

Observer A get value: 3
Observer A complete!
Observer B get value: 3
Observer B complete!

当你注释掉subject.complete();则什么也不会输出。
因为subject没有结束。

参考:
RxJS Subject