Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix LCI on RoCE #73

Merged
merged 4 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

# Vscode related
.vscode/
build/

# CLion related
.idea
Expand Down
12 changes: 12 additions & 0 deletions lci/api/lci.h
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,18 @@ extern int LCI_RECV_SLOW_DOWN_USEC;
*/
extern bool LCI_IBV_ENABLE_TD;

/**
* @ingroup LCI_COMM
* @brief Which gid index to use for the ibv backend.
*/
extern int LCI_IBV_GID_IDX;

/**
* @ingroup LCI_COMM
* @brief Enable gid index auto selection for both ib and RoCE.
*/
extern int LCI_IBV_FORCE_GID_AUTO_SELECT;

/**
* @ingroup LCI_COMM
* @brief Whether to enable the progress specific network endpoint.
Expand Down
143 changes: 138 additions & 5 deletions lci/backend/ibv/lcisi_ibv_detail.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,8 @@ static double translate_speed(uint8_t speed)
}
}

bool LCISI_ibv_select_best_device_port(struct ibv_device** dev_list,
int num_devices,
struct ibv_device** device_o,
uint8_t* port_o)
bool select_best_device_port(struct ibv_device** dev_list, int num_devices,
struct ibv_device** device_o, uint8_t* port_o)
{
struct ibv_device* best_device;
uint8_t best_port;
Expand Down Expand Up @@ -92,7 +90,7 @@ bool LCISI_ibv_select_best_device_port(struct ibv_device** dev_list,
continue;
}
// Check whether we can get its lid
if (port_attr.link_layer != IBV_LINK_LAYER_ETHERNET && !port_attr.lid) {
if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND && !port_attr.lid) {
fprintf(stderr, "Couldn't get local LID\n");
continue;
}
Expand Down Expand Up @@ -136,4 +134,139 @@ bool LCISI_ibv_select_best_device_port(struct ibv_device** dev_list,
LCI_Log(LCI_LOG_INFO, "ibv", "No device is available!\n");
return false;
}
}

typedef enum roce_version_t {
ROCE_V2,
ROCE_V1,
ROCE_VER_UNKNOWN,
ROCE_VER_MAX,
} roce_version_t;

roce_version_t query_gid_roce_version(LCISI_server_t* server,
unsigned gid_index)
{
char buf[16];
int ret;
char* dev_name = ibv_get_device_name(server->ib_dev);

union ibv_gid gid;
ret = ibv_query_gid(server->dev_ctx, server->dev_port, gid_index, &gid);
if (ret == 0) {
ret = LCT_read_file(buf, sizeof(buf),
"/sys/class/infiniband/%s/ports/%d/gid_attrs/types/%d",
dev_name, server->dev_port, gid_index);
if (ret > 0) {
if (!strncmp(buf, "IB/RoCE v1", 10)) {
LCI_Log(LCI_LOG_DEBUG, "ibv",
"dev %s port %d index %d uses IB/Roce v1\n", dev_name,
server->dev_port, gid_index);
return ROCE_V1;
} else if (!strncmp(buf, "RoCE v2", 7)) {
LCI_Log(LCI_LOG_DEBUG, "ibv", "dev %s port %d index %d uses Roce v2\n",
dev_name, server->dev_port, gid_index);
return ROCE_V2;
}
}
}
LCI_Log(LCI_LOG_DEBUG, "ibv",
"failed to parse gid type '%s' (dev=%s port=%d index=%d)\n", buf,
dev_name, server->dev_port, gid_index);
return ROCE_VER_UNKNOWN;
}

bool test_roce_gid_index(LCISI_server_t* server, uint8_t gid_index)
{
struct ibv_ah_attr ah_attr;
struct ibv_ah* ah;
union ibv_gid gid;

IBV_SAFECALL(
ibv_query_gid(server->dev_ctx, server->dev_port, gid_index, &gid));

memset(&ah_attr, 0, sizeof(ah_attr));
ah_attr.port_num = server->dev_port;
ah_attr.is_global = 1;
ah_attr.grh.dgid = gid;
ah_attr.grh.sgid_index = gid_index;
ah_attr.grh.hop_limit = 255;
ah_attr.grh.flow_label = 1;
ah_attr.dlid = 0xC000;

ah = ibv_create_ah(server->dev_pd, &ah_attr);
if (ah == NULL) {
LCI_Log(LCI_LOG_DEBUG, "ibv", "gid entry %d is not operational\n",
gid_index);
return false;
}

ibv_destroy_ah(ah);
return true;
}

int select_best_gid_for_roce(LCISI_server_t* server)
{
static const int roce_prio[] = {
[ROCE_V2] = 0,
[ROCE_V1] = 1,
[ROCE_VER_UNKNOWN] = 2,
};
int gid_tbl_len = server->port_attr.gid_tbl_len;
int best_priority = 100;
int best_gid_idx = -1;

LCI_Log(LCI_LOG_DEBUG, "ibv", "RoCE gid auto selection among %d gids\n",
gid_tbl_len);
for (int i = 0; i < gid_tbl_len; ++i) {
roce_version_t version = query_gid_roce_version(server, i);
int priority = roce_prio[version];

if (priority == 0 && test_roce_gid_index(server, i)) {
best_gid_idx = i;
best_priority = priority;
break;
} else if (priority < best_priority && test_roce_gid_index(server, i)) {
best_gid_idx = i;
best_priority = priority;
}
}
if (best_gid_idx >= 0) {
LCI_Log(LCI_LOG_INFO, "ibv", "RoCE gid auto selection: use gid %d\n",
best_gid_idx);
return best_gid_idx;
}

const int default_gid = 0;
LCI_Log(LCI_LOG_INFO, "ibv",
"RoCE gid auto selection: fall back to the default gid %d\n",
default_gid);
return default_gid; // default gid for roce
}

void gid_to_wire_gid(const union ibv_gid* gid, char wgid[])
{
LCI_Assert(sizeof(union ibv_gid) == 16, "Unexpected ibv_gid size %d\n",
sizeof(union ibv_gid));
uint32_t tmp_gid[4];
int i;

memcpy(tmp_gid, gid, sizeof(tmp_gid));
for (i = 0; i < 4; ++i) sprintf(&wgid[i * 8], "%08x", htobe32(tmp_gid[i]));
}

void wire_gid_to_gid(const char* wgid, union ibv_gid* gid)
{
LCI_Assert(sizeof(union ibv_gid) == 16, "Unexpected ibv_gid size %d\n",
sizeof(union ibv_gid));
char tmp[9];
__be32 v32;
int i;
uint32_t tmp_gid[4];

for (tmp[8] = 0, i = 0; i < 4; ++i) {
memcpy(tmp, wgid + i * 8, 8);
sscanf(tmp, "%x", &v32);
tmp_gid[i] = be32toh(v32);
}
memcpy(gid, tmp_gid, sizeof(*gid));
}
14 changes: 10 additions & 4 deletions lci/backend/ibv/lcisi_ibv_detail.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,15 @@
#include "infiniband/verbs.h"
#include <stdbool.h>

bool LCISI_ibv_select_best_device_port(struct ibv_device** dev_list,
int num_devices,
struct ibv_device** device_o,
uint8_t* port_o);
bool select_best_device_port(struct ibv_device** dev_list, int num_devices,
struct ibv_device** device_o, uint8_t* port_o);

int select_best_gid_for_roce(LCISI_server_t* server);

const int WIRE_GID_NBYTES = 32;

void gid_to_wire_gid(const union ibv_gid* gid, char wgid[]);

void wire_gid_to_gid(const char* wgid, union ibv_gid* gid);

#endif
72 changes: 66 additions & 6 deletions lci/backend/ibv/server_ibv.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "runtime/lcii.h"
#include "backend/ibv/lcisi_ibv_detail.h"

static const int max_sge_num = 1;
static const int inline_size = 236;
Expand Down Expand Up @@ -85,8 +86,8 @@ void LCISD_server_init(LCIS_server_t* s)
exit(EXIT_FAILURE);
}

bool ret = LCISI_ibv_select_best_device_port(
server->dev_list, num_devices, &server->ib_dev, &server->dev_port);
bool ret = select_best_device_port(server->dev_list, num_devices,
&server->ib_dev, &server->dev_port);
LCI_Assert(ret, "Cannot find available ibv device/port!\n");

// ibv_open_device provides the user with a verbs context which is the object
Expand Down Expand Up @@ -117,6 +118,32 @@ void LCISD_server_init(LCIS_server_t* s)
exit(EXIT_FAILURE);
}

// adjust max_send, max_recv, max_cqe
// Note: the max value reported by device attributes might still not be the
// strict upper bound.
if (LCI_SERVER_MAX_SENDS > server->dev_attr.max_qp_wr) {
LCI_Log(LCI_LOG_INFO, "ibv",
"The configured LCI_SERVER_MAX_SENDS (%d) is adjusted as it is "
"larger than the maximum allowable value by the device (%d).\n",
LCI_SERVER_MAX_SENDS, server->dev_attr.max_qp_wr);
LCI_SERVER_MAX_SENDS = server->dev_attr.max_qp_wr;
}
if (LCI_SERVER_MAX_RECVS > server->dev_attr.max_srq_wr) {
LCI_Log(LCI_LOG_INFO, "ibv",
"The configured LCI_SERVER_MAX_RECVS (%d) is adjusted as it is "
"larger than the maximum allowable value by the device (%d).\n",
LCI_SERVER_MAX_RECVS, server->dev_attr.max_srq_wr);
LCI_SERVER_MAX_RECVS = server->dev_attr.max_srq_wr;
}
if (LCI_SERVER_MAX_CQES > server->dev_attr.max_cqe) {
LCI_Log(LCI_LOG_INFO, "ibv",
"The configured LCI_SERVER_MAX_CQES (%d) is adjusted as it is "
"larger than the maximum allowable value by the device (%d).\n",
LCI_SERVER_MAX_CQES, server->dev_attr.max_cqe);
LCI_SERVER_MAX_CQES = server->dev_attr.max_cqe;
}

// configure on-demand paging
server->odp_mr = NULL;
if (LCI_IBV_USE_ODP == 2) {
#ifdef IBV_ODP_SUPPORT_SRQ_RECV
Expand Down Expand Up @@ -158,7 +185,7 @@ void LCISD_server_init(LCIS_server_t* s)
if (rc != 0) {
fprintf(stderr, "Unable to query port\n");
exit(EXIT_FAILURE);
} else if (server->port_attr.link_layer != IBV_LINK_LAYER_ETHERNET &&
} else if (server->port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND &&
!server->port_attr.lid) {
fprintf(stderr, "Couldn't get local LID\n");
exit(EXIT_FAILURE);
Expand All @@ -167,6 +194,26 @@ void LCISD_server_init(LCIS_server_t* s)
LCI_Log(LCI_LOG_INFO, "ibv", "Maximum MTU: %s; Active MTU: %s\n",
mtu_str(server->port_attr.max_mtu),
mtu_str(server->port_attr.active_mtu));

// query the gid
server->gid_idx = LCI_IBV_GID_IDX;
if (server->gid_idx < 0 &&
(LCI_IBV_FORCE_GID_AUTO_SELECT ||
server->port_attr.link_layer == IBV_LINK_LAYER_ETHERNET)) {
// User did not explicitly specify the gid to use and we are using RoCE
server->gid_idx = select_best_gid_for_roce(server);
}
if (server->gid_idx >= 0) {
LCI_Log(LCI_LOG_INFO, "ibv", "Use GID index: %d\n", server->gid_idx);
if (ibv_query_gid(server->dev_ctx, server->dev_port, server->gid_idx,
&server->gid)) {
fprintf(stderr, "can't read sgid of index %d\n", server->gid_idx);
exit(EXIT_FAILURE);
}
} else
memset(&server->gid, 0, sizeof(server->gid));

// Initialize the event polling thread
LCISI_event_polling_thread_init(server);
}

Expand Down Expand Up @@ -337,12 +384,15 @@ void LCISD_endpoint_init(LCIS_server_t server_pp, LCIS_endpoint_t* endpoint_pp,
exit(EXIT_FAILURE);
}
}
char wgid[WIRE_GID_NBYTES + 1];
memset(wgid, 0, sizeof(wgid));
gid_to_wire_gid(&endpoint_p->server->gid, wgid);
// Use this queue pair "i" to connect to rank e.
char key[LCT_PMI_STRING_LIMIT + 1];
sprintf(key, "LCI_KEY_%d_%d_%d", endpoint_id, LCI_RANK, i);
char value[LCT_PMI_STRING_LIMIT + 1];
sprintf(value, "%x:%hx", endpoint_p->qps[i]->qp_num,
endpoint_p->server->port_attr.lid);
sprintf(value, "%x:%hx:%s", endpoint_p->qps[i]->qp_num,
endpoint_p->server->port_attr.lid, wgid);
LCT_pmi_publish(key, value);
}
LCI_Log(LCI_LOG_INFO, "ibv", "Current inline data size is %d\n", inline_size);
Expand All @@ -356,7 +406,10 @@ void LCISD_endpoint_init(LCIS_server_t server_pp, LCIS_endpoint_t* endpoint_pp,
LCT_pmi_getname(i, key, value);
uint32_t dest_qpn;
uint16_t dest_lid;
sscanf(value, "%x:%hx", &dest_qpn, &dest_lid);
union ibv_gid gid;
char wgid[WIRE_GID_NBYTES + 1];
sscanf(value, "%x:%hx:%s", &dest_qpn, &dest_lid, wgid);
wire_gid_to_gid(wgid, &gid);
// Once a queue pair (QP) has receive buffers posted to it, it is now
// possible to transition the QP into the ready to receive (RTR) state.
{
Expand All @@ -383,6 +436,13 @@ void LCISD_endpoint_init(LCIS_server_t server_pp, LCIS_endpoint_t* endpoint_pp,
attr.min_rnr_timer = 12;
// should not be necessary to set these, given is_global = 0
memset(&attr.ah_attr.grh, 0, sizeof attr.ah_attr.grh);
// If we are using gid
if (gid.global.interface_id) {
attr.ah_attr.is_global = 1;
attr.ah_attr.grh.hop_limit = 1;
attr.ah_attr.grh.dgid = gid;
attr.ah_attr.grh.sgid_index = endpoint_p->server->gid_idx;
}

int flags = IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN |
IBV_QP_RQ_PSN | IBV_QP_MAX_DEST_RD_ATOMIC |
Expand Down
2 changes: 2 additions & 0 deletions lci/backend/ibv/server_ibv.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ typedef struct __attribute__((aligned(LCI_CACHE_LINE))) LCISI_server_t {
uint8_t dev_port;
struct ibv_mr* odp_mr;
size_t max_inline;
int gid_idx;
union ibv_gid gid;
// event polling thread
pthread_t event_polling_thread;
atomic_bool event_polling_thread_run;
Expand Down
1 change: 0 additions & 1 deletion lci/backend/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ static inline LCI_error_t LCISD_post_recv(LCIS_endpoint_t endpoint_pp,
#endif
#ifdef LCI_USE_SERVER_IBV
#include "backend/ibv/server_ibv.h"
#include "backend/ibv/lcisi_ibv_detail.h"
#endif
#ifdef LCI_USE_SERVER_UCX
#include "backend/ucx/server_ucx.h"
Expand Down
5 changes: 5 additions & 0 deletions lci/runtime/env.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ LCI_API bool LCI_IBV_ENABLE_EVENT_POLLING_THREAD;
LCI_API int LCI_SEND_SLOW_DOWN_USEC;
LCI_API int LCI_RECV_SLOW_DOWN_USEC;
LCI_API bool LCI_IBV_ENABLE_TD;
LCI_API int LCI_IBV_GID_IDX;
LCI_API int LCI_IBV_FORCE_GID_AUTO_SELECT;
LCI_API bool LCI_ENABLE_PRG_NET_ENDPOINT;
LCI_API LCI_rdv_protocol_t LCI_RDV_PROTOCOL;
LCI_API bool LCI_OFI_CXI_TRY_NO_HACK;
Expand Down Expand Up @@ -85,6 +87,9 @@ void LCII_env_init(int num_proc, int rank)
#endif
LCI_IBV_ENABLE_TD =
LCIU_getenv_or("LCI_IBV_ENABLE_TD", LCI_IBV_ENABLE_TD_DEFAULT);
LCI_IBV_GID_IDX = LCIU_getenv_or("LCI_IBV_GID_IDX", -1);
LCI_IBV_FORCE_GID_AUTO_SELECT =
LCIU_getenv_or("LCI_IBV_FORCE_GID_AUTO_SELECT", 0);
LCI_ENABLE_PRG_NET_ENDPOINT = LCIU_getenv_or(
"LCI_ENABLE_PRG_NET_ENDPOINT", LCI_ENABLE_PRG_NET_ENDPOINT_DEFAULT);
LCI_MEDIUM_SIZE = LCI_PACKET_SIZE - sizeof(struct LCII_packet_context);
Expand Down
3 changes: 2 additions & 1 deletion lct/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ target_sources_relative(
tbarrier/tbarrier.cpp
util/thread.cpp
util/time.cpp
util/string.cpp)
util/string.cpp
util/io.cpp)

target_include_directories(LCT PRIVATE ${CMAKE_CURRENT_BINARY_DIR})

Expand Down
4 changes: 4 additions & 0 deletions lct/api/lct.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ LCT_API bool LCT_tbarrier_test(LCT_tbarrier_t tbarrier, int64_t ticket);
LCT_API void LCT_tbarrier_wait(LCT_tbarrier_t tbarrier, int64_t ticket);
LCT_API void LCT_tbarrier_arrive_and_wait(LCT_tbarrier_t tbarrier);

// File IO
LCT_API ssize_t LCT_read_file(char* buffer, size_t max,
const char* filename_fmt, ...);

#ifdef __cplusplus
}
#endif
Expand Down
Loading
Loading