一只会飞的旺旺
文章152
标签131
分类7
iot-dc3阅读笔记

iot-dc3阅读笔记

小卖铺上新啦!ChatGPT账号大甩卖! 一键直达

地址

pnoker/iot-dc3

DC3 是基于 Spring Cloud 的开源可分布式物联网 (IOT) 平台,用于快速开发, 部署物联设备接入项目,是一整套物联系统解决方案。

目录结构

.
├── dc3 资源文件,如sh,sql等
├── dc3-api gRpc定义的接口结构
├── dc3-center 平台中心模块
├── dc3-common 平台公共模块
├── dc3-driver 平台驱动模块
├── dc3-driver-sdk 平台驱动SDK模块
└── dc3-gateway 平台网关模块

平台中心模块

.
├── dc3-center-auth 授权模块,主要负责接口权限
├── dc3-center-data 数据模块,主要负责驱动数据处理
└── dc3-center-manager 管理模块

平台公共模块

.
├── dc3 git脚本
├── dc3-common-api api
├── dc3-common-auth 授权相关
├── dc3-common-constant 常量相关
├── dc3-common-exception 异常相关
├── dc3-common-influxdata influxDataDB相关
├── dc3-common-log 日志相关
├── dc3-common-model 模型相关
├── dc3-common-mongo mongoDB相关
├── dc3-common-mqtt mqtt相关
├── dc3-common-mysql 数据库相关
├── dc3-common-public 公共配置相关
├── dc3-common-quartz 定时任务
├── dc3-common-rabbitmq 消息队列相关
├── dc3-common-redis 缓存相关
├── dc3-common-thread 线程相关
└── dc3-common-web web服务配置

平台驱动模块

├── dc3-driver-dtu-yeecom Dtu驱动相关
├── dc3-driver-edge-gateway 边缘网关相关
├── dc3-driver-listening-virtual 虚拟网关相关
├── dc3-driver-lwm2m Lwm2m&Coap相关
├── dc3-driver-modbus-tcp modbusTcp相关
├── dc3-driver-mqtt mqtt相关
├── dc3-driver-opc-da opc-da相关
├── dc3-driver-opc-ua opc-ua相关
├── dc3-driver-plcs7 plcs7相关
├── dc3-driver-virtual 测试驱动相关
└── dc3-driver-weather-amap 高德地图天气相关

授权模块

授权中心模块(dc3-center-auth)负责管理平台的登录授权功能,包括:

  • 用户管理: 包括用户的增删改查,以及密码重置等操作。
  • 租户管理: 包括租户的增删改查,以及租户与用户绑定关系的管理。
  • IP 黑名单: 维护 IP 黑名单,限制非法 IP 的访问。
  • 令牌管理: 生成、校验和注销用户的 Token 令牌,实现基于 Token 的身份验证。

登录流程

  1. 获取租户信息: 网关从请求头中获取租户信息,并调用 TenantApi 查询租户是否存在且已启用。
  2. 获取用户信息: 网关从请求头中获取用户登录名称,并调用 UserLoginApi 查询用户是否存在且已启用。
  3. 校验 Token: 网关从请求头中获取 Token 信息,并调用 TokenApi 校验 Token 是否有效。
  4. 设置用户信息: 如果 Token 校验通过,网关会将用户信息设置到请求头中,并转发到下游服务。
// AuthenticGatewayFilterFactory.java
@Component
static class AuthenticGatewayFilter implements GatewayFilter {

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        ServerHttpRequest request = exchange.getRequest();

        try {
            // 1. 获取租户信息
            String tenantHeader = GatewayUtil.getRequestHeader(request, RequestConstant.Header.X_AUTH_TENANT);
            String tenant = DecodeUtil.byteToString(DecodeUtil.decode(tenantHeader));
            // ... 校验租户

            // 2. 获取用户信息
            String userHeader = GatewayUtil.getRequestHeader(request, RequestConstant.Header.X_AUTH_LOGIN);
            String user = DecodeUtil.byteToString(DecodeUtil.decode(userHeader));
            // ... 校验用户

            // 3. 校验 Token
            String tokenHeader = GatewayUtil.getRequestHeader(request, RequestConstant.Header.X_AUTH_TOKEN);
            // ... 解析 Token 信息
            // ... 校验 Token 是否有效

            // 4. 设置用户信息
            ServerHttpRequest build = request.mutate().headers(
                httpHeader -> {
                    // ... 设置用户信息到请求头
                }
            ).build();

            exchange.mutate().request(build).build();
        } catch (Exception e) {
            // ... 处理异常
        }

