Skip to content

Commit

Permalink
Update to draft-05 wire image
Browse files Browse the repository at this point in the history
Summary: Adds/renames old priority fields

Reviewed By: roticv

Differential Revision:
D59661631

Privacy Context Container: L1222497

fbshipit-source-id: af399ce1e6dee2c5a52875d9345603f848738d82
  • Loading branch information
afrind authored and facebook-github-bot committed Jul 30, 2024
1 parent 239f3ff commit 0bc3ae9
Show file tree
Hide file tree
Showing 19 changed files with 202 additions and 87 deletions.
2 changes: 1 addition & 1 deletion moxygen/MoQCodec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ folly::Expected<folly::Unit, ErrorCode> MoQCodec::parseFrame(
break;
}
case FrameType::SUBSCRIBE_UPDATE: {
auto res = parseSubscribeUpdateRequest(cursor);
auto res = parseSubscribeUpdate(cursor);
if (res) {
if (callback_) {
callback_->onSubscribeUpdate(std::move(res.value()));
Expand Down
3 changes: 1 addition & 2 deletions moxygen/MoQCodec.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ class MoQCodec {
std::unique_ptr<folly::IOBuf> payload,
bool eom) = 0;
virtual void onSubscribe(SubscribeRequest subscribeRequest) = 0;
virtual void onSubscribeUpdate(
SubscribeUpdateRequest subscribeUpdateRequest) = 0;
virtual void onSubscribeUpdate(SubscribeUpdate subscribeUpdate) = 0;
virtual void onSubscribeOk(SubscribeOk subscribeOk) = 0;
virtual void onSubscribeError(SubscribeError subscribeError) = 0;
virtual void onSubscribeDone(SubscribeDone subscribeDone) = 0;
Expand Down
98 changes: 67 additions & 31 deletions moxygen/MoQFramer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,10 @@ folly::Expected<ObjectHeader, ErrorCode> parseObjectHeader(
return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW);
}
objectHeader.id = id->first;
auto sendOrder = quic::decodeQuicInteger(cursor);
if (!sendOrder) {
if (!cursor.canAdvance(1)) {
return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW);
}
objectHeader.sendOrder = sendOrder->first;
objectHeader.priority = cursor.readBE<uint8_t>();
auto objectStatus = quic::decodeQuicInteger(cursor);
if (!objectStatus) {
return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW);
Expand Down Expand Up @@ -208,11 +207,11 @@ folly::Expected<ObjectHeader, ErrorCode> parseStreamHeader(
} else {
objectHeader.forwardPreference = ForwardPreference::Track;
}
auto sendOrder = quic::decodeQuicInteger(cursor);
if (!sendOrder) {
auto priority = quic::decodeQuicInteger(cursor);
if (!priority) {
return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW);
}
objectHeader.sendOrder = sendOrder->first;
objectHeader.priority = priority->first;
return objectHeader;
}

Expand Down Expand Up @@ -297,6 +296,15 @@ folly::Expected<SubscribeRequest, ErrorCode> parseSubscribeRequest(
return folly::makeUnexpected(res.error());
}
subscribeRequest.fullTrackName = std::move(res.value());
if (!cursor.canAdvance(2)) {
return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW);
}
subscribeRequest.priority = cursor.readBE<uint8_t>();
auto order = cursor.readBE<uint8_t>();
if (order > folly::to_underlying(GroupOrder::NewestFirst)) {
return folly::makeUnexpected(ErrorCode::INVALID_MESSAGE);
}
subscribeRequest.groupOrder = static_cast<GroupOrder>(order);
auto locType = quic::decodeQuicInteger(cursor);
if (!locType) {
return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW);
Expand Down Expand Up @@ -332,44 +340,38 @@ folly::Expected<SubscribeRequest, ErrorCode> parseSubscribeRequest(
return subscribeRequest;
}

