diff --git a/src/mqtt_client.c b/src/mqtt_client.c index 0a5091bb8..fe3c05f76 100644 --- a/src/mqtt_client.c +++ b/src/mqtt_client.c @@ -1233,8 +1233,10 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj, rc = MqttPacket_Write(client, client->tx_buf, client->write.len); #ifdef WOLFMQTT_NONBLOCK - if (rc == MQTT_CODE_CONTINUE) - break; + if (rc == MQTT_CODE_CONTINUE) { + /* keep send mutex locked and return to caller */ + return rc; + } #endif if (rc == client->write.len) { rc = 0; /* success */ @@ -1312,7 +1314,6 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj, if (!waitMatchFound) { /* if we get here, then the we are still waiting for a packet */ mms_stat->read = MQTT_MSG_BEGIN; - MQTT_TRACE_MSG("Wait Again"); #ifdef WOLFMQTT_NONBLOCK /* for non-blocking return with code continue instead of waiting again * if called with packet type and id of 'any' */ @@ -1320,6 +1321,7 @@ static int MqttClient_WaitType(MqttClient *client, void *packet_obj, return MQTT_CODE_CONTINUE; } #endif + MQTT_TRACE_MSG("Wait Again"); goto wait_again; } @@ -1679,13 +1681,20 @@ static int MqttClient_Publish_WritePayload(MqttClient *client, if (client == NULL || publish == NULL) return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG); - if (pubCb) { + if (pubCb) { /* use publish callback to get data */ word32 tmp_len = publish->buffer_len; do { - /* Use the callback to get payload */ - if ((client->write.len = pubCb(publish)) < 0) { - return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_CALLBACK); + /* use the client->write.len to handle non-blocking re-entry when + * new publish callback data is needed */ + if (client->write.len == 0) { + /* Use the callback to get payload */ + if ((client->write.len = pubCb(publish)) < 0) { + #ifdef WOLFMQTT_DEBUG_CLIENT + PRINTF("Publish callback error %d", client->write.len); + #endif + return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_CALLBACK); + } } if ((word32)client->write.len < publish->buffer_len) { @@ -1715,6 +1724,7 @@ static int MqttClient_Publish_WritePayload(MqttClient *client, publish->buffer_pos += publish->intBuf_pos; publish->intBuf_pos = 0; + client->write.len = 0; /* reset current write len */ } while (publish->buffer_pos < publish->total_len); } @@ -1746,8 +1756,17 @@ static int MqttClient_Publish_WritePayload(MqttClient *client, /* Check if we are done sending publish message */ if (publish->buffer_pos < publish->buffer_len) { + #ifdef WOLFMQTT_DEBUG_CLIENT + PRINTF("Publish Write: not done (%d remain)", + publish->buffer_len - publish->buffer_pos); + #endif return MQTT_CODE_PUB_CONTINUE; } + #ifdef WOLFMQTT_DEBUG_CLIENT + else { + PRINTF("Publish Write: done"); + } + #endif #else do { rc = MqttPacket_Write(client, client->tx_buf, client->write.len); @@ -1779,6 +1798,11 @@ static int MqttClient_Publish_WritePayload(MqttClient *client, /* If transferring more chunks */ publish->buffer_pos += publish->intBuf_pos; if (publish->buffer_pos < publish->total_len) { + #ifdef WOLFMQTT_DEBUG_CLIENT + PRINTF("Publish Write: chunk (%d remain)", + publish->total_len - publish->buffer_pos); + #endif + /* Build next payload to send */ client->write.len = (publish->total_len - publish->buffer_pos); if (client->write.len > client->tx_buf_len) { @@ -1786,6 +1810,11 @@ static int MqttClient_Publish_WritePayload(MqttClient *client, } rc = MQTT_CODE_PUB_CONTINUE; } + #ifdef WOLFMQTT_DEBUG_CLIENT + else { + PRINTF("Publish Write: chunked done"); + } + #endif } } return rc; @@ -1897,6 +1926,9 @@ static int MqttPublishMsg(MqttClient *client, MqttPublish *publish, return rc; } + /* reset client->write.len */ + client->write.len = 0; + /* advance state */ publish->stat.write = MQTT_MSG_PAYLOAD; } @@ -1906,7 +1938,7 @@ static int MqttPublishMsg(MqttClient *client, MqttPublish *publish, { rc = MqttClient_Publish_WritePayload(client, publish, pubCb); #ifdef WOLFMQTT_NONBLOCK - if (rc == MQTT_CODE_CONTINUE) + if (rc == MQTT_CODE_CONTINUE || rc == MQTT_CODE_PUB_CONTINUE) return rc; #endif #ifdef WOLFMQTT_MULTITHREAD @@ -2562,6 +2594,7 @@ int MqttClient_CancelMessage(MqttClient *client, MqttObject* msg) #ifdef WOLFMQTT_DEBUG_CLIENT PRINTF("Cancel Write Lock"); #endif + client->write.pos = 0; /* reset current write position */ mms_stat->isWriteLocked = 0; wm_SemUnlock(&client->lockSend); } diff --git a/src/mqtt_socket.c b/src/mqtt_socket.c index ce6b2cefb..25bb0950a 100644 --- a/src/mqtt_socket.c +++ b/src/mqtt_socket.c @@ -235,9 +235,7 @@ static int MqttSocket_ReadDo(MqttClient *client, byte* buf, int buf_len, client->tls.sockRcRead = 0; /* init value */ rc = wolfSSL_read(client->tls.ssl, (char*)buf, buf_len); if (rc < 0) { - #if defined(WOLFMQTT_DEBUG_SOCKET) || defined(WOLFSSL_ASYNC_CRYPT) int error = wolfSSL_get_error(client->tls.ssl, 0); - #endif #ifdef WOLFMQTT_DEBUG_SOCKET if (error != WOLFSSL_ERROR_WANT_READ && error != WC_PENDING_E) { @@ -252,7 +250,12 @@ static int MqttSocket_ReadDo(MqttClient *client, byte* buf, int buf_len, if (error == WC_PENDING_E) { rc = MQTT_CODE_CONTINUE; } + else #endif + /* used with compatibility layer to communicate peer close */ + if (error == WOLFSSL_ERROR_ZERO_RETURN) { + rc = MQTT_CODE_ERROR_NETWORK; + } } } else