        return chain.filter(exchange);
    }
}

Token 生成

用户登录时,会调用 TokenController 的 generateToken 接口生成 Token,具体实现步骤如下:

  1. 生成随机盐值: 调用 TokenService 的 generateSalt 方法生成一个随机盐值,并将其缓存到 Redis 中,有效期为 5 分钟。
  2. 生成 Token: 调用 KeyUtil 的 generateToken 方法生成 Token,使用 JWT 算法,并将用户名、盐值和租户 ID 作为 payload,盐值作为密钥进行签名,有效期为 12 小时。
  3. 缓存 Token: 将生成的 Token 缓存到 Redis 中,Key 为 PrefixConstant.USER + SuffixConstant.TOKEN + SymbolConstant.DOUBLE_COLON + username + SymbolConstant.HASHTAG + tenant.getId(),有效期为 12 小时。
// TokenController.java
@Slf4j
@RestController
@RequestMapping(AuthServiceConstant.TOKEN_URL_PREFIX)
public class TokenController {

    @Resource
    private TokenService tokenService;

    @PostMapping("/generate")
    public R<String> generateToken(@Validated(Auth.class) @RequestBody Login login) {
        String token = tokenService.generateToken(login.getName(), login.getSalt(), login.getPassword(), login.getTenant());
        return ObjectUtil.isNotNull(token) ? R.ok(token, "The token will expire in 12 hours.") : R.fail();
    }
}

// TokenServiceImpl.java
@Slf4j
@Service
public class TokenServiceImpl implements TokenService {

    @Override
    public String generateToken(String username, String salt, String password, String tenantName) {
        // ... 校验用户信息

        String redisTokenKey = PrefixConstant.USER + SuffixConstant.TOKEN + SymbolConstant.DOUBLE_COLON + username + SymbolConstant.HASHTAG + tenant.getId();
        String token = KeyUtil.generateToken(username, redisSaltValue, tenant.getId());
        redisUtil.setKey(redisTokenKey, token, TimeoutConstant.TOKEN_CACHE_TIMEOUT, TimeUnit.HOURS);
        return token;
    }
}

Token校验

网关在收到请求时,会调用 TokenApi 的 checkTokenValid 接口校验 Token,具体实现步骤如下:

  1. 从 Redis 中获取 Token: 根据用户名和租户 ID 从 Redis 中获取 Token。
  2. 解析 Token: 调用 KeyUtil 的 parserToken 方法解析 Token,并校验签名是否正确。
  3. 判断 Token 是否过期: 如果 Token 未过期,返回 Token 的过期时间。
// TokenApi.java
@Slf4j
@GrpcService
public class TokenApi extends TokenApiGrpc.TokenApiImplBase {

    @Override
    public void checkTokenValid(LoginQuery request, StreamObserver<RTokenDTO> responseObserver) {
        // ...
        TokenValid select = tokenService.checkTokenValid(request.getName(), request.getSalt(), request.getToken(), request.getTenant());
        if (ObjectUtil.isNull(select)) {
            // ...
        } else if (!select.isValid()) {
            // ...
        } else {
            String expireTime = TimeUtil.completeFormat(select.getExpireTime());
            // ...
        }

        // ...
    }
}

// TokenServiceImpl.java
@Slf4j
@Service
public class TokenServiceImpl implements TokenService {

    @Override
    public TokenValid checkTokenValid(String username, String salt, String token, String tenantName) {
        // ...
        String redisKey = PrefixConstant.USER + SuffixConstant.TOKEN + SymbolConstant.DOUBLE_COLON + username + SymbolConstant.HASHTAG + tenant.getId();
        String redisToken = redisUtil.getKey(redisKey);
        // ...
        try {
            Claims claims = KeyUtil.parserToken(username, salt, token, tenant.getId());
            return new TokenValid(true, claims.getExpiration());
        } catch (Exception e) {
            return new TokenValid(false, null);
        }
    }
}

数据采集

我们以 MQTT 驱动为例,讲解一下数据从采集到入库再到展示的完整流程。

image-20240702151346536

驱动程序配置

MQTT 驱动程序(dc3-driver-mqtt)需要在配置文件中配置以下信息:

  • MQTT Broker 地址: 用于连接 MQTT Broker。
  • 驱动属性: 定义驱动程序自身的属性,例如连接超时时间、重连次数等。
  • 位号属性: 定义驱动程序支持的位号属性,例如读写权限、数据类型、单位等。
  • 订阅主题: 用于接收设备上报数据的主题。
  • 发布主题: 用于向设备发送指令的主题。