folly::Expected<SubscribeUpdateRequest, ErrorCode> parseSubscribeUpdateRequest(
folly::Expected<SubscribeUpdate, ErrorCode> parseSubscribeUpdate(
folly::io::Cursor& cursor) noexcept {
SubscribeUpdateRequest subscribeUpdateRequest;
SubscribeUpdate subscribeUpdate;
auto subscribeID = quic::decodeQuicInteger(cursor);
if (!subscribeID) {
return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW);
}
subscribeUpdateRequest.subscribeID = subscribeID->first;
auto startGroup = quic::decodeQuicInteger(cursor);
if (!startGroup) {
return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW);
}
subscribeUpdateRequest.startGroup = startGroup->first;
auto startObject = quic::decodeQuicInteger(cursor);
if (!startObject) {
return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW);
subscribeUpdate.subscribeID = subscribeID->first;
auto start = parseAbsoluteLocation(cursor);
if (!start) {
return folly::makeUnexpected(start.error());
}
subscribeUpdateRequest.startObject = startObject->first;
auto endGroup = quic::decodeQuicInteger(cursor);
if (!endGroup) {
return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW);
subscribeUpdate.start = start.value();
auto end = parseAbsoluteLocation(cursor);
if (!end) {
return folly::makeUnexpected(end.error());
}
subscribeUpdateRequest.endGroup = endGroup->first;
auto endObject = quic::decodeQuicInteger(cursor);
if (!endObject) {
subscribeUpdate.end = end.value();
if (!cursor.canAdvance(1)) {
return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW);
}
subscribeUpdateRequest.endObject = endObject->first;
subscribeUpdate.priority = cursor.readBE<uint8_t>();
auto numParams = quic::decodeQuicInteger(cursor);
if (!numParams) {
return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW);
}
auto res2 = parseTrackRequestParams(
cursor, numParams->first, subscribeUpdateRequest.params);
auto res2 =
parseTrackRequestParams(cursor, numParams->first, subscribeUpdate.params);
if (!res2) {
return folly::makeUnexpected(res2.error());
}
return subscribeUpdateRequest;
return subscribeUpdate;
}

