-
Notifications
You must be signed in to change notification settings - Fork 40
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
3 changed files
with
424 additions
and
0 deletions.
There are no files selected for viewing
200 changes: 200 additions & 0 deletions
200
sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/exporter/SpanExporter.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,200 @@ | ||
/* | ||
* Copyright 2023 Typelevel | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.typelevel.otel4s.sdk.trace | ||
package exporter | ||
|
||
import cats.Applicative | ||
import cats.Foldable | ||
import cats.MonadThrow | ||
import cats.Monoid | ||
import cats.Parallel | ||
import cats.data.NonEmptyList | ||
import cats.syntax.all._ | ||
import org.typelevel.otel4s.sdk.trace.data.SpanData | ||
|
||
/** An interface that allows different tracing services to export recorded data | ||
* for sampled spans in their own format. | ||
* | ||
* @see | ||
* [[https://opentelemetry.io/docs/specs/otel/trace/sdk/#span-exporter]] | ||
* | ||
* @tparam F | ||
* the higher-kinded type of a polymorphic effect | ||
*/ | ||
trait SpanExporter[F[_]] { | ||
|
||
/** The name of the exporter. | ||
* | ||
* It will be used in an exception to distinguish individual failures in the | ||
* multi-error scenario. | ||
* | ||
* @see | ||
* [[SpanExporter.ExporterFailure]] | ||
* | ||
* @see | ||
* [[SpanExporter.CompositeExporterFailure]] | ||
* | ||
* @see | ||
* [[SpanExporter.Multi]] | ||
*/ | ||
def name: String | ||
|
||
/** Called to export sampled [[data.SpanData SpanData]]. | ||
* | ||
* @note | ||
* the export operations can be performed simultaneously depending on the | ||
* type of span processor being used. However, the batch span processor | ||
* will ensure that only one export can occur at a time. | ||
* | ||
* @param spans | ||
* the sampled spans to be exported | ||
*/ | ||
def exportSpans[G[_]: Foldable](spans: G[SpanData]): F[Unit] | ||
|
||
/** Exports the collection of sampled [[data.SpanData SpanData]] that have not | ||
* yet been exported. | ||
* | ||
* @note | ||
* the export operations can be performed simultaneously depending on the | ||
* type of span processor being used. However, the batch span processor | ||
* will ensure that only one export can occur at a time. | ||
*/ | ||
def flush: F[Unit] | ||
|
||
override def toString: String = | ||
name | ||
} | ||
|
||
object SpanExporter { | ||
|
||
/** Creates a [[SpanExporter]] which delegates all exports to the exporters. | ||
*/ | ||
def of[F[_]: MonadThrow: Parallel]( | ||
exporters: SpanExporter[F]* | ||
): SpanExporter[F] = | ||
if (exporters.sizeIs == 1) exporters.head | ||
else exporters.combineAll | ||
|
||
/** Creates a no-op implementation of the [[SpanExporter]]. | ||
* | ||
* All export operations are no-op. | ||
*/ | ||
def noop[F[_]: Applicative]: SpanExporter[F] = | ||
new Noop | ||
|
||
implicit def spanExporterMonoid[F[_]: MonadThrow: Parallel] | ||
: Monoid[SpanExporter[F]] = | ||
new Monoid[SpanExporter[F]] { | ||
val empty: SpanExporter[F] = | ||
noop[F] | ||
|
||
def combine(x: SpanExporter[F], y: SpanExporter[F]): SpanExporter[F] = | ||
(x, y) match { | ||
case (that, _: Noop[F]) => | ||
that | ||
case (_: Noop[F], other) => | ||
other | ||
case (that: Multi[F], other: Multi[F]) => | ||
Multi(that.exporters.concatNel(other.exporters)) | ||
case (that: Multi[F], other) => | ||
Multi(that.exporters :+ other) | ||
case (that, other: Multi[F]) => | ||
Multi(that :: other.exporters) | ||
case (that, other) => | ||
Multi(NonEmptyList.of(that, other)) | ||
} | ||
} | ||
|
||
/** An error occurred when invoking an exporter. | ||
* | ||
* @param exporter | ||
* the name of an exporter that failed. See [[SpanExporter.name]] | ||
* | ||
* @param failure | ||
* the error occurred | ||
*/ | ||
final case class ExporterFailure(exporter: String, failure: Throwable) | ||
extends Throwable( | ||
s"The exporter [$exporter] has failed due to ${failure.getMessage}", | ||
failure | ||
) | ||
|
||
/** An composite failure, when '''at least 2''' exporters have failed. | ||
* | ||
* @param first | ||
* the first occurred error | ||
* | ||
* @param rest | ||
* the rest of errors | ||
*/ | ||
final case class CompositeExporterFailure( | ||
first: ExporterFailure, | ||
rest: NonEmptyList[ExporterFailure] | ||
) extends Throwable( | ||
s"Multiple exporters [${rest.prepend(first).map(_.exporter).mkString_(", ")}] have failed", | ||
first | ||
) | ||
|
||
private final class Noop[F[_]: Applicative] extends SpanExporter[F] { | ||
val name: String = "SpanExporter.Noop" | ||
|
||
def exportSpans[G[_]: Foldable](spans: G[SpanData]): F[Unit] = | ||
Applicative[F].unit | ||
|
||
def flush: F[Unit] = | ||
Applicative[F].unit | ||
} | ||
|
||
private final case class Multi[F[_]: MonadThrow: Parallel]( | ||
exporters: NonEmptyList[SpanExporter[F]] | ||
) extends SpanExporter[F] { | ||
val name: String = | ||
s"SpanExporter.Multi(${exporters.map(_.toString).mkString_(", ")})" | ||
|
||
def exportSpans[G[_]: Foldable](spans: G[SpanData]): F[Unit] = | ||
exporters | ||
.parTraverse(e => e.exportSpans(spans).attempt.tupleLeft(e.toString)) | ||
.flatMap(attempts => handleAttempts(attempts)) | ||
|
||
def flush: F[Unit] = | ||
exporters | ||
.parTraverse(e => e.flush.attempt.tupleLeft(e.toString)) | ||
.flatMap(attempts => handleAttempts(attempts)) | ||
|
||
private def handleAttempts( | ||
results: NonEmptyList[(String, Either[Throwable, Unit])] | ||
): F[Unit] = { | ||
val failures = results.collect { case (exporter, Left(failure)) => | ||
ExporterFailure(exporter, failure) | ||
} | ||
|
||
failures match { | ||
case Nil => | ||
MonadThrow[F].unit | ||
|
||
case head :: Nil => | ||
MonadThrow[F].raiseError(head) | ||
|
||
case head :: tail => | ||
MonadThrow[F].raiseError( | ||
CompositeExporterFailure(head, NonEmptyList.fromListUnsafe(tail)) | ||
) | ||
} | ||
} | ||
} | ||
|
||
} |
55 changes: 55 additions & 0 deletions
55
sdk/trace/src/test/scala/org/typelevel/otel4s/sdk/trace/exporter/InMemorySpanExporter.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
/* | ||
* Copyright 2023 Typelevel | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.typelevel.otel4s.sdk.trace.exporter | ||
|
||
import cats.Foldable | ||
import cats.Monad | ||
import cats.effect.Concurrent | ||
import cats.effect.std.Queue | ||
import cats.syntax.foldable._ | ||
import cats.syntax.functor._ | ||
import org.typelevel.otel4s.sdk.trace.data.SpanData | ||
|
||
// todo: should be in the testkit package | ||
final class InMemorySpanExporter[F[_]: Monad] private ( | ||
queue: Queue[F, SpanData] | ||
) extends SpanExporter[F] { | ||
val name: String = "InMemorySpanExporter" | ||
|
||
def exportSpans[G[_]: Foldable](spans: G[SpanData]): F[Unit] = | ||
spans.traverse_(span => queue.offer(span)) | ||
|
||
def flush: F[Unit] = | ||
Monad[F].unit | ||
|
||
def finishedSpans: F[List[SpanData]] = | ||
queue.tryTakeN(None) | ||
|
||
def reset: F[Unit] = | ||
queue.tryTakeN(None).void | ||
} | ||
|
||
object InMemorySpanExporter { | ||
|
||
def create[F[_]: Concurrent]( | ||
capacity: Option[Int] | ||
): F[InMemorySpanExporter[F]] = | ||
for { | ||
queue <- capacity.fold(Queue.unbounded[F, SpanData])(Queue.bounded(_)) | ||
} yield new InMemorySpanExporter[F](queue) | ||
|
||
} |
Oops, something went wrong.