一 、 概述

为了弥补代码的遗失,木舟IOT平台正在加班加点进行研发,后面不只是针对于IOT设备接入上报,告警,视频管理,组态数据可视化大屏,后面还会有快速搭建微服务平台,利用surging.cli工具根据数据库表生成微服务,中间服务,能让程序员快速完成BOSS交给的任务,从而在这个内卷的社会能占有一席之地。这些都是没有完成任务的空话,现在发此篇的目的是作者有能力开发出优秀的IOT平台,先介绍一个比较突出的功能,就是可以基于共享或者独立配置添加网络组件, 下面来介绍一下如何添加网络组件。

一键运行打包成品下载:
https://pan.baidu.com/s/11hcf9ieCkJxlGrzvIuxeQA?pwd=ajsr

测试用户:fanly

测试密码:123456

为了让大家节约时间,能尽快运行产品看到效果,上面有 一键运行打包成品可以进行下载测试运行。

二、如何测试运行

以下是目录结构,

IDE:consul 注册中心

kayak.client: 网关

kayak.server:微服务

apache-skywalking-apm:skywalking链路跟踪

以上是目录结构,大家不需要一个个运行,只需要打开运行startup.bat,如果需要测试skywalking ,只需要apache-skywalking-apm\bin\startup.bat  文件就可以了,以下是运行的界面

三、如何添加组件

1.添加http服务组件,

打开平台界面,然后点击设备接入->网络组件,然后可以看到如下界面

再点击新增组件或者编辑组件,完成后注意启动状态是关闭状态,此时并不能对于该组件功能进行访问调用,只有把启动状态打开,才能访问调用

以上是http服务组件,启动完成后,如果设置了webservice和swagger,你可以访问webservice和swagger,看是否可以访问

2.添加/编辑Tcp服务组件

当添加/编辑Tcp组件时,设置Host:127.0.0.1 ,port:248并且还有解析方式选项,选项里面有不处理,固定长度,分隔符,自定义脚本,下面我们就来看自定义脚本

添加脚本如下:

parser.Fixed(4).Handler(function(buffer){var buf = BytesUtils.Slice(buffer,1,4);
parser.Fixed(buffer.ReadableBytes).Result(buf);
}).Handler(
function(buffer){parser.Fixed(8).Result(buffer);}
).Handler(
function(buffer){
parser.Result(
'处理完成','gb2312').Complete();
}
)

而基于TCP服务代码如下,需要继承于TcpBehavior

