近期调研和使用 zeromq 与 cppzmq 的一些问题
关于message
消息分片
消息分片的发送
消息分片允许将多个消息封装成一条消息。在发送自定义协议数据时,我们经常需要在消息前“填充”一个包头。如下代码,在发送的时候加上
zmq::send_flags::sndmore
标识(对应 zeromq
ZMQ_SNDMORE
),表示后面还有消息。这样 zeromq 会将
ZMQ_SNDMORE
的消息和最后一段消息拼装成一条完整的消息发送。
int SendData(char* pMsg, int iMsgLen)
{
tagMsgHead stHead;
bzero(&stHead, sizeof(stHead));
...
stHead.Len = iMsgLen;
stHead.Crc = 0;
try
{
m_socket.send(zmq::const_buffer((const void*)(&stHead), sizeof(stHead)),zmq::send_flags::dontwait|zmq::send_flags::sndmore);
m_socket.send(zmq::const_buffer((const void*)(pMsg), static_cast<size_t>(iMsgLen)), zmq::send_flags::dontwait);
}
catch (...)
{
....
}
return 0;
}
消息分片的接收
需要注意的是,如果发送使用了
ZMQ_SNDMORE
分片,那么在接收时也需要分多次
recv
接收数据(这点比较麻烦)。开始的时候以为
recv
接收的是一个完成的包,后面才知道
recv
接收的其实是“帧”数据,多个“帧”拼装成一个消息。具体接收方法如下:
Buffer buffer;
while (1) {
// 接收消息
zmq_msg_t identify;
zmq_msg_t message;
zmq_msg_init(&identify);
zmq_msg_init(&message);
zmq_recvmsg(socket, &identify, 0);
zmq_recvmsg(socket, &message, 0);
buffer.Append(zmq_msg_data(&message), zmq_msg_size(&message));
// 检查是否还有更多消息可读
while(zmq_msg_more(&message)) {
zmq_recvmsg(socket, &message, 0);
buffer.Append(zmq_msg_data(&message), zmq_msg_size(&message));
}
zmq_msg_close(&identify);
zmq_msg_close(&message);
}
使用 cppzmq 的话,代码如下:
Buffer buffer;
while (1) {
// 接收消息
zmq::message_t identity;
zmq::message_t message;
socket.recv(identity, zmq::recv_flags::none);
socket.recv(message, zmq::recv_flags::none);
buffer.Append(message.data(), message.size());
// 检查是否还有更多消息可读
while(message.more()) {
socket.recv(message, zmq::recv_flags::none);
buffer.Append(message.data(), message.size());
}
}
使用
ZMQ_SNDMORE
后接收也需要分片接收,这个确实是比较麻烦的地方。个人觉得如果改成一次接收会更好,因为这样更符合使用的“直觉”。
后面会不断更新这部分,有新的问题会加进来。