地址
DC3 是基于 Spring Cloud 的开源可分布式物联网 (IOT) 平台,用于快速开发, 部署物联设备接入项目,是一整套物联系统解决方案。
目录结构
.
├── dc3 资源文件,如sh,sql等
├── dc3-api gRpc定义的接口结构
├── dc3-center 平台中心模块
├── dc3-common 平台公共模块
├── dc3-driver 平台驱动模块
├── dc3-driver-sdk 平台驱动SDK模块
└── dc3-gateway 平台网关模块
平台中心模块
.
├── dc3-center-auth 授权模块,主要负责接口权限
├── dc3-center-data 数据模块,主要负责驱动数据处理
└── dc3-center-manager 管理模块
平台公共模块
.
├── dc3 git脚本
├── dc3-common-api api
├── dc3-common-auth 授权相关
├── dc3-common-constant 常量相关
├── dc3-common-exception 异常相关
├── dc3-common-influxdata influxDataDB相关
├── dc3-common-log 日志相关
├── dc3-common-model 模型相关
├── dc3-common-mongo mongoDB相关
├── dc3-common-mqtt mqtt相关
├── dc3-common-mysql 数据库相关
├── dc3-common-public 公共配置相关
├── dc3-common-quartz 定时任务
├── dc3-common-rabbitmq 消息队列相关
├── dc3-common-redis 缓存相关
├── dc3-common-thread 线程相关
└── dc3-common-web web服务配置
平台驱动模块
├── dc3-driver-dtu-yeecom Dtu驱动相关
├── dc3-driver-edge-gateway 边缘网关相关
├── dc3-driver-listening-virtual 虚拟网关相关
├── dc3-driver-lwm2m Lwm2m&Coap相关
├── dc3-driver-modbus-tcp modbusTcp相关
├── dc3-driver-mqtt mqtt相关
├── dc3-driver-opc-da opc-da相关
├── dc3-driver-opc-ua opc-ua相关
├── dc3-driver-plcs7 plcs7相关
├── dc3-driver-virtual 测试驱动相关
└── dc3-driver-weather-amap 高德地图天气相关
授权模块
授权中心模块(dc3-center-auth)负责管理平台的登录授权功能,包括:
- 用户管理: 包括用户的增删改查,以及密码重置等操作。
- 租户管理: 包括租户的增删改查,以及租户与用户绑定关系的管理。
- IP 黑名单: 维护 IP 黑名单,限制非法 IP 的访问。
- 令牌管理: 生成、校验和注销用户的 Token 令牌,实现基于 Token 的身份验证。
登录流程
- 获取租户信息: 网关从请求头中获取租户信息,并调用 TenantApi 查询租户是否存在且已启用。
- 获取用户信息: 网关从请求头中获取用户登录名称,并调用 UserLoginApi 查询用户是否存在且已启用。
- 校验 Token: 网关从请求头中获取 Token 信息,并调用 TokenApi 校验 Token 是否有效。
- 设置用户信息: 如果 Token 校验通过,网关会将用户信息设置到请求头中,并转发到下游服务。
// AuthenticGatewayFilterFactory.java
@Component
static class AuthenticGatewayFilter implements GatewayFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
try {
// 1. 获取租户信息
String tenantHeader = GatewayUtil.getRequestHeader(request, RequestConstant.Header.X_AUTH_TENANT);
String tenant = DecodeUtil.byteToString(DecodeUtil.decode(tenantHeader));
// ... 校验租户
// 2. 获取用户信息
String userHeader = GatewayUtil.getRequestHeader(request, RequestConstant.Header.X_AUTH_LOGIN);
String user = DecodeUtil.byteToString(DecodeUtil.decode(userHeader));
// ... 校验用户
// 3. 校验 Token
String tokenHeader = GatewayUtil.getRequestHeader(request, RequestConstant.Header.X_AUTH_TOKEN);
// ... 解析 Token 信息
// ... 校验 Token 是否有效
// 4. 设置用户信息
ServerHttpRequest build = request.mutate().headers(
httpHeader -> {
// ... 设置用户信息到请求头
}
).build();
exchange.mutate().request(build).build();
} catch (Exception e) {
// ... 处理异常
}
return chain.filter(exchange);
}
}
Token 生成
用户登录时,会调用 TokenController 的 generateToken 接口生成 Token,具体实现步骤如下:
- 生成随机盐值: 调用 TokenService 的 generateSalt 方法生成一个随机盐值,并将其缓存到 Redis 中,有效期为 5 分钟。
- 生成 Token: 调用 KeyUtil 的 generateToken 方法生成 Token,使用 JWT 算法,并将用户名、盐值和租户 ID 作为 payload,盐值作为密钥进行签名,有效期为 12 小时。
- 缓存 Token: 将生成的 Token 缓存到 Redis 中,Key 为 PrefixConstant.USER + SuffixConstant.TOKEN + SymbolConstant.DOUBLE_COLON + username + SymbolConstant.HASHTAG + tenant.getId(),有效期为 12 小时。
// TokenController.java
@Slf4j
@RestController
@RequestMapping(AuthServiceConstant.TOKEN_URL_PREFIX)
public class TokenController {
@Resource
private TokenService tokenService;
@PostMapping("/generate")
public R<String> generateToken(@Validated(Auth.class) @RequestBody Login login) {
String token = tokenService.generateToken(login.getName(), login.getSalt(), login.getPassword(), login.getTenant());
return ObjectUtil.isNotNull(token) ? R.ok(token, "The token will expire in 12 hours.") : R.fail();
}
}
// TokenServiceImpl.java
@Slf4j
@Service
public class TokenServiceImpl implements TokenService {
@Override
public String generateToken(String username, String salt, String password, String tenantName) {
// ... 校验用户信息
String redisTokenKey = PrefixConstant.USER + SuffixConstant.TOKEN + SymbolConstant.DOUBLE_COLON + username + SymbolConstant.HASHTAG + tenant.getId();
String token = KeyUtil.generateToken(username, redisSaltValue, tenant.getId());
redisUtil.setKey(redisTokenKey, token, TimeoutConstant.TOKEN_CACHE_TIMEOUT, TimeUnit.HOURS);
return token;
}
}
Token校验
网关在收到请求时,会调用 TokenApi 的 checkTokenValid 接口校验 Token,具体实现步骤如下:
- 从 Redis 中获取 Token: 根据用户名和租户 ID 从 Redis 中获取 Token。
- 解析 Token: 调用 KeyUtil 的 parserToken 方法解析 Token,并校验签名是否正确。
- 判断 Token 是否过期: 如果 Token 未过期,返回 Token 的过期时间。
// TokenApi.java
@Slf4j
@GrpcService
public class TokenApi extends TokenApiGrpc.TokenApiImplBase {
@Override
public void checkTokenValid(LoginQuery request, StreamObserver<RTokenDTO> responseObserver) {
// ...
TokenValid select = tokenService.checkTokenValid(request.getName(), request.getSalt(), request.getToken(), request.getTenant());
if (ObjectUtil.isNull(select)) {
// ...
} else if (!select.isValid()) {
// ...
} else {
String expireTime = TimeUtil.completeFormat(select.getExpireTime());
// ...
}
// ...
}
}
// TokenServiceImpl.java
@Slf4j
@Service
public class TokenServiceImpl implements TokenService {
@Override
public TokenValid checkTokenValid(String username, String salt, String token, String tenantName) {
// ...
String redisKey = PrefixConstant.USER + SuffixConstant.TOKEN + SymbolConstant.DOUBLE_COLON + username + SymbolConstant.HASHTAG + tenant.getId();
String redisToken = redisUtil.getKey(redisKey);
// ...
try {
Claims claims = KeyUtil.parserToken(username, salt, token, tenant.getId());
return new TokenValid(true, claims.getExpiration());
} catch (Exception e) {
return new TokenValid(false, null);
}
}
}
数据采集
我们以 MQTT 驱动为例,讲解一下数据从采集到入库再到展示的完整流程。
驱动程序配置
MQTT 驱动程序(dc3-driver-mqtt)需要在配置文件中配置以下信息:
- MQTT Broker 地址: 用于连接 MQTT Broker。
- 驱动属性: 定义驱动程序自身的属性,例如连接超时时间、重连次数等。
- 位号属性: 定义驱动程序支持的位号属性,例如读写权限、数据类型、单位等。
- 订阅主题: 用于接收设备上报数据的主题。
- 发布主题: 用于向设备发送指令的主题。
接收数据
MQTT 驱动程序通过订阅主题接收设备上报的数据,具体实现步骤如下:
- 连接 MQTT Broker: 驱动程序启动时,会连接到配置的 MQTT Broker。
- 订阅主题: 连接成功后,驱动程序会订阅配置文件中配置的主题。
- 接收消息: 当设备向订阅主题发布消息时,驱动程序会接收到消息,并将其解析成 PointValue 对象。
- 发送数据: 调用 DriverSenderService 的 pointValueSender 方法将 PointValue 对象发送到数据中心模块。
// MqttReceiveHandler.java
@Slf4j
@Component
public class MqttReceiveHandler {
// ...
@Bean
@ServiceActivator(inputChannel = "mqttInboundChannel")
public MessageHandler mqttInboundReceive() {
return message -> {
// ...
MessageHeader messageHeader = new MessageHeader(message.getHeaders());
String payload = message.getPayload().toString();
// ... 校验消息
MqttScheduleJob.messageCount.getAndIncrement();
MqttMessage mqttMessage = new MqttMessage(messageHeader, payload);
// ...
// ... 根据数据量判断是否批量处理
threadPoolExecutor.execute(() -> mqttReceiveService.receiveValue(mqttMessage));
// ...
};
}
}
// MqttReceiveServiceImpl.java
@Slf4j
@Service
public class MqttReceiveServiceImpl implements MqttReceiveService {
@Override
public void receiveValue(MqttMessage mqttMessage) {
// do something to process your mqtt messages
log.info(JsonUtil.toPrettyJsonString(mqttMessage));
PointValue pointValue = JsonUtil.parseObject(mqttMessage.getPayload(), PointValue.class);
pointValue.setOriginTime(new Date());
driverSenderService.pointValueSender(pointValue);
}
// ...
}
数据入库
数据中心模块(dc3-center-data)需要在配置文件中配置数据存储策略,例如:
- 存储方式: 支持 Redis、MongoDB、InfluxDB、TDengine、OpenTSDB、Elasticsearch 等多种存储方式。
- 批量处理速度: 当数据采集速度超过该值时,启用批量处理机制。
- 批量处理间隔: 批量处理的时间间隔。
// PointValueReceiver.java
@Slf4j
@Component
public class PointValueReceiver {
// ...
@RabbitHandler
@RabbitListener(queues = "#{pointValueQueue.name}")
public void pointValueReceive(Channel channel, Message message, PointValue pointValue) {
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
// ... 校验数据
PointValueScheduleJob.valueCount.getAndIncrement();
log.debug("Point value, From: {}, Received: {}", message.getMessageProperties().getReceivedRoutingKey(), pointValue);
// ... 根据数据量判断是否批量处理
threadPoolExecutor.execute(() -> pointValueService.savePointValue(pointValue));
// ...
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}
// PointValueServiceImpl.java
@Slf4j
@Service
public class PointValueServiceImpl implements PointValueService {
// ...
@Override
public void savePointValue(PointValue pointValue) {
// ... 校验数据
pointValue.setCreateTime(new Date());
repositoryHandleService.save(pointValue);
}
// ...
}
// RepositoryHandleServiceImpl.java
@Slf4j
@Service
public class RepositoryHandleServiceImpl implements RepositoryHandleService {
// ...
@Override
public void save(PointValue pointValue) {
// 保存单个数据到 Redis & Mongo
savePointValueToRepository(pointValue, redisRepositoryService, mongoRepositoryService);
// ... 根据配置选择存储服务
}
// ...
}
数据展示
用户可以通过平台提供的 API 接口查询设备数据,具体实现步骤如下:
- 发送请求: 用户通过平台 API 接口发送数据查询请求。
- 接收请求: 数据中心模块接收到请求后,解析请求参数,例如设备 ID、位号 ID、时间范围等。
- 查询数据: 根据请求参数,从数据库中查询数据。
- 返回数据: 将查询到的数据返回给用户。
// PointValueController.java
@Slf4j
@RestController
@RequestMapping(DataServiceConstant.VALUE_URL_PREFIX)
public class PointValueController {
// ...
@PostMapping("/latest")
public R<Page<PointValue>> latest(@RequestBody PointValuePageQuery pointValuePageQuery) {
// ...
Page<PointValue> page = pointValueService.latest(pointValuePageQuery);
// ...
}
// ...
}
// PointValueServiceImpl.java
@Slf4j
@Service
public class PointValueServiceImpl implements PointValueService {
// ...
@Override
public Page<PointValue> latest(PointValuePageQuery pageQuery) {
// ...
// 查询实时数据
List<PointValue> pointValues = realtime(pageQuery.getDeviceId(), pointIds);
if (CollUtil.isEmpty(pointValues)) {
// 查询最近数据
pointValues = latest(pageQuery.getDeviceId(), pointIds);
}
// ...
// ... 查询历史数据
}
// ...
}
数据可视化
平台可以提供数据可视化功能,将查询到的数据以图表等形式展示给用户,例如:
图解说明:
- 驱动程序定时发送设备状态事件消息: 驱动程序会定期执行定时任务,读取设备的实时状态,并将其封装成 DeviceEvent 对象,通过 RabbitMQ 的事件交换机发送到数据中心。
- 数据中心接收并处理设备状态事件消息: 数据中心模块订阅了事件交换机的消息,接收到设备状态事件消息后,会将设备状态信息写入 Redis,Key 为 device_status:{deviceId},并设置过期时间。
- 前端页面请求设备状态: 前端页面通过 AJAX 等方式请求数据中心的 API 接口,获取设备的实时状态信息。
- 数据中心从 Redis 中读取并返回设备状态: 数据中心模块接收到前端请求后,从 Redis 中读取设备状态信息,并将其返回给前端页面。
推送机制
- 驱动程序定期读取设备状态,并通过 RabbitMQ 推送到数据中心。
- 数据中心接收到状态信息后,更新 Redis 中的缓存数据。
拉取机制
- 前端页面定期轮询数据中心 API 接口,获取最新的设备状态信息。
核心代码
1.驱动程序定时发送设备状态
// DriverCustomServiceImpl.java
@Slf4j
@Service
public class DriverCustomServiceImpl implements DriverCustomService {
// ...
@Override
public void schedule() {
// 定时任务逻辑,例如每隔 10 秒执行一次
driverContext.getDriverMetadata().getDeviceMap().keySet()
.forEach(id -> driverSenderService.deviceEventSender(new DeviceEvent(id, EventConstant.Device.STATUS, DeviceStatusEnum.ONLINE, 25, TimeUnit.SECONDS)));
}
// ...
}
2.数据中心接收和处理设备状态事件
// DeviceEventReceiver.java
@Slf4j
@Component
public class DeviceEventReceiver {
// ...
@RabbitHandler
@RabbitListener(queues = "#{deviceEventQueue.name}")
public void deviceEventReceive(Channel channel, Message message, DeviceEvent deviceEvent) {
// ...
switch (deviceEvent.getType()) {
case EventConstant.Device.STATUS:
redisUtil.setKey(
PrefixConstant.DEVICE_STATUS_KEY_PREFIX + deviceEvent.getDeviceId(),
deviceEvent.getContent(),
deviceEvent.getTimeOut(),
deviceEvent.getTimeUnit()
);
break;
// ...
}
// ...
}
}
3.前端请求设备状态
// 前端代码示例,使用 AJAX 请求数据
setInterval(function() {
$.ajax({
url: '/data/device/status/device',
type: 'GET',
success: function(data) {
// 更新页面上的设备状态信息
// ...
}
});
}, 1000); // 每隔 1 秒请求一次
数据展示
图解说明:
- 驱动程序读取设备数据: 驱动程序会定期执行定时任务,读取设备的实时数据,并将其封装成 PointValue 对象。
- 驱动程序发送设备数据消息: 驱动程序将 PointValue 对象通过 RabbitMQ 的数据交换机发送到数据中心。
- 数据中心接收并处理设备数据消息: 数据中心模块订阅了数据交换机的消息,接收到设备数据消息后,会进行以下操作:
- 将 PointValue 对象写入 Redis,Key 为 realtime_value:{deviceId}.{pointId},用于实时数据展示。
- 将 PointValue 对象写入 MongoDB,用于存储历史数据。
- 前端页面请求设备实时数据: 前端页面通过 AJAX 等方式请求数据中心的 API 接口,获取设备的实时数据。
- 数据中心从 Redis 中读取并返回设备实时数据: 数据中心模块接收到前端请求后,从 Redis 中读取设备实时数据,并将其返回给前端页面。
推送机制
- 驱动程序定期读取设备数据,并通过 RabbitMQ 推送到数据中心。
- 数据中心接收到数据后,更新 Redis 和 MongoDB 中的缓存数据。
拉取机制
- 前端页面定期轮询数据中心 API 接口,获取最新的设备数据。
核心代码
1.驱动程序读取并发送设备数据
// DriverReadScheduleJob.java
@Slf4j
@Component
public class DriverReadScheduleJob extends QuartzJobBean {
// ...
@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
// ... 遍历设备和位号
threadPoolExecutor.execute(() -> driverCommandService.read(device.getId(), pointId));
// ...
}
}
// DriverCommandServiceImpl.java
@Slf4j
@Service
public class DriverCommandServiceImpl implements DriverCommandService {
// ...
@Override
public PointValue read(String deviceId, String pointId) {
// ... 获取设备和位号信息
try {
// 调用驱动自定义 read 方法读取数据
String rawValue = driverCustomService.read(
// ...
);
// ... 校验数据
PointValue pointValue = new PointValue(deviceId, pointId, rawValue, ConvertUtil.convertValue(point, rawValue));
driverSenderService.pointValueSender(pointValue);
return pointValue;
} catch (Exception e) {
// ... 处理异常
}
}
// ...
}
2.数据中心接收和处理设备数据
// PointValueReceiver.java
@Slf4j
@Component
public class PointValueReceiver {
// ...
@RabbitHandler
@RabbitListener(queues = "#{pointValueQueue.name}")
public void pointValueReceive(Channel channel, Message message, PointValue pointValue) {
// ...
// ... 根据数据量判断是否批量处理
threadPoolExecutor.execute(() -> pointValueService.savePointValue(pointValue));
// ...
}
}
// PointValueServiceImpl.java
@Slf4j
@Service
public class PointValueServiceImpl implements PointValueService {
// ...
@Override
public void savePointValue(PointValue pointValue) {
// ...
pointValue.setCreateTime(new Date());
repositoryHandleService.save(pointValue);
}
// ...
}
// RepositoryHandleServiceImpl.java
@Slf4j
@Service
public class RepositoryHandleServiceImpl implements RepositoryHandleService {
// ...
@Override
public void save(PointValue pointValue) {
// 保存单个数据到 Redis & Mongo
savePointValueToRepository(pointValue, redisRepositoryService, mongoRepositoryService);
// ... 根据配置选择其他存储服务
}
// ...
}
3.前端请求示例
// 前端代码示例,使用 AJAX 请求数据
setInterval(function() {
$.ajax({
url: '/data/point_value/latest', // 假设 API 接口为 /data/point_value/latest
type: 'POST',
data: JSON.stringify({ deviceId: 'device1' }), // 假设需要查询 deviceId 为 device1 的设备数据
success: function(data) {
// 更新页面上的设备数据
// ...
}
});
}, 1000); // 每隔 1 秒请求一次
指令下置
图解说明:
- 用户发送指令下置请求: 用户通过平台 API 接口发送指令下置请求,例如控制设备开关、设置参数等。
- 平台网关查询驱动信息: 平台网关接收到请求后,调用设备管理中心 API 接口,查询该设备绑定的驱动程序信息,例如驱动程序服务名。
- 平台网关将指令下发到 RabbitMQ: 平台网关将指令内容封装成 DeviceCommandDTO 对象,并将其发送到 RabbitMQ 的指令交换机,RoutingKey 为 dc3.r.command.device.{driverServiceName},例如 dc3.r.command.device.dc3-driver-modbus-tcp。
- 驱动程序接收指令消息: 驱动程序订阅了指令交换机上的对应主题,接收到指令消息后,解析指令内容,例如设备 ID、位号 ID、指令类型、指令参数等。
- 驱动程序将指令发送到设备: 驱动程序根据解析出的指令内容,通过对应的驱动协议将指令发送到设备。
核心代码
1.平台网关发送指令
// PointValueCommandController.java
@Slf4j
@RestController
@RequestMapping(DataServiceConstant.VALUE_COMMAND_URL_PREFIX)
public class PointValueCommandController {
// ...
@PostMapping("/write")
public R<Boolean> write(@Validated @RequestBody PointValueWriteVO entityVO) {
// ...
DeviceCommandDTO.DeviceWrite deviceWrite = new DeviceCommandDTO.DeviceWrite(entityVO.getDeviceId(), entityVO.getPointId(), entityVO.getValue());
DeviceCommandDTO deviceCommandDTO = new DeviceCommandDTO(DeviceCommandTypeEnum.WRITE, JsonUtil.toJsonString(deviceWrite));
rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_EXCHANGE_COMMAND, RabbitConstant.ROUTING_DEVICE_COMMAND_PREFIX + rDriverDTO.getData().getServiceName(), deviceCommandDTO);
// ...
}
// ...
}
2.驱动程序接收指令
// DeviceCommandReceiver.java
@Slf4j
@Component
public class DeviceCommandReceiver {
// ...
@RabbitHandler
@RabbitListener(queues = "#{deviceCommandQueue.name}")
public void deviceCommandReceive(Channel channel, Message message, DeviceCommandDTO entityDTO) {
// ...
switch (entityDTO.getType()) {
case WRITE:
driverCommandService.write(entityDTO);
break;
// ...
}
// ...
}
}
// DriverCommandServiceImpl.java
@Slf4j
@Service
public class DriverCommandServiceImpl implements DriverCommandService {
// ...
@Override
public void write(DeviceCommandDTO commandDTO) {
// ... 解析指令内容
log.info("Start command of write: {}", JsonUtil.toPrettyJsonString(commandDTO));
Boolean write = write(deviceWrite.getDeviceId(), deviceWrite.getPointId(), deviceWrite.getValue()); // 调用驱动程序自定义的 write 方法
log.info("End command of write: write {}", write);
}
// ...
}
MQ订阅设计
项目采用了一种层次化的主题命名规范,主要包含以下几个部分:
部分 | 说明 | 示例 |
---|---|---|
应用标识 | 用于区分不同的应用,通常为 dc3 | dc3 |
消息类型 | 用于区分不同的消息类型,例如事件、元数据、指令、数据 | e (事件), m (元数据), c (指令), v (数据) |
操作方向 | 用于区分消息的发送方和接收方,例如驱动、设备、平台 | d (驱动), p (平台) |
模块类型 | 用于区分不同的模块,例如驱动、设备 | driver, device |
服务名称 | 用于区分不同的服务,通常为驱动程序或设备的唯一标识 | dc3-driver-mqtt |
其他信息 | 可选,用于进一步区分消息,例如设备 ID、位号 ID 等 | {deviceId}, {pointId} |
例如,驱动程序向数据中心发送设备状态事件消息,主题可以命名为:dc3/e/d/device/{driverServiceName},其中:
- dc3: 应用标识
- e: 事件消息类型
- d: 驱动程序发送方向
- device: 模块类型
- {driverServiceName}: 驱动程序服务名称
项目中,不同的模块会根据其功能和职责订阅不同的主题,以实现高效的消息传递和处理。
驱动程序:
- 同步主题: 订阅 dc3/sync/d/{driverClient} 主题,接收平台下发的同步指令。
- 元数据主题: 订阅 dc3/m/p/driver/{driverService} 主题,接收平台下发的元数据更新指令。
- 指令主题: 订阅 dc3/c/p/driver/{driverService} 和 dc3/c/p/device/{driverService} 主题,接收平台下发的指令。
数据中心:
- 事件主题: 订阅 dc3/e/d/* 主题,接收驱动程序上报的驱动和设备事件消息。
- 数据主题: 订阅 dc3/v/d/* 主题,接收驱动程序上报的设备数据消息。
平台网关:
- 指令主题: 根据设备绑定的驱动程序,将指令发布到 dc3/c/p/driver/{driverServiceName} 或 dc3/c/p/device/{driverServiceName} 主题。