接收数据

MQTT 驱动程序通过订阅主题接收设备上报的数据,具体实现步骤如下:

  1. 连接 MQTT Broker: 驱动程序启动时,会连接到配置的 MQTT Broker。
  2. 订阅主题: 连接成功后,驱动程序会订阅配置文件中配置的主题。
  3. 接收消息: 当设备向订阅主题发布消息时,驱动程序会接收到消息,并将其解析成 PointValue 对象。
  4. 发送数据: 调用 DriverSenderService 的 pointValueSender 方法将 PointValue 对象发送到数据中心模块。
// MqttReceiveHandler.java
@Slf4j
@Component
public class MqttReceiveHandler {

    // ...

    @Bean
    @ServiceActivator(inputChannel = "mqttInboundChannel")
    public MessageHandler mqttInboundReceive() {
        return message -> {
            // ...
            
            MessageHeader messageHeader = new MessageHeader(message.getHeaders());
            String payload = message.getPayload().toString();
            // ... 校验消息

            MqttScheduleJob.messageCount.getAndIncrement();
            MqttMessage mqttMessage = new MqttMessage(messageHeader, payload);
            // ...

            // ... 根据数据量判断是否批量处理

            threadPoolExecutor.execute(() -> mqttReceiveService.receiveValue(mqttMessage));
            // ...
        };
    }
}

// MqttReceiveServiceImpl.java
@Slf4j
@Service
public class MqttReceiveServiceImpl implements MqttReceiveService {

    @Override
    public void receiveValue(MqttMessage mqttMessage) {
        // do something to process your mqtt messages
        log.info(JsonUtil.toPrettyJsonString(mqttMessage));
        PointValue pointValue = JsonUtil.parseObject(mqttMessage.getPayload(), PointValue.class);
        pointValue.setOriginTime(new Date());
        driverSenderService.pointValueSender(pointValue);
    }

    // ...
}

数据入库

数据中心模块(dc3-center-data)需要在配置文件中配置数据存储策略,例如:

  • 存储方式: 支持 Redis、MongoDB、InfluxDB、TDengine、OpenTSDB、Elasticsearch 等多种存储方式。
  • 批量处理速度: 当数据采集速度超过该值时,启用批量处理机制。
  • 批量处理间隔: 批量处理的时间间隔。
// PointValueReceiver.java
@Slf4j
@Component
public class PointValueReceiver {

    // ...

    @RabbitHandler
    @RabbitListener(queues = "#{pointValueQueue.name}")
    public void pointValueReceive(Channel channel, Message message, PointValue pointValue) {
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
            // ... 校验数据

            PointValueScheduleJob.valueCount.getAndIncrement();
            log.debug("Point value, From: {}, Received: {}", message.getMessageProperties().getReceivedRoutingKey(), pointValue);

            // ... 根据数据量判断是否批量处理

            threadPoolExecutor.execute(() -> pointValueService.savePointValue(pointValue));
            // ...
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }
}

// PointValueServiceImpl.java
@Slf4j
@Service
public class PointValueServiceImpl implements PointValueService {

    // ...

    @Override
    public void savePointValue(PointValue pointValue) {
        // ... 校验数据

        pointValue.setCreateTime(new Date());
        repositoryHandleService.save(pointValue);
    }

    // ...
}

// RepositoryHandleServiceImpl.java
@Slf4j
@Service
public class RepositoryHandleServiceImpl implements RepositoryHandleService {

    // ...

    @Override
    public void save(PointValue pointValue) {
        // 保存单个数据到 Redis & Mongo
        savePointValueToRepository(pointValue, redisRepositoryService, mongoRepositoryService);

        // ... 根据配置选择存储服务

    }

    // ...
}

数据展示

用户可以通过平台提供的 API 接口查询设备数据,具体实现步骤如下:

  1. 发送请求: 用户通过平台 API 接口发送数据查询请求。
  2. 接收请求: 数据中心模块接收到请求后,解析请求参数,例如设备 ID、位号 ID、时间范围等。
  3. 查询数据: 根据请求参数,从数据库中查询数据。
  4. 返回数据: 将查询到的数据返回给用户。
// PointValueController.java
@Slf4j
@RestController
@RequestMapping(DataServiceConstant.VALUE_URL_PREFIX)
public class PointValueController {

