惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

酷 壳 – CoolShell
酷 壳 – CoolShell
H
Hacker News: Front Page
P
Palo Alto Networks Blog
T
ThreatConnect
Apple Machine Learning Research
Apple Machine Learning Research
博客园_首页
T
True Tiger Recordings
P
Privacy & Cybersecurity Law Blog
B
Blog
IT之家
IT之家
Last Week in AI
Last Week in AI
F
Full Disclosure
Hacker News: Ask HN
Hacker News: Ask HN
C
Comments on: Blog
Microsoft Azure Blog
Microsoft Azure Blog
C
Cybersecurity and Infrastructure Security Agency CISA
Microsoft Security Blog
Microsoft Security Blog
博客园 - 【当耐特】
N
News and Events Feed by Topic
NISL@THU
NISL@THU
腾讯CDC
雷峰网
雷峰网
Security Latest
Security Latest
李成银的技术随笔
M
Microsoft Research Blog - Microsoft Research
L
LangChain Blog
L
Lohrmann on Cybersecurity
cs.CL updates on arXiv.org
cs.CL updates on arXiv.org
C
Check Point Blog
Y
Y Combinator Blog
Recent Announcements
Recent Announcements
博客园 - Franky
N
News | PayPal Newsroom
V
V2EX
A
About on SuperTechFans
The Register - Security
The Register - Security
月光博客
月光博客
奇客Solidot–传递最新科技情报
奇客Solidot–传递最新科技情报
Google Online Security Blog
Google Online Security Blog
MyScale Blog
MyScale Blog
Cisco Talos Blog
Cisco Talos Blog
Vercel News
Vercel News
WordPress大学
WordPress大学
C
Cyber Attacks, Cyber Crime and Cyber Security
The Hacker News
The Hacker News
IntelliJ IDEA : IntelliJ IDEA – the Leading IDE for Professional Development in Java and Kotlin | The JetBrains Blog
IntelliJ IDEA : IntelliJ IDEA – the Leading IDE for Professional Development in Java and Kotlin | The JetBrains Blog
爱范儿
爱范儿
A
Arctic Wolf
L
LINUX DO - 最新话题
freeCodeCamp Programming Tutorials: Python, JavaScript, Git & More

博客园 - 李国宝

2206年最佳边缘计算集群方案:Tailscale + k3s 谁不想要自己的Tailscale内网呢~ 腾讯云API网关废了?集群开源方案平替 来一打自建IP Proxy玩玩之Majora Github Copilot 比在座各位更会写代码。jpg .NET Core + React Antd Pro脚手架 【爬虫系列】2. 打开App逆向“潘多拉魔盒” Ratel:一直站在Android逆向巅峰的平头哥 【爬虫系列】1. 无事,Python验证码识别入门 【爬虫系列】0. 无内鬼,破解前端JS参数签名 利用容器逃逸实现远程登录k8s集群节点 边缘计算k8s集群SuperEdge初体验 能动手绝不多说:开源评论系统remark42上手指南 一次依赖注入不慎引发的一连串事故 反手来个K8S入门到跑路 MySQL Online DDL导致全局锁表案例分析 .NET Core教程--给API加一个服务端缓存啦 .NET Core中使用RabbitMQ正确方式 - 李国宝 - 博客园 .NET Core单元测试之搞死开发的覆盖率统计(coverlet + ReportGenerator )
任务队列和异步接口的正确打开方式(.NET Core版本)
李国宝 · 2019-01-13 · via 博客园 - 李国宝

任务队列和异步接口的正确打开方式

什么是异步接口?

Asynchronous Operations

Certain types of operations might require processing of the request in an asynchronous manner (e.g. validating a bank account, processing an image, etc.) in order to avoid long delays on the client side and prevent long-standing open client connections waiting for the operations to complete. For such use cases, APIs MUST employ the following pattern:

