地址 pnoker/iot-dc3
DC3 是基于 Spring Cloud 的开源可分布式物联网 (IOT) 平台,用于快速开发, 部署物联设备接入项目,是一整套物联系统解决方案。
目录结构 . ├── dc3 资源文件,如sh,sql等 ├── dc3-api gRpc定义的接口结构 ├── dc3-center 平台中心模块 ├── dc3-common 平台公共模块 ├── dc3-driver 平台驱动模块 ├── dc3-driver-sdk 平台驱动SDK模块 └── dc3-gateway 平台网关模块
平台中心模块 . ├── dc3-center-auth 授权模块,主要负责接口权限 ├── dc3-center-data 数据模块,主要负责驱动数据处理 └── dc3-center-manager 管理模块
平台公共模块 . ├── dc3 git脚本 ├── dc3-common-api api ├── dc3-common-auth 授权相关 ├── dc3-common-constant 常量相关 ├── dc3-common-exception 异常相关 ├── dc3-common-influxdata influxDataDB相关 ├── dc3-common-log 日志相关 ├── dc3-common-model 模型相关 ├── dc3-common-mongo mongoDB相关 ├── dc3-common-mqtt mqtt相关 ├── dc3-common-mysql 数据库相关 ├── dc3-common-public 公共配置相关 ├── dc3-common-quartz 定时任务 ├── dc3-common-rabbitmq 消息队列相关 ├── dc3-common-redis 缓存相关 ├── dc3-common-thread 线程相关 └── dc3-common-web web服务配置
平台驱动模块 ├── dc3-driver-dtu-yeecom Dtu驱动相关 ├── dc3-driver-edge-gateway 边缘网关相关 ├── dc3-driver-listening-virtual 虚拟网关相关 ├── dc3-driver-lwm2m Lwm2m&Coap相关 ├── dc3-driver-modbus-tcp modbusTcp相关 ├── dc3-driver-mqtt mqtt相关 ├── dc3-driver-opc-da opc-da相关 ├── dc3-driver-opc-ua opc-ua相关 ├── dc3-driver-plcs7 plcs7相关 ├── dc3-driver-virtual 测试驱动相关 └── dc3-driver-weather-amap 高德地图天气相关
授权模块 授权中心模块(dc3-center-auth)负责管理平台的登录授权功能,包括:
用户管理 : 包括用户的增删改查,以及密码重置等操作。
租户管理 : 包括租户的增删改查,以及租户与用户绑定关系的管理。
IP 黑名单 : 维护 IP 黑名单,限制非法 IP 的访问。
令牌管理 : 生成、校验和注销用户的 Token 令牌,实现基于 Token 的身份验证。
登录流程
获取租户信息 : 网关从请求头中获取租户信息,并调用 TenantApi 查询租户是否存在且已启用。
获取用户信息 : 网关从请求头中获取用户登录名称,并调用 UserLoginApi 查询用户是否存在且已启用。
校验 Token : 网关从请求头中获取 Token 信息,并调用 TokenApi 校验 Token 是否有效。
设置用户信息 : 如果 Token 校验通过,网关会将用户信息设置到请求头中,并转发到下游服务。
@Component static class AuthenticGatewayFilter implements GatewayFilter { @Override public Mono<Void> filter (ServerWebExchange exchange, GatewayFilterChain chain) { ServerHttpRequest request = exchange.getRequest(); try { String tenantHeader = GatewayUtil.getRequestHeader(request, RequestConstant.Header.X_AUTH_TENANT); String tenant = DecodeUtil.byteToString(DecodeUtil.decode(tenantHeader)); String userHeader = GatewayUtil.getRequestHeader(request, RequestConstant.Header.X_AUTH_LOGIN); String user = DecodeUtil.byteToString(DecodeUtil.decode(userHeader)); String tokenHeader = GatewayUtil.getRequestHeader(request, RequestConstant.Header.X_AUTH_TOKEN); ServerHttpRequest build = request.mutate().headers( httpHeader -> { } ).build(); exchange.mutate().request(build).build(); } catch (Exception e) { } return chain.filter(exchange); } }
Token 生成 用户登录时,会调用 TokenController 的 generateToken 接口生成 Token,具体实现步骤如下:
生成随机盐值 : 调用 TokenService 的 generateSalt 方法生成一个随机盐值,并将其缓存到 Redis 中,有效期为 5 分钟。
生成 Token : 调用 KeyUtil 的 generateToken 方法生成 Token,使用 JWT 算法,并将用户名、盐值和租户 ID 作为 payload,盐值作为密钥进行签名,有效期为 12 小时。
缓存 Token : 将生成的 Token 缓存到 Redis 中,Key 为 PrefixConstant.USER + SuffixConstant.TOKEN + SymbolConstant.DOUBLE_COLON + username + SymbolConstant.HASHTAG + tenant.getId(),有效期为 12 小时。
@Slf4j @RestController @RequestMapping(AuthServiceConstant.TOKEN_URL_PREFIX) public class TokenController { @Resource private TokenService tokenService; @PostMapping("/generate") public R<String> generateToken (@Validated(Auth.class) @RequestBody Login login) { String token = tokenService.generateToken(login.getName(), login.getSalt(), login.getPassword(), login.getTenant()); return ObjectUtil.isNotNull(token) ? R.ok(token, "The token will expire in 12 hours." ) : R.fail(); } } @Slf4j @Service public class TokenServiceImpl implements TokenService { @Override public String generateToken (String username, String salt, String password, String tenantName) { String redisTokenKey = PrefixConstant.USER + SuffixConstant.TOKEN + SymbolConstant.DOUBLE_COLON + username + SymbolConstant.HASHTAG + tenant.getId(); String token = KeyUtil.generateToken(username, redisSaltValue, tenant.getId()); redisUtil.setKey(redisTokenKey, token, TimeoutConstant.TOKEN_CACHE_TIMEOUT, TimeUnit.HOURS); return token; } }
Token校验 网关在收到请求时,会调用 TokenApi 的 checkTokenValid 接口校验 Token,具体实现步骤如下:
从 Redis 中获取 Token : 根据用户名和租户 ID 从 Redis 中获取 Token。
解析 Token : 调用 KeyUtil 的 parserToken 方法解析 Token,并校验签名是否正确。
判断 Token 是否过期 : 如果 Token 未过期,返回 Token 的过期时间。
@Slf4j @GrpcService public class TokenApi extends TokenApiGrpc .TokenApiImplBase { @Override public void checkTokenValid (LoginQuery request, StreamObserver<RTokenDTO> responseObserver) { TokenValid select = tokenService.checkTokenValid(request.getName(), request.getSalt(), request.getToken(), request.getTenant()); if (ObjectUtil.isNull(select)) { } else if (!select.isValid()) { } else { String expireTime = TimeUtil.completeFormat(select.getExpireTime()); } } } @Slf4j @Service public class TokenServiceImpl implements TokenService { @Override public TokenValid checkTokenValid (String username, String salt, String token, String tenantName) { String redisKey = PrefixConstant.USER + SuffixConstant.TOKEN + SymbolConstant.DOUBLE_COLON + username + SymbolConstant.HASHTAG + tenant.getId(); String redisToken = redisUtil.getKey(redisKey); try { Claims claims = KeyUtil.parserToken(username, salt, token, tenant.getId()); return new TokenValid (true , claims.getExpiration()); } catch (Exception e) { return new TokenValid (false , null ); } } }
数据采集 我们以 MQTT 驱动为例,讲解一下数据从采集到入库再到展示的完整流程。
驱动程序配置 MQTT 驱动程序(dc3-driver-mqtt)需要在配置文件中配置以下信息:
MQTT Broker 地址 : 用于连接 MQTT Broker。
驱动属性 : 定义驱动程序自身的属性,例如连接超时时间、重连次数等。
位号属性 : 定义驱动程序支持的位号属性,例如读写权限、数据类型、单位等。
订阅主题 : 用于接收设备上报数据的主题。
发布主题 : 用于向设备发送指令的主题。
接收数据 MQTT 驱动程序通过订阅主题接收设备上报的数据,具体实现步骤如下:
连接 MQTT Broker : 驱动程序启动时,会连接到配置的 MQTT Broker。
订阅主题 : 连接成功后,驱动程序会订阅配置文件中配置的主题。
接收消息 : 当设备向订阅主题发布消息时,驱动程序会接收到消息,并将其解析成 PointValue 对象。
发送数据 : 调用 DriverSenderService 的 pointValueSender 方法将 PointValue 对象发送到数据中心模块。
@Slf4j @Component public class MqttReceiveHandler { @Bean @ServiceActivator(inputChannel = "mqttInboundChannel") public MessageHandler mqttInboundReceive () { return message -> { MessageHeader messageHeader = new MessageHeader (message.getHeaders()); String payload = message.getPayload().toString(); MqttScheduleJob.messageCount.getAndIncrement(); MqttMessage mqttMessage = new MqttMessage (messageHeader, payload); threadPoolExecutor.execute(() -> mqttReceiveService.receiveValue(mqttMessage)); }; } } @Slf4j @Service public class MqttReceiveServiceImpl implements MqttReceiveService { @Override public void receiveValue (MqttMessage mqttMessage) { log.info(JsonUtil.toPrettyJsonString(mqttMessage)); PointValue pointValue = JsonUtil.parseObject(mqttMessage.getPayload(), PointValue.class); pointValue.setOriginTime(new Date ()); driverSenderService.pointValueSender(pointValue); } }
数据入库 数据中心模块(dc3-center-data)需要在配置文件中配置数据存储策略,例如:
存储方式 : 支持 Redis、MongoDB、InfluxDB、TDengine、OpenTSDB、Elasticsearch 等多种存储方式。
批量处理速度 : 当数据采集速度超过该值时,启用批量处理机制。
批量处理间隔 : 批量处理的时间间隔。
@Slf4j @Component public class PointValueReceiver { @RabbitHandler @RabbitListener(queues = "#{pointValueQueue.name}") public void pointValueReceive (Channel channel, Message message, PointValue pointValue) { try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), true ); PointValueScheduleJob.valueCount.getAndIncrement(); log.debug("Point value, From: {}, Received: {}" , message.getMessageProperties().getReceivedRoutingKey(), pointValue); threadPoolExecutor.execute(() -> pointValueService.savePointValue(pointValue)); } catch (Exception e) { log.error(e.getMessage(), e); } } } @Slf4j @Service public class PointValueServiceImpl implements PointValueService { @Override public void savePointValue (PointValue pointValue) { pointValue.setCreateTime(new Date ()); repositoryHandleService.save(pointValue); } } @Slf4j @Service public class RepositoryHandleServiceImpl implements RepositoryHandleService { @Override public void save (PointValue pointValue) { savePointValueToRepository(pointValue, redisRepositoryService, mongoRepositoryService); } }
数据展示 用户可以通过平台提供的 API 接口查询设备数据,具体实现步骤如下:
发送请求 : 用户通过平台 API 接口发送数据查询请求。
接收请求 : 数据中心模块接收到请求后,解析请求参数,例如设备 ID、位号 ID、时间范围等。
查询数据 : 根据请求参数,从数据库中查询数据。
返回数据 : 将查询到的数据返回给用户。
@Slf4j @RestController @RequestMapping(DataServiceConstant.VALUE_URL_PREFIX) public class PointValueController { @PostMapping("/latest") public R<Page<PointValue>> latest (@RequestBody PointValuePageQuery pointValuePageQuery) { Page<PointValue> page = pointValueService.latest(pointValuePageQuery); } } @Slf4j @Service public class PointValueServiceImpl implements PointValueService { @Override public Page<PointValue> latest (PointValuePageQuery pageQuery) { List<PointValue> pointValues = realtime(pageQuery.getDeviceId(), pointIds); if (CollUtil.isEmpty(pointValues)) { pointValues = latest(pageQuery.getDeviceId(), pointIds); } } }
数据可视化 平台可以提供数据可视化功能,将查询到的数据以图表等形式展示给用户,例如:
实时曲线 : 展示位号值的实时变化趋势。
历史曲线 : 展示位号值在一段时间内的变化趋势。
报表 : 以表格形式展示位号值。
状态展示
图解说明:
驱动程序定时发送设备状态事件消息 : 驱动程序会定期执行定时任务,读取设备的实时状态,并将其封装成 DeviceEvent 对象,通过 RabbitMQ 的事件交换机发送到数据中心。
数据中心接收并处理设备状态事件消息 : 数据中心模块订阅了事件交换机的消息,接收到设备状态事件消息后,会将设备状态信息写入 Redis,Key 为 device_status:{deviceId},并设置过期时间。
前端页面请求设备状态 : 前端页面通过 AJAX 等方式请求数据中心的 API 接口,获取设备的实时状态信息。
数据中心从 Redis 中读取并返回设备状态 : 数据中心模块接收到前端请求后,从 Redis 中读取设备状态信息,并将其返回给前端页面。
推送机制
驱动程序定期读取设备状态,并通过 RabbitMQ 推送到数据中心。
数据中心接收到状态信息后,更新 Redis 中的缓存数据。
拉取机制
前端页面定期轮询数据中心 API 接口,获取最新的设备状态信息。
核心代码 1.驱动程序定时发送设备状态
@Slf4j @Service public class DriverCustomServiceImpl implements DriverCustomService { @Override public void schedule () { driverContext.getDriverMetadata().getDeviceMap().keySet() .forEach(id -> driverSenderService.deviceEventSender(new DeviceEvent (id, EventConstant.Device.STATUS, DeviceStatusEnum.ONLINE, 25 , TimeUnit.SECONDS))); } }
2.数据中心接收和处理设备状态事件
@Slf4j @Component public class DeviceEventReceiver { @RabbitHandler @RabbitListener(queues = "#{deviceEventQueue.name}") public void deviceEventReceive (Channel channel, Message message, DeviceEvent deviceEvent) { switch (deviceEvent.getType()) { case EventConstant.Device.STATUS: redisUtil.setKey( PrefixConstant.DEVICE_STATUS_KEY_PREFIX + deviceEvent.getDeviceId(), deviceEvent.getContent(), deviceEvent.getTimeOut(), deviceEvent.getTimeUnit() ); break ; } } }
3.前端请求设备状态
setInterval(function() { $.ajax({ url: '/data/device/status/device' , type: 'GET' , success: function(data) { } }); }, 1000 );
数据展示
图解说明:
驱动程序读取设备数据 : 驱动程序会定期执行定时任务,读取设备的实时数据,并将其封装成 PointValue 对象。
驱动程序发送设备数据消息 : 驱动程序将 PointValue 对象通过 RabbitMQ 的数据交换机发送到数据中心。
数据中心接收并处理设备数据消息 : 数据中心模块订阅了数据交换机的消息,接收到设备数据消息后,会进行以下操作:
将 PointValue 对象写入 Redis,Key 为 realtime_value:{deviceId}.{pointId},用于实时数据展示。
将 PointValue 对象写入 MongoDB,用于存储历史数据。
前端页面请求设备实时数据 : 前端页面通过 AJAX 等方式请求数据中心的 API 接口,获取设备的实时数据。
数据中心从 Redis 中读取并返回设备实时数据 : 数据中心模块接收到前端请求后,从 Redis 中读取设备实时数据,并将其返回给前端页面。
推送机制
驱动程序定期读取设备数据,并通过 RabbitMQ 推送到数据中心。
数据中心接收到数据后,更新 Redis 和 MongoDB 中的缓存数据。
拉取机制
前端页面定期轮询数据中心 API 接口,获取最新的设备数据。
核心代码 1.驱动程序读取并发送设备数据
// DriverReadScheduleJob.java @Slf4j @Component public class DriverReadScheduleJob extends QuartzJobBean { // ... @Override protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException { // ... 遍历设备和位号 threadPoolExecutor.execute(() -> driverCommandService.read(device.getId(), pointId)); // ... } } // DriverCommandServiceImpl.java @Slf4j @Service public class DriverCommandServiceImpl implements DriverCommandService { // ... @Override public PointValue read(String deviceId, String pointId) { // ... 获取设备和位号信息 try { // 调用驱动自定义 read 方法读取数据 String rawValue = driverCustomService.read( // ... ); // ... 校验数据 PointValue pointValue = new PointValue(deviceId, pointId, rawValue, ConvertUtil.convertValue(point, rawValue)); driverSenderService.pointValueSender(pointValue); return pointValue; } catch (Exception e) { // ... 处理异常 } } // ... }
2.数据中心接收和处理设备数据
@Slf4j @Component public class PointValueReceiver { @RabbitHandler @RabbitListener(queues = "#{pointValueQueue.name}") public void pointValueReceive (Channel channel, Message message, PointValue pointValue) { threadPoolExecutor.execute(() -> pointValueService.savePointValue(pointValue)); } } @Slf4j @Service public class PointValueServiceImpl implements PointValueService { @Override public void savePointValue (PointValue pointValue) { pointValue.setCreateTime(new Date ()); repositoryHandleService.save(pointValue); } } @Slf4j @Service public class RepositoryHandleServiceImpl implements RepositoryHandleService { @Override public void save (PointValue pointValue) { savePointValueToRepository(pointValue, redisRepositoryService, mongoRepositoryService); } }
3.前端请求示例
setInterval(function() { $.ajax({ url: '/data/point_value/latest' , type: 'POST' , data: JSON.stringify({ deviceId: 'device1' }), success: function(data) { } }); }, 1000 );
指令下置
图解说明:
用户发送指令下置请求 : 用户通过平台 API 接口发送指令下置请求,例如控制设备开关、设置参数等。
平台网关查询驱动信息 : 平台网关接收到请求后,调用设备管理中心 API 接口,查询该设备绑定的驱动程序信息,例如驱动程序服务名。
平台网关将指令下发到 RabbitMQ : 平台网关将指令内容封装成 DeviceCommandDTO 对象,并将其发送到 RabbitMQ 的指令交换机,RoutingKey 为 dc3.r.command.device.{driverServiceName},例如 dc3.r.command.device.dc3-driver-modbus-tcp。
驱动程序接收指令消息 : 驱动程序订阅了指令交换机上的对应主题,接收到指令消息后,解析指令内容,例如设备 ID、位号 ID、指令类型、指令参数等。
驱动程序将指令发送到设备 : 驱动程序根据解析出的指令内容,通过对应的驱动协议将指令发送到设备。
核心代码 1.平台网关发送指令
@Slf4j @RestController @RequestMapping(DataServiceConstant.VALUE_COMMAND_URL_PREFIX) public class PointValueCommandController { @PostMapping("/write") public R<Boolean> write (@Validated @RequestBody PointValueWriteVO entityVO) { DeviceCommandDTO.DeviceWrite deviceWrite = new DeviceCommandDTO .DeviceWrite(entityVO.getDeviceId(), entityVO.getPointId(), entityVO.getValue()); DeviceCommandDTO deviceCommandDTO = new DeviceCommandDTO (DeviceCommandTypeEnum.WRITE, JsonUtil.toJsonString(deviceWrite)); rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_EXCHANGE_COMMAND, RabbitConstant.ROUTING_DEVICE_COMMAND_PREFIX + rDriverDTO.getData().getServiceName(), deviceCommandDTO); } }
2.驱动程序接收指令
@Slf4j @Component public class DeviceCommandReceiver { @RabbitHandler @RabbitListener(queues = "#{deviceCommandQueue.name}") public void deviceCommandReceive (Channel channel, Message message, DeviceCommandDTO entityDTO) { switch (entityDTO.getType()) { case WRITE: driverCommandService.write(entityDTO); break ; } } } @Slf4j @Service public class DriverCommandServiceImpl implements DriverCommandService { @Override public void write (DeviceCommandDTO commandDTO) { log.info("Start command of write: {}" , JsonUtil.toPrettyJsonString(commandDTO)); Boolean write = write(deviceWrite.getDeviceId(), deviceWrite.getPointId(), deviceWrite.getValue()); log.info("End command of write: write {}" , write); } }
MQ订阅设计 项目采用了一种层次化的主题命名规范,主要包含以下几个部分:
部分
说明
示例
应用标识
用于区分不同的应用,通常为 dc3
dc3
消息类型
用于区分不同的消息类型,例如事件、元数据、指令、数据
e (事件), m (元数据), c (指令), v (数据)
操作方向
用于区分消息的发送方和接收方,例如驱动、设备、平台
d (驱动), p (平台)
模块类型
用于区分不同的模块,例如驱动、设备
driver, device
服务名称
用于区分不同的服务,通常为驱动程序或设备的唯一标识
dc3-driver-mqtt
其他信息
可选,用于进一步区分消息,例如设备 ID、位号 ID 等
{deviceId}, {pointId}
例如,驱动程序向数据中心发送设备状态事件消息,主题可以命名为:dc3/e/d/device/{driverServiceName},其中:
dc3: 应用标识
e: 事件消息类型
d: 驱动程序发送方向
device: 模块类型
{driverServiceName}: 驱动程序服务名称
项目中,不同的模块会根据其功能和职责订阅不同的主题,以实现高效的消息传递和处理。
驱动程序:
同步主题 : 订阅 dc3/sync/d/{driverClient} 主题,接收平台下发的同步指令。
元数据主题 : 订阅 dc3/m/p/driver/{driverService} 主题,接收平台下发的元数据更新指令。
指令主题 : 订阅 dc3/c/p/driver/{driverService} 和 dc3/c/p/device/{driverService} 主题,接收平台下发的指令。
数据中心:
事件主题 : 订阅 dc3/e/d/* 主题,接收驱动程序上报的驱动和设备事件消息。
数据主题 : 订阅 dc3/v/d/* 主题,接收驱动程序上报的设备数据消息。
平台网关:
指令主题 : 根据设备绑定的驱动程序,将指令发布到 dc3/c/p/driver/{driverServiceName} 或 dc3/c/p/device/{driverServiceName} 主题。
Modbus实现