internal class TcpDeviceDataService : TcpBehavior, ITcpDeviceDataService
{
private readonly IDeviceProvider _deviceProvider;
public TcpDeviceDataService(IDeviceProvider deviceProvider)
{
_deviceProvider
=deviceProvider;
}

public override
voidLoad(string clientId, NetworkProperties tcpServerProperties)
{
var deviceStatus =_deviceProvider.IsConnected(clientId);this.Parser.HandlePayload().Subscribe(async buffer =>await ParserBuffer(buffer));
}

public override
voidDeviceStatusProcess(DeviceStatus status, string clientId, NetworkProperties tcpServerProperties)
{
//throw new NotImplementedException(); }

public async Task ParserBuffer(IByteBuffer buffer)
{
List
<string> result = new List<string>();while (buffer.ReadableBytes > 0)
{
result.Add(buffer.ReadString(
this.Parser.GetNextFixedRecordLength(),
Encoding.GetEncoding(
"gb2312")));
}
//var str= buffer.ReadString(buffer.ReadableBytes, Encoding.UTF8); var byteBuffer =Unpooled.Buffer();
byteBuffer.WriteString(
"\r\n", Encoding.UTF8);
byteBuffer.WriteString(
"处理完成", Encoding.GetEncoding("gb2312"));
await Sender.SendAndFlushAsync(byteBuffer);
//await Sender.SendAndFlushAsync("消息已接收",Encoding.GetEncoding("gb2312")); this.Parser.Close();
}

public Task
<bool>ChangeDeviceStage(string deviceId)
{
throw newNotImplementedException();
}
}

用测试Tcp调试工具结果如下

3.添加/编辑UDP服务组件

当添加/编辑UDP组件时, 设置Host:127.0.0.1 ,port:267 并且可以是否开启组播

而基于udp服务代码如下,需要继承于
UdpBehavior

internal class UdpDeviceDataService : UdpBehavior, IUdpDeviceDataService
{
public Task
<bool>ChangeDeviceStage(string deviceId)
{
throw newNotImplementedException();
}

public override async Task Dispatch(IEnumerable
<byte>bytes)
{
await Sender.SendAndFlushAsync(
"\r\n", Encoding.UTF8);
await Sender.SendAndFlushAsync(
"处理完成", Encoding.GetEncoding("gb2312"));
}
}

测试结果如下:

4.添加/编辑WebSocket服务组件

当添加/编辑WebSocket组件时, 设置Host:127.0.0.1 ,port:55

而基于websocket服务代码如下,需要继承于WSBehavior

internal classWSDeviceDataService : WSBehavior, IWSDeviceDataService
{
protected override voidOnMessage(MessageEventArgs e)
{
this.Client.Value.SendTo($"send:{e.Data},\r\n reply:hello,welcome to you!",ID);
}
protected override voidOnOpen()
{

}
}

测试结果如下:

5.添加/编辑UDP服务组件

当添加/编辑WebSocket组件时, 设置Host:127.0.0.1 ,port:345

添加greet.proto文件,脚本如下:

syntax = "proto3";

package Greet;

service Greeter {
//Sends a greeting rpc ChangeDeviceStage (DeviceRequest) returns (DeviceReply) {}
}

message DeviceRequest {
string deviceId = 1;
}

message DeviceReply {
bool message = 1;
}

然后再创建GreeterBehavior,继承Greeter.GreeterBase, IServiceBehavior,代码如下

public partial classGreeterBehavior : Greeter.GreeterBase, IServiceBehavior
{
privateServerReceivedDelegate received;public eventServerReceivedDelegate Received
{
add
{
if (value == null)
{
received
+=value;
}
}
remove
{
received
-=value;
}
}
public string MessageId { get; } = Guid.NewGuid().ToString("N");public async Task Write(object result, int statusCode = 200, string exceptionMessage = "")
{
if (received == null)return;var message = new TransportMessage(MessageId, newReactiveResultMessage
{
ExceptionMessage
=exceptionMessage,
StatusCode
=statusCode,
Result
=result

});
awaitreceived(message);
}
public T CreateProxy<T>(string key) where T : class{return ServiceLocator.GetService<IServiceProxyFactory>().CreateProxy<T>(key);
}
public objectCreateProxy(Type type)
{
return ServiceLocator.GetService<IServiceProxyFactory>().CreateProxy(type);
}
public object CreateProxy(stringkey, Type type)
{
return ServiceLocator.GetService<IServiceProxyFactory>().CreateProxy(key, type);
}
public T CreateProxy<T>() where T : class{return ServiceLocator.GetService<IServiceProxyFactory>().CreateProxy<T>();
}
public T GetService<T>(string key) where T : class{if (ServiceLocator.Current.IsRegisteredWithKey<T>(key))return ServiceLocator.GetService<T>(key);else return ServiceLocator.GetService<IServiceProxyFactory>().CreateProxy<T>(key);
}
public T GetService<T>() where T : class{if (ServiceLocator.Current.IsRegistered<T>())return ServiceLocator.GetService<T>();else return ServiceLocator.GetService<IServiceProxyFactory>().CreateProxy<T>();

}
public objectGetService(Type type)
{
if(ServiceLocator.Current.IsRegistered(type))returnServiceLocator.GetService(type);else return ServiceLocator.GetService<IServiceProxyFactory>().CreateProxy(type);
}
public object GetService(stringkey, Type type)
{
if(ServiceLocator.Current.IsRegisteredWithKey(key, type))returnServiceLocator.GetService(key, type);else return ServiceLocator.GetService<IServiceProxyFactory>().CreateProxy(key, type);

}
public voidPublish(IntegrationEvent @event)
{
GetService
<IEventBus>().Publish(@event);
}

}

而基于grpc服务代码如下,需要继承于刚刚创建的GreeterBehavior

    public classGrpcDeviceDataService : GreeterBehavior, IGrpcDeviceDataService
{
public override Task<DeviceReply>ChangeDeviceStage(DeviceRequest request, ServerCallContext context)
{
return Task.FromResult(newDeviceReply
{
Message
= true}) ;
}
}

以下是测试结果:

6.添加/编辑MQTT服务组件

当添加/编辑MQTT组件时, 设置Host:127.0.0.1 ,port:425

而基于mqtt服务代码如下,需要继承于MqttBehavior

 public classMQTTDeviceDataService : MqttBehavior, IMQTTDeviceDataService
{
public override async Task<bool> Authorized(string username, stringpassword)
{
bool result = false;if (username == "admin" && password == "123456")
result
= true;return awaitTask.FromResult(result);
}
public async Task<bool> IsOnline(stringdeviceId)
{
return await base.GetDeviceIsOnine(deviceId);
}
public async Task Publish(stringdeviceId, WillMessage message)
{
var willMessage = newMqttWillMessage
{
WillMessage
=message.Message,
Qos
=message.Qos,
Topic
=message.Topic,
WillRetain
=message.WillRetain
};
awaitPublish(deviceId, willMessage);awaitRemotePublish(deviceId, willMessage);
}
}

以下是测试结果:

三、总结

木舟IOT平台会在github开源社区版本,可以自由更改代码,用于商业项目,但不能自营平台,如低代码平台,IOT平台等,如有违反,后果自负,还有最好不要更改命名空间,然后跟公司说是自己研发的,如果知道后,我在博客全网通报此人,以前surging相关的事件就算了,就当没发生过。,如果碰到困难,比较紧急的话,可以联系作者,加群:744677125

标签: none

添加新评论