惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

S
Secure Thoughts
S
Securelist
P
Proofpoint News Feed
D
DataBreaches.Net
Cisco Talos Blog
Cisco Talos Blog
C
CXSECURITY Database RSS Feed - CXSecurity.com
Project Zero
Project Zero
A
About on SuperTechFans
罗磊的独立博客
WordPress大学
WordPress大学
月光博客
月光博客
Latest news
Latest news
C
Cyber Attacks, Cyber Crime and Cyber Security
GbyAI
GbyAI
cs.AI updates on arXiv.org
cs.AI updates on arXiv.org
博客园 - 三生石上(FineUI控件)
F
Fortinet All Blogs
W
WeLiveSecurity
Attack and Defense Labs
Attack and Defense Labs
V
Visual Studio Blog
Blog — PlanetScale
Blog — PlanetScale
CTFtime.org: upcoming CTF events
CTFtime.org: upcoming CTF events
P
Privacy International News Feed
AI
AI
博客园 - 司徒正美
freeCodeCamp Programming Tutorials: Python, JavaScript, Git & More
www.infosecurity-magazine.com
www.infosecurity-magazine.com
Stack Overflow Blog
Stack Overflow Blog
M
MIT News - Artificial intelligence
Help Net Security
Help Net Security
T
Tor Project blog
V
Vulnerabilities – Threatpost
C
Cisco Blogs
I
Intezer
Threat Intelligence Blog | Flashpoint
Threat Intelligence Blog | Flashpoint
MyScale Blog
MyScale Blog
雷峰网
雷峰网
MongoDB | Blog
MongoDB | Blog
Forbes - Security
Forbes - Security
V
V2EX
Apple Machine Learning Research
Apple Machine Learning Research
T
Threat Research - Cisco Blogs
B
Blog RSS Feed
博客园 - 叶小钗
N
News and Events Feed by Topic
钛媒体:引领未来商业与生活新知
钛媒体:引领未来商业与生活新知
Simon Willison's Weblog
Simon Willison's Weblog
C
CERT Recently Published Vulnerability Notes
让小产品的独立变现更简单 - ezindie.com
让小产品的独立变现更简单 - ezindie.com
N
News and Events Feed by Topic

博客园 - 灰灰狼

架构与设计概要 IoC概要 需求分析概要 消息队列并发处理基类-简化版 2013年5.28~7.27 Microsoft FTE 微软面试总结 String Format for DateTime 多语言建议 multi-language 问题观 New life I would like About that task about wcf 基于证书的WCF安全开发详解 asp.net缓存(20100804完善版) - 灰灰狼 - 博客园 呼唤程序员精神——关于我今天发起的讨论的总结 asp.net mvc下实现窗口不关闭,就让Session不过期 正确的产品开发策略
接上文,支持并发数量的完美版本
灰灰狼 · 2014-09-15 · via 博客园 - 灰灰狼

