惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

SecWiki News
SecWiki News
H
Help Net Security
罗磊的独立博客
Stack Overflow Blog
Stack Overflow Blog
M
MIT News - Artificial intelligence
Jina AI
Jina AI
L
LangChain Blog
K
Kaspersky official blog
I
Intezer
Martin Fowler
Martin Fowler
爱范儿
爱范儿
AWS News Blog
AWS News Blog
The Hacker News
The Hacker News
Recorded Future
Recorded Future
人人都是产品经理
人人都是产品经理
H
Hackread – Cybersecurity News, Data Breaches, AI and More
C
CXSECURITY Database RSS Feed - CXSecurity.com
Spread Privacy
Spread Privacy
Simon Willison's Weblog
Simon Willison's Weblog
U
Unit 42
N
News and Events Feed by Topic
A
Arctic Wolf
G
GRAHAM CLULEY
Microsoft Azure Blog
Microsoft Azure Blog
博客园 - 聂微东
F
Fortinet All Blogs
C
Cisco Blogs
美团技术团队
Vercel News
Vercel News
K
KPMG report finds enterprise disconnect between AI and its ROI | CIO
H
Hacker News: Front Page
T
Tailwind CSS Blog
I
InfoQ
宝玉的分享
宝玉的分享
Google DeepMind News
Google DeepMind News
博客园 - 司徒正美
P
Palo Alto Networks Blog
A
About on SuperTechFans
Threat Intelligence Blog | Flashpoint
Threat Intelligence Blog | Flashpoint
云风的 BLOG
云风的 BLOG
TaoSecurity Blog
TaoSecurity Blog
Google Online Security Blog
Google Online Security Blog
Exploit-DB.com RSS Feed
Exploit-DB.com RSS Feed
P
Privacy & Cybersecurity Law Blog
H
Heimdal Security Blog
cs.CV updates on arXiv.org
cs.CV updates on arXiv.org
Hacker News: Ask HN
Hacker News: Ask HN
O
OpenAI News
博客园 - Franky
Scott Helme
Scott Helme

博客园 - egmkang

配眼镜最佳实践 [Python]解密pyc文件 C# Protobuf如何做到0分配内存的序列化/反序列化(2) CPU实现原子操作的原理 Flash----一种VirtualActor模式的分布式有状态系统原型 [投资]对价值投资新的理解 通过Consul Raft库打造自己的分布式系统 [06] 优化C#服务器的思路和工具的使用 [05] 通过P/Invoke加速C#程序 [04] C# Alloc Free编程之实践 [03] C# Alloc Free编程 [02] 多线程逻辑编程 [01] C#网络编程的最佳实践 C#如何正确的做深拷贝 C# Protobuf如何做到0分配内存的序列化 最近很火的namebase羊毛, 手把手教你怎么薅 DotNetty发送请求的最佳实践 C# protobuf自动更新cs文件 多读少写场景下多线程锁冲突的降低
Dapr实现分布式有状态服务的细节
egmkang · 2020-11-05 · via 博客园 - egmkang

Dapr是为云上环境设计的跨语言, 事件驱动, 可以便捷的构建微服务的系统. balabala一堆, 有兴趣的小伙伴可以去了解一下.

Dapr提供有状态和无状态的微服务. 大部分人都是做无状态服务(微服务)的, 只是某些领域无状态并不好使, 因为开销实在是太大了; 有状态服务有固定的场景, 就是要求开销小, 延迟和吞吐都比较高. 废话少说, 直接来看Dapr是怎么实现有状态服务的.

先来了解一下有状态服务:

1. 稳定的路由

   发送给A服务器的请求, 不能发给B服务器, 否则就是无状态的

2. 状态

   状态保存在自己服务器内部, 而不是远程存储, 这一点和无状态有很明显的区别, 所以无状态服务需要用redis这种东西加速, 有状态不需要

3. 处理是单线程

   状态一般来讲比较复杂, 想要对一个比较复杂的东西进行并行的计算是比较困难的; 当然A和B的逻辑之间没有关系, 其实是可以并行的, 但是A自己本身的逻辑执行需要串行执行.

对于一个有状态服务来讲(dapr), 实现23实际上是很轻松的, 甚至有一些是用户需要实现的东西, 所以1才是关键, 当前这个消息(请求)需要被发送到哪个服务器上面处理才是最关键的, 甚至决定了他是什么系统.

