Skip to content

Commit

Permalink
Better logging and exception messages
Browse files Browse the repository at this point in the history
  • Loading branch information
erikvanoosten committed Nov 13, 2024
1 parent 0bc1aeb commit dee9aaf
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,7 @@ final class PartitionStreamControl private (

/** To be invoked when the stream is no longer processing. */
private[internal] def halt: UIO[Unit] = {
val timeOutMessage = s"No records were pulled for more than $maxStreamPullInterval for topic partition $tp. " +
"Use ConsumerSettings.withMaxPollInterval to set a longer interval if processing a batch of records " +
"needs more time."
val timeOutMessage = s"No records were pulled for more than $maxStreamPullInterval for topic partition $tp."
val consumeTimeout = new TimeoutException(timeOutMessage) with NoStackTrace
interruptionPromise.fail(consumeTimeout).unit
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -671,8 +671,10 @@ private[consumer] final class Runloop private (
*/
private def checkStreamPullInterval(streams: Chunk[PartitionStreamControl]): ZIO[Any, Nothing, Unit] = {
def logShutdown(stream: PartitionStreamControl): ZIO[Any, Nothing, Unit] =
ZIO.logWarning(
s"Stream for ${stream.tp} has not pulled chunks for more than $maxStreamPullInterval, shutting down"
ZIO.logError(
s"Stream for ${stream.tp} has not pulled chunks for more than $maxStreamPullInterval, shutting down. " +
"Use ConsumerSettings.withMaxPollInterval or .withMaxStreamPullInterval to set a longer interval when " +
"processing a batch of records needs more time."
)

for {
Expand Down

0 comments on commit dee9aaf

Please sign in to comment.