folly::Expected<SubscribeOk, ErrorCode> parseSubscribeOk(
Expand All @@ -386,9 +388,14 @@ folly::Expected<SubscribeOk, ErrorCode> parseSubscribeOk(
return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW);
}
subscribeOk.expires = std::chrono::milliseconds(expires->first);
if (!cursor.canAdvance(1)) {
if (!cursor.canAdvance(2)) {
return folly::makeUnexpected(ErrorCode::PARSE_UNDERFLOW);
}
auto order = cursor.readBE<uint8_t>();
if (order == 0 || order > folly::to_underlying(GroupOrder::NewestFirst)) {
return folly::makeUnexpected(ErrorCode::INVALID_MESSAGE);
}
subscribeOk.groupOrder = static_cast<GroupOrder>(order);
auto contentExists = cursor.readBE<uint8_t>();
if (contentExists) {
auto res = parseAbsoluteLocation(cursor);
Expand Down Expand Up @@ -728,7 +735,7 @@ WriteResult writeStreamHeader(
if (objectHeader.forwardPreference == ForwardPreference::Group) {
writeVarint(writeBuf, objectHeader.group, size, error);
}
writeVarint(writeBuf, objectHeader.sendOrder, size, error);
writeVarint(writeBuf, objectHeader.priority, size, error);
if (error) {
return folly::makeUnexpected(quic::TransportErrorCode::INTERNAL_ERROR);
}
Expand Down Expand Up @@ -764,7 +771,7 @@ WriteResult writeObject(
}
writeVarint(writeBuf, objectHeader.id, size, error);
if (!multiObject) {
writeVarint(writeBuf, objectHeader.sendOrder, size, error);
writeBuf.append(&objectHeader.priority, 1);
writeVarint(
writeBuf, folly::to_underlying(objectHeader.status), size, error);
} else {
Expand Down Expand Up @@ -794,6 +801,9 @@ WriteResult writeSubscribeRequest(
writeVarint(writeBuf, subscribeRequest.subscribeID, size, error);
writeVarint(writeBuf, subscribeRequest.trackAlias, size, error);
writeFullTrackName(writeBuf, subscribeRequest.fullTrackName, size, error);
writeBuf.append(&subscribeRequest.priority, 1);
uint8_t order = folly::to_underlying(subscribeRequest.groupOrder);
writeBuf.append(&order, 1);
writeVarint(
writeBuf, folly::to_underlying(subscribeRequest.locType), size, error);
if (subscribeRequest.locType == LocationType::AbsoluteStart ||
Expand All @@ -816,6 +826,30 @@ WriteResult writeSubscribeRequest(
return size;
}

WriteResult writeSubscribeUpdate(
folly::IOBufQueue& writeBuf,
const SubscribeUpdate& update) noexcept {
size_t size = 0;
bool error = false;
writeVarint(
writeBuf, folly::to_underlying(FrameType::SUBSCRIBE_UPDATE), size, error);
writeVarint(writeBuf, update.subscribeID, size, error);
writeVarint(writeBuf, update.start.group, size, error);
writeVarint(writeBuf, update.start.object, size, error);
writeVarint(writeBuf, update.end.group, size, error);
writeVarint(writeBuf, update.end.object, size, error);
writeBuf.append(&update.priority, 1);
writeVarint(writeBuf, update.params.size(), size, error);
for (auto& param : update.params) {
writeVarint(writeBuf, param.key, size, error);
writeFixedString(writeBuf, param.value, size, error);
}
if (error) {
return folly::makeUnexpected(quic::TransportErrorCode::INTERNAL_ERROR);
}
return size;
}

WriteResult writeSubscribeOk(
folly::IOBufQueue& writeBuf,
const SubscribeOk& subscribeOk) noexcept {
Expand All @@ -825,6 +859,8 @@ WriteResult writeSubscribeOk(
writeBuf, folly::to_underlying(FrameType::SUBSCRIBE_OK), size, error);
writeVarint(writeBuf, subscribeOk.subscribeID, size, error);
writeVarint(writeBuf, subscribeOk.expires.count(), size, error);
auto order = folly::to_underlying(subscribeOk.groupOrder);
writeBuf.append(&order, 1);
if (subscribeOk.latest) {
writeVarint(writeBuf, 1, size, error); // content exists
writeVarint(writeBuf, subscribeOk.latest->group, size, error);
Expand Down
29 changes: 21 additions & 8 deletions moxygen/MoQFramer.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ constexpr uint64_t kVersionDraft01 = 0xff000001;
constexpr uint64_t kVersionDraft02 = 0xff000002;
constexpr uint64_t kVersionDraft03 = 0xff000003;
constexpr uint64_t kVersionDraft04 = 0xff000004;
constexpr uint64_t kVersionDraftCurrent = kVersionDraft04;
constexpr uint64_t kVersionDraft05 = 0xff000005;
constexpr uint64_t kVersionDraftCurrent = kVersionDraft05;

struct ClientSetup {
std::vector<uint64_t> supportedVersions;
Expand Down Expand Up @@ -138,7 +139,7 @@ struct ObjectHeader {
uint64_t trackAlias;
uint64_t group;
uint64_t id;
uint64_t sendOrder;
uint64_t priority;
ForwardPreference forwardPreference;
ObjectStatus status{ObjectStatus::NORMAL};
folly::Optional<uint64_t> length{folly::none};
Expand Down Expand Up @@ -220,10 +221,18 @@ struct FullTrackName {
};
};

enum class GroupOrder : uint8_t {
Default = 0x0,
OldestFirst = 0x1,
NewestFirst = 0x2
};

struct SubscribeRequest {
uint64_t subscribeID;
uint64_t trackAlias;
FullTrackName fullTrackName;
uint8_t priority;
GroupOrder groupOrder;
LocationType locType;
folly::Optional<AbsoluteLocation> start;
folly::Optional<AbsoluteLocation> end;
Expand All @@ -233,21 +242,21 @@ struct SubscribeRequest {
folly::Expected<SubscribeRequest, ErrorCode> parseSubscribeRequest(
folly::io::Cursor& cursor) noexcept;

struct SubscribeUpdateRequest {
struct SubscribeUpdate {
uint64_t subscribeID;
uint64_t startGroup;
uint64_t startObject;
uint64_t endGroup;
uint64_t endObject;
AbsoluteLocation start;
AbsoluteLocation end;
uint8_t priority;
std::vector<TrackRequestParameter> params;
};

folly::Expected<SubscribeUpdateRequest, ErrorCode> parseSubscribeUpdateRequest(
folly::Expected<SubscribeUpdate, ErrorCode> parseSubscribeUpdate(
folly::io::Cursor& cursor) noexcept;

struct SubscribeOk {
uint64_t subscribeID;
std::chrono::milliseconds expires;
GroupOrder groupOrder;
// context exists is inferred from presence of latest
folly::Optional<AbsoluteLocation> latest;
};
Expand Down Expand Up @@ -366,6 +375,10 @@ WriteResult writeSubscribeRequest(
folly::IOBufQueue& writeBuf,
const SubscribeRequest& subscribeRequest) noexcept;

WriteResult writeSubscribeUpdate(
folly::IOBufQueue& writeBuf,
const SubscribeUpdate& update) noexcept;

WriteResult writeSubscribeOk(
folly::IOBufQueue& writeBuf,
const SubscribeOk& subscribeOk) noexcept;
Expand Down
2 changes: 1 addition & 1 deletion moxygen/MoQServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ void MoQServer::ControlVisitor::operator()(
}

void MoQServer::ControlVisitor::operator()(
SubscribeUpdateRequest subscribeUpdate) const {
SubscribeUpdate subscribeUpdate) const {
XLOG(INFO) << "SubscribeRequest id=" << subscribeUpdate.subscribeID;
}

Expand Down
2 changes: 1 addition & 1 deletion moxygen/MoQServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class MoQServer {
void operator()(ServerSetup) const override;
void operator()(Announce announce) const override;
void operator()(SubscribeRequest subscribeReq) const override;
void operator()(SubscribeUpdateRequest subscribeUpdate) const override;
void operator()(SubscribeUpdate subscribeUpdate) const override;
void operator()(Unannounce unannounce) const override;
void operator()(AnnounceCancel announceCancel) const override;
void operator()(SubscribeDone subscribeDone) const override;
Expand Down
8 changes: 3 additions & 5 deletions moxygen/MoQSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,9 @@ void MoQSession::onSubscribe(SubscribeRequest subscribeRequest) {
controlMessages_.enqueue(std::move(subscribeRequest));
}

void MoQSession::onSubscribeUpdate(
SubscribeUpdateRequest subscribeUpdateRequest) {
void MoQSession::onSubscribeUpdate(SubscribeUpdate subscribeUpdate) {
XLOG(DBG1) << __func__;
controlMessages_.enqueue(std::move(subscribeUpdateRequest));
controlMessages_.enqueue(std::move(subscribeUpdate));
}

void MoQSession::onUnsubscribe(Unsubscribe unsubscribe) {
Expand All @@ -295,7 +294,7 @@ void MoQSession::onSubscribeOk(SubscribeOk subOk) {
XLOG(ERR) << "No matching subscribe ID=" << subOk.subscribeID;
return;
}
subIt->second->subscribeOK(subIt->second, subOk.latest);
subIt->second->subscribeOK(subIt->second, subOk.groupOrder, subOk.latest);
}

void MoQSession::onSubscribeError(SubscribeError subErr) {
Expand Down Expand Up @@ -534,7 +533,6 @@ void MoQSession::publishImpl(
PublishKey publishKey(
{objHeader.subscribeID,
objHeader.group,
objHeader.sendOrder,
objHeader.forwardPreference,
objHeader.id});
auto pubDataIt = publishDataMap_.find(publishKey);
Expand Down
Loading

0 comments on commit 0bc3ae9

Please sign in to comment.