决定哪个请求的目标地址, 这个东西在分布式系统里面叫Placement, 有时候也叫Naming. TiDB里面有一个Server叫PlacementDriver, 简称PD, 其实就是在干同样的事情.

好了, 开始研究Dapr的Placement是怎么实现的.

有一个Placement的进程, 2333, 目录cmd/placement, 就看他了

func main() {
	log.Infof("starting Dapr Placement Service -- version %s -- commit %s", version.Version(), version.Commit())

	cfg := newConfig()

	// Apply options to all loggers.
	if err := logger.ApplyOptionsToLoggers(&cfg.loggerOptions); err != nil {
		log.Fatal(err)
	}
	log.Infof("log level set to: %s", cfg.loggerOptions.OutputLevel)

	// Initialize dapr metrics for placement.
	if err := cfg.metricsExporter.Init(); err != nil {
		log.Fatal(err)
	}

	if err := monitoring.InitMetrics(); err != nil {
		log.Fatal(err)
	}

	// Start Raft cluster.
	raftServer := raft.New(cfg.raftID, cfg.raftInMemEnabled, cfg.raftBootStrap, cfg.raftPeers)
	if raftServer == nil {
		log.Fatal("failed to create raft server.")
	}

	if err := raftServer.StartRaft(nil); err != nil {
		log.Fatalf("failed to start Raft Server: %v", err)
	}

	// Start Placement gRPC server.
	hashing.SetReplicationFactor(cfg.replicationFactor)
	apiServer := placement.NewPlacementService(raftServer)

可以看到main函数里面启动了一个raft server, 一般这样的话, 就说明在某些能力方面做到了强一致性.

raft库用的是consul实现的raft, 而不是etcd, 因为etcd的raft不是库, 只能是一个服务器(包括etcd embed), 你不能定制里面的协议, 你只能使用etcd提供给你的client来访问他. 这一点etcd做的非常不友好.

如果用raft库来做placement, 那么协议可以定制, 可以找Apply相关的函数, 因为raft状态机只是负责log的一致性, log即消息, 消息的处理则表现出来状态, Apply函数就是需要用户做消息处理的地方. 幸亏之前有做过MIT 6.824的lab, 对这个稍微有一点了解.

// Apply log is invoked once a log entry is committed.
func (c *FSM) Apply(log *raft.Log) interface{} {
	buf := log.Data
	cmdType := CommandType(buf[0])

	if log.Index < c.state.Index {
		logging.Warnf("old: %d, new index: %d. skip apply", c.state.Index, log.Index)
		return nil
	}

	var err error
	var updated bool
	switch cmdType {
	case MemberUpsert:
		updated, err = c.upsertMember(buf[1:])
	case MemberRemove:
		updated, err = c.removeMember(buf[1:])
	default:
		err = errors.New("unimplemented command")
	}

	if err != nil {
		return err
	}

	return updated
}

在pkg/placement/raft文件夹下面找到raft相关的代码, fsm.go里面有对消息的处理函数.

可以看到, 消息的处理非常简单, 里面只有MemberUpsert, 和MemberRemove两个消息.  FSM状态机内保存的状态只有:

// DaprHostMemberState is the state to store Dapr runtime host and
// consistent hashing tables.
type DaprHostMemberState struct {
	// Index is the index number of raft log.
	Index uint64
	// Members includes Dapr runtime hosts.
	Members map[string]*DaprHostMember

	// TableGeneration is the generation of hashingTableMap.
	// This is increased whenever hashingTableMap is updated.
	TableGeneration uint64

	// hashingTableMap is the map for storing consistent hashing data
	// per Actor types.
	hashingTableMap map[string]*hashing.Consistent
}

很明显, 这里面只有DaprHostMember这个有用的信息, 而DaprHostMember就是集群内的节点.

这里可以分析出来, Dapr通过Raft协议来维护了一个强一致性的Membership, 除此之外什么也没干....据我的朋友说, 跟Orleans是有一点类似的, 只是Orleans是AP系统.

再通过对一致性Hash的分析, 可以看到:

func (a *actorsRuntime) lookupActorAddress(actorType, actorID string) (string, string) {
	if a.placementTables == nil {
		return "", ""
	}

	t := a.placementTables.Entries[actorType]
	if t == nil {
		return "", ""
	}
	host, err := t.GetHost(actorID)
	if err != nil || host == nil {
		return "", ""
	}
	return host.Name, host.AppID
}

通过 ActorType和ActorID到一致性的Hash表中去找host, 那个GetHost实现就是一致性Hash表实现的.

Actor RPC Call的实现:

func (a *actorsRuntime) Call(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
	if a.placementBlock {
		<-a.placementSignal
	}

	actor := req.Actor()
	targetActorAddress, appID := a.lookupActorAddress(actor.GetActorType(), actor.GetActorId())
	if targetActorAddress == "" {
		return nil, errors.Errorf("error finding address for actor type %s with id %s", actor.GetActorType(), actor.GetActorId())
	}

	var resp *invokev1.InvokeMethodResponse
	var err error

	if a.isActorLocal(targetActorAddress, a.config.HostAddress, a.config.Port) {
		resp, err = a.callLocalActor(ctx, req)
	} else {
		resp, err = a.callRemoteActorWithRetry(ctx, retry.DefaultLinearRetryCount, retry.DefaultLinearBackoffInterval, a.callRemoteActor, targetActorAddress, appID, req)
	}

	if err != nil {
		return nil, err
	}
	return resp, nil
}

通过刚才我们看到loopupActorAddress函数找到的Host, 然后判断是否是在当前Host宿主内, 否则就发送到远程, 对当前宿主做了优化, 实际上没鸡儿用, 因为分布式系统里面, 一般都会有很多个host, 在当前host内的概率实际上是非常低的.

从这边, 我们大概就能分析到全貌, 即Dapr实现分布式有状态服务的细节:

1. 通过Consul Raft库维护Membership

2. 集群和Placement组件通讯, 获取到Membership

3. 寻找Actor的算法实现在Host内, 而不是Placement组件. 通过ActorType找到可以提供某种服务的Host, 然后组成一个一致性Hash表, 到该表内查找Host, 进而转发请求

对Host内一致性Hash表的查找引用, 找到了修改内容的地方:

func (a *actorsRuntime) updatePlacements(in *placementv1pb.PlacementTables) {
	a.placementTableLock.Lock()
	defer a.placementTableLock.Unlock()

	if in.Version != a.placementTables.Version {
		for k, v := range in.Entries {
			loadMap := map[string]*hashing.Host{}
			for lk, lv := range v.LoadMap {
				loadMap[lk] = hashing.NewHost(lv.Name, lv.Id, lv.Load, lv.Port)
			}
			c := hashing.NewFromExisting(v.Hosts, v.SortedSet, loadMap)
			a.placementTables.Entries[k] = c
		}

		a.placementTables.Version = in.Version
		a.drainRebalancedActors()

		log.Infof("placement tables updated, version: %s", in.GetVersion())

		a.evaluateReminders()
	}
}

从这几行代码可以看出, 版本不不一样, 就会全更新, 而且还会进行rehash, 就是a.drainRebalanceActors. 

如果学过数据结构, 那么肯定学到过一种东西叫HashTable, HashTable在扩容的时候需要rehash, 需要构建一个更大的table, 然后把所有元素重新放进去, 位置会和原先的大不一样. 而一致性Hash可以解决全rehash的情况, 只让部分内容rehash, 失效的内容会比较少.

但是, 凡事都有一个但是, 所有的节点都同时rehash还好, 可一个分布式系统怎么做到所有node都同时rehash, 很显然是做不到的, 所以Dapr维护的Actor Address目录, 是最终一致的, 也就是系统里面会存在多个ID相同的Actor(短暂的), 还是会导致不一致.

对dapr/proto/placement/v1/placement.proto查看, 验证了我的猜想

// Placement service is used to report Dapr runtime host status.
service Placement {
  rpc ReportDaprStatus(stream Host) returns (stream PlacementOrder) {}
}

message PlacementOrder {
  PlacementTables tables = 1;
  string operation = 2;
}

Host启动, 就去placement那边通过gRPC Stream订阅了集群的变动. 懒到极点了, 居然是把整个membership发送过来, 而不是发送的diff.

总结一下, 从上面的源码分析我们可以知道, Dapr的Membership是CP系统, 但是Actor的Placement不是, 是一个最终一致的AP系统. 而TiDB的PD是一个CP系统, 只不过是通过etcd embed做的. 希望对大家有一点帮助.

对我有帮助的, 可能就是Dapr对于Consul raft的使用.

参考:

1. Dapr

2. Etcd Embed

3. Consul Raft