    // ...

    @PostMapping("/latest")
    public R<Page<PointValue>> latest(@RequestBody PointValuePageQuery pointValuePageQuery) {
        // ...
        Page<PointValue> page = pointValueService.latest(pointValuePageQuery);
        // ...
    }

    // ...
}

// PointValueServiceImpl.java
@Slf4j
@Service
public class PointValueServiceImpl implements PointValueService {

    // ...

    @Override
    public Page<PointValue> latest(PointValuePageQuery pageQuery) {
        // ...
        // 查询实时数据
        List<PointValue> pointValues = realtime(pageQuery.getDeviceId(), pointIds);
        if (CollUtil.isEmpty(pointValues)) {
            // 查询最近数据
            pointValues = latest(pageQuery.getDeviceId(), pointIds);
        }
        // ...

        // ... 查询历史数据
    }

    // ...
}

数据可视化

平台可以提供数据可视化功能,将查询到的数据以图表等形式展示给用户,例如:

  • 实时曲线: 展示位号值的实时变化趋势。
  • 历史曲线: 展示位号值在一段时间内的变化趋势。
  • 报表: 以表格形式展示位号值。

    状态展示

image-20240702154506368

图解说明:

  1. 驱动程序定时发送设备状态事件消息: 驱动程序会定期执行定时任务,读取设备的实时状态,并将其封装成 DeviceEvent 对象,通过 RabbitMQ 的事件交换机发送到数据中心。
  2. 数据中心接收并处理设备状态事件消息: 数据中心模块订阅了事件交换机的消息,接收到设备状态事件消息后,会将设备状态信息写入 Redis,Key 为 device_status:{deviceId},并设置过期时间。
  3. 前端页面请求设备状态: 前端页面通过 AJAX 等方式请求数据中心的 API 接口,获取设备的实时状态信息。
  4. 数据中心从 Redis 中读取并返回设备状态: 数据中心模块接收到前端请求后,从 Redis 中读取设备状态信息,并将其返回给前端页面。

推送机制

  • 驱动程序定期读取设备状态,并通过 RabbitMQ 推送到数据中心。
  • 数据中心接收到状态信息后,更新 Redis 中的缓存数据。

拉取机制

  • 前端页面定期轮询数据中心 API 接口,获取最新的设备状态信息。

核心代码

1.驱动程序定时发送设备状态

// DriverCustomServiceImpl.java
@Slf4j
@Service
public class DriverCustomServiceImpl implements DriverCustomService {

    // ...

    @Override
    public void schedule() {
        // 定时任务逻辑,例如每隔 10 秒执行一次
        driverContext.getDriverMetadata().getDeviceMap().keySet()
                .forEach(id -> driverSenderService.deviceEventSender(new DeviceEvent(id, EventConstant.Device.STATUS, DeviceStatusEnum.ONLINE, 25, TimeUnit.SECONDS)));
    }

    // ...
}

2.数据中心接收和处理设备状态事件

// DeviceEventReceiver.java
@Slf4j
@Component
public class DeviceEventReceiver {

    // ...

    @RabbitHandler
    @RabbitListener(queues = "#{deviceEventQueue.name}")
    public void deviceEventReceive(Channel channel, Message message, DeviceEvent deviceEvent) {
        // ...

        switch (deviceEvent.getType()) {
            case EventConstant.Device.STATUS:
                redisUtil.setKey(
                        PrefixConstant.DEVICE_STATUS_KEY_PREFIX + deviceEvent.getDeviceId(),
                        deviceEvent.getContent(),
                        deviceEvent.getTimeOut(),
                        deviceEvent.getTimeUnit()
                );
                break;
            // ...
        }

        // ...
    }
}

3.前端请求设备状态

// 前端代码示例,使用 AJAX 请求数据
setInterval(function() {
  $.ajax({
    url: '/data/device/status/device',
    type: 'GET',
    success: function(data) {
      // 更新页面上的设备状态信息
      // ...
    }
  });
}, 1000); // 每隔 1 秒请求一次

数据展示

image-20240702154521066

