Skip to content

Commit

Permalink
MoQDate refactor, stream_per_group option and publish END_OF_GROUP
Browse files Browse the repository at this point in the history
Summary: Use a single-session Forwarder to unify the catchup and live subscription cases

Reviewed By: roticv

Differential Revision:
D59650063

Privacy Context Container: L1222497

fbshipit-source-id: f2bf5d069656356f31017b23034496753fe00dd1
  • Loading branch information
afrind authored and facebook-github-bot committed Jul 12, 2024
1 parent 659d006 commit bba553f
Showing 1 changed file with 30 additions and 63 deletions.
93 changes: 30 additions & 63 deletions moxygen/samples/date/MoQDateServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,17 @@ DEFINE_int32(relay_transaction_timeout, 120, "Transaction timeout (s)");
DEFINE_string(cert, "", "Cert path");
DEFINE_string(key, "", "Key path");
DEFINE_int32(port, 9667, "Server Port");
DEFINE_bool(stream_per_group, false, "Use one stream for each group");

namespace {
using namespace moxygen;

class MoQDateServer : MoQServer {
public:
explicit MoQDateServer(folly::EventBase* evb)
explicit MoQDateServer(folly::EventBase* evb, ForwardPreference pref)
: MoQServer(FLAGS_port, FLAGS_cert, FLAGS_key, "/moq-date"),
forwarder_(dateTrackName()) {
forwarder_(dateTrackName()),
pref_(pref) {
if (!FLAGS_relay_url.empty()) {
proxygen::URL url(FLAGS_relay_url);
if (!url.isValid() || !url.hasHost()) {
Expand Down Expand Up @@ -141,11 +143,13 @@ class MoQDateServer : MoQServer {
if (range.start >= now) {
return false;
}
MoQForwarder forwarder(dateTrackName());
forwarder.addSubscriber({clientSession, subscribeID, trackAlias, range});
time_t t =
range.start.group * 60 + std::max(range.start.object, (uint64_t)1) - 1;
while (range.start < now && range.start < range.end) {
auto n = publishDate(
t, false, clientSession, subscribeID, trackAlias, range.end);
auto n =
publishDate(forwarder, t, false, subscribeID, trackAlias, range.end);
t++;
// publishDate publishes two objects for obj = 0
range.start.object += n;
Expand All @@ -154,15 +158,7 @@ class MoQDateServer : MoQServer {
range.start.object = 0;
}
}
if (range.end <= now) {
clientSession->subscribeDone(
{subscribeID,
SubscribeDoneStatusCode::SUBSCRIPTION_ENDED,
"",
range.start});
return true;
}
return false;
return forwarder.empty();
}

folly::coro::Task<void> publishDateLoop() {
Expand All @@ -173,17 +169,17 @@ class MoQDateServer : MoQServer {
auto now = std::chrono::system_clock::now();
auto in_time_t = std::chrono::system_clock::to_time_t(now);

publishDate(in_time_t, first, nullptr, 0, 0, folly::none);
publishDate(forwarder_, in_time_t, first, 0, 0, folly::none);
first = false;
}
co_await folly::coro::sleep(std::chrono::seconds(1));
}
}

size_t publishDate(
MoQForwarder& forwarder,
time_t in_time_t,
bool forceGroup,
const std::shared_ptr<MoQSession>& session,
uint64_t subscribeID,
uint64_t trackAlias,
folly::Optional<AbsoluteLocation> end) {
Expand All @@ -197,72 +193,39 @@ class MoQDateServer : MoQServer {
{uint64_t(in_time_t / 60), uint64_t(lt->tm_sec + 1)});
if (lt->tm_sec == 0 || forceGroup) {
ObjectHeader objHeader(
{0,
0,
{subscribeID,
trackAlias,
nowLoc.group,
0,
0,
ForwardPreference::Object,
pref_,
ObjectStatus::NORMAL,
folly::none});
if (session) {
publishObjectToSession(
session,
subscribeID,
trackAlias,
std::move(objHeader),
folly::IOBuf::copyBuffer(ss.str()));
} else {
forwarder_.publish(
std::move(objHeader), folly::IOBuf::copyBuffer(ss.str()));
}
forwarder.publish(objHeader, folly::IOBuf::copyBuffer(ss.str()));
objectsPublished++;
}
if (!end || nowLoc < *end) {
auto secBuf = folly::to<std::string>(lt->tm_sec);
ObjectHeader objHeader(
{0,
0,
{subscribeID,
trackAlias,
nowLoc.group,
nowLoc.object,
0,
ForwardPreference::Object,
pref_,
ObjectStatus::NORMAL,
folly::none});
if (session) {
publishObjectToSession(
session,
subscribeID,
trackAlias,
std::move(objHeader),
folly::IOBuf::copyBuffer(secBuf));
} else {
forwarder_.publish(
std::move(objHeader), folly::IOBuf::copyBuffer(secBuf));
}
forwarder.publish(std::move(objHeader), folly::IOBuf::copyBuffer(secBuf));
objectsPublished++;
if (nowLoc.object == 60) {
objHeader.status = ObjectStatus::END_OF_GROUP;
objHeader.id++;
forwarder.publish(std::move(objHeader), nullptr);
}
}
return objectsPublished;
}

void publishObjectToSession(
const std::shared_ptr<MoQSession>& session,
uint64_t subscribeID,
uint64_t trackAlias,
ObjectHeader inObjHeader,
std::unique_ptr<folly::IOBuf> payload) {
session->getEventBase()->runImmediatelyOrRunInEventBaseThread(
[session,
subscribeID,
trackAlias,
objHeader = std::move(inObjHeader),
buf = payload->clone()]() mutable {
objHeader.subscribeID = subscribeID;
objHeader.trackAlias = trackAlias;
session->publish(std::move(objHeader), 0, std::move(buf), true);
});
}

void unsubscribe(
std::shared_ptr<MoQSession> session,
Unsubscribe unsubscribe) {
Expand All @@ -275,17 +238,21 @@ class MoQDateServer : MoQServer {
}

private:
[[nodiscard]] FullTrackName dateTrackName() const {
static FullTrackName dateTrackName() {
return FullTrackName({"moq-date", "/date"});
}
MoQForwarder forwarder_;
std::unique_ptr<MoQRelayClient> relayClient_;
ForwardPreference pref_{ForwardPreference::Group};
};
} // namespace
int main(int argc, char* argv[]) {
folly::Init init(&argc, &argv, true);
folly::EventBase evb;
MoQDateServer moqDateServer(&evb);
MoQDateServer moqDateServer(
&evb,
FLAGS_stream_per_group ? ForwardPreference::Group
: ForwardPreference::Object);
moqDateServer.publishDateLoop().scheduleOn(&evb).start();
evb.loopForever();
return 0;
Expand Down

0 comments on commit bba553f

Please sign in to comment.