增加了并发数量的支持,做了些完美主义化:

    public abstract class MessageQueueConcurrentHandlerBase<T> : IMessageQueueHandler
    {
        public MessageQueueConcurrentHandlerBase(string queueName, int maxConcurrency = 5, Action<string> logDelegate = null)
        {
            if (!MessageQueue.Exists(queueName))
                throw new Exception(string.Format("No such a queue: {0}", queueName));
            if (maxConcurrency < 1)
                throw new ArgumentOutOfRangeException("maxConcurrency");

            this._queueName = queueName;
            this._poolForConsumer = new Semaphore(0, maxConcurrency);
            this._producerAutoResetEvent = new AutoResetEvent(false);
            this._maxConcurrency = maxConcurrency;
            this._logDelegate = logDelegate;
        }

        public void StartRead()
        {
            this._queue = new MessageQueue(this._queueName) { Formatter = new XmlMessageFormatter(new Type[] { typeof(long) }) };
            this._queue.PeekCompleted += new PeekCompletedEventHandler(Produce);
            this._producerAutoResetEvent.Set();
            this._poolForConsumer.Release(this._maxConcurrency);

            this._queue.BeginPeek();
        }

        public override string ToString()
        {
            return string.Format("{0}-{1}", this.HandlerName, this._queueName);
        }

        public long CurrentWorkerCount { get { return Interlocked.Read(ref this._currentWorkerCount); } }

        public int MaxConcurrency { get { return _maxConcurrency; } }

        protected abstract string HandlerName { get; }

        protected abstract void MainProcess(T messageItem, string consumerName);

        protected void LogInfo(string msg)
        {
            if (_logDelegate != null)
            {
                this._logDelegate(msg);
            }
        }

        #region private
        private void Produce(object sender, PeekCompletedEventArgs e)
        {
            this._producerAutoResetEvent.WaitOne();

            var message = this._queue.EndPeek(e.AsyncResult);

            long consumerIndex = Interlocked.Increment(ref this._consumerIndex);
            ThreadPool.QueueUserWorkItem(new WaitCallback(this.Consume), consumerIndex);
            this._queue.BeginPeek();
        }

        private void Consume(object stateInfo)
        {
            long consumerIndex = (long)stateInfo;
            string consumerName = string.Format("{0}-{1}", this.HandlerName, consumerIndex);

            this._poolForConsumer.WaitOne();

            var message = this._queue.Receive();
            this._producerAutoResetEvent.Set();

            T messageItem = (T)message.Body;

            this.LogInfo(string.Format("{0} Received a message, MessageItem = {1}", consumerName, messageItem));
            Interlocked.Increment(ref this._currentWorkerCount);

            try
            {
                this.LogInfo(string.Format("{0} will run MainProcess, MessageItem = {1}, CurrentWorkerCount = {2}", consumerName, messageItem, this.CurrentWorkerCount));
                MainProcess(messageItem, consumerName);
            }
            catch (Exception ex)
            {
                this.HandleException(ex, messageItem);
            }
            finally
            {
                Interlocked.Decrement(ref this._currentWorkerCount);

                this.LogInfo(string.Format("{0} run over, messageItem = {1}, CurrentWorkerCount = {2}", consumerName, messageItem, this.CurrentWorkerCount));
            }

            this._poolForConsumer.Release();
        }

        private void HandleException(Exception ex, T messageItem)
        {
            this.LogInfo(string.Format("Exception in {0}:[Message]={1},[StackTrace]={2},[Type]={3},[CurrentWorkerCount]={4},[messageItem]={5}", this.HandlerName, ex.GetBaseException().Message, ex.StackTrace, ex.GetType(), this.CurrentWorkerCount, messageItem));
        }

        private readonly string _queueName;
        private MessageQueue _queue;
        private long _currentWorkerCount;
        private Semaphore _poolForConsumer;
        private AutoResetEvent _producerAutoResetEvent;
        private int _maxConcurrency;
        private Action<string> _logDelegate;
        private long _consumerIndex = 0;
        #endregion
    }

以下是测试代码,觉得好就表扬一下啊:)

    class Program : MessageQueueConcurrentHandlerBase<long>
    {
        public Program(string queueName, int maxConcurrency, Action<string> logDelegate)
            : base(queueName, maxConcurrency, logDelegate)
        {

        }

        protected override void MainProcess(long messageItem, string consumerName)
        {
            Thread.Sleep(5 * 1000);
            this.LogInfo(string.Format("{0}-MainProcess Over:\tmessageItem = {1}, CurrentWorkerCount = {2}", consumerName, messageItem, this.CurrentWorkerCount));
        }

        protected override string HandlerName { get { return "TestQueueHandler"; } }

        static void Main(string[] args)
        {
            string queuePath = CreateQueueIfNotExist("MyQueue");
            MessageQueue myQueue = new MessageQueue(queuePath);

            IMessageQueueHandler messageQueueHandler = new Program(queuePath, 3, s => { Console.WriteLine(s); });
            messageQueueHandler.StartRead();
            long x = 0;
            do
            {
                foreach (var s in Console.ReadLine().Split(",".ToCharArray()))
                {
                    if (long.TryParse(s.Trim(), out x))
                        myQueue.Send(x);
                }
            } while (x > 0);

            Console.ReadLine();
        }

        private static string CreateQueueIfNotExist(string queueName)
        {
            string queuePath = string.Format(@".\Private$\{0}", queueName);
            if (!MessageQueue.Exists(queuePath))
                MessageQueue.Create(queuePath);

            return queuePath;
        }
    }