图解说明:

  1. 驱动程序读取设备数据: 驱动程序会定期执行定时任务,读取设备的实时数据,并将其封装成 PointValue 对象。
  2. 驱动程序发送设备数据消息: 驱动程序将 PointValue 对象通过 RabbitMQ 的数据交换机发送到数据中心。
  3. 数据中心接收并处理设备数据消息: 数据中心模块订阅了数据交换机的消息,接收到设备数据消息后,会进行以下操作:
    • 将 PointValue 对象写入 Redis,Key 为 realtime_value:{deviceId}.{pointId},用于实时数据展示。
    • 将 PointValue 对象写入 MongoDB,用于存储历史数据。
  4. 前端页面请求设备实时数据: 前端页面通过 AJAX 等方式请求数据中心的 API 接口,获取设备的实时数据。
  5. 数据中心从 Redis 中读取并返回设备实时数据: 数据中心模块接收到前端请求后,从 Redis 中读取设备实时数据,并将其返回给前端页面。

推送机制

  • 驱动程序定期读取设备数据,并通过 RabbitMQ 推送到数据中心。
  • 数据中心接收到数据后,更新 Redis 和 MongoDB 中的缓存数据。

拉取机制

  • 前端页面定期轮询数据中心 API 接口,获取最新的设备数据。

核心代码

1.驱动程序读取并发送设备数据

// DriverReadScheduleJob.java
@Slf4j
@Component
public class DriverReadScheduleJob extends QuartzJobBean {
    
    // ...

    @Override
    protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        // ... 遍历设备和位号

        threadPoolExecutor.execute(() -> driverCommandService.read(device.getId(), pointId));

        // ...
    }
}

// DriverCommandServiceImpl.java
@Slf4j
@Service
public class DriverCommandServiceImpl implements DriverCommandService {

    // ...

    @Override
    public PointValue read(String deviceId, String pointId) {
        // ... 获取设备和位号信息

        try {
            // 调用驱动自定义 read 方法读取数据
            String rawValue = driverCustomService.read(
                // ...
            );

            // ... 校验数据

            PointValue pointValue = new PointValue(deviceId, pointId, rawValue, ConvertUtil.convertValue(point, rawValue));
            driverSenderService.pointValueSender(pointValue);
            return pointValue;
        } catch (Exception e) {
            // ... 处理异常
        }
    }
    
    // ...
}

2.数据中心接收和处理设备数据

// PointValueReceiver.java
@Slf4j
@Component
public class PointValueReceiver {

    // ...

    @RabbitHandler
    @RabbitListener(queues = "#{pointValueQueue.name}")
    public void pointValueReceive(Channel channel, Message message, PointValue pointValue) {
        // ...

        // ... 根据数据量判断是否批量处理

        threadPoolExecutor.execute(() -> pointValueService.savePointValue(pointValue));

        // ...
    }
}

// PointValueServiceImpl.java
@Slf4j
@Service
public class PointValueServiceImpl implements PointValueService {

    // ...

    @Override
    public void savePointValue(PointValue pointValue) {
        // ...
        pointValue.setCreateTime(new Date());
        repositoryHandleService.save(pointValue);
    }

    // ...
}

// RepositoryHandleServiceImpl.java
@Slf4j
@Service
public class RepositoryHandleServiceImpl implements RepositoryHandleService {

    // ...

    @Override
    public void save(PointValue pointValue) {
        // 保存单个数据到 Redis & Mongo
        savePointValueToRepository(pointValue, redisRepositoryService, mongoRepositoryService);

        // ... 根据配置选择其他存储服务
    }

    // ...
}

3.前端请求示例

// 前端代码示例,使用 AJAX 请求数据
setInterval(function() {
  $.ajax({
    url: '/data/point_value/latest', // 假设 API 接口为 /data/point_value/latest
    type: 'POST',
    data: JSON.stringify({ deviceId: 'device1' }), // 假设需要查询 deviceId 为 device1 的设备数据
    success: function(data) {
      // 更新页面上的设备数据
      // ...
    }
  });
}, 1000); // 每隔 1 秒请求一次

指令下置

image-20240702160343549

图解说明:

  1. 用户发送指令下置请求: 用户通过平台 API 接口发送指令下置请求,例如控制设备开关、设置参数等。
  2. 平台网关查询驱动信息: 平台网关接收到请求后,调用设备管理中心 API 接口,查询该设备绑定的驱动程序信息,例如驱动程序服务名。
  3. 平台网关将指令下发到 RabbitMQ: 平台网关将指令内容封装成 DeviceCommandDTO 对象,并将其发送到 RabbitMQ 的指令交换机,RoutingKey 为 dc3.r.command.device.{driverServiceName},例如 dc3.r.command.device.dc3-driver-modbus-tcp。
  4. 驱动程序接收指令消息: 驱动程序订阅了指令交换机上的对应主题,接收到指令消息后,解析指令内容,例如设备 ID、位号 ID、指令类型、指令参数等。
  5. 驱动程序将指令发送到设备: 驱动程序根据解析出的指令内容,通过对应的驱动协议将指令发送到设备。

