ASP.NET Core 中使用 dapr:pub/sub 发送与订阅消息

2021年11月23日 阅读数:1
这篇文章主要向大家介绍ASP.NET Core 中使用 dapr:pub/sub 发送与订阅消息,主要内容包括基础应用、实用技巧、原理机制等方面,希望对大家有所帮助。

咱们决定从这周开始在实际开发中使用 dapr,先在 pub/sub 场景使用。这篇博文记录一下在 kubernetes 集群中基于 ASP.NET Core 使用 dapr 发送/订阅消息的试验过程。git

Dapr 环境准备

在 k8s 集群上部署好 daprgithub

# dapr status -k                
NAME                   NAMESPACE    HEALTHY  STATUS   REPLICAS  VERSION  AGE  CREATED              
dapr-placement-server  dapr-system  True     Running  3         1.5.0    7d   2021-11-13 11:22.53  
dapr-sentry            dapr-system  True     Running  3         1.5.0    7d   2021-11-13 10:51.45  
dapr-dashboard         dapr-system  True     Running  1         0.9.0    7d   2021-11-13 10:50.39  
dapr-operator          dapr-system  True     Running  3         1.5.0    7d   2021-11-13 10:51.10  
dapr-sidecar-injector  dapr-system  True     Running  3         1.5.0    7d   2021-11-13 10:50.40 

部署 pub/sub component ,这里用 redisweb

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
  namespace: production
spec:
  type: pubsub.redis
  version: v1
  metadata:
    - name: redisHost
      value: redis-master.production.svc.cluster.local:6379
    - name: redisPassword
      secretKeyRef:
        name: redis
        key: redis-password

给发布与订阅消息的应用添加 dapr 注解(k8s deployment),添加以后 dapr 会自动向 pod 中注入 sidecar。redis

spec:
  template:
    metadata:
      annotations:
        dapr.io/app-id: ing-web
        dapr.io/enabled: "true"
        dapr.io/app-port: "80"

注:dapr.io/app-port: "80" 必定不能少,默认端口不是80,开始没有加这个,形成订阅的消息老是收不到。api

应用A发送消息

安装 dapr .net sdk 的 nuget 包 Dapr.AspNetCorebash

dotnet add package

在 Startup 的 ConfigureServices 中添加 AddDaprClientapp

services.AddDaprClient();

注:这个项目只发消息,不订阅消息,因此不须要 AddDapr 与 Configure 的 MapSubscribeHandlercurl

在构造函数中注入 DaprClientide

public IngService(DaprClient daprClient)
{
}

用 PublishEventAsync 方法发消息到消息队列函数

await _daprClient.PublishEventAsync("pubsub", "newIng", ing);

在 redis 中检查消息是否发送成功

kubectl exec -it StatefulSet/redis-master -- redis-cli
127.0.0.1:6379> KEYS *
1) "newIng"
127.0.0.1:6379> TYPE newIng
stream

确认发送成功,消息是以 stream 类型保存在 redis 中的,key 名称就是 topic 名称。

应用B订阅消息

nuget 安装 Dapr.AspNetCore 并在 Startup 的 ConfigureServices 中添加 AddDapr

public void ConfigureServices(IServiceCollection services)
{
    services.AddMvc().AddDapr();
}

在 Configure 中添加 endpoints.MapSubscribeHandler()

app.UseEndpoints(endpoints =>
{
    endpoints.MapSubscribeHandler();
    // ...
});

实现订阅消息的 Controller,并在 Action 处添加 Topic 属性

[ApiController]
public class SubscriptionController : ControllerBase
{
    private readonly ILogger _logger;

    public SubscriptionController(ILogger<SubscriptionController> logger)
    {
        _logger = logger;
    }

    [Topic("pubsub", "newIng")]
    [HttpPost("/sub/newing")]
    public IActionResult NewIng(Ing ing)
    {
        _logger.LogInformation(
            "Received message: {Content} {DateAdded}",
            ing.Content,
            ing.DateAdded.ToString("yyyy-MM-dd HH:mm:ss"));

        return Ok();
    }
}

endpoints.MapSubscribeHandler 的做用是让 dapr 发现消息订阅者,dapr 收到消息后会向应用的 /dapr/subscribe 路径发请求,若是发现有对应消息的订阅者,会向订阅者的请求路径 POST 消息。

对于上面的示例代码,咱们能够进入应用的容器用 curl 命令验证一下

# curl localhost/dapr/subscribe   
[{"topic":"newIng","pubsubName":"pubsub","route":"sub/newing"}]

点火试验

应用A对应的是园子的闪存,我发了一条闪存“[dapr]此闪会经过 dapr 向消息队列发一条消息3”,随即对应的消息被发出,看看应用B的日志中是否记录了这条订阅消息。

Received message: [dapr]此闪会经过 dapr 向消息队列发一条消息3 2021-11-21 15:19:41

收到了,试验初步成功。

待解决问题

  • 很是奇怪的问题:闪存内容若是以中文开头,就收不到订阅消息,好比上面的闪存,若是去掉开头的[dapr] 就收不到消息。
  • 每次 /dapr/subscribe 被请求时,日志中都会出现下面的错误,对应实现代码在 DaprEndpointRouteBuilderExtensions

A default subscription to topic newIng on pubsub pubsub already exists.

  • 调用订阅者 Action 的鉴权问题,好比调用上面加了 Topic 属性 NewIng 方法,若是应用是暴露在公网上的,没有鉴权,就谁均可以调用。