Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix handling of pva_ctrl_msg::SetEndian #27

Merged
merged 1 commit into from
May 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions documentation/releasenotes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Release Notes
0.3.0 (UNRELEASED)
------------------

* Fix protocol **incompatibility** with Big Endian servers.
* Add support for IPv4 multicast and IPv6 uni/multicast for UDP. And IPv6 unicast for TCP.
See :ref:`addrspec` for entries which may now appear in **EPICS_PVA*_ADDR_LIST**.
* PVXS now attempts to fanout unicast searches through the loopback interface, and
Expand Down
2 changes: 1 addition & 1 deletion src/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ void Channel::disconnect(const std::shared_ptr<Channel>& self)
{
(void)evbuffer_drain(current->txBody.get(), evbuffer_get_length(current->txBody.get()));

EvOutBuf R(hostBE, current->txBody.get());
EvOutBuf R(current->sendBE, current->txBody.get());

to_wire(R, sid);
to_wire(R, cid);
Expand Down
10 changes: 5 additions & 5 deletions src/clientconn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ void Connection::createChannels()
{
(void)evbuffer_drain(txBody.get(), evbuffer_get_length(txBody.get()));

EvOutBuf R(hostBE, txBody.get());
EvOutBuf R(sendBE, txBody.get());

to_wire(R, uint16_t(1u));
to_wire(R, chan->cid);
Expand All @@ -90,7 +90,7 @@ void Connection::sendDestroyRequest(uint32_t sid, uint32_t ioid)
{
(void)evbuffer_drain(txBody.get(), evbuffer_get_length(txBody.get()));

EvOutBuf R(hostBE, txBody.get());
EvOutBuf R(sendBE, txBody.get());

to_wire(R, sid);
to_wire(R, ioid);
Expand Down Expand Up @@ -232,7 +232,7 @@ void Connection::handle_CONNECTION_VALIDATION()
{
(void)evbuffer_drain(txBody.get(), evbuffer_get_length(txBody.get()));

EvOutBuf R(hostBE, txBody.get());
EvOutBuf R(sendBE, txBody.get());

// serverReceiveBufferSize, not used
to_wire(R, uint32_t(0x10000));
Expand Down Expand Up @@ -316,7 +316,7 @@ void Connection::handle_CREATE_CHANNEL()
{
(void)evbuffer_drain(txBody.get(), evbuffer_get_length(txBody.get()));

EvOutBuf R(hostBE, txBody.get());
EvOutBuf R(sendBE, txBody.get());
to_wire(R, sid);
to_wire(R, cid);
}
Expand Down Expand Up @@ -400,7 +400,7 @@ void Connection::tickEcho()

auto tx = bufferevent_get_output(bev.get());

to_evbuf(tx, Header{CMD_ECHO, 0u, 0u}, hostBE);
to_evbuf(tx, Header{CMD_ECHO, 0u, 0u}, sendBE);

// maybe help reduce latency
bufferevent_flush(bev.get(), EV_WRITE, BEV_FLUSH);
Expand Down
8 changes: 5 additions & 3 deletions src/clientget.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,9 +286,11 @@ struct GPROp : public OperationBase
// act on new operation state

{
(void)evbuffer_drain(chan->conn->txBody.get(), evbuffer_get_length(chan->conn->txBody.get()));
auto& conn = chan->conn;

EvOutBuf R(hostBE, chan->conn->txBody.get());
(void)evbuffer_drain(conn->txBody.get(), evbuffer_get_length(conn->txBody.get()));

EvOutBuf R(conn->sendBE, conn->txBody.get());

to_wire(R, chan->sid);
to_wire(R, ioid);
Expand Down Expand Up @@ -339,7 +341,7 @@ struct GPROp : public OperationBase
{
(void)evbuffer_drain(conn->txBody.get(), evbuffer_get_length(conn->txBody.get()));

EvOutBuf R(hostBE, conn->txBody.get());
EvOutBuf R(conn->sendBE, conn->txBody.get());

to_wire(R, chan->sid);
to_wire(R, ioid);
Expand Down
2 changes: 1 addition & 1 deletion src/clientintrospect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ struct InfoOp : public OperationBase
{
(void)evbuffer_drain(conn->txBody.get(), evbuffer_get_length(conn->txBody.get()));

EvOutBuf R(hostBE, conn->txBody.get());
EvOutBuf R(conn->sendBE, conn->txBody.get());

to_wire(R, chan->sid);
to_wire(R, ioid);
Expand Down
6 changes: 3 additions & 3 deletions src/clientmon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ struct SubscriptionImpl : public OperationBase, public Subscription

(void)evbuffer_drain(conn->txBody.get(), evbuffer_get_length(conn->txBody.get()));

EvOutBuf R(hostBE, conn->txBody.get());
EvOutBuf R(conn->sendBE, conn->txBody.get());

to_wire(R, chan->sid);
to_wire(R, ioid);
Expand Down Expand Up @@ -272,7 +272,7 @@ struct SubscriptionImpl : public OperationBase, public Subscription

(void)evbuffer_drain(conn->txBody.get(), evbuffer_get_length(conn->txBody.get()));

EvOutBuf R(hostBE, conn->txBody.get());
EvOutBuf R(conn->sendBE, conn->txBody.get());

to_wire(R, chan->sid);
to_wire(R, ioid);
Expand Down Expand Up @@ -360,7 +360,7 @@ struct SubscriptionImpl : public OperationBase, public Subscription
{
(void)evbuffer_drain(conn->txBody.get(), evbuffer_get_length(conn->txBody.get()));

EvOutBuf R(hostBE, conn->txBody.get());
EvOutBuf R(conn->sendBE, conn->txBody.get());

to_wire(R, chan->sid);
to_wire(R, ioid);
Expand Down
22 changes: 21 additions & 1 deletion src/conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ ConnBase::ConnBase(bool isClient, bufferevent* bev, const SockAddr& peerAddr)
,peerName(peerAddr.tostring())
,bev(bev)
,isClient(isClient)
,sendBE(EPICS_BYTE_ORDER==EPICS_ENDIAN_BIG)
,peerBE(true) // arbitrary choice, default should be overwritten before use
,expectSeg(false)
,segCmd(0xff)
Expand All @@ -46,7 +47,7 @@ size_t ConnBase::enqueueTxBody(pva_app_msg_t cmd)
to_evbuf(tx, Header{cmd,
uint8_t(isClient ? 0u : pva_flags::Server),
uint32_t(blen)},
hostBE);
sendBE);
auto err = evbuffer_add_buffer(tx, txBody.get());
assert(!err); // could only fail if frozen/pinned, which is not the case
statTx += 8u + blen;
Expand Down Expand Up @@ -121,6 +122,25 @@ void ConnBase::bevRead()
"%s %s Receive header\n", peerLabel(), peerName.c_str());

if(header[2]&pva_flags::Control) {
if(header[3]==pva_ctrl_msg::SetEndian) {
/* This should be the first message sent by a (supposedly) server.
* However, old pvAccess* accepts it from either peer at any time.
*
* The protocol spec. claims that we should inspect the size field
* (bytes 4-7) and act as follows.
* 0x00000000 - Send future messages using endianness on this (received)
* message. Peer will ignore MSB flag in our headers!
* 0xffffffff - Send future messages as we like. Peer will test the
* MSB flag.
*
* However, neither pvAccessCPP nor pvAccessJava actually test this.
* Instead the 0x00000000 behavior is assumed.
*
* So we latch the byte order here, as the peer should ignore the MSB
* flag subsequent messages...
*/
sendBE = header[2]&pva_flags::MSB;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new java implementation behaves like this:
The 'size' field 0x00000000 or 0xffffffff is indeed ignored.
The MSB flag is used to set the equivalent of the sendBE flag, and future messages that the client sends will always use the endianess that the server configured via the set-endian command. For received messages, however, we continue to check the byte order flag in each received messages,

This specific PVXS change is only about the sendBE, locking the byte order of messages that the client sends to the server from now on, making 0x00000000 the default and only option.
What about received messages? Should the set-endian command also affect received messages, we trust the server to from now on only give us messages in that same byte order, or should the client still check the byte order flag of each received message?

}
// Control messages are not actually useful
evbuffer_drain(rx, 8);
statRx += 8u;
Expand Down
1 change: 1 addition & 0 deletions src/conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ struct ConnBase
TypeStore rxRegistry;

const bool isClient;
bool sendBE;
bool peerBE;
bool expectSeg;

Expand Down
8 changes: 4 additions & 4 deletions src/serverchan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ void ServerChannelControl::close()
conn->peerName.c_str(), ch->name.c_str());

auto tx = bufferevent_get_output(conn->bev.get());
EvOutBuf R(hostBE, tx);
EvOutBuf R(conn->sendBE, tx);
to_wire(R, Header{CMD_DESTROY_CHANNEL, pva_flags::Server, 8});
to_wire(R, ch->sid);
to_wire(R, ch->cid);
Expand Down Expand Up @@ -251,7 +251,7 @@ void ServerConn::handle_SEARCH()
{
(void)evbuffer_drain(txBody.get(), evbuffer_get_length(txBody.get()));

EvOutBuf R(hostBE, txBody.get());
EvOutBuf R(sendBE, txBody.get());

_to_wire<12>(R, iface->server->effective.guid.data(), false, __FILE__, __LINE__);
to_wire(R, searchID);
Expand Down Expand Up @@ -363,7 +363,7 @@ void ServerConn::handle_CREATE_CHANNEL()
{
(void)evbuffer_drain(txBody.get(), evbuffer_get_length(txBody.get()));

EvOutBuf R(hostBE, txBody.get());
EvOutBuf R(sendBE, txBody.get());
to_wire(R, cid);
to_wire(R, sid);
to_wire(R, sts);
Expand Down Expand Up @@ -417,7 +417,7 @@ void ServerConn::handle_DESTROY_CHANNEL()

{
auto tx = bufferevent_get_output(bev.get());
EvOutBuf R(hostBE, tx);
EvOutBuf R(sendBE, tx);
to_wire(R, Header{CMD_DESTROY_CHANNEL, pva_flags::Server, 8});
to_wire(R, sid);
to_wire(R, cid);
Expand Down
8 changes: 4 additions & 4 deletions src/serverconn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ ServerConn::ServerConn(ServIface* iface, evutil_socket_t sock, struct sockaddr *

// queue connection validation message
{
VectorOutBuf M(hostBE, buf);
VectorOutBuf M(sendBE, buf);
to_wire(M, Header{pva_ctrl_msg::SetEndian, pva_flags::Control|pva_flags::Server, 0});

auto save = M.save();
Expand All @@ -87,7 +87,7 @@ ServerConn::ServerConn(ServIface* iface, evutil_socket_t sock, struct sockaddr *
to_wire(M, "ca");
auto bend = M.save();

FixedBuf H(hostBE, save, 8);
FixedBuf H(sendBE, save, 8);
to_wire(H, Header{CMD_CONNECTION_VALIDATION, pva_flags::Server, uint32_t(bend-bstart)});

assert(M.good() && H.good());
Expand Down Expand Up @@ -123,7 +123,7 @@ void ServerConn::handle_ECHO()
auto tx = bufferevent_get_output(bev.get());
uint32_t len = evbuffer_get_length(segBuf.get());

to_evbuf(tx, Header{CMD_ECHO, pva_flags::Server, len}, hostBE);
to_evbuf(tx, Header{CMD_ECHO, pva_flags::Server, len}, sendBE);

auto err = evbuffer_add_buffer(tx, segBuf.get());
assert(!err);
Expand All @@ -140,7 +140,7 @@ void auth_complete(ServerConn *self, const Status& sts)
(void)evbuffer_drain(self->txBody.get(), evbuffer_get_length(self->txBody.get()));

{
EvOutBuf M(hostBE, self->txBody.get());
EvOutBuf M(self->sendBE, self->txBody.get());
to_wire(M, sts);
}

Expand Down
2 changes: 1 addition & 1 deletion src/serverget.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ struct ServerGPR : public ServerOp
{
(void)evbuffer_drain(conn->txBody.get(), evbuffer_get_length(conn->txBody.get()));

EvOutBuf R(hostBE, conn->txBody.get());
EvOutBuf R(conn->sendBE, conn->txBody.get());
to_wire(R, uint32_t(ioid));
to_wire(R, subcmd);
to_wire(R, sts);
Expand Down
2 changes: 1 addition & 1 deletion src/serverintrospect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ struct ServerIntrospect : public ServerOp
{
(void)evbuffer_drain(conn->txBody.get(), evbuffer_get_length(conn->txBody.get()));

EvOutBuf R(hostBE, conn->txBody.get());
EvOutBuf R(conn->sendBE, conn->txBody.get());
to_wire(R, uint32_t(ioid));
to_wire(R, sts);
if(type)
Expand Down
2 changes: 1 addition & 1 deletion src/servermon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ struct MonitorOp : public ServerOp,
{
(void)evbuffer_drain(conn->txBody.get(), evbuffer_get_length(conn->txBody.get()));

EvOutBuf R(hostBE, conn->txBody.get());
EvOutBuf R(conn->sendBE, conn->txBody.get());
to_wire(R, uint32_t(ioid));
to_wire(R, subcmd);
if(subcmd&0x08) {
Expand Down