For POST requests:

  • Return the 202 Accepted HTTP response code.

  • In the response body, include one or more URIs as hypermedia links, which could include:

    • The final URI of the resource where it will be available in future if the ID and path are already known. Clients can then make an HTTP GET request to that URI in order to obtain the completed resource. Until the resource is ready, the final URI SHOULD return the HTTP status code 404 Not Found.

    { "rel": "self", "href": "/v1/namespace/resources/{resource_id}", "method": "GET" }

    • A temporary request queue URI where the status of the operation may be obtained via some temporary identifier. Clients SHOULD make an HTTP GET request to obtain the status of the operation which MAY include such information as completion state, ETA, and final URI once it is completed.

    { "rel": "self", "href": "/v1/queue/requests/{request_id}, "method": "GET" }"

For PUT/PATCH/DELETE/GET requests:

Like POST, you can support PUT/PATCH/DELETE/GET to be asynchronous. The behaviour would be as follows:

  • Return the 202 Accepted HTTP response code.

  • In the response body, include one or more URIs as hypermedia links, which could include:

    • A temporary request queue URI where the status of the operation may be obtained via some temporary identifier. Clients SHOULD make an HTTP GET request to obtain the status of the operation which MAY include such information as completion state, ETA, and final URI once it is completed.

    { "rel": "self", "href": "/v1/queue/requests/{request_id}, "method": "GET" }"

APIs that support both synchronous and asynchronous processing for an URI:

APIs that support both synchronous and asynchronous operations for a particular URI and an HTTP method combination, MUST recognize the Prefer header and exhibit following behavior:

  • If the request contains a Prefer=respond-async header, the service MUST switch the processing to asynchronous mode.
  • If the request doesn't contain a Prefer=respond-async header, the service MUST process the request synchronously.

It is desirable that all APIs that implement asynchronous processing, also support webhooks as a mechanism of pushing the processing status to the client.

资料引自:paypal/API Design Patterns And Use Cases:asynchronous-operations

用人话来说

  • 简单来说就是请求过来,直接返回对应的resourceId/request_id,然后可以通过resourceId/request_id查询处理结果

  • 处理过程可能是队列,也可能直接是异步操作

  • 如果还没完成处理,返回404,如果处理完成,正常返回对应数据

好像也没什么讲了....

全文结束吧.

样例代码部分啦

实现逻辑

  • 创建任务,生成"request-id"存储到对应redis zset队列中

  • 同时往redis channel发出任务消息, 后台任务处理服务自行处理此消息(生产者-消费者模式)

  • 任务处理服务处理完消息之后,将处理结果写入redis,request-id为key,结果为value,然后从从redis zset从移除对应的"request-id"

  • 获取request-id处理结果时:如果request-id能查询到对应的任务处理结果,直接返回处理完的数据; 如果request-id还在sortset队列则直接返回404 + 对应的位置n,表示还在处理中,前面还有n个请求;

时序图大概长这样:

喜闻乐见代码时间

RequestService.cs

// RequestService.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using CorrelationId;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
using StackExchange.Redis;
using static StackExchange.Redis.RedisChannel;

namespace MTQueue.Service
{
    public class RequestService
    {


        private readonly ICorrelationContextAccessor _correlationContext;

        private readonly ConnectionMultiplexer _redisMultiplexer;

        private readonly IServiceProvider _services;

        private readonly ILogger<RequestService> _logger;

        public RequestService(ICorrelationContextAccessor correlationContext,
        ConnectionMultiplexer redisMultiplexer, IServiceProvider services,
        ILogger<RequestService> logger)
        {
            _correlationContext = correlationContext;
            _redisMultiplexer = redisMultiplexer;
            _services = services;
            _logger = logger;
        }

        public long? AddRequest(JToken data)
        {
            var requestId = _correlationContext.CorrelationContext.CorrelationId;
            var redisDB = _redisMultiplexer.GetDatabase(CommonConst.DEFAULT_DB);
            var index = redisDB.SortedSetRank(CommonConst.REQUESTS_SORT_SETKEY, requestId);
            if (index == null)
            {
                data["requestId"] = requestId;
                redisDB.SortedSetAdd(CommonConst.REQUESTS_SORT_SETKEY, requestId, GetTotalSeconds());
                PushRedisMessage(data.ToString());
            }
            return redisDB.SortedSetRank(CommonConst.REQUESTS_SORT_SETKEY, requestId);
        }

        public static long GetTotalSeconds()
        {
            return (long)(DateTime.Now.ToLocalTime() - new DateTime(1970, 1, 1).ToLocalTime()).TotalSeconds;
        }

        private void PushRedisMessage(string message)
        {
            Task.Run(() =>
            {
                try
                {
                    using (var scope = _services.CreateScope())
                    {
                        var multiplexer = scope.ServiceProvider.GetRequiredService<ConnectionMultiplexer>();
                        multiplexer.GetSubscriber().PublishAsync(CommonConst.REQUEST_CHANNEL, message);
                    }
                }
                catch (Exception ex)
                {
                    _logger.LogError(-1, ex, message);
                }
            });
        }

        public Tuple<JToken, long?> GetRequest(string requestId)
        {
            var redisDB = _redisMultiplexer.GetDatabase(CommonConst.DEFAULT_DB);
            var keyIndex = redisDB.SortedSetRank(CommonConst.REQUESTS_SORT_SETKEY, requestId);
            var response = redisDB.StringGet(requestId);
            if (response.IsNull)
            {
                return Tuple.Create<JToken, long?>(default(JToken), keyIndex);
            }
            return Tuple.Create<JToken, long?>(JToken.Parse(response), keyIndex);
        }

    }
}

// RedisMQListener.cs

using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MTQueue.Model;
using MTQueue.Service;
using Newtonsoft.Json.Linq;
using StackExchange.Redis;
using static StackExchange.Redis.RedisChannel;

namespace MTQueue.Listener
{
    public class RedisMQListener : IHostedService
    {
        private readonly ConnectionMultiplexer _redisMultiplexer;

        private readonly IServiceProvider _services;

        private readonly ILogger<RedisMQListener> _logger;

        public RedisMQListener(IServiceProvider services, ConnectionMultiplexer redisMultiplexer,
        ILogger<RedisMQListener> logger)
        {
            _services = services;
            _redisMultiplexer = redisMultiplexer;
            _logger = logger;
        }

        public Task StartAsync(CancellationToken cancellationToken)
        {
            Register();
            return Task.CompletedTask;
        }


        public virtual bool Process(RedisChannel ch, RedisValue message)
        {
            _logger.LogInformation("Process start,message: " + message);
            var redisDB = _services.GetRequiredService<ConnectionMultiplexer>()
            .GetDatabase(CommonConst.DEFAULT_DB);
            var messageJson = JToken.Parse(message);
            var requestId = messageJson["requestId"]?.ToString();
            if (string.IsNullOrEmpty(requestId))
            {
                _logger.LogWarning("requestId not in message.");
                return false;
            }
            var mtAgent = _services.GetRequiredService<ZhihuClient>();
            var text = mtAgent.GetZhuanlan(messageJson);
            redisDB.StringSet(requestId, text.ToString(), CommonConst.RESPONSE_TS);
            _logger.LogInformation("Process finish,requestId:" + requestId);
            redisDB.SortedSetRemove(CommonConst.REQUESTS_SORT_SETKEY, requestId);
            return true;
        }


        public void Register()
        {
            var sub = _redisMultiplexer.GetSubscriber();
            var channel = CommonConst.REQUEST_CHANNEL;
            sub.SubscribeAsync(channel, (ch, value) =>
            {
                Process(ch, value);
            });
        }

        public void DeRegister()
        {
            // this.connection.Close();
        }


        public Task StopAsync(CancellationToken cancellationToken)
        {
            // this.connection.Close();
            return Task.CompletedTask;
        }
    }

}


// RequestsController.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using CorrelationId;
using Microsoft.AspNetCore.Mvc;
using MTQueue.Service;
using Newtonsoft.Json.Linq;

namespace MTQueue.Controllers
{
    [Route("v1/[controller]")]
    [ApiController]
    public class RequestsController : ControllerBase
    {

        private readonly ICorrelationContextAccessor _correlationContext;

        private readonly RequestService _requestService;

        private readonly ZhihuClient _mtAgentClient;

        public RequestsController(ICorrelationContextAccessor correlationContext,
         RequestService requestService, ZhihuClient mtAgentClient)
        {
            _correlationContext = correlationContext;
            _requestService = requestService;
            _mtAgentClient = mtAgentClient;
        }



        [HttpGet("{requestId}")]
        public IActionResult Get(string requestId)
        {
            var result = _requestService.GetRequest(requestId);
            var resource = $"/v1/requests/{requestId}";
            if (result.Item1 == default(JToken))
            {
                return NotFound(new { rel = "self", href = resource, method = "GET", index = result.Item2 });
            }
            return Ok(result.Item1);
        }

        [HttpPost]
        public IActionResult Post([FromBody] JToken data, [FromHeader(Name = "Prefer")]string prefer)
        {
            if (!string.IsNullOrEmpty(prefer) && prefer == "respond-async")
            {
                var index = _requestService.AddRequest(data);
                var requestId = _correlationContext.CorrelationContext.CorrelationId;
                var resource = $"/v1/requests/{requestId}";
                return Accepted(resource, new { rel = "self", href = resource, method = "GET", index = index });
            }
            return Ok(_mtAgentClient.GetZhuanlan(data));
        }
    }
}

完整代码见:https://github.com/liguobao/TaskQueueSample