惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

T
Tenable Blog
H
Heimdal Security Blog
K
Kaspersky official blog
奇客Solidot–传递最新科技情报
奇客Solidot–传递最新科技情报
S
Schneier on Security
G
GRAHAM CLULEY
U
Unit 42
OSCHINA 社区最新新闻
OSCHINA 社区最新新闻
C
CERT Recently Published Vulnerability Notes
Google DeepMind News
Google DeepMind News
罗磊的独立博客
Stack Overflow Blog
Stack Overflow Blog
阮一峰的网络日志
阮一峰的网络日志
Simon Willison's Weblog
Simon Willison's Weblog
C
Cisco Blogs
Cyberwarzone
Cyberwarzone
T
The Exploit Database - CXSecurity.com
Project Zero
Project Zero
Security Archives - TechRepublic
Security Archives - TechRepublic
www.infosecurity-magazine.com
www.infosecurity-magazine.com
博客园 - 司徒正美
Exploit-DB.com RSS Feed
Exploit-DB.com RSS Feed
V
Visual Studio Blog
博客园 - Franky
Engineering at Meta
Engineering at Meta
WordPress大学
WordPress大学
Jina AI
Jina AI
P
Proofpoint News Feed
P
Proofpoint News Feed
有赞技术团队
有赞技术团队
L
LINUX DO - 最新话题
宝玉的分享
宝玉的分享
N
News and Events Feed by Topic
cs.CV updates on arXiv.org
cs.CV updates on arXiv.org
博客园 - 聂微东
T
The Blog of Author Tim Ferriss
Spread Privacy
Spread Privacy
Application and Cybersecurity Blog
Application and Cybersecurity Blog
IT之家
IT之家
S
Security Affairs
博客园 - 叶小钗
freeCodeCamp Programming Tutorials: Python, JavaScript, Git & More
小众软件
小众软件
N
News | PayPal Newsroom
Cloudbric
Cloudbric
AWS News Blog
AWS News Blog
W
WeLiveSecurity
The Last Watchdog
The Last Watchdog
Cyber Security Advisories - MS-ISAC
Cyber Security Advisories - MS-ISAC
NISL@THU
NISL@THU

博客园 - Heclei

WebDeploy 同步IIS6 bindingRedirect 相当奇怪的设置 - Heclei - 博客园 C# 使用WM_NCLBUTTONDOWN消息实现任意位置移动窗体 发布开源程序:数据库结构比较 VS2005 WEB F5调试自动退出调试状态 JS整理,getCookie, getElementsByClassName , 添加删除事件 (转)C# 单实例 - Heclei - 博客园 (转)VC++ 2005 用程式建立 Paradox ( *.db ) 资料库档桉 - Heclei C# 判断操作系统类型 Service Broker实现发布-订阅(Publish-Subscribe)框架(3) Service Broker实现发布-订阅(Publish-Subscribe)框架(1) .net sqlserver 关于连接池没事释放的注意项 mysql ERROR 1045 JS实现局部打印和预览 perforce,P4,权限设置 Visual Studio 2005 sp1 新愁加就恨 将一个数据库的登录账户转移到另一个账户中(Trans logins from one database to another ) Find the need for WinFx, understanding for very very beginners 使用vsto制作的outlook插件,安装加载失败
Service Broker实现发布-订阅(Publish-Subscribe)框架(2)
Heclei · 2008-09-05 · via 博客园 - Heclei

关于发布-订阅框架的介绍,请访问前一篇 Service Broker实现发布-订阅(Publish-Subscribe)框架(1) 。

应用发布者逻辑

在存储过程sp_PublisherService 中实现PublisherService服务的入口点(entry point)。当一有新的消息到达PublisherQueue队列是,sp_PublisherService存储过程自动激活,并开始处理消息。这一存储过程能够处理如下消息类型:

[http://ssb.csharp.at/SSB_Book/c10/PublishMessage]: 在存储过程开始发布article消息时,它需要从AuthorService接收该消息类型。这一消息包含有如下article消息相同的主题(Subject)。

[http://ssb.csharp.at/SSB_Book/c10/SubscribeMessage]: 在存储过程准备订阅一个主题时,它需要从订阅者服务方接收该消息类型。这一消息包含有订阅者请求的主题。

[http://ssb.csharp.at/SSB_Book/c10/ArticleMessage]: 在存储过程发布article消息时,它会从AuthorService方接收这一消息类型。

[http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog]: 在存储过程准备关闭PublisherService打开的会话时,存储过程会从AuthorService或订阅方服务接收这一消息类型。

[http://schemas.microsoft.com/SQL/ServiceBroker/Error]: 在请求的主题不存在是,存储过程将从PublisherService接收这一消息类型。

下面的script显示sp_PublisherService存储过程如何处理这些消息类型。

sp_PublisherService 服务处理程序:

CREATE PROCEDURE sp_PublisherService

AS

BEGIN

DECLARE @Conversation UNIQUEIDENTIFIER;

DECLARE @Message VARBINARY(MAX);

DECLARE @MessageTypeName SYSNAME;

BEGIN TRANSACTION;

WAITFOR

(

RECEIVE TOP(1)

@Conversation = conversation_handle,

@Message = message_body,

@MessageTypeName = message_type_name

FROM PublisherQueue

), TIMEOUT 1000;

WHILE (@Conversation IS NOT NULL)

BEGIN

IF (@MessageTypeName = 'http://ssb.csharp.at/SSB_Book/c10/PublishMessage')

BEGIN

EXEC sp_ProcessPublicationRequest @Conversation, @Message;

END

ELSE IF (@MessageTypeName ='http://ssb.csharp.at/SSB_Book/c10/SubscribeMessage')

BEGIN

EXEC sp_ProcessSubscriptionRequest @Conversation, @Message;

END

ELSE IF (@MessageTypeName ='http://ssb.csharp.at/SSB_Book/c10/ArticleMessage')

BEGIN

EXEC sp_SendOnPublication @Conversation, @Message;

END

ELSE IF (@MessageTypeName IN (

N'http://schemas.microsoft.com/SQL/ServiceBroker/Error',

N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'))

BEGIN

END CONVERSATION @Conversation;

IF (EXISTS (SELECT * FROM Publications WHERE Publication = @Conversation))

BEGIN

EXEC sp_RemovePublication @Conversation;

END

IF (EXISTS (SELECT * FROM Subscribers))

BEGIN

EXEC sp_RemoveSubscriber @Conversation;

END

END

ELSE

BEGIN

-- Unexpected message

RAISERROR (N'Received unexpected message type: %s', 16, 1,

@MessageTypeName);

ROLLBACK;

RETURN;

END

COMMIT;

SELECT @Conversation = NULL;

BEGIN TRANSACTION;

WAITFOR

(

RECEIVE TOP(1)

@Conversation = conversation_handle,

@Message = message_body,

@MessageTypeName = message_type_name

FROM PublisherQueue

), TIMEOUT 1000;

END

COMMIT;

END

GO

在上面的存储过程中,先从PublisherQueue队列中接收一个新的消息。如下消息类型为[http://ssb.csharp.at/SSB_Book/c10/PublishMessage],就调用sp_ProcessPublicationRequest 存储过程,将接收到的发布数据记录到Publications表中。如果为[http://ssb.csharp.at/SSB_Book/c10/SubscribeMessage] 消息类型,则调用sp_ProcessSubscriptionRequest 存储过程,负责将接收到的订阅数据记录到Subscriptions表中。最后,如果为 [http://ssb.csharp.at/SSB_Book/c10/ArticleMessage]消息类型,则sp_PublisherService存储过程负责调用sp_SendOnPublication 分发该消息给所有匹配的订阅者。

通过sp_ProcessPublicationRequest 和sp_ProcessSubscriptionRequest 存储过程来管理发布和订阅记录。这2个存储过程分别调用其他的存储过程,将接收到的消息插入到Publications 或 Subscriptions 表中。如下显示 the sp_ProcessPublicationRequest 存储过程。因为sp_ProcessSubscriptionRequest 存储过程比较相似,这里忽略该存储过程。

sp_ProcessPublicationRequest 存储过程脚本:

CREATE PROCEDURE sp_ProcessPublicationRequest

@Conversation UNIQUEIDENTIFIER,

@Message VARBINARY(MAX)

AS

BEGIN

DECLARE @Request XML;

DECLARE @Subject NVARCHAR(MAX);

SELECT @Request = CAST(@Message AS XML);

WITH XMLNAMESPACES (DEFAULT 'http://ssb.csharp.at/SSB_Book/c10/PublishSubscribe')

SELECT @Subject = @Request.value(N'(//Publish/Subject)[1]', N'NVARCHAR(MAX)');

IF (@Subject IS NOT NULL)

BEGIN

EXEC sp_PublishPublication @Conversation, @Subject, @Message;

END

ELSE

BEGIN

END CONVERSATION @Conversation

WITH ERROR = 1

DESCRIPTION = N'The publication is missing a subject';

EXEC sp_RemovePublication @Conversation;

END

END

GO

sp_ProcessPublicationRequest存储过程调用sp_PublishPublication,并传入@Conversation, @Subject, @Message 三个参数。如下是sp_PublishPublication 存储过程。

sp_PublishPublication 存储过程脚本:

CREATE PROCEDURE sp_PublishPublication

@Publication UNIQUEIDENTIFIER,

@Subject NVARCHAR(MAX),

@OriginalXml XML

AS

BEGIN

INSERT INTO Publications (Publication, Subject, OriginalXml)

VALUES

(

@Publication,

@Subject,

@OriginalXml

)

END

GO

Publications 表中的Publication列和Subscriptions表中的Subscription 列都存放会话ID,你需要这些会话ID来发送article消息到订阅方。最后一个存储过程是sp_SendOnPublication,在有[http://ssb.csharp.at/SSB_Book/c10/ArticleMessage] 消息从 AuthorService接收到后,调用才存储过程。

sp_SendOnPublication 存储过程脚本:

CREATE PROCEDURE sp_SendOnPublication

@Publication UNIQUEIDENTIFIER,

@Article VARBINARY(MAX)

AS

BEGIN

DECLARE @Subscription UNIQUEIDENTIFIER;

DECLARE @cursorSubscriptions CURSOR;

SET @cursorSubscriptions = CURSOR LOCAL SCROLL FOR

SELECT Subscriber

FROM Subscriptions s

JOIN Publications p ON s.Subject = p.Subject

WHERE p.Publication = @Publication;

BEGIN TRANSACTION;

OPEN @cursorSubscriptions;

FETCH NEXT FROM @cursorSubscriptions

INTO @Subscription;

WHILE (@@fetch_status = 0)

BEGIN

IF (@Article IS NOT NULL)

BEGIN

SEND ON CONVERSATION @Subscription

MESSAGE TYPE [http://ssb.csharp.at/SSB_Book/c10/ArticleMessage] (@Article);

END

ELSE

BEGIN

SEND ON CONVERSATION @Subscription

MESSAGE TYPE [http://ssb.csharp.at/SSB_Book/c10/ArticleMessage];

END

FETCH NEXT FROM @cursorSubscriptions INTO @Subscription;

END

CLOSE @cursorSubscriptions;

DEALLOCATE @cursorSubscriptions;

COMMIT;

END

GO

sp_SendOnPublication 存储过程使用一个cusor,发送从AuthorService接收到的article消息,给匹配的订阅者。

SEND ON CONVERSATION @Subscription

MESSAGE TYPE [http://ssb.csharp.at/SSB_Book/c10/ArticleMessage] (@Article);

通过对Publications 和 Subscriptions 表的Subject 字段进行连接,进行匹配:

SELECT Subscriber

FROM Subscriptions s

JOIN Publications p ON s.Subject = p.Subject

WHERE p.Publication = @Publication;

当接收到的是[http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog] 消息类型时(从AuthorService或从订阅者服务方),相应的发布或订阅数据则需要从Publications或Subscriptions表中删除。分别通过sp_RemovePublication 或 sp_RemoveSubscriptions 存储过程来实现。

sp_RemovePublication 存储过程脚本:

CREATE PROCEDURE sp_RemovePublication

@Publication UNIQUEIDENTIFIER

AS

BEGIN

DELETE FROM Publications

WHERE Publication = @Publication

END

GO

EntLib.com译者注:本文翻译Apress出版《Pro SQL Server 2005 Service Broker》的其中一个章节。后面的内容会尽快发布,欢迎交流、分享。谢谢!