From 642b187de94c4c5a23d0ea30ecf2b411ed9f2256 Mon Sep 17 00:00:00 2001 From: Maksym Ochenashko Date: Tue, 21 Nov 2023 11:30:39 +0200 Subject: [PATCH] sdk-trace: add `SpanExporter` --- .../sdk/trace/exporter/SpanExporter.scala | 197 ++++++++++++++++++ .../trace/exporter/InMemorySpanExporter.scala | 55 +++++ .../trace/exporter/SpanExporterSuite.scala | 169 +++++++++++++++ 3 files changed, 421 insertions(+) create mode 100644 sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/exporter/SpanExporter.scala create mode 100644 sdk/trace/src/test/scala/org/typelevel/otel4s/sdk/trace/exporter/InMemorySpanExporter.scala create mode 100644 sdk/trace/src/test/scala/org/typelevel/otel4s/sdk/trace/exporter/SpanExporterSuite.scala diff --git a/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/exporter/SpanExporter.scala b/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/exporter/SpanExporter.scala new file mode 100644 index 000000000..fb25627a9 --- /dev/null +++ b/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/exporter/SpanExporter.scala @@ -0,0 +1,197 @@ +/* + * Copyright 2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.sdk.trace +package exporter + +import cats.Applicative +import cats.Foldable +import cats.MonadThrow +import cats.Monoid +import cats.Parallel +import cats.data.NonEmptyList +import cats.syntax.all._ +import org.typelevel.otel4s.sdk.trace.data.SpanData + +/** An interface that allows different tracing services to export recorded data + * for sampled spans in their own format. + * + * @see + * [[https://opentelemetry.io/docs/specs/otel/trace/sdk/#span-exporter]] + * + * @tparam F + * the higher-kinded type of a polymorphic effect + */ +trait SpanExporter[F[_]] { + + /** The name of the exporter. + * + * It will be used in an exception to distinguish individual failures in the + * multi-error scenario. + * + * @see + * [[SpanExporter.ExporterFailure]] + * + * @see + * [[SpanExporter.CompositeExporterFailure]] + */ + def name: String + + /** Called to export sampled [[data.SpanData SpanData]]. + * + * @note + * the export operations can be performed simultaneously depending on the + * type of span processor being used. However, the batch span processor + * will ensure that only one export can occur at a time. + * + * @param spans + * the sampled spans to be exported + */ + def exportSpans[G[_]: Foldable](spans: G[SpanData]): F[Unit] + + /** Exports the collection of sampled [[data.SpanData SpanData]] that have not + * yet been exported. + * + * @note + * the export operations can be performed simultaneously depending on the + * type of span processor being used. However, the batch span processor + * will ensure that only one export can occur at a time. + */ + def flush: F[Unit] + + override def toString: String = + name +} + +object SpanExporter { + + /** Creates a [[SpanExporter]] which delegates all exports to the exporters. + */ + def of[F[_]: MonadThrow: Parallel]( + exporters: SpanExporter[F]* + ): SpanExporter[F] = + if (exporters.sizeIs == 1) exporters.head + else exporters.combineAll + + /** Creates a no-op implementation of the [[SpanExporter]]. + * + * All export operations are no-op. + */ + def noop[F[_]: Applicative]: SpanExporter[F] = + new Noop + + implicit def spanExporterMonoid[F[_]: MonadThrow: Parallel] + : Monoid[SpanExporter[F]] = + new Monoid[SpanExporter[F]] { + val empty: SpanExporter[F] = + noop[F] + + def combine(x: SpanExporter[F], y: SpanExporter[F]): SpanExporter[F] = + (x, y) match { + case (that, _: Noop[F]) => + that + case (_: Noop[F], other) => + other + case (that: Multi[F], other: Multi[F]) => + Multi(that.exporters.concatNel(other.exporters)) + case (that: Multi[F], other) => + Multi(that.exporters :+ other) + case (that, other: Multi[F]) => + Multi(that :: other.exporters) + case (that, other) => + Multi(NonEmptyList.of(that, other)) + } + } + + /** An error occurred when invoking an exporter. + * + * @param exporter + * the name of an exporter that failed. See [[SpanExporter.name]] + * + * @param failure + * the error occurred + */ + final case class ExporterFailure(exporter: String, failure: Throwable) + extends Exception( + s"The exporter [$exporter] has failed due to ${failure.getMessage}", + failure + ) + + /** An composite failure, when '''at least 2''' exporters have failed. + * + * @param first + * the first occurred error + * + * @param rest + * the rest of errors + */ + final case class CompositeExporterFailure( + first: ExporterFailure, + rest: NonEmptyList[ExporterFailure] + ) extends Exception( + s"Multiple exporters [${rest.prepend(first).map(_.exporter).mkString_(", ")}] have failed", + first + ) + + private final class Noop[F[_]: Applicative] extends SpanExporter[F] { + val name: String = "SpanExporter.Noop" + + def exportSpans[G[_]: Foldable](spans: G[SpanData]): F[Unit] = + Applicative[F].unit + + def flush: F[Unit] = + Applicative[F].unit + } + + private final case class Multi[F[_]: MonadThrow: Parallel]( + exporters: NonEmptyList[SpanExporter[F]] + ) extends SpanExporter[F] { + val name: String = + s"SpanExporter.Multi(${exporters.map(_.toString).mkString_(", ")})" + + def exportSpans[G[_]: Foldable](spans: G[SpanData]): F[Unit] = + exporters + .parTraverse(e => e.exportSpans(spans).attempt.tupleLeft(e.toString)) + .flatMap(attempts => handleAttempts(attempts)) + + def flush: F[Unit] = + exporters + .parTraverse(e => e.flush.attempt.tupleLeft(e.toString)) + .flatMap(attempts => handleAttempts(attempts)) + + private def handleAttempts( + results: NonEmptyList[(String, Either[Throwable, Unit])] + ): F[Unit] = { + val failures = results.collect { case (exporter, Left(failure)) => + ExporterFailure(exporter, failure) + } + + failures match { + case Nil => + MonadThrow[F].unit + + case head :: Nil => + MonadThrow[F].raiseError(head) + + case head :: tail => + MonadThrow[F].raiseError( + CompositeExporterFailure(head, NonEmptyList.fromListUnsafe(tail)) + ) + } + } + } + +} diff --git a/sdk/trace/src/test/scala/org/typelevel/otel4s/sdk/trace/exporter/InMemorySpanExporter.scala b/sdk/trace/src/test/scala/org/typelevel/otel4s/sdk/trace/exporter/InMemorySpanExporter.scala new file mode 100644 index 000000000..9138f22bc --- /dev/null +++ b/sdk/trace/src/test/scala/org/typelevel/otel4s/sdk/trace/exporter/InMemorySpanExporter.scala @@ -0,0 +1,55 @@ +/* + * Copyright 2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.sdk.trace.exporter + +import cats.Foldable +import cats.Monad +import cats.effect.Concurrent +import cats.effect.std.Queue +import cats.syntax.foldable._ +import cats.syntax.functor._ +import org.typelevel.otel4s.sdk.trace.data.SpanData + +// todo: should be in the testkit package +final class InMemorySpanExporter[F[_]: Monad] private ( + queue: Queue[F, SpanData] +) extends SpanExporter[F] { + val name: String = "InMemorySpanExporter" + + def exportSpans[G[_]: Foldable](spans: G[SpanData]): F[Unit] = + spans.traverse_(span => queue.offer(span)) + + def flush: F[Unit] = + Monad[F].unit + + def finishedSpans: F[List[SpanData]] = + queue.tryTakeN(None) + + def reset: F[Unit] = + queue.tryTakeN(None).void +} + +object InMemorySpanExporter { + + def create[F[_]: Concurrent]( + capacity: Option[Int] + ): F[InMemorySpanExporter[F]] = + for { + queue <- capacity.fold(Queue.unbounded[F, SpanData])(Queue.bounded(_)) + } yield new InMemorySpanExporter[F](queue) + +} diff --git a/sdk/trace/src/test/scala/org/typelevel/otel4s/sdk/trace/exporter/SpanExporterSuite.scala b/sdk/trace/src/test/scala/org/typelevel/otel4s/sdk/trace/exporter/SpanExporterSuite.scala new file mode 100644 index 000000000..78b4d1119 --- /dev/null +++ b/sdk/trace/src/test/scala/org/typelevel/otel4s/sdk/trace/exporter/SpanExporterSuite.scala @@ -0,0 +1,169 @@ +/* + * Copyright 2023 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.sdk.trace +package exporter + +import cats.Foldable +import cats.data.NonEmptyList +import cats.effect.IO +import munit.CatsEffectSuite +import org.typelevel.otel4s.sdk.trace.data.SpanData + +class SpanExporterSuite extends CatsEffectSuite { + + test("create a no-op instance") { + val exporter = SpanExporter.noop[IO] + + assertEquals(exporter.toString, "SpanExporter.Noop") + } + + test("of (empty input) - use noop") { + val exporter = SpanExporter.of[IO]() + + assertEquals(exporter.toString, "SpanExporter.Noop") + } + + test("of (single input) - use this input") { + val data: SpanData = getSpanData + + for { + inMemory <- InMemorySpanExporter.create[IO](None) + exporter <- IO.pure(SpanExporter.of(inMemory)) + _ <- exporter.exportSpans(List(data)) + spans <- inMemory.finishedSpans + } yield { + assertEquals(exporter.toString, "InMemorySpanExporter") + assertEquals(spans, List(data)) + } + } + + test("of (multiple) - create a multi instance") { + val data: SpanData = getSpanData + + for { + inMemoryA <- InMemorySpanExporter.create[IO](None) + inMemoryB <- InMemorySpanExporter.create[IO](None) + exporter <- IO.pure(SpanExporter.of(inMemoryA, inMemoryB)) + _ <- exporter.exportSpans(List(data)) + spansA <- inMemoryA.finishedSpans + spansB <- inMemoryB.finishedSpans + } yield { + assertEquals(spansA, List(data)) + assertEquals(spansB, List(data)) + + assertEquals( + exporter.toString, + "SpanExporter.Multi(InMemorySpanExporter, InMemorySpanExporter)" + ) + } + } + + test("of (multiple) - flatten out nested multi instances") { + val data: SpanData = getSpanData + + for { + inMemoryA <- InMemorySpanExporter.create[IO](None) + inMemoryB <- InMemorySpanExporter.create[IO](None) + + multi1 <- IO.pure(SpanExporter.of(inMemoryA, inMemoryB)) + multi2 <- IO.pure(SpanExporter.of(inMemoryA, inMemoryB)) + + exporter <- IO.pure(SpanExporter.of(multi1, multi2)) + + _ <- exporter.exportSpans(List(data)) + spansA <- inMemoryA.finishedSpans + spansB <- inMemoryB.finishedSpans + } yield { + assertEquals(spansA, List(data, data)) + assertEquals(spansB, List(data, data)) + + assertEquals( + exporter.toString, + "SpanExporter.Multi(InMemorySpanExporter, InMemorySpanExporter, InMemorySpanExporter, InMemorySpanExporter)" + ) + } + } + + test("of (multiple) - single failure - rethrow a single failure") { + val data: SpanData = getSpanData + + val onExport = new RuntimeException("cannot export spans") + val onFlush = new RuntimeException("cannot flush") + + val failing = new FailingExporter("error-prone", onExport, onFlush) + + def expected(e: Throwable) = + SpanExporter.ExporterFailure("error-prone", e) + + for { + inMemoryA <- InMemorySpanExporter.create[IO](None) + inMemoryB <- InMemorySpanExporter.create[IO](None) + exporter <- IO.pure(SpanExporter.of(inMemoryA, failing, inMemoryB)) + exportAttempt <- exporter.exportSpans(List(data)).attempt + flushAttempt <- exporter.flush.attempt + spansA <- inMemoryA.finishedSpans + spansB <- inMemoryB.finishedSpans + } yield { + assertEquals(spansA, List(data)) + assertEquals(spansB, List(data)) + assertEquals(exportAttempt, Left(expected(onExport))) + assertEquals(flushAttempt, Left(expected(onFlush))) + } + } + + test("of (multiple) - multiple failures - rethrow a composite failure") { + val onExport = new RuntimeException("cannot export spans") + val onFlush = new RuntimeException("cannot flush") + + val exporter1 = new FailingExporter("exporter1", onExport, onFlush) + val exporter2 = new FailingExporter("exporter2", onExport, onFlush) + + val exporter = SpanExporter.of(exporter1, exporter2) + + def expected(e: Throwable) = + SpanExporter.CompositeExporterFailure( + SpanExporter.ExporterFailure("exporter1", e), + NonEmptyList.of(SpanExporter.ExporterFailure("exporter2", e)) + ) + + for { + exportAttempt <- exporter.exportSpans(List.empty[SpanData]).attempt + flushAttempt <- exporter.flush.attempt + } yield { + assertEquals(exportAttempt, Left(expected(onExport))) + assertEquals(flushAttempt, Left(expected(onFlush))) + } + } + + private def getSpanData: SpanData = + Gens.spanData.sample.getOrElse(getSpanData) + + private class FailingExporter( + exporterName: String, + onExport: Throwable, + onFlush: Throwable + ) extends SpanExporter[IO] { + def name: String = exporterName + + def exportSpans[G[_]: Foldable](spans: G[SpanData]): IO[Unit] = + IO.raiseError(onExport) + + def flush: IO[Unit] = + IO.raiseError(onFlush) + } + +}