核心代码

1.平台网关发送指令

// PointValueCommandController.java
@Slf4j
@RestController
@RequestMapping(DataServiceConstant.VALUE_COMMAND_URL_PREFIX)
public class PointValueCommandController {

    // ...

    @PostMapping("/write")
    public R<Boolean> write(@Validated @RequestBody PointValueWriteVO entityVO) {
        // ...

        DeviceCommandDTO.DeviceWrite deviceWrite = new DeviceCommandDTO.DeviceWrite(entityVO.getDeviceId(), entityVO.getPointId(), entityVO.getValue());
        DeviceCommandDTO deviceCommandDTO = new DeviceCommandDTO(DeviceCommandTypeEnum.WRITE, JsonUtil.toJsonString(deviceWrite));
        rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_EXCHANGE_COMMAND, RabbitConstant.ROUTING_DEVICE_COMMAND_PREFIX + rDriverDTO.getData().getServiceName(), deviceCommandDTO);

        // ...
    }

    // ...
}

2.驱动程序接收指令

// DeviceCommandReceiver.java
@Slf4j
@Component
public class DeviceCommandReceiver {

    // ...

    @RabbitHandler
    @RabbitListener(queues = "#{deviceCommandQueue.name}")
    public void deviceCommandReceive(Channel channel, Message message, DeviceCommandDTO entityDTO) {
        // ...

        switch (entityDTO.getType()) {
            case WRITE:
                driverCommandService.write(entityDTO);
                break;
            // ...
        }

        // ...
    }
}

// DriverCommandServiceImpl.java
@Slf4j
@Service
public class DriverCommandServiceImpl implements DriverCommandService {

    // ...

    @Override
    public void write(DeviceCommandDTO commandDTO) {
        // ... 解析指令内容

        log.info("Start command of write: {}", JsonUtil.toPrettyJsonString(commandDTO));
        Boolean write = write(deviceWrite.getDeviceId(), deviceWrite.getPointId(), deviceWrite.getValue()); // 调用驱动程序自定义的 write 方法
        log.info("End command of write: write {}", write);
    }

    // ...
}

MQ订阅设计

项目采用了一种层次化的主题命名规范,主要包含以下几个部分:

部分 说明 示例
应用标识 用于区分不同的应用,通常为 dc3 dc3
消息类型 用于区分不同的消息类型,例如事件、元数据、指令、数据 e (事件), m (元数据), c (指令), v (数据)
操作方向 用于区分消息的发送方和接收方,例如驱动、设备、平台 d (驱动), p (平台)
模块类型 用于区分不同的模块,例如驱动、设备 driver, device
服务名称 用于区分不同的服务,通常为驱动程序或设备的唯一标识 dc3-driver-mqtt
其他信息 可选,用于进一步区分消息,例如设备 ID、位号 ID 等 {deviceId}, {pointId}

例如,驱动程序向数据中心发送设备状态事件消息,主题可以命名为:dc3/e/d/device/{driverServiceName},其中:

  • dc3: 应用标识
  • e: 事件消息类型
  • d: 驱动程序发送方向
  • device: 模块类型
  • {driverServiceName}: 驱动程序服务名称

项目中,不同的模块会根据其功能和职责订阅不同的主题,以实现高效的消息传递和处理。

image-20240702161250757

驱动程序:

  • 同步主题: 订阅 dc3/sync/d/{driverClient} 主题,接收平台下发的同步指令。
  • 元数据主题: 订阅 dc3/m/p/driver/{driverService} 主题,接收平台下发的元数据更新指令。
  • 指令主题: 订阅 dc3/c/p/driver/{driverService} 和 dc3/c/p/device/{driverService} 主题,接收平台下发的指令。

数据中心:

  • 事件主题: 订阅 dc3/e/d/* 主题,接收驱动程序上报的驱动和设备事件消息。
  • 数据主题: 订阅 dc3/v/d/* 主题,接收驱动程序上报的设备数据消息。

平台网关:

  • 指令主题: 根据设备绑定的驱动程序,将指令发布到 dc3/c/p/driver/{driverServiceName} 或 dc3/c/p/device/{driverServiceName} 主题。

Modbus实现

image-20240702163901323

微信支付码 微信支付
支付宝支付码 支付宝支付