一、概述

上篇文章介绍了木舟如何上传模块热部署,那么此篇文章将介绍如何利用HTTP网络组件接入设备,那么有些人会问木舟又是什么,是什么架构为基础,能做什么呢?

木舟 (Kayak) 是什么?

木舟(Kayak)是基于.NET6.0软件环境下的surging微服务引擎进行开发的, 平台包含了微服务和物联网平台。支持异步和响应式编程开发,功能包含了物模型,设备,产品,网络组件的统一管理和微服务平台下的注册中心,服务路由,模块,中间服务等管理。还有多协议适配(TCP,MQTT,UDP,CoAP,HTTP,Grpc,websocket,rtmp,httpflv,webservice,等),通过灵活多样的配置适配能够接入不同厂家不同协议等设备。并且通过设备告警,消息通知,数据可视化等功能。能够让你能快速建立起微服务物联网平台系统。

那么下面就为大家介绍如何从创建组件、协议、设备网关,设备到设备网关接入,再到设备数据上报,把整个流程通过此篇文章进行阐述。

二、网络组件

1.编辑创建HTTP协议的网络组件,可以选择共享配置和独立配置(独立配置是集群模式),然后可以选择开启swagger和webservice.

开启成功后,可以看看swagger 是否可以访问

又或者是访问一下中间服务,以上篇文章上传的Testapi 模块为例:

三、自定义协议

  • 如何创建自定义协议模块

如果是网络编程开发,必然会涉及到协议报文的编码解码处理,那么对于平台也是做到了灵活处理,首先是协议模块创建,通过以下代码看出协议模块可以添加协议说明md文档, 身份鉴权处理,HTTP路由,消息编解码,元数据配置。下面一一介绍如何进行编写

  public classDemo5ProtocolSupportProvider : ProtocolSupportProvider
{
public override IObservable<ProtocolSupport>Create(ProtocolContext context)
{
      var support = new ComplexProtocolSupport();
    support.Id = "demo5";
    support.Name = "演示协议5";
    support.Description = "演示协议5";
support.AddDocument(MessageTransport.Http,
"Document/document-http.md");
    support.AddAuthenticator(MessageTransport.Http,
newDemo5Authenticator());
     support.AddRoutes(MessageTransport.Http,
new List<BasicMessageCodec>() {
   BasicMessageCodec.DeviceOnline,
   BasicMessageCodec.ReportProperty,
  BasicMessageCodec.WriteProperty,
  BasicMessageCodec.ReadProperty,
BasicMessageCodec.Event
}.Select(p
=>HttpDescriptor.Instance(p.Pattern)
.GroupName(p.Route.GroupName())
.HttpMethod(p.Route.HttpMethod())
.Path(p.Pattern)
.ContentType(MediaType.ToString(MediaType.ApplicationJson))
.Description(p.Route.Description())
.Example(p.Route.Example())
).ToList());
support.AddMessageCodecSupport(MessageTransport.Http, ()
=> Observable.Return(newHttpDeviceMessageCodec()));
support.AddConfigMetadata(MessageTransport.Http, _httpConfig);
returnObservable.Return(support);

}

}

1. 添加协议说明文档如代码:
support.AddDocument(MessageTransport.Http,
"
Document/document-http.md
"
);,文档仅支持
markdown
文件,如下所示

### 使用HTTP推送设备数据

上报属性例子:

POST
/{productId}/{deviceId}/properties/report
Authorization:{产品或者设备中配置的Token}
Content
-Type: application/json

{
"properties":{"temp":11.5}
}

上报事件例子:

POST
/{productId}/{deviceId}/event/{eventId}
Authorization:{产品或者设备中配置的Token}
Content
-Type: application/json

{
"data":{"createtime": ""}
}

2. 添加身份鉴权如代码:
support.AddAuthenticator(MessageTransport.Http, new Demo5Authenticator()) ,自定义身份鉴权
Demo5Authenticator
代码如下:

       public classDemo5Authenticator : IAuthenticator
{
public IObservable<AuthenticationResult>Authenticate(IAuthenticationRequest request, IDeviceOperator deviceOperator)
{
var result = Observable.Return<AuthenticationResult>(default);if (request isDefaultAuthRequest)
{
var authRequest = request asDefaultAuthRequest;
deviceOperator.GetConfig(authRequest.GetTransport()
==MessageTransport.Http?"token": "key").Subscribe( config =>{var password = config.Convert<string>();if(authRequest.Password.Equals(password))
{
result
=result.Publish(AuthenticationResult.Success(authRequest.DeviceId));
}
else{
result
= result.Publish(AuthenticationResult.Failure(StatusCode.CUSTOM_ERROR, "验证失败,密码错误"));
}
});
}
elseresult= Observable.Return<AuthenticationResult>(AuthenticationResult.Failure(StatusCode.CUSTOM_ERROR, "不支持请求参数类型"));returnresult;
}
public IObservable<AuthenticationResult>Authenticate(IAuthenticationRequest request, IDeviceRegistry registry)
{
var result = Observable.Return<AuthenticationResult>(default);var authRequest = request asDefaultAuthRequest;
registry
.GetDevice(authRequest.DeviceId)
.Subscribe(
async p =>{var config= await p.GetConfig(authRequest.GetTransport() == MessageTransport.Http ? "token" : "key");var password= config.Convert<string>();if(authRequest.Password.Equals(password))
{
result
=result.Publish(AuthenticationResult.Success(authRequest.DeviceId));
}
else{
result
= result.Publish(AuthenticationResult.Failure(StatusCode.CUSTOM_ERROR, "验证失败,密码错误"));
}
});
returnresult;
}
}

3. 添加Http路由代码
support.AddRoutes,那么如何配置呢,代码如下:

    public static BasicMessageCodec ReportProperty =>
 new BasicMessageCodec("/*/properties/report", typeof(ReadPropertyMessage), route => route.GroupName("属性上报")
.HttpMethod(
"Post")
.Description(
"上报物模型属性数据")
.Example(
"{\"properties\":{\"属性ID\":\"属性值\"}}"));

4.添加消息编解码代码
support.AddMessageCodecSupport(MessageTransport.Http, () => Observable.Return(
new
HttpDeviceMessageCodec())), 可以自定义编解码,
HttpDeviceMessageCodec
代码如下:

  public classHttpDeviceMessageCodec : DeviceMessageCodec
{
private readonlyMessageTransport _transport;public HttpDeviceMessageCodec() : this(MessageTransport.Http)
{
}
private staticDefaultHttpResponseMessage Unauthorized(String msg)
{
return newDefaultHttpResponseMessage()
.ContentType(MediaType.ApplicationJson)
.Body(
"{\"success\":false,\"code\":\"unauthorized\",\"message\":\"" + msg + "\"}")
.Status(HttpStatus.AuthorizationFailed);
}
private staticDefaultHttpResponseMessage BadRequest()
{
return newDefaultHttpResponseMessage()
.ContentType(MediaType.ApplicationJson)
.Body(
"{\"success\":false,\"code\":\"bad_request\"}")
.Status(HttpStatus.RequestError);
}
publicHttpDeviceMessageCodec(MessageTransport transport)
{
_transport
=transport;
}
public override IObservable<IDeviceMessage>Decode(MessageDecodeContext context)
{
if (context.GetMessage() isHttpRequestMessage)
{
returnDecodeHttpRequestMessage(context);
}
return Observable.Return<IDeviceMessage>(default);
}
public override IObservable<IEncodedMessage>Encode(MessageEncodeContext context)
{
return Observable.Return<IEncodedMessage>(default);
}
private IObservable<IDeviceMessage>DecodeHttpRequestMessage(MessageDecodeContext context)
{
var result = Observable.Return<IDeviceMessage>(default);var message =(HttpExchangeMessage)context.GetMessage();

Header
? header = message.Request.GetHeader("Authorization");if (header == null || header.Value == null || header.Value.Length == 0)
{
message
.Response(Unauthorized(
"Authorization header is required")).ToObservable()
.Subscribe(p
=> result = result.Publish(default));returnresult;
}
var httpToken = header.Value[0];var paths = message.Path.Split("/");if (paths.Length == 0)
{
message.Response(BadRequest()).ToObservable()
.Subscribe(p
=> result = result.Publish(default));returnresult;
}
String deviceId
= paths[1];
context.GetDevice(deviceId).Subscribe(
async deviceOperator =>{var config = deviceOperator==null?null: await deviceOperator.GetConfig("token");var token = config?.Convert<string>();if (token == null || !httpToken.Equals(token))
{
awaitmessage
.Response(Unauthorized(
"Device not registered or authentication failed"));
}
else{var deviceMessage = awaitDecodeBody(message, deviceId);if (deviceMessage != null)
{
await message.Success("{\"success\":true,\"code\":\"success\"}");
result
=result.Publish(deviceMessage);
}
else{awaitmessage.Response(BadRequest());
}
}
});
returnresult;
}
private async Task<IDeviceMessage> DecodeBody(HttpExchangeMessage message,stringdeviceId)
{
byte[] body = new byte[message.Payload.ReadableBytes];
message.Payload.ReadBytes(body);
var deviceMessage = awaitTopicMessageCodec.Dodecode(message.Path, body);
deviceMessage.DeviceId
=deviceId;returndeviceMessage;
}
}

5.添加元数据配置代码
support.AddConfigMetadata(MessageTransport.Http, _httpConfig);
_httpConfig
代码如下

        private readonly DefaultConfigMetadata _httpConfig = newDefaultConfigMetadata("Http认证配置","token为http认证令牌")
.Add(
"token", "token", "http令牌", StringType.Instance);
  • 如何加载协议模块,协议模块包含了协议模块支持添加引用加载和上传热部署加载。

引用加载模块

上传热部署协议模块

四、设备网关

创建设备网关

五、产品管理

以下是添加产品。

设备接入

六、设备管理

添加设备

HTTP认证配置

创建告警阈值

七、测试

利用Postman 进行测试,以调用http://127.0.0.1:168/{productid}/{deviceid}/properties/report 为例,Authorization设置:123456

1.正常数据测试

2. 如果是选用Get方式调用,会因为找不到ServiceRoute而返回错误。

3. 把Authorization改成1111,会返回错误
Device not registered or authentication failed
,从而上报数据失败

以上上传的数据可以在设备信息-》运行状态中查看

告警信息可以在超临界数据中查看

七、总结

以上是基于HTTP网络组件设备接入,现有平台网络组件可以支持TCP,MQTT,UDP,CoAP,HTTP,Grpc,websocket,rtmp,httpflv,webservice,tcpclient, 而设备接入支持TCP,UDP,HTTP网络组件,后面会陆续添加支持所有网络组件接入,后面我也会陆续介绍其它网路组件设备接入 ,  然后定于11月20日发布1.0测试版平台。也请大家到时候关注捧场。

标签: none

添加新评论