Socket开发框架之数据采集客户端
虽然目前.NET的主流的开发基本上是基于Web方式(传统的Web方式和Silvelight方式)、基于Winform方式(传统的Winform模式和WPF方式等)、基于服务应用方式(传统的WebService和WCF服务方式)等主要几种开发,另外,还有一种就是基于Socket协议的开发方式,不同于高级服务层的WebService和WCF服务,基于Socket协议开发是较为底层的开发方式,它往往具有更加灵活,可控性更高的优点,不过相对来说,开发难度也会大一些。
我由于工作需要,需要开发一个数据采集客户端,监控指定目录的文件变化,登陆服务器端并传送数据内容,本人将其重构优化,逐步提炼把它升级为一种Socket框架的方式来对待,其中涉及一些有意思的技术应用,在此分享讨论下。首先我们看来看数据采集客户端的主界面。
一般的Socket客户端需要涉及到服务启动、服务停止、状态自动检查、用户登录、用户注销、日志记录、协议数据组装拆包、协议数据收发、多线程数据处理、数据展示通知、参数配置等等功能。本文命名为Socket开发框架之数据采集客户端,就是想站在一个较为通用、完好封装的角度上介绍这些功能点的实现。
1、 服务启动及服务停止
根据不同的业务需要,我创建几个业务管理类,分别是CommonManager、DispatchManager、FileWatcherManger、LoginManager、ReqAnsManager。
其中CommonManager是其他业务管理类的总管,并且包含和Socket服务器连接的控制管理, DispatchManager则是负责解析收到的数据并对不同数据进行分发处理,FileWatcherManger是实现对指定目录的文件进行监控并对其数据进行管理,LoginManager主要是实现登录控制管理、ReqAnsManager是对发送数据包需要后续验证处理的管理业务类。他们的关系如图所示:
其中重要的总管CommonManager类负责管理各类的协调,其中启动、停止及退出实现如下所示:
#region
启动和退出
bool
_isStart
=
false
;
public
DateTime StartTime;
public
DateTime StopTime;
public
bool
IsStart
{
get
{
return
_isStart; }
set
{ _isStart
=
value; }
}
public
void
Start()
{
if
(
!
IsStart)
{
StartTime
=
DateTime.Now;
DispatchManager.Instance.Start();
LoginManager.Instance.Login();
//
使用默认账号密码登录
this
.CheckTimer();
TimeSpan ts
=
(TimeSpan)(DateTime.Now
-
this
.StartTime);
Log.WriteInfo(
string
.Format(
"
[{0}] 结束启动服务. | cost: {1} ms
"
,
this
._Name, ts.TotalMilliseconds));
IsStart
=
true
;
}
}
public
void
Stop()
{
if
(pcClient.Connected)
{
pcClient.DisConnect();
}
this
.StopBaseDataRefresh();
FileWatcherManger.Instance.StopFileWatcher();
this
.StopCheckTimer();
StopTime
=
DateTime.Now;
Portal.gc.MainDialog.SetNotifyStatus(NotifyIconHelper.Status.Offline);
Log.WriteInfo(
string
.Format(
"
操作者主动停止服务
"
));
IsStart
=
false
;
}
public
void
Exit()
{
if
(IsProcess)
{
Application.ExitThread();
System.Diagnostics.Process.GetCurrentProcess().Kill();
}
else
{
Stop();
}
}
#endregion
#region
终端通信
PCDataClient pcClient
=
new
PCDataClient();
public
bool
Connected
{
get
{
return
pcClient.Connected; }
}
///
<summary>
///
连接服务器
///
</summary>
public
void
Connect(
string
ip,
int
port)
{
pcClient.Connect(ip, port);
}
///
<summary>
///
不指定服务器ip和port时,使用默认值
///
</summary>
public
void
Connect()
{
this
.Connect(PCDataCollector_Config.Default.ServerIP, PCDataCollector_Config.Default.ServerPort);
}
public
bool
Send(
string
send)
{
if
(
!
this
.Connected)
{
Connect();
//
确保连接上
}
return
pcClient.SendTo(send);
}
#endregion
2、 状态自动检查
另外,CommonManager类需要启用一个定时器,来定时检测Socket客户端的连接情况,数据接收线程的正常情况,数据处理分派业务类的正常情况,把它放到一个Timer处理定时检测的实现。
///
<summary>
///
服务检查
///
</summary>
public
void
CheckTimer()
{
if
(_CheckTimer
==
null
)
{
_CheckTimer
=
new
System.Threading.Timer(
new
TimerCallback(Check));
_CheckTimer.Change(PCDataCollector_Config.Default.CheckTimerSpan, PCDataCollector_Config.Default.CheckTimerSpan);
}
Log.WriteInfo(
string
.Format(
"
[{0}] 服务检查线程启动.....
"
,
this
._Name));
}
///
<summary>
///
停止服务器检查
///
</summary>
public
void
StopCheckTimer()
{
if
(_CheckTimer
!=
null
)
{
_CheckTimer.Change(Timeout.Infinite, Timeout.Infinite);
}
}
///
<summary>
///
检查
///
</summary>
///
<param name="stateInfo"></param>
private
void
Check(Object stateInfo)
{
if
(_CheckTimer
!=
null
)
{
_CheckTimer.Change(Timeout.Infinite, Timeout.Infinite);
}
Check();
if
(_CheckTimer
!=
null
)
{
_CheckTimer.Change(PCDataCollector_Config.Default.CheckTimerSpan, PCDataCollector_Config.Default.CheckTimerSpan);
}
}
public
void
Check()
{
pcClient.CheckConnect();
ReceiverForServer.Instance.Check();
DispatchManager.Instance.Check();
PostServerInfo();
}
这样终端就能定时的检测和通讯服务器之间的连接以及数据处理线程的正常运行。
3、 用户登录及用户注销
用户登录,首先通过发送登录协议指令并在内存中记录发送的协议包,等待服务器的响应,当收到服务器的登录响应后 ,执行相关的操作,如启动文件目录监控,为发送数据做好准备。
public
void
Login()
{
Login(PCDataCollector_Config.Default.UserName, PCDataCollector_Config.Default.Password);
}
///
<summary>
///
使用账号密码登录
///
</summary>
public
void
Login(
string
userNo,
string
password)
{
if
(
string
.IsNullOrEmpty(userNo)
&&
string
.IsNullOrEmpty(password))
{
throw
new
Exception(
"
用户名或密码不能为空!
"
);
}
this
.userNo
=
userNo;
this
.password
=
password;
SendLogin();
}
///
<summary>
///
重新登录
///
</summary>
public
void
ReLogin()
{
SendLogin();
}
private
void
SendLogin()
{
string
seqNo
=
DateTime.Now.ToString(
"
yyyyMMdd
"
)
+
new
Random().Next(
99999
).ToString().PadLeft(
5
,
'
0
'
);
//
发送请求
AuthenticationRequest requestData
=
new
AuthenticationRequest(seqNo, userNo, password);
CommonManager.Instance.Send(requestData.ToString());
//
记录请求
ReqAnsManager.Instance.Add(
new
RequestRecord(DataTypeKey.AuthenticationRequest, seqNo, DateTime.Now.AddSeconds(
10
)));
Log.WriteInfo(
string
.Format(
"
正在登录。。。。{0}
"
, tryCount));
Interlocked.Increment(
ref
tryCount);
//
计数
}
///
<summary>
///
服务器响应处理
///
</summary>
public
void
HandleLoginResult(AuthenticationAnswerData data)
{
try
{
RequestRecord record
=
ReqAnsManager.Instance.Find(data.SeqNo, DataTypeKey.AuthenticationRequest);
if
(record
!=
null
)
{
ReqAnsManager.Instance.Remove(record);
if
(data.ValidateResult
==
0
)
{
tryCount
=
0
;
//
重置失败次数为0
lastLoginTime
=
DateTime.Now;
isLogined
=
true
;
Portal.gc.MainDialog.SetNotifyStatus(NotifyIconHelper.Status.Online);
Log.WriteInfo(
"
登录成功!
"
);
//
ThreadPool.QueueUserWorkItem(DataAccess.Instance.LoadBaseData);
//
CommonManager.Instance.StarBaseDataRefresh();
//
登录成功后,对指定文件夹进行监控,自动发送数据
FileWatcherManger.Instance.StartFileWatcher();
}
else
{
lastLoginTime
=
DateTime.Now;
isLogined
=
false
;
Portal.gc.MainDialog.SetNotifyStatus(NotifyIconHelper.Status.Offline);
Log.WriteError(
"
登录失败:
"
+
data.Message);
if
(tryCount
<
PCDataCollector_Config.Default.TryLoginCount)
{
Thread.Sleep(
100
);
LoginManager.Instance.ReLogin();
}
else
{
Log.WriteInfo(
string
.Format(
"
尝试登录失败超过【{0}】次,等待【{1}】秒后再进行连接!
"
,
PCDataCollector_Config.Default.TryLoginCount, PCDataCollector_Config.Default.ReConnectSecconds));
tryCount
=
0
;
//
重置失败次数为0
Thread.Sleep(PCDataCollector_Config.Default.ReConnectSecconds
*
1000
);
LoginManager.Instance.ReLogin();
}
}
}
}
catch
(Exception ex)
{
Log.WriteError(
"
初始化异常:
"
+
ex.Message);
CommonManager.Instance.Exit();
}
}
用户的注销及断开,是通过客户端连接类来处理,该类命名为PCDataClient,其继承自Socket客户端处理类BaseSocketClient类,基类封装了对一般Socket类的连接、接收、发送等操作。该类只需要把解析好的数据传送给
ReceiverForServer类进行处理,而该类会把
数据分派给处理类DispatchManager类进行进一步处理。
public
class
PCDataClient : BaseSocketClient
{
protected
override
void
OnRead(PreData data)
{
ReceiverForServer.Instance.AppendPreData(data);
ReceiverForServer.Instance.Check();
}
///
<summary>
///
断开连接
///
</summary>
public
override
void
DisConnect()
{
base
.DisConnect();
LoginManager.Instance.isLogined
=
false
;
LoginManager.Instance.tryCount
=
0
;
//
重置失败次数为0
//
客户端和服务器连接中断
Log.WriteError(
string
.Format(
"
客户端和服务器连接中断
"
));
}
///
<summary>
///
由于基类只是在调用CheckConnect()函数时确保连接,并没有重新登录,
///
因此需要重载基类,如果断网了,连接后重新执行登录,否则收发数据不成功
///
</summary>
public
override
void
CheckConnect()
{
if
(
!
this
.Connected)
{
Connect(IP, Port);
LoginManager.Instance.Login();
}
Log.WriteInfo(
string
.Format(
"
[{0}] 服务连接 | IP:{1} | Port:{2} | receive:{3} | send:{4}
"
,
_Name, IP, Port, ReceivePackCount, SendPackCount));
}
}
对接收到的数据进行统一处理,只需要继承基类
BaseReceiver即可,相关基类的实现可以参考随笔《
Socket开发探秘--基类及公共类的定义
》。
public
class
ReceiverForServer : BaseReceiver
{
#region
单件
///
<summary>
///
Receiver实例
///
</summary>
private
static
ReceiverForServer _manager;
///
<summary>
///
锁定实例
///
</summary>
private
static
object
oClassLock
=
typeof
(ReceiverForServer);
///
<summary>
///
得到该实例
///
</summary>
///
<returns></returns>
public
static
ReceiverForServer Instance
{
get
{
lock
(oClassLock)
//
加锁只生成一个实例
{
if
(_manager
==
null
)
{
_manager
=
new
ReceiverForServer();
}
}
return
_manager;
}
}
///
<summary>
///
私有的构造函数,防止从外部实例化
///
</summary>
private
ReceiverForServer()
{
}
#endregion
public
override
void
PreDataHandle(PreData data)
{
DispatchManager.Instance.Dispatch(data);
}
}
4、 协议数据组装拆包
数据的组包和拆包,在较早的随笔《
Socket开发探秘--数据封包和拆包
》有详细的介绍,数据的组装和拆包主要通过反射原理,把字符串数据转换为对应的实体类,或者把实体类组装成字符串。
5、 数据分派处理
public
sealed
class
DispatchManager
{
#region
单例
private
static
DispatchManager _manager
=
new
DispatchManager();
public
static
DispatchManager Instance
{
get
{
return
_manager;
}
}
private
DispatchManager() { }
#endregion
#region
委托、事件
//
认证应答事件
public
delegate
void
AuthenticationAnswerDelegate(AuthenticationAnswerData data);
public
event
AuthenticationAnswerDelegate SignalAuthenticationAnswer;
//
空车位上传数据应答事件
public
delegate
void
PCParkingDataAnswerDelegate(PCParkingInfoAnswer data);
public
event
PCParkingDataAnswerDelegate parkingDataAnswer;
//
明细信息上传的应答事件
public
delegate
void
PCTicketDataAnswerDelegate(PCTicketDataAnswer data);
public
event
PCTicketDataAnswerDelegate ticketDataAnswer;
#endregion
///
<summary>
///
启动,关联所有socket接收过来分发的事件
///
</summary>
public
void
Start()
{
//
注册登录结果处理方法
SignalAuthenticationAnswer
+=
new
DispatchManager.AuthenticationAnswerDelegate(LoginManager.Instance.HandleLoginResult);
parkingDataAnswer
+=
new
PCParkingDataAnswerDelegate(FileWatcherManger.Instance.HandleAnswerResult);
ticketDataAnswer
+=
new
PCTicketDataAnswerDelegate(FileWatcherManger.Instance.HandleAnswerResult);
}
#region
方法
///
<summary>
///
分发应答数据
///
对于部分类型的应答,需要进行内部广播
///
</summary>
///
<param name="predata"></param>
public
void
Dispatch(PreData predata)
{
if
(predata
==
null
)
return
;
switch
(predata.Key)
{
//
身份认证应答
case
DataTypeKey.AuthenticationAnswer:
AuthenticationAnswerData authData
=
new
AuthenticationAnswerData(predata.Content);
SignalAuthenticationAnswer(authData);
break
;
case
DataTypeKey.PCParkingInfoAnswer:
PCParkingInfoAnswer spaceData
=
new
PCParkingInfoAnswer(predata.Content);
parkingDataAnswer(spaceData);
break
;
case
DataTypeKey.PCTicketDataAnswer:
PCTicketDataAnswer pickBaseData
=
new
PCTicketDataAnswer(predata.Content);
ticketDataAnswer(pickBaseData);
break
;
default
:
break
;
}
}
///
<summary>
///
系统检查
///
</summary>
public
void
Check()
{
}
#endregion
}
由于涉及内容较多,篇幅太长,这些相关的内容将逐步介绍,期间可能会逐步对代码进行优化和提炼,以求达到通用、良好封装的目的,能促进其他项目的使用及处理。
希望本文对于从事Socket开发处理的读者有一个好的引导和启发,大家一起交流,互通有无,各取所长。