Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sdk-trace: add SpanExporter #373

Merged
merged 1 commit into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
* Copyright 2023 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.typelevel.otel4s.sdk.trace
package exporter

import cats.Applicative
import cats.Foldable
import cats.MonadThrow
import cats.Monoid
import cats.Parallel
import cats.data.NonEmptyList
import cats.syntax.all._
import org.typelevel.otel4s.sdk.trace.data.SpanData

/** An interface that allows different tracing services to export recorded data
* for sampled spans in their own format.
*
* @see
* [[https://opentelemetry.io/docs/specs/otel/trace/sdk/#span-exporter]]
*
* @tparam F
* the higher-kinded type of a polymorphic effect
*/
trait SpanExporter[F[_]] {

/** The name of the exporter.
*
* It will be used in an exception to distinguish individual failures in the
* multi-error scenario.
*
* @see
* [[SpanExporter.ExporterFailure]]
*
* @see
* [[SpanExporter.CompositeExporterFailure]]
*/
def name: String

/** Called to export sampled [[data.SpanData SpanData]].
*
* @note
* the export operations can be performed simultaneously depending on the
* type of span processor being used. However, the batch span processor
* will ensure that only one export can occur at a time.
*
* @param spans
* the sampled spans to be exported
*/
def exportSpans[G[_]: Foldable](spans: G[SpanData]): F[Unit]

/** Exports the collection of sampled [[data.SpanData SpanData]] that have not
* yet been exported.
*
* @note
* the export operations can be performed simultaneously depending on the
* type of span processor being used. However, the batch span processor
* will ensure that only one export can occur at a time.
*/
def flush: F[Unit]

override def toString: String =
name
}

object SpanExporter {

/** Creates a [[SpanExporter]] which delegates all exports to the exporters.
*/
def of[F[_]: MonadThrow: Parallel](
exporters: SpanExporter[F]*
): SpanExporter[F] =
if (exporters.sizeIs == 1) exporters.head
else exporters.combineAll

/** Creates a no-op implementation of the [[SpanExporter]].
*
* All export operations are no-op.
*/
def noop[F[_]: Applicative]: SpanExporter[F] =
new Noop

implicit def spanExporterMonoid[F[_]: MonadThrow: Parallel]
: Monoid[SpanExporter[F]] =
new Monoid[SpanExporter[F]] {
val empty: SpanExporter[F] =
noop[F]

def combine(x: SpanExporter[F], y: SpanExporter[F]): SpanExporter[F] =
(x, y) match {
case (that, _: Noop[F]) =>
that
case (_: Noop[F], other) =>
other
case (that: Multi[F], other: Multi[F]) =>
Multi(that.exporters.concatNel(other.exporters))
case (that: Multi[F], other) =>
Multi(that.exporters :+ other)
case (that, other: Multi[F]) =>
Multi(that :: other.exporters)
case (that, other) =>
Multi(NonEmptyList.of(that, other))
}
}

/** An error occurred when invoking an exporter.
*
* @param exporter
* the name of an exporter that failed. See [[SpanExporter.name]]
*
* @param failure
* the error occurred
*/
final case class ExporterFailure(exporter: String, failure: Throwable)
extends Exception(
s"The exporter [$exporter] has failed due to ${failure.getMessage}",
failure
)

/** An composite failure, when '''at least 2''' exporters have failed.
*
* @param first
* the first occurred error
*
* @param rest
* the rest of errors
*/
final case class CompositeExporterFailure(
first: ExporterFailure,
rest: NonEmptyList[ExporterFailure]
) extends Exception(
s"Multiple exporters [${rest.prepend(first).map(_.exporter).mkString_(", ")}] have failed",
first
)

private final class Noop[F[_]: Applicative] extends SpanExporter[F] {
val name: String = "SpanExporter.Noop"

def exportSpans[G[_]: Foldable](spans: G[SpanData]): F[Unit] =
Applicative[F].unit

def flush: F[Unit] =
Applicative[F].unit
}

private final case class Multi[F[_]: MonadThrow: Parallel](
exporters: NonEmptyList[SpanExporter[F]]
) extends SpanExporter[F] {
val name: String =
s"SpanExporter.Multi(${exporters.map(_.toString).mkString_(", ")})"

def exportSpans[G[_]: Foldable](spans: G[SpanData]): F[Unit] =
exporters
.parTraverse(e => e.exportSpans(spans).attempt.tupleLeft(e.toString))
.flatMap(attempts => handleAttempts(attempts))

def flush: F[Unit] =
exporters
.parTraverse(e => e.flush.attempt.tupleLeft(e.toString))
.flatMap(attempts => handleAttempts(attempts))

private def handleAttempts(
results: NonEmptyList[(String, Either[Throwable, Unit])]
): F[Unit] = {
val failures = results.collect { case (exporter, Left(failure)) =>
ExporterFailure(exporter, failure)
}

failures match {
case Nil =>
MonadThrow[F].unit

case head :: Nil =>
MonadThrow[F].raiseError(head)

case head :: tail =>
MonadThrow[F].raiseError(
CompositeExporterFailure(head, NonEmptyList.fromListUnsafe(tail))
)
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2023 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.typelevel.otel4s.sdk.trace.exporter

import cats.Foldable
import cats.Monad
import cats.effect.Concurrent
import cats.effect.std.Queue
import cats.syntax.foldable._
import cats.syntax.functor._
import org.typelevel.otel4s.sdk.trace.data.SpanData

// todo: should be in the testkit package
final class InMemorySpanExporter[F[_]: Monad] private (
queue: Queue[F, SpanData]
) extends SpanExporter[F] {
val name: String = "InMemorySpanExporter"

def exportSpans[G[_]: Foldable](spans: G[SpanData]): F[Unit] =
spans.traverse_(span => queue.offer(span))

def flush: F[Unit] =
Monad[F].unit

def finishedSpans: F[List[SpanData]] =
queue.tryTakeN(None)

def reset: F[Unit] =
queue.tryTakeN(None).void
}

object InMemorySpanExporter {

def create[F[_]: Concurrent](
capacity: Option[Int]
): F[InMemorySpanExporter[F]] =
for {
queue <- capacity.fold(Queue.unbounded[F, SpanData])(Queue.bounded(_))
} yield new InMemorySpanExporter[F](queue)

}
Loading