diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0277ddd..ccba01d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -28,9 +28,9 @@ jobs: strategy: matrix: os: [ubuntu-latest] - scala: [2.12.18, 2.13.12, 3.3.1] + scala: [2.13.12, 3.3.1] java: [temurin@17] - project: [rootJS, rootJVM, rootNative] + project: [rootJVM] runs-on: ${{ matrix.os }} steps: - name: Checkout current branch (full) @@ -69,14 +69,6 @@ jobs: - name: Check that workflows are up to date run: sbt githubWorkflowCheck - - name: scalaJSLink - if: matrix.project == 'rootJS' - run: sbt 'project ${{ matrix.project }}' '++ ${{ matrix.scala }}' Test/scalaJSLinkerResult - - - name: nativeLink - if: matrix.project == 'rootNative' - run: sbt 'project ${{ matrix.project }}' '++ ${{ matrix.scala }}' Test/nativeLink - - name: Test run: sbt 'project ${{ matrix.project }}' '++ ${{ matrix.scala }}' test @@ -90,11 +82,11 @@ jobs: - name: Make target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') - run: mkdir -p examples/target target .js/target core/.native/target site/target core/.js/target core/.jvm/target .jvm/target .native/target project/target + run: mkdir -p examples/target target .js/target site/target core/.jvm/target .jvm/target .native/target project/target - name: Compress target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') - run: tar cf targets.tar examples/target target .js/target core/.native/target site/target core/.js/target core/.jvm/target .jvm/target .native/target project/target + run: tar cf targets.tar examples/target target .js/target site/target core/.jvm/target .jvm/target .native/target project/target - name: Upload target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') @@ -146,46 +138,6 @@ jobs: ~/Library/Caches/Coursier/v1 key: ${{ runner.os }}-sbt-cache-v2-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }} - - name: Download target directories (2.12.18, rootJS) - uses: actions/download-artifact@v3 - with: - name: target-${{ matrix.os }}-${{ matrix.java }}-2.12.18-rootJS - - - name: Inflate target directories (2.12.18, rootJS) - run: | - tar xf targets.tar - rm targets.tar - - - name: Download target directories (2.12.18, rootJVM) - uses: actions/download-artifact@v3 - with: - name: target-${{ matrix.os }}-${{ matrix.java }}-2.12.18-rootJVM - - - name: Inflate target directories (2.12.18, rootJVM) - run: | - tar xf targets.tar - rm targets.tar - - - name: Download target directories (2.12.18, rootNative) - uses: actions/download-artifact@v3 - with: - name: target-${{ matrix.os }}-${{ matrix.java }}-2.12.18-rootNative - - - name: Inflate target directories (2.12.18, rootNative) - run: | - tar xf targets.tar - rm targets.tar - - - name: Download target directories (2.13.12, rootJS) - uses: actions/download-artifact@v3 - with: - name: target-${{ matrix.os }}-${{ matrix.java }}-2.13.12-rootJS - - - name: Inflate target directories (2.13.12, rootJS) - run: | - tar xf targets.tar - rm targets.tar - - name: Download target directories (2.13.12, rootJVM) uses: actions/download-artifact@v3 with: @@ -196,26 +148,6 @@ jobs: tar xf targets.tar rm targets.tar - - name: Download target directories (2.13.12, rootNative) - uses: actions/download-artifact@v3 - with: - name: target-${{ matrix.os }}-${{ matrix.java }}-2.13.12-rootNative - - - name: Inflate target directories (2.13.12, rootNative) - run: | - tar xf targets.tar - rm targets.tar - - - name: Download target directories (3.3.1, rootJS) - uses: actions/download-artifact@v3 - with: - name: target-${{ matrix.os }}-${{ matrix.java }}-3.3.1-rootJS - - - name: Inflate target directories (3.3.1, rootJS) - run: | - tar xf targets.tar - rm targets.tar - - name: Download target directories (3.3.1, rootJVM) uses: actions/download-artifact@v3 with: @@ -226,16 +158,6 @@ jobs: tar xf targets.tar rm targets.tar - - name: Download target directories (3.3.1, rootNative) - uses: actions/download-artifact@v3 - with: - name: target-${{ matrix.os }}-${{ matrix.java }}-3.3.1-rootNative - - - name: Inflate target directories (3.3.1, rootNative) - run: | - tar xf targets.tar - rm targets.tar - - name: Import signing key if: env.PGP_SECRET != '' && env.PGP_PASSPHRASE == '' run: echo $PGP_SECRET | base64 -di | gpg --import diff --git a/build.sbt b/build.sbt index 0b0cc6a..9471bfd 100644 --- a/build.sbt +++ b/build.sbt @@ -15,8 +15,10 @@ ThisBuild / tlCiReleaseBranches := Seq("main") // true by default, set to false to publish to s01.oss.sonatype.org ThisBuild / tlSonatypeUseLegacyHost := true -ThisBuild / crossScalaVersions := Seq("2.12.18", "2.13.12", "3.3.1") -ThisBuild / scalaVersion := "3.3.1" +val scala213 = "2.13.12" +val scala3 = "3.3.1" +ThisBuild / crossScalaVersions := Seq(scala213, scala3) +ThisBuild / scalaVersion := scala213 ThisBuild / githubWorkflowJavaVersions := Seq(JavaSpec.temurin("17")) ThisBuild / tlJdkRelease := Some(8) @@ -25,10 +27,13 @@ ThisBuild / testFrameworks += new TestFramework("munit.Framework") val catsV = "2.10.0" val catsEffectV = "3.5.2" -val fs2V = "3.7.0" -val http4sV = "0.23.19" -val fiberLocalV = "0.1.2" -val natchezV = "0.3.2" +val catsMtlV = "1.3.1" +val fs2V = "3.9.2" +val http4sV = "0.23.23" + +val openTelemetryV = "1.31.0" +val otel4sV = "0.3.0" + val munitCatsEffectV = "2.0.0-M3" val slf4jV = "1.7.36" @@ -38,15 +43,11 @@ val slf4jV = "1.7.36" lazy val `natchez-http4s-otel` = tlCrossRootProject .aggregate(core, examples) -lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform) +lazy val core = crossProject(JVMPlatform) .crossType(CrossType.Pure) .in(file("core")) .settings( - name := "natchez-http4s-otel", - mimaBinaryIssueFilters ++= Seq( - ProblemFilters.exclude[DirectMissingMethodProblem]("io.chrisdavenport.natchezhttp4sotel.ClientMiddleware.request"), - ProblemFilters.exclude[DirectMissingMethodProblem]("io.chrisdavenport.natchezhttp4sotel.ServerMiddleware.request") - ), + name := "http4s-otel4s", libraryDependencies ++= Seq( "org.typelevel" %%% "cats-core" % catsV, @@ -55,18 +56,17 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform) "co.fs2" %%% "fs2-core" % fs2V, "co.fs2" %%% "fs2-io" % fs2V, - "org.http4s" %%% "http4s-server" % http4sV, - "org.http4s" %%% "http4s-client" % http4sV, + "org.http4s" %%% "http4s-server" % http4sV, + "org.http4s" %%% "http4s-client" % http4sV, - "io.chrisdavenport" %%% "fiberlocal" % fiberLocalV, - "org.tpolecat" %%% "natchez-core" % natchezV, + "org.typelevel" %%% "otel4s-core-trace" % otel4sV, + "org.typelevel" %%% "otel4s-java" % otel4sV, + "org.typelevel" %%% "cats-mtl" % catsMtlV, - - "org.typelevel" %%% "munit-cats-effect" % munitCatsEffectV % Test, - "org.tpolecat" %%% "natchez-testkit" % natchezV % Test, + "io.opentelemetry" % "opentelemetry-sdk-testing" % openTelemetryV % Test, + "org.typelevel" %%% "cats-effect-testkit" % catsEffectV % Test, + "org.typelevel" %%% "munit-cats-effect" % munitCatsEffectV % Test, ) - ).jsSettings( - scalaJSLinkerConfig ~= { _.withModuleKind(ModuleKind.CommonJSModule)}, ) lazy val examples = project.in(file("examples")) @@ -75,12 +75,18 @@ lazy val examples = project.in(file("examples")) .settings( scalacOptions -= "-Xfatal-warnings", libraryDependencies ++= Seq( - "org.tpolecat" %% "natchez-jaeger" % natchezV, + "org.typelevel" %% "otel4s-java" % otel4sV, + "io.opentelemetry" % "opentelemetry-exporter-otlp" % openTelemetryV % Runtime, + "io.opentelemetry" % "opentelemetry-sdk-extension-autoconfigure" % openTelemetryV % Runtime, "org.http4s" %% "http4s-dsl" % http4sV, "org.http4s" %% "http4s-ember-server" % http4sV, "org.http4s" %% "http4s-ember-client" % http4sV, "org.slf4j" % "slf4j-simple" % slf4jV, - ) + ), + run / fork := true, + javaOptions += "-Dotel.service.name=jaeger-example", + javaOptions += "-Dotel.metrics.exporter=none", + javaOptions += "-Dotel.java.global-autoconfigure.enabled=true", ) lazy val site = project.in(file("site")) diff --git a/core/src/main/scala/io/chrisdavenport/http4sotel4s/ClientMiddleware.scala b/core/src/main/scala/io/chrisdavenport/http4sotel4s/ClientMiddleware.scala new file mode 100644 index 0000000..2bc405a --- /dev/null +++ b/core/src/main/scala/io/chrisdavenport/http4sotel4s/ClientMiddleware.scala @@ -0,0 +1,185 @@ +package io.chrisdavenport.http4sotel4s + +import cats.Applicative +import cats.effect.kernel.Outcome +import cats.effect.{Concurrent, MonadCancelThrow, Resource, SyncIO} +import cats.syntax.flatMap._ +import org.http4s.client.{Client, RequestKey} +import org.http4s.{Headers, Request, Response} +import org.http4s.client.middleware.Retry +import org.http4s.headers.{Host, `User-Agent`} +import org.typelevel.ci.CIString +import org.typelevel.otel4s.Attribute +import org.typelevel.otel4s.trace.Tracer +import org.typelevel.vault.{Key, Vault} + +import scala.collection.mutable.ListBuffer + +object ClientMiddleware { + + def default[F[_]: Tracer: Concurrent]: ClientMiddlewareBuilder[F] = { + new ClientMiddlewareBuilder[F](Defaults.reqHeaders, Defaults.respHeaders, Defaults.clientSpanName, Defaults.additionalRequestTags, Defaults.additionalResponseTags, Defaults.includeUrl) + } + + object Defaults { + val reqHeaders = OTHttpTags.Headers.defaultHeadersIncluded + val respHeaders = OTHttpTags.Headers.defaultHeadersIncluded + def clientSpanName[F[_]]: Request[F] => String = {(req: Request[F]) => s"Http Client - ${req.method}"} + def additionalRequestTags[F[_]]: Request[F] => Seq[Attribute[_]] = {(_: Request[F]) => Seq()} + def additionalResponseTags[F[_]]: Response[F] => Seq[Attribute[_]] = {(_: Response[F]) => Seq()} + def includeUrl[F[_]]: Request[F] => Boolean = {(_: Request[F]) => true} + } + + final class ClientMiddlewareBuilder[F[_]: Tracer: Concurrent] private[ClientMiddleware] ( + private val reqHeaders: Set[CIString], + private val respHeaders: Set[CIString], + private val clientSpanName: Request[F] => String, + private val additionalRequestTags: Request[F] => Seq[Attribute[_]], + private val additionalResponseTags: Response[F] => Seq[Attribute[_]], + private val includeUrl: Request[F] => Boolean, + ){ self => + private def copy( + reqHeaders: Set[CIString] = self.reqHeaders, + respHeaders: Set[CIString] = self.respHeaders, + clientSpanName: Request[F] => String = self.clientSpanName, + additionalRequestTags: Request[F] => Seq[Attribute[_]] = self.additionalRequestTags, + additionalResponseTags: Response[F] => Seq[Attribute[_]] = self.additionalResponseTags , + includeUrl: Request[F] => Boolean = self.includeUrl, + ): ClientMiddlewareBuilder[F] = + new ClientMiddlewareBuilder[F](reqHeaders, respHeaders, clientSpanName, additionalRequestTags, additionalResponseTags, includeUrl) + + def withRequestHeaders(reqHeaders: Set[CIString]) = copy(reqHeaders = reqHeaders) + + def withResponseHeaders(respHeaders: Set[CIString]) = copy(respHeaders = respHeaders) + + def withClientSpanName(clientSpanName: Request[F] => String) = copy(clientSpanName = clientSpanName) + + def withAdditionalRequestTags(additionalRequestTags: Request[F] => Seq[Attribute[_]]) = + copy(additionalRequestTags = additionalRequestTags) + + def withAdditionalResponseTags(additionalResponseTags: Response[F] => Seq[Attribute[_]]) = + copy(additionalResponseTags = additionalResponseTags) + + def withIncludeUrl(includeUrl: Request[F] => Boolean ) = copy(includeUrl = includeUrl) + + def build: Client[F] => Client[F] = { (client: Client[F]) => + Client[F] { (req: Request[F]) => // Resource[F, Response[F]] + + val base = request(req, reqHeaders, includeUrl) ++ additionalRequestTags(req) + MonadCancelThrow[Resource[F, *]].uncancelable { poll => + + for { + res <- Tracer[F].span(clientSpanName(req)).resource + span = res.span + _ <- Resource.eval(span.addAttributes(base: _*)) + traceHeaders <- Resource.eval(Tracer[F].propagate(Headers.empty)) + newReq = req.withHeaders(traceHeaders ++ req.headers) + resp <- poll(client.run(newReq)).guaranteeCase { + case Outcome.Succeeded(fa) => + Resource.eval(span.addAttribute(Attribute("exit.case", "succeeded"))) >> + fa.flatMap(resp => + Resource.eval( + span.addAttributes(response(resp, respHeaders) ++ additionalResponseTags(resp): _*) + ) + ) + case Outcome.Errored(e) => + Resource.eval( + span.recordException(e) >> + span.addAttribute(Attribute("exit.case", "errored")) + ) + case Outcome.Canceled() => + // Canceled isn't always an error, but it generally is for http + // TODO decide if this should add error, we do for the server side. + Resource.eval(span.addAttributes(Attribute("exit.case", "canceled"), Attribute("canceled", true))) + + } + // Automatically handle client processing errors. Since this is after the response, + // the error case will only get hit if the use block of the resulting resource happens, + // which is the request processing stage. + _ <- Resource.makeCase(Applicative[F].unit){ + case (_, Resource.ExitCase.Errored(e)) => span.recordException(e) + case (_, _) => Applicative[F].unit + } + } yield resp + } + } + } + } + + val ExtraTagsKey: Key[List[Attribute[_]]] = Key.newKey[SyncIO, List[Attribute[_]]].unsafeRunSync() + + private[http4sotel4s] def request[F[_]](req: Request[F], headers: Set[CIString]): List[Attribute[_]] = { + request(req, headers, Function.const[Boolean, Request[F]](true)(_)) + } + def request[F[_]](request: Request[F], headers: Set[CIString], includeUrl: Request[F] => Boolean): List[Attribute[_]] = { + val builder = new ListBuffer[Attribute[_]]() + builder += OTHttpTags.Common.kind("client") + builder += OTHttpTags.Common.method(request.method) + if (includeUrl(request)) { + builder += OTHttpTags.Common.url(request.uri) + builder += OTHttpTags.Common.target(request.uri) + } + val host = request.headers.get[Host].getOrElse{ + val key = RequestKey.fromRequest(request) + Host(key.authority.host.value, key.authority.port) + } + builder += OTHttpTags.Common.host(host) + request.uri.scheme.foreach( s => + builder += OTHttpTags.Common.scheme(s) + ) + request.headers.get[`User-Agent`].foreach( ua => + builder += OTHttpTags.Common.userAgent(ua) + ) + + request.contentLength.foreach(l => + builder += OTHttpTags.Common.requestContentLength(l) + ) + + request.remote.foreach{sa => + builder += + OTHttpTags.Common.peerIp(sa.host) + + builder += + OTHttpTags.Common.peerPort(sa.port) + + } + retryCount(request.attributes).foreach{count => + builder += OTHttpTags.Common.retryCount(count) + } + builder ++= + OTHttpTags.Headers.request(request.headers, headers) + + builder ++= request.attributes.lookup(ExtraTagsKey).toList.flatten + + builder.toList + } + + + + def response[F[_]](response: Response[F], headers: Set[CIString]): List[Attribute[_]] = { + val builder = new ListBuffer[Attribute[_]]() + + builder += OTHttpTags.Common.status(response.status) + response.contentLength.foreach{l => + builder += OTHttpTags.Common.responseContentLength(l) + } + // Due to negotiation. Only the response knows what protocol was selected + builder += OTHttpTags.Common.flavor(response.httpVersion) + retryCount(response.attributes).foreach{count => + builder += OTHttpTags.Common.retryCount(count) + } + + builder ++= + OTHttpTags.Headers.response(response.headers, headers) + builder ++= response.attributes.lookup(ExtraTagsKey).toList.flatten + + builder.toList + } + + private def retryCount(vault: Vault): Option[Int] = { + // AttemptCountKey is 1,2,3,4 for the initial request, + // since we want to do retries. We substract by 1 to get 0,1,2,3. + vault.lookup(Retry.AttemptCountKey).map(i => i - 1) + } + +} diff --git a/core/src/main/scala/io/chrisdavenport/natchezhttp4sotel/OTHttpTags.scala b/core/src/main/scala/io/chrisdavenport/http4sotel4s/OTHttpTags.scala similarity index 57% rename from core/src/main/scala/io/chrisdavenport/natchezhttp4sotel/OTHttpTags.scala rename to core/src/main/scala/io/chrisdavenport/http4sotel4s/OTHttpTags.scala index 50e8521..4ea0e3f 100644 --- a/core/src/main/scala/io/chrisdavenport/natchezhttp4sotel/OTHttpTags.scala +++ b/core/src/main/scala/io/chrisdavenport/http4sotel4s/OTHttpTags.scala @@ -1,49 +1,47 @@ -package io.chrisdavenport.natchezhttp4sotel +package io.chrisdavenport.http4sotel4s import org.http4s._ -import natchez._ +import org.typelevel.otel4s.Attribute import org.http4s.headers._ import com.comcast.ip4s._ import org.typelevel.ci.CIString -import cats.syntax.all._ -import io.chrisdavenport.natchezhttp4sotel.helpers.printStackTrace // This follows the documents here // https://github.com/open-telemetry/opentelemetry-specification/blob/a50def370ef444029a12ea637769229768daeaf8/specification/trace/semantic_conventions/http.md // We can update both the link and the tags as standards develop out of experimental object OTHttpTags { object Common { - def kind(kind: String): (String, TraceValue) = ("span.kind", kind) - def method(m: Method): (String, TraceValue) = ("http.method", m.name) - def url(url: Uri): (String, TraceValue) = ("http.url", url.renderString) - def target(url: Uri): (String, TraceValue) = ("http.target", url.copy(scheme = None, authority = None).renderString) - def host(host: org.http4s.headers.Host): (String, TraceValue) = ("http.host", org.http4s.headers.Host.headerInstance.value(host)) - def scheme(scheme: Uri.Scheme): (String, TraceValue) = ("http.scheme", scheme.value) + def kind(kind: String): Attribute[String] = Attribute("span.kind" , kind) + def method(m: Method): Attribute[String] = Attribute("http.method" , m.name) + def url(url: Uri): Attribute[String] = Attribute("http.url" , url.renderString) + def target(url: Uri): Attribute[String] = Attribute("http.target" , url.copy(scheme = None, authority = None).renderString) + def host(host: org.http4s.headers.Host): Attribute[String] = Attribute("http.host" , org.http4s.headers.Host.headerInstance.value(host)) + def scheme(scheme: Uri.Scheme): Attribute[String] = Attribute("http.scheme" , scheme.value) - def status(status: Status): (String, TraceValue) = ("http.status_code", status.code) + def status(status: Status): Attribute[Long] = Attribute("http.status_code" , status.code.toLong) // Need to check both request and response in case negotiation happens - def flavor(httpVersion: HttpVersion): (String, TraceValue) = ("http.flavor", httpVersion.major.toString() ++ "." ++ httpVersion.minor.toString()) - def userAgent(userAgent: `User-Agent`): (String, TraceValue) = ("http.user_agent", `User-Agent`.headerInstance.value(userAgent)) - def requestContentLength(cl: Long): (String, TraceValue) = ("http.request_content_length", cl.toInt) - def responseContentLength(cl: Long): (String, TraceValue) = ("http.response_content_length", cl.toInt) - def retryCount(i: Int): (String, TraceValue) = ("http.retry_count", i) - def peerIp(ip: IpAddress): (String, TraceValue) = ("net.peer.ip", ip.toString()) // TODO: Check that this is the right way - def peerPort(port: Port): (String, TraceValue) = ("net.peer.port", port.value) + def flavor(httpVersion: HttpVersion): Attribute[String] = Attribute("http.flavor" , httpVersion.major.toString() ++ "." ++ httpVersion.minor.toString()) + def userAgent(userAgent: `User-Agent`): Attribute[String] = Attribute("http.user_agent" , `User-Agent`.headerInstance.value(userAgent)) + def requestContentLength(cl: Long): Attribute[Long] = Attribute("http.request_content_length" , cl) + def responseContentLength(cl: Long): Attribute[Long] = Attribute("http.response_content_length" , cl) + def retryCount(i: Int): Attribute[Long] = Attribute("http.retry_count" , i.toLong) + def peerIp(ip: IpAddress): Attribute[String] = Attribute("net.peer.ip" , ip.toString()) // TODO: Check that this is the right way + def peerPort(port: Port): Attribute[Long] = Attribute("net.peer.port" , port.value.toLong) } object Client { - def peerName(uri: Uri): Option[(String, TraceValue)] = uri.host.map(h => "net.peer.name" -> h.value) + def peerName(uri: Uri): Option[Attribute[String]] = uri.host.map(h => Attribute("net.peer.name" , h.value)) } object Server { - def serverName(s: String): (String, TraceValue) = ("http.server_name", s) + def serverName(s: String): Attribute[String] = Attribute("http.server_name" , s) // The route template. Since http4s uses unapplies by default this is non-trivial. // Accept-List for segements are better to get coherent routes, but block-list segments // take much less effort while allowing unexpected variables. We will provide options // for this - def route(s: String): (String, TraceValue) = ("http.route", s) + def route(s: String): Attribute[String] = Attribute("http.route" , s) // This is either the net ip, OR the x-forwarded for depending on what is available. - def clientIp(ip: IpAddress): (String, TraceValue) = ("http.client_ip", ip.toString()) + def clientIp(ip: IpAddress): Attribute[String] = Attribute("http.client_ip" , ip.toString()) } @@ -51,22 +49,22 @@ object OTHttpTags { // TODO: Otel here is a []string, not a single string. I have chosen this for simplicity, but we can do better. // s is a whitelisted set of Headers to include, any headers not there will not appear. - private def generic(headers: Headers, s: Set[CIString], messageType: String): List[(String, TraceValue)] = { + private def generic(headers: Headers, s: Set[CIString], messageType: String): List[Attribute[_]] = { headers.headers - .groupBy(r => (r.name)) + .groupBy(_.name) .toList - .map{ + .map { case (name, list) => val key = "http." ++ messageType ++ ".header.string." ++ name.toString.toLowerCase.replace("-", "_") if (s.contains(name)) (key, list.map(_.value).mkString(", ")) else (key, "") - }.map{ case (name, s) => name -> TraceableValue[String].toTraceValue(s)} // We add a string as a prefix, because the otel standard is an array so + }.map { case (name, s) => Attribute(name, s) } // We add a string as a prefix, because the otel standard is an array so // that way we don't have bad values in the canonical space we'll want to use when we can. } - def request(headers: Headers, s: Set[CIString]): List[(String, TraceValue)] = + def request(headers: Headers, s: Set[CIString]): List[Attribute[_]] = generic(headers, s, "request") - def response(headers: Headers, s: Set[CIString]): List[(String, TraceValue)] = + def response(headers: Headers, s: Set[CIString]): List[Attribute[_]] = generic(headers, s, "response") lazy val defaultHeadersIncluded = Set( @@ -152,16 +150,4 @@ object OTHttpTags { "X-B3-TraceId", ).map(CIString(_)) } - - // https://github.com/open-telemetry/opentelemetry-specification/blob/a50def370ef444029a12ea637769229768daeaf8/specification/trace/semantic_conventions/exceptions.md - object Errors { - def error(e: Throwable): List[(String, TraceValue)] = { - val error = ("error", TraceableValue[Boolean].toTraceValue(true)).some - val message: Option[(String, TraceValue)] = Option(e.getMessage()).map(m => "exception.message" -> m) - val className: Option[(String, TraceValue)] = Option(e.getClass()).flatMap(c => Option(c.getName())).map(c => "exception.type" -> c) - val stacktrace = ("exception.stacktrace" -> TraceableValue[String].toTraceValue(printStackTrace(e))).some - List(error, message, className, stacktrace).flatten // List[Option[A]] => List[A] using internal speedery - } - } - } diff --git a/core/src/main/scala/io/chrisdavenport/http4sotel4s/ServerMiddleware.scala b/core/src/main/scala/io/chrisdavenport/http4sotel4s/ServerMiddleware.scala new file mode 100644 index 0000000..8168250 --- /dev/null +++ b/core/src/main/scala/io/chrisdavenport/http4sotel4s/ServerMiddleware.scala @@ -0,0 +1,192 @@ +package io.chrisdavenport.http4sotel4s + +import cats.data.Kleisli +import cats.effect.kernel.{MonadCancelThrow, Outcome} +import cats.effect.syntax.all._ +import cats.syntax.all._ +import org.http4s.client.RequestKey +import org.http4s.headers.{Host, `User-Agent`} +import org.http4s._ +import org.typelevel.ci.CIString +import org.typelevel.otel4s.{Attribute, KindTransformer} +import org.typelevel.otel4s.trace.Tracer + +object ServerMiddleware { + + def default[F[_]: Tracer: MonadCancelThrow]: ServerMiddlewareBuilder[F] = + new ServerMiddlewareBuilder[F](Defaults.isKernelHeader, Defaults.reqHeaders, Defaults.respHeaders, Defaults.routeClassifier, Defaults.serverSpanName, Defaults.additionalRequestTags, Defaults.additionalResponseTags, Defaults.includeUrl, Defaults.doNotTrace) + + object Defaults { + val isKernelHeader: CIString => Boolean = name => !ExcludedHeaders.contains(name) + val reqHeaders: Set[CIString] = OTHttpTags.Headers.defaultHeadersIncluded + val respHeaders: Set[CIString] = OTHttpTags.Headers.defaultHeadersIncluded + def routeClassifier[F[_]]: Request[F] => Option[String] = {(_: Request[F]) => None} + def serverSpanName[F[_]]: Request[F] => String = {(req: Request[F]) => s"Http Server - ${req.method}"} + def additionalRequestTags[F[_]]: Request[F] => Seq[Attribute[_]] = {(_: Request[F]) => Seq()} + def additionalResponseTags[F[_]]: Response[F] => Seq[Attribute[_]] = {(_: Response[F]) => Seq()} + def includeUrl[F[_]]: Request[F] => Boolean = {(_: Request[F]) => true} + def doNotTrace: RequestPrelude => Boolean = {(_: RequestPrelude) => false} + } + + final class ServerMiddlewareBuilder[F[_]: Tracer: MonadCancelThrow] private[ServerMiddleware] ( + isKernelHeader: CIString => Boolean, + reqHeaders: Set[CIString], + respHeaders: Set[CIString], + routeClassifier: Request[F] => Option[String], + serverSpanName: Request[F] => String, + additionalRequestTags: Request[F] => Seq[Attribute[_]], + additionalResponseTags: Response[F] => Seq[Attribute[_]], + includeUrl: Request[F] => Boolean, + doNotTrace: RequestPrelude => Boolean, + ){ self => + + private def copy( + isKernelHeader: CIString => Boolean = self.isKernelHeader, + reqHeaders: Set[CIString] = self.reqHeaders, + respHeaders: Set[CIString] = self.respHeaders, + routeClassifier: Request[F] => Option[String] = self.routeClassifier, + serverSpanName: Request[F] => String = self.serverSpanName, + additionalRequestTags: Request[F] => Seq[Attribute[_]] = self.additionalRequestTags, + additionalResponseTags: Response[F] => Seq[Attribute[_]] = self.additionalResponseTags, + includeUrl: Request[F] => Boolean = self.includeUrl, + doNotTrace: RequestPrelude => Boolean = self.doNotTrace, + ): ServerMiddlewareBuilder[F] = + new ServerMiddlewareBuilder[F](isKernelHeader, reqHeaders, respHeaders, routeClassifier, serverSpanName, additionalRequestTags, additionalResponseTags, includeUrl, doNotTrace) + + def withIsKernelHeader(isKernelHeader: CIString => Boolean) = copy(isKernelHeader = isKernelHeader) + def withRequestHeaders(reqHeaders: Set[CIString]) = copy(reqHeaders = reqHeaders) + def withResponseHeaders(respHeaders: Set[CIString]) = copy(respHeaders = respHeaders) + def withRouteClassifier(routeClassifier: Request[F] => Option[String]) = copy(routeClassifier = routeClassifier) + def withServerSpanName(serverSpanName: Request[F] => String) = copy(serverSpanName = serverSpanName) + def withAdditionalRequestTags(additionalRequestTags: Request[F] => Seq[Attribute[_]]) = copy(additionalRequestTags = additionalRequestTags) + def withAdditionalResponseTags(additionalResponseTags: Response[F] => Seq[Attribute[_]]) = copy(additionalResponseTags = additionalResponseTags) + def withIncludeUrl(includeUrl: Request[F] => Boolean) = copy(includeUrl = includeUrl) + def withDoNotTrace(doNotTrace: RequestPrelude => Boolean) = copy(doNotTrace = doNotTrace) + + + private def buildTracedF[G[_]: MonadCancelThrow](f: Http[G, F])(implicit kt: KindTransformer[F, G]): Http[G, F] = + Kleisli { (req: Request[F]) => + if (doNotTrace(req.requestPrelude)) f(req) + else { + val init = request(req, reqHeaders, routeClassifier, includeUrl) ++ additionalRequestTags(req) + MonadCancelThrow[G].uncancelable { poll => + val tracerG = Tracer[F].mapK[G] + tracerG.joinOrRoot(req.headers) { + tracerG.span(serverSpanName(req), init: _*).use { span => + poll(f.run(req)) + .guaranteeCase { + case Outcome.Succeeded(fa) => + span.addAttribute(Attribute("exit.case", "succeeded")) >> + fa.flatMap { resp => + val out = response(resp, respHeaders) ++ additionalResponseTags(resp) + span.addAttributes(out: _*) + } + case Outcome.Errored(e) => + span.recordException(e) >> + span.addAttribute(Attribute("exit.case", "errored")) + case Outcome.Canceled() => + span.addAttributes( + Attribute("exit.case", "canceled"), + Attribute("canceled", true), + Attribute("error", true) // A cancelled http is an error for the server. The connection got cut for some reason. + ) + } + } + } + } + } + } + + def buildHttpApp(f: HttpApp[F]): HttpApp[F] = + buildTracedF(f) + + def buildHttpRoutes(f: HttpRoutes[F]): HttpRoutes[F] = + buildTracedF(f) + } + + private[http4sotel4s] def request[F[_]](req: Request[F], headers: Set[CIString], routeClassifier: Request[F] => Option[String]): List[Attribute[_]] = { + request(req, headers, routeClassifier, Function.const[Boolean, Request[F]](true)) + } + + def request[F[_]](request: Request[F], headers: Set[CIString], routeClassifier: Request[F] => Option[String], includeUrl: Request[F] => Boolean): List[Attribute[_]] = { + val builder = List.newBuilder[Attribute[_]] + builder += OTHttpTags.Common.kind("server") + builder += OTHttpTags.Common.method(request.method) + if (includeUrl(request)) { + builder += OTHttpTags.Common.url(request.uri) + builder += OTHttpTags.Common.target(request.uri) + } + val host = request.headers.get[Host].getOrElse{ + val key = RequestKey.fromRequest(request) + Host(key.authority.host.value, key.authority.port) + } + builder += OTHttpTags.Common.host(host) + request.uri.scheme.foreach( s => + builder += OTHttpTags.Common.scheme(s) + ) + request.headers.get[`User-Agent`].foreach( ua => + builder += OTHttpTags.Common.userAgent(ua) + ) + + request.contentLength.foreach(l => + builder += OTHttpTags.Common.requestContentLength(l) + ) + routeClassifier(request).foreach(s => + builder += OTHttpTags.Server.route(s) + ) + + builder += OTHttpTags.Common.flavor(request.httpVersion) + + request.remote.foreach{sa => + builder += + OTHttpTags.Common.peerIp(sa.host) + + builder += + OTHttpTags.Common.peerPort(sa.port) + } + // Special Server + request.from.foreach(ip => + builder += OTHttpTags.Server.clientIp(ip) + ) + builder ++= + OTHttpTags.Headers.request(request.headers, headers) + + builder.result() + } + + def response[F[_]](response: Response[F], headers: Set[CIString]): List[Attribute[_]] = { + val builder = List.newBuilder[Attribute[_]] + + builder += OTHttpTags.Common.status(response.status) + response.contentLength.foreach(l => + builder += OTHttpTags.Common.responseContentLength(l) + ) + builder ++= + OTHttpTags.Headers.response(response.headers, headers) + + builder.result() + } + + val ExcludedHeaders: Set[CIString] = { + import org.http4s.headers._ + import org.typelevel.ci._ + + val payload = Set( + `Content-Length`.name, + ci"Content-Type", + `Content-Range`.name, + ci"Trailer", + `Transfer-Encoding`.name, + ) + + val security = Set( + Authorization.name, + Cookie.name, + `Set-Cookie`.name, + ) + + payload ++ security + } + + +} \ No newline at end of file diff --git a/core/src/main/scala/io/chrisdavenport/natchezhttp4sotel/helpers.scala b/core/src/main/scala/io/chrisdavenport/http4sotel4s/helpers.scala similarity index 84% rename from core/src/main/scala/io/chrisdavenport/natchezhttp4sotel/helpers.scala rename to core/src/main/scala/io/chrisdavenport/http4sotel4s/helpers.scala index d763053..cf9fd54 100644 --- a/core/src/main/scala/io/chrisdavenport/natchezhttp4sotel/helpers.scala +++ b/core/src/main/scala/io/chrisdavenport/http4sotel4s/helpers.scala @@ -1,16 +1,16 @@ -package io.chrisdavenport.natchezhttp4sotel +package io.chrisdavenport.http4sotel4s -private[natchezhttp4sotel] object helpers { - import java.io.{OutputStream, FilterOutputStream, ByteArrayOutputStream, PrintStream} +import java.io.{OutputStream, FilterOutputStream, ByteArrayOutputStream, PrintStream} +import scala.util.Using +private[http4sotel4s] object helpers { def printStackTrace(e: Throwable): String = { val baos = new ByteArrayOutputStream - val fs = new AnsiFilterStream(baos) - val ps = new PrintStream(fs, true, "UTF-8") - e.printStackTrace(ps) - ps.close - fs.close - baos.close + Using.resource(new AnsiFilterStream(baos)) { fs => + Using.resource(new PrintStream(fs, true, "UTF-8")) { ps => + e.printStackTrace(ps) + } + } new String(baos.toByteArray, "UTF-8") } diff --git a/core/src/main/scala/io/chrisdavenport/http4sotel4s/package.scala b/core/src/main/scala/io/chrisdavenport/http4sotel4s/package.scala new file mode 100644 index 0000000..4738f0b --- /dev/null +++ b/core/src/main/scala/io/chrisdavenport/http4sotel4s/package.scala @@ -0,0 +1,17 @@ +package io.chrisdavenport + +import org.http4s.{Header, Headers} +import org.typelevel.ci.CIString +import org.typelevel.otel4s.context.propagation.{TextMapGetter, TextMapUpdater} + +package object http4sotel4s { + implicit val headersTMU: TextMapUpdater[Headers] = + (carrier: Headers, key: String, value: String) => carrier.put(Header.Raw(CIString(key), value)) + implicit val headersTMG: TextMapGetter[Headers] = + new TextMapGetter[Headers] { + def get(carrier: Headers, key: String): Option[String] = + carrier.get(CIString(key)).map(_.head.value) + def keys(carrier: Headers): Iterable[String] = + carrier.headers.view.map(_.name).distinct.map(_.toString).toSeq + } +} diff --git a/core/src/main/scala/io/chrisdavenport/natchezhttp4sotel/ClientMiddleware.scala b/core/src/main/scala/io/chrisdavenport/natchezhttp4sotel/ClientMiddleware.scala deleted file mode 100644 index d80d0c5..0000000 --- a/core/src/main/scala/io/chrisdavenport/natchezhttp4sotel/ClientMiddleware.scala +++ /dev/null @@ -1,209 +0,0 @@ -package io.chrisdavenport.natchezhttp4sotel - -import cats._ -import cats.syntax.all._ -import cats.effect.kernel._ -import org.http4s._ -import org.typelevel.ci.CIString -import natchez._ -import scala.collection.mutable.ListBuffer -import org.http4s.headers._ -import org.http4s.client._ -import org.typelevel.vault.Key -import org.http4s.client.middleware.Retry - -object ClientMiddleware { - - @deprecated("0.3.0", "Use default without entrypoint") - def default[F[_]: natchez.Trace: MonadCancelThrow](ep: EntryPoint[F]): ClientMiddlewareBuilder[F] = { - new ClientMiddlewareBuilder[F](Defaults.reqHeaders, Defaults.respHeaders, Defaults.clientSpanName, Defaults.additionalRequestTags, Defaults.additionalResponseTags, Defaults.includeUrl) - } - - def default[F[_]: natchez.Trace: MonadCancelThrow]: ClientMiddlewareBuilder[F] = { - new ClientMiddlewareBuilder[F](Defaults.reqHeaders, Defaults.respHeaders, Defaults.clientSpanName, Defaults.additionalRequestTags, Defaults.additionalResponseTags, Defaults.includeUrl) - } - - object Defaults { - val reqHeaders = OTHttpTags.Headers.defaultHeadersIncluded - val respHeaders = OTHttpTags.Headers.defaultHeadersIncluded - def clientSpanName[F[_]]: Request[F] => String = {(req: Request[F]) => s"Http Client - ${req.method}"} - def additionalRequestTags[F[_]]: Request[F] => Seq[(String, TraceValue)] = {(_: Request[F]) => Seq()} - def additionalResponseTags[F[_]]: Response[F] => Seq[(String, TraceValue)] = {(_: Response[F]) => Seq()} - def includeUrl[F[_]]: Request[F] => Boolean = {(_: Request[F]) => true} - } - - object Keys { - private val internalSpanKey = Key.newKey[cats.effect.SyncIO, Any].unsafeRunSync() - - def spanKey[F[_]]: Key[F ~> F] = internalSpanKey.asInstanceOf[Key[F ~> F]] - } - - final class ClientMiddlewareBuilder[F[_]: Trace: MonadCancelThrow] private[ClientMiddleware] ( - private val reqHeaders: Set[CIString], - private val respHeaders: Set[CIString], - private val clientSpanName: Request[F] => String, - private val additionalRequestTags: Request[F] => Seq[(String, TraceValue)], - private val additionalResponseTags: Response[F] => Seq[(String, TraceValue)], - private val includeUrl: Request[F] => Boolean - ){ self => - private def copy( - reqHeaders: Set[CIString] = self.reqHeaders, - respHeaders: Set[CIString] = self.respHeaders, - clientSpanName: Request[F] => String = self.clientSpanName, - additionalRequestTags: Request[F] => Seq[(String, TraceValue)] = self.additionalRequestTags, - additionalResponseTags: Response[F] => Seq[(String, TraceValue)] = self.additionalResponseTags , - includeUrl: Request[F] => Boolean = self.includeUrl, - ): ClientMiddlewareBuilder[F] = - new ClientMiddlewareBuilder[F](reqHeaders, respHeaders, clientSpanName, additionalRequestTags, additionalResponseTags, includeUrl) - - def withRequestHeaders(reqHeaders: Set[CIString]) = copy(reqHeaders = reqHeaders) - - def withResponseHeaders(respHeaders: Set[CIString]) = copy(respHeaders = respHeaders) - - def withClientSpanName(clientSpanName: Request[F] => String) = copy(clientSpanName = clientSpanName) - - def withAdditionalRequestTags(additionalRequestTags: Request[F] => Seq[(String, TraceValue)]) = - copy(additionalRequestTags = additionalRequestTags) - - def withAdditionalResponseTags(additionalResponseTags: Response[F] => Seq[(String, TraceValue)]) = - copy(additionalResponseTags = additionalResponseTags) - - def withIncludeUrl(includeUrl: Request[F] => Boolean ) = copy(includeUrl = includeUrl) - - def build: Client[F] => Client[F] = { (client: Client[F]) => - Client[F]{(req: Request[F]) => - val base = request(req, reqHeaders, includeUrl) ++ additionalRequestTags(req) - MonadCancelThrow[Resource[F, *]].uncancelable(poll => - for { - fk <- natchez.Trace[F].spanR(clientSpanName(req)) - _ <- Resource.eval(fk(Trace[F].put(base:_*))) - knl <- Resource.eval(fk(Trace[F].kernel)) - knlHeaders = Headers(knl.toHeaders.map { case (k, v) => Header.Raw(k, v) } .toSeq) - newReq = req.withHeaders(knlHeaders ++ req.headers) - resp <- poll(client.run(newReq)).guaranteeCase{ - case Outcome.Succeeded(fa) => - Resource.eval(fk(Trace[F].put("exit.case" -> "succeeded"))) >> - fa.flatMap(resp => - Resource.eval( - fk(Trace[F].put((response(resp, respHeaders) ++ additionalResponseTags(resp)):_*)) - ) - ) - case Outcome.Errored(e) => - val exitCase: (String, TraceValue) = ("exit.case" -> TraceableValue[String].toTraceValue("errored")) - val error = OTHttpTags.Errors.error(e) - Resource.eval( - fk(Trace[F].put((exitCase :: error):_*)) - ) - case Outcome.Canceled() => - // Canceled isn't always an error, but it generally is for http - // TODO decide if this should add error, we do for the server side. - Resource.eval(fk(Trace[F].put("exit.case" -> "canceled", "canceled" -> true))) - - } - // Automatically handle client processing errors. Since this is after the response, - // the error case will only get hit if the use block of the resulting resource happens, - // which is the request processing stage. - _ <- Resource.makeCase(Applicative[F].unit){ - case (_, Resource.ExitCase.Errored(e)) => fk(Trace[F].put(OTHttpTags.Errors.error(e):_*)) - case (_, _) => Applicative[F].unit - } - } yield resp.withAttribute(Keys.spanKey[F], fk) - ) - } - } - } - - val ExtraTagsKey: Key[List[(String, TraceValue)]] = Key.newKey[cats.effect.SyncIO, List[(String, TraceValue)]].unsafeRunSync() - - @deprecated("0.2.1", "Direct Method is Deprecated, use default with the builder instead.") - def trace[F[_]: natchez.Trace: MonadCancelThrow]( - ep: EntryPoint[F], // This is to escape from F Trace to Resource[F, *] timing. Which is critical - reqHeaders: Set[CIString] = OTHttpTags.Headers.defaultHeadersIncluded, - respHeaders: Set[CIString] = OTHttpTags.Headers.defaultHeadersIncluded, - clientSpanName: Request[F] => String = {(req: Request[F]) => s"Http Client - ${req.method}"}, - additionalRequestTags: Request[F] => Seq[(String, TraceValue)] = {(_: Request[F]) => Seq()}, - additionalResponseTags: Response[F] => Seq[(String, TraceValue)] = {(_: Response[F]) => Seq()}, - )(client: Client[F]): Client[F] = - default(ep) - .withRequestHeaders(reqHeaders) - .withResponseHeaders(respHeaders) - .withClientSpanName(clientSpanName) - .withAdditionalRequestTags(additionalRequestTags) - .withAdditionalResponseTags(additionalResponseTags) - .build(client) - - private[natchezhttp4sotel] def request[F[_]](req: Request[F], headers: Set[CIString]): List[(String, TraceValue)] = { - request(req, headers, Function.const[Boolean, Request[F]](true)(_)) - } - def request[F[_]](request: Request[F], headers: Set[CIString], includeUrl: Request[F] => Boolean): List[(String, TraceValue)] = { - val builder = new ListBuffer[(String, TraceValue)]() - builder += OTHttpTags.Common.kind("client") - builder += OTHttpTags.Common.method(request.method) - if (includeUrl(request)) { - builder += OTHttpTags.Common.url(request.uri) - builder += OTHttpTags.Common.target(request.uri) - } - val host = request.headers.get[Host].getOrElse{ - val key = RequestKey.fromRequest(request) - Host(key.authority.host.value, key.authority.port) - } - builder += OTHttpTags.Common.host(host) - request.uri.scheme.foreach( s => - builder += OTHttpTags.Common.scheme(s) - ) - request.headers.get[`User-Agent`].foreach( ua => - builder += OTHttpTags.Common.userAgent(ua) - ) - - request.contentLength.foreach(l => - builder += OTHttpTags.Common.requestContentLength(l) - ) - - request.remote.foreach{sa => - builder += - OTHttpTags.Common.peerIp(sa.host) - - builder += - OTHttpTags.Common.peerPort(sa.port) - - } - retryCount(request.attributes).foreach{count => - builder += OTHttpTags.Common.retryCount(count) - } - builder ++= - OTHttpTags.Headers.request(request.headers, headers) - - builder ++= request.attributes.lookup(ExtraTagsKey).toList.flatten - - builder.toList - } - - - - def response[F[_]](response: Response[F], headers: Set[CIString]): List[(String, TraceValue)] = { - val builder = new ListBuffer[(String, TraceValue)]() - - builder += OTHttpTags.Common.status(response.status) - response.contentLength.foreach{l => - builder += OTHttpTags.Common.responseContentLength(l) - } - // Due to negotiation. Only the response knows what protocol was selected - builder += OTHttpTags.Common.flavor(response.httpVersion) - retryCount(response.attributes).foreach{count => - builder += OTHttpTags.Common.retryCount(count) - } - - builder ++= - OTHttpTags.Headers.response(response.headers, headers) - builder ++= response.attributes.lookup(ExtraTagsKey).toList.flatten - - builder.toList - } - - private def retryCount(vault: org.typelevel.vault.Vault): Option[Int] = { - // AttemptCountKey is 1,2,3,4 for the initial request, - // since we want to do retries. We substract by 1 to get 0,1,2,3. - vault.lookup(Retry.AttemptCountKey).map(i => i - 1) - } - -} \ No newline at end of file diff --git a/core/src/main/scala/io/chrisdavenport/natchezhttp4sotel/ServerMiddleware.scala b/core/src/main/scala/io/chrisdavenport/natchezhttp4sotel/ServerMiddleware.scala deleted file mode 100644 index 249d948..0000000 --- a/core/src/main/scala/io/chrisdavenport/natchezhttp4sotel/ServerMiddleware.scala +++ /dev/null @@ -1,327 +0,0 @@ -package io.chrisdavenport.natchezhttp4sotel - -import cats._ -import cats.syntax.all._ -import cats.effect.kernel._ -import cats.effect.syntax.all._ -import org.http4s._ -import org.typelevel.ci.CIString -import natchez._ -import scala.collection.mutable.ListBuffer -import org.http4s.headers._ -import org.http4s.client._ -import io.chrisdavenport.fiberlocal._ -import cats.data.OptionT -import java.net.URI -import cats.arrow.FunctionK - -object ServerMiddleware { - - def default[F[_]: MonadCancelThrow: GenFiberLocal](ep: EntryPoint[F]): ServerMiddlewareBuilder[F] = - new ServerMiddlewareBuilder[F](ep, Defaults.isKernelHeader, Defaults.reqHeaders, Defaults.respHeaders, Defaults.routeClassifier, Defaults.serverSpanName, Defaults.additionalRequestTags, Defaults.additionalResponseTags, Defaults.includeUrl) - - object Defaults { - val isKernelHeader: CIString => Boolean = name => !ExcludedHeaders.contains(name) - val reqHeaders: Set[CIString] = OTHttpTags.Headers.defaultHeadersIncluded - val respHeaders: Set[CIString] = OTHttpTags.Headers.defaultHeadersIncluded - def routeClassifier[F[_]]: Request[F] => Option[String] = {(_: Request[F]) => None} - def serverSpanName[F[_]]: Request[F] => String = {(req: Request[F]) => s"Http Server - ${req.method}"} - def additionalRequestTags[F[_]]: Request[F] => Seq[(String, TraceValue)] = {(_: Request[F]) => Seq()} - def additionalResponseTags[F[_]]: Response[F] => Seq[(String, TraceValue)] = {(_: Response[F]) => Seq()} - def includeUrl[F[_]]: Request[F] => Boolean = {(_: Request[F]) => true} - } - - final class ServerMiddlewareBuilder[F[_]: MonadCancelThrow: GenFiberLocal] private[ServerMiddleware] ( - ep: EntryPoint[F], - isKernelHeader: CIString => Boolean, - reqHeaders: Set[CIString], - respHeaders: Set[CIString], - routeClassifier: Request[F] => Option[String], - serverSpanName: Request[F] => String, - additionalRequestTags: Request[F] => Seq[(String, TraceValue)], - additionalResponseTags: Response[F] => Seq[(String, TraceValue)], - includeUrl: Request[F] => Boolean, - ){ self => - - private def copy( - ep: EntryPoint[F] = self.ep, - isKernelHeader: CIString => Boolean = self.isKernelHeader, - reqHeaders: Set[CIString] = self.reqHeaders, - respHeaders: Set[CIString] = self.respHeaders, - routeClassifier: Request[F] => Option[String] = self.routeClassifier, - serverSpanName: Request[F] => String = self.serverSpanName, - additionalRequestTags: Request[F] => Seq[(String, TraceValue)] = self.additionalRequestTags, - additionalResponseTags: Response[F] => Seq[(String, TraceValue)] = self.additionalResponseTags, - includeUrl: Request[F] => Boolean = self.includeUrl, - ): ServerMiddlewareBuilder[F] = - new ServerMiddlewareBuilder[F](ep, isKernelHeader, reqHeaders, respHeaders, routeClassifier, serverSpanName, additionalRequestTags, additionalResponseTags, includeUrl) - - def withIsKernelHeader(isKernelHeader: CIString => Boolean) = copy(isKernelHeader = isKernelHeader) - def withRequestHeaders(reqHeaders: Set[CIString]) = copy(reqHeaders = reqHeaders) - def withResponseHeaders(respHeaders: Set[CIString]) = copy(respHeaders = respHeaders) - def withRouteClassifier(routeClassifier: Request[F] => Option[String]) = copy(routeClassifier = routeClassifier) - def withServerSpanName(serverSpanName: Request[F] => String) = copy(serverSpanName = serverSpanName) - def withAdditionalRequestTags(additionalRequestTags: Request[F] => Seq[(String, TraceValue)]) = copy(additionalRequestTags = additionalRequestTags) - def withAdditionalResponseTags(additionalResponseTags: Response[F] => Seq[(String, TraceValue)]) = copy(additionalResponseTags = additionalResponseTags) - def withIncludeUrl(includeUrl: Request[F] => Boolean) = copy(includeUrl = includeUrl) - - def buildHttpApp(f: Trace[F] => HttpApp[F]): HttpApp[F] = - MakeSureYouKnowWhatYouAreDoing.buildTracedF(FunctionK.id)(f.andThen(_.pure[F])) - - def buildHttpRoutes(f: Trace[F] => HttpRoutes[F]): HttpRoutes[F] = - MakeSureYouKnowWhatYouAreDoing.buildTracedF(OptionT.liftK)(f.andThen(OptionT.pure[F](_))) - - final class MakeSureYouKnowWhatYouAreDoing{ - def buildTracedF[G[_]: MonadCancelThrow](fk: F ~> G)(f: Trace[F] => G[Http[G, F]]): Http[G, F] = { - cats.data.Kleisli{(req: Request[F]) => - val kernelHeaders = req.headers.headers - .collect { - case header if isKernelHeader(header.name) => header.name -> header.value - } - .toMap - - val kernel = Kernel(kernelHeaders) - - MonadCancelThrow[G].uncancelable(poll => - ep.continueOrElseRoot(serverSpanName(req), kernel).mapK(fk).use{span => - val init = request(req, reqHeaders, routeClassifier, includeUrl) ++ additionalRequestTags(req) - fk(span.put(init:_*)) >> - fk(GenFiberLocal[F].local(span)).map(fromFiberLocal(_)) - .flatMap( trace => - poll(f(trace).flatMap(_.run(req))).guaranteeCase{ - case Outcome.Succeeded(fa) => - fk(span.put("exit.case" -> "succeeded")) >> - fa.flatMap{resp => - val out = response(resp, respHeaders) ++ additionalResponseTags(resp) - fk(span.put(out:_*)) - } - case Outcome.Errored(e) => - fk(span.put("exit.case" -> "errored")) >> - fk(span.put(OTHttpTags.Errors.error(e):_*)) - case Outcome.Canceled() => - fk(span.put( - "exit.case" -> "canceled", - "canceled" -> true, - "error" -> true // A cancelled http is an error for the server. The connection got cut for some reason. - )) - } - ) - } - ) - } - } - } - - def MakeSureYouKnowWhatYouAreDoing = new MakeSureYouKnowWhatYouAreDoing - - } - - // Recommended to get best tracing - @deprecated("0.2.1", "Direct Method is Deprecated, use default with the builder instead.") - def httpApp[F[_]: MonadCancelThrow: GenFiberLocal]( - ep: EntryPoint[F], - isKernelHeader: CIString => Boolean = name => !ExcludedHeaders.contains(name), - reqHeaders: Set[CIString] = OTHttpTags.Headers.defaultHeadersIncluded, - respHeaders: Set[CIString] = OTHttpTags.Headers.defaultHeadersIncluded, - routeClassifier: Request[F] => Option[String] = {(_: Request[F]) => None}, - serverSpanName: Request[F] => String = {(req: Request[F]) => s"Http Server - ${req.method}"}, - additionalRequestTags: Request[F] => Seq[(String, TraceValue)] = {(_: Request[F]) => Seq()}, - additionalResponseTags: Response[F] => Seq[(String, TraceValue)] = {(_: Response[F]) => Seq()}, - )(f: Trace[F] => HttpApp[F]): HttpApp[F] = - default(ep) - .withIsKernelHeader(isKernelHeader) - .withRequestHeaders(reqHeaders) - .withResponseHeaders(respHeaders) - .withRouteClassifier(routeClassifier) - .withServerSpanName(serverSpanName) - .withAdditionalRequestTags(additionalRequestTags) - .withAdditionalResponseTags(additionalResponseTags) - .buildHttpApp(f) - - @deprecated("0.2.1", "Direct Method is Deprecated, use default with the builder instead.") - def httpRoutes[F[_]: MonadCancelThrow: GenFiberLocal]( - ep: EntryPoint[F], - isKernelHeader: CIString => Boolean = name => !ExcludedHeaders.contains(name), - reqHeaders: Set[CIString] = OTHttpTags.Headers.defaultHeadersIncluded, - respHeaders: Set[CIString] = OTHttpTags.Headers.defaultHeadersIncluded, - routeClassifier: Request[F] => Option[String] = {(_: Request[F]) => None}, - serverSpanName: Request[F] => String = {(req: Request[F]) => s"Http Server - ${req.method}"}, - additionalRequestTags: Request[F] => Seq[(String, TraceValue)] = {(_: Request[F]) => Seq()}, - additionalResponseTags: Response[F] => Seq[(String, TraceValue)] = {(_: Response[F]) => Seq()}, - )(f: Trace[F] => HttpRoutes[F]): HttpRoutes[F] = - default(ep) - .withIsKernelHeader(isKernelHeader) - .withRequestHeaders(reqHeaders) - .withResponseHeaders(respHeaders) - .withRouteClassifier(routeClassifier) - .withServerSpanName(serverSpanName) - .withAdditionalRequestTags(additionalRequestTags) - .withAdditionalResponseTags(additionalResponseTags) - .buildHttpRoutes(f) - - - object MakeSureYouKnowWhatYouAreDoing { - // This effect to generate routes will run on every request. - // This is often undesired and can generate a lot of wasted state if used - // incorrectly. Should never be used to instantiate global state across request, - // the effect is scoped to a single request in. But as we see for the fiberlocal - // with this can be pretty useful when its what you need. - @deprecated("0.2.1", "Direct Method is Deprecated, use default with the builder instead.") - def tracedF[F[_]: MonadCancelThrow: GenFiberLocal, G[_]: MonadCancelThrow]( - ep: EntryPoint[F], - fk: F ~> G, - isKernelHeader: CIString => Boolean = name => !ExcludedHeaders.contains(name), - reqHeaders: Set[CIString] = OTHttpTags.Headers.defaultHeadersIncluded, - respHeaders: Set[CIString] = OTHttpTags.Headers.defaultHeadersIncluded, - routeClassifier: Request[F] => Option[String] = {(_: Request[F]) => None}, - serverSpanName: Request[F] => String = {(req: Request[F]) => s"Http Server - ${req.method}"}, - additionalRequestTags: Request[F] => Seq[(String, TraceValue)] = {(_: Request[F]) => Seq()}, - additionalResponseTags: Response[F] => Seq[(String, TraceValue)] = {(_: Response[F]) => Seq()}, - )( - f: Trace[F] => G[Http[G, F]] - ): Http[G, F] = - default(ep) - .withIsKernelHeader(isKernelHeader) - .withRequestHeaders(reqHeaders) - .withResponseHeaders(respHeaders) - .withRouteClassifier(routeClassifier) - .withServerSpanName(serverSpanName) - .withAdditionalRequestTags(additionalRequestTags) - .withAdditionalResponseTags(additionalResponseTags) - .MakeSureYouKnowWhatYouAreDoing - .buildTracedF(fk)(f) - } - - private[natchezhttp4sotel] def request[F[_]](req: Request[F], headers: Set[CIString], routeClassifier: Request[F] => Option[String]): List[(String, TraceValue)] = { - request(req, headers, routeClassifier, Function.const[Boolean, Request[F]](true)) - } - - def request[F[_]](request: Request[F], headers: Set[CIString], routeClassifier: Request[F] => Option[String], includeUrl: Request[F] => Boolean): List[(String, TraceValue)] = { - val builder = new ListBuffer[(String, TraceValue)]() - builder += OTHttpTags.Common.kind("server") - builder += OTHttpTags.Common.method(request.method) - if (includeUrl(request)) { - builder += OTHttpTags.Common.url(request.uri) - builder += OTHttpTags.Common.target(request.uri) - } - val host = request.headers.get[Host].getOrElse{ - val key = RequestKey.fromRequest(request) - Host(key.authority.host.value, key.authority.port) - } - builder += OTHttpTags.Common.host(host) - request.uri.scheme.foreach( s => - builder += OTHttpTags.Common.scheme(s) - ) - request.headers.get[`User-Agent`].foreach( ua => - builder += OTHttpTags.Common.userAgent(ua) - ) - - request.contentLength.foreach(l => - builder += OTHttpTags.Common.requestContentLength(l) - ) - routeClassifier(request).foreach(s => - builder += OTHttpTags.Server.route(s) - ) - - - builder += OTHttpTags.Common.flavor(request.httpVersion) - - request.remote.foreach{sa => - builder += - OTHttpTags.Common.peerIp(sa.host) - - builder += - OTHttpTags.Common.peerPort(sa.port) - } - // Special Server - request.from.foreach(ip => - builder += OTHttpTags.Server.clientIp(ip) - ) - builder ++= - OTHttpTags.Headers.request(request.headers, headers) - - - builder.toList - } - - def response[F[_]](response: Response[F], headers: Set[CIString]): List[(String, TraceValue)] = { - val builder = new ListBuffer[(String, TraceValue)]() - - builder += OTHttpTags.Common.status(response.status) - response.contentLength.foreach(l => - builder += OTHttpTags.Common.responseContentLength(l) - ) - builder ++= - OTHttpTags.Headers.response(response.headers, headers) - - - builder.toList - } - - private def fromFiberLocal[F[_]: MonadCancelThrow](local: FiberLocal[F, Span[F]]): natchez.Trace[F] = { - new Trace[F] { - - override def put(fields: (String, TraceValue)*): F[Unit] = - local.get.flatMap(_.put(fields: _*)) - - override def attachError(err: Throwable, fields: (String, TraceValue)*): F[Unit] = - local.get.flatMap(_.attachError(err, fields: _*)) - - override def log(fields: (String, TraceValue)*): F[Unit] = - local.get.flatMap(_.log(fields: _*)) - - override def log(event: String): F[Unit] = - local.get.flatMap(_.log(event)) - - override def kernel: F[Kernel] = - local.get.flatMap(_.kernel) - - override def spanR(name: String, options: Span.Options): Resource[F, F ~> F] = - for { - parent <- Resource.eval(local.get) - child <- parent.span(name, options) - } yield new (F ~> F) { - def apply[A](fa: F[A]): F[A] = - local.get.flatMap { old => - local - .set(child) - .bracket(_ => fa.onError{ case e => child.attachError(e)})(_ => local.set(old)) - } - - } - - override def span[A](name: String, options: Span.Options)(k: F[A]): F[A] = - spanR(name, options).use(_(k)) - - override def traceId: F[Option[String]] = - local.get.flatMap(_.traceId) - - override def traceUri: F[Option[URI]] = - local.get.flatMap(_.traceUri) - } - } - - - val ExcludedHeaders: Set[CIString] = { - import org.http4s.headers._ - import org.typelevel.ci._ - - val payload = Set( - `Content-Length`.name, - ci"Content-Type", - `Content-Range`.name, - ci"Trailer", - `Transfer-Encoding`.name, - ) - - val security = Set( - Authorization.name, - Cookie.name, - `Set-Cookie`.name, - ) - - payload ++ security - } - - -} \ No newline at end of file diff --git a/core/src/test/scala/io/chrisdavenport/http4sotel4s/ClientMiddlewareTests.scala b/core/src/test/scala/io/chrisdavenport/http4sotel4s/ClientMiddlewareTests.scala new file mode 100644 index 0000000..a60cdfd --- /dev/null +++ b/core/src/test/scala/io/chrisdavenport/http4sotel4s/ClientMiddlewareTests.scala @@ -0,0 +1,93 @@ +package io.chrisdavenport.http4sotel4s + +import munit.CatsEffectSuite +import cats.effect.{IO, IOLocal} +import io.opentelemetry.api.common.{AttributeKey => JAttributeKey} +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator +import io.opentelemetry.context.propagation.{ContextPropagators => JContextPropagators} +import io.opentelemetry.sdk.{OpenTelemetrySdk => JOpenTelemetrySdk} +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter +import io.opentelemetry.sdk.trace.SdkTracerProvider +import io.opentelemetry.sdk.trace.`export`.SimpleSpanProcessor +import io.opentelemetry.sdk.trace.data.SpanData +import org.http4s._ +import org.http4s.client._ +import org.typelevel.otel4s.java.OtelJava +import org.typelevel.otel4s.java.context.Context +import org.typelevel.otel4s.java.instances._ +import org.typelevel.otel4s.trace.{Tracer, TracerProvider} + +import scala.jdk.CollectionConverters._ + +class ClientMiddlewareTests extends CatsEffectSuite { + import ClientMiddlewareTests._ + + test("ClientMiddleware") { + for { + sdk <- makeSdk + tracerIO <- sdk.provider.get("tracer") + _ <- { + implicit val tracer: Tracer[IO] = tracerIO + val fakeClient = + Client.fromHttpApp[IO] { + HttpApp[IO] { _.body.compile.drain.as(Response[IO](Status.Ok)) } + } + val tracedClient = ClientMiddleware.default[IO].build(fakeClient) + + tracedClient + .run(Request[IO](Method.GET)) + .use(_.body.compile.drain) + } + spans <- sdk.finishedSpans + } yield { + assertEquals(spans.length, 1) + val List(span) = spans + assertEquals(span.getName, "Http Client - GET") + val attributes = span.getAttributes + + def getStringAttribute(name: String): Option[String] = + Option(attributes.get(JAttributeKey.stringKey(name))) + def getLongAttribute(name: String): Option[Long] = + Option(attributes.get(JAttributeKey.longKey(name))).map(_.longValue()) + + assertEquals(getStringAttribute("span.kind"), Some("client")) + assertEquals(getStringAttribute("http.method"), Some("GET")) + assertEquals(getStringAttribute("http.url"), Some("/")) + assertEquals(getStringAttribute("http.target"), Some("/")) + assertEquals(getStringAttribute("http.host"), Some("localhost")) + assertEquals(getStringAttribute("exit.case"), Some("succeeded")) + assertEquals(getLongAttribute("http.status_code"), Some(200L)) + assertEquals(getStringAttribute("http.flavor"), Some("1.1")) + } + } +} + +object ClientMiddlewareTests { + private def makeSdk: IO[Sdk] = { + val exporter = InMemorySpanExporter.create() + + val tracerProvider = SdkTracerProvider + .builder() + .addSpanProcessor(SimpleSpanProcessor.create(exporter)) + .build() + + val jOtel = JOpenTelemetrySdk + .builder() + .setTracerProvider(tracerProvider) + .setPropagators( + JContextPropagators.create(W3CTraceContextPropagator.getInstance()) + ) + .build() + + IOLocal(Context.root).map { implicit ioLocal: IOLocal[Context] => + new Sdk(OtelJava.local[IO](jOtel).tracerProvider, exporter) + } + } + + final class Sdk( + val provider: TracerProvider[IO], + exporter: InMemorySpanExporter) { + def finishedSpans: IO[List[SpanData]] = + IO.delay(exporter.getFinishedSpanItems.asScala.toList) + } +} diff --git a/core/src/test/scala/io/chrisdavenport/natchezhttp4sotel/ClientMiddewareTests.scala b/core/src/test/scala/io/chrisdavenport/natchezhttp4sotel/ClientMiddewareTests.scala deleted file mode 100644 index ee69fc1..0000000 --- a/core/src/test/scala/io/chrisdavenport/natchezhttp4sotel/ClientMiddewareTests.scala +++ /dev/null @@ -1,114 +0,0 @@ -package io.chrisdavenport.natchezhttp4sotel - -import cats._ -import cats.syntax.all._ -import cats.effect.{Trace => _, _} -import natchez._ -import munit.CatsEffectSuite -import cats.data.Kleisli -import org.http4s._ -import org.http4s.client._ - -class ClientMiddewareTests extends TraceSuite { - traceTest( - "ClientMiddleware", - new TraceTest { - def program[F[_]: Async: Trace] = { - val fakeClient = Client.fromHttpApp(HttpApp{(req: Request[F]) => req.body.compile.drain.as(Response[F](Status.Ok))}) - val tracedClient = ClientMiddleware.default[F].build(fakeClient) - - tracedClient.run(Request[F](Method.GET)).use{ resp => - val fk = resp.attributes.lookup(ClientMiddleware.Keys.spanKey[F]).get - - fk(Trace[F].put("mainUse" -> "test")) >> - Trace[F].put("mainUseNoFk" -> "test") >> - resp.body.compile.drain - } - } - - def expectedHistory = List( - (Lineage.Root, NatchezCommand.CreateRootSpan("root", Kernel(Map()), Span.Options.Defaults)), - (Lineage.Root, NatchezCommand.CreateSpan("Http Client - GET", None, Span.Options.Defaults)), - - // Outgoing Request - (Lineage.Root / "Http Client - GET", NatchezCommand.Put(List( - "span.kind" -> "client", - "http.method" -> "GET", - "http.url" -> "/", - "http.target" -> "/", - "http.host" -> "localhost" - ))), - (Lineage.Root / "Http Client - GET", NatchezCommand.AskKernel(Kernel(Map.empty))), - - // Talk to Remote - (Lineage.Root / "Http Client - GET", NatchezCommand.Put(List("exit.case" -> "succeeded"))), - (Lineage.Root / "Http Client - GET", NatchezCommand.Put(List( - "http.status_code" -> 200, - "http.flavor" -> "1.1" - ))), - // Use Block - (Lineage.Root / "Http Client - GET", NatchezCommand.Put(List("mainUse" -> "test"))), - (Lineage.Root, NatchezCommand.Put(List("mainUseNoFk" -> "test"))), - - // Cleaning up - (Lineage.Root, NatchezCommand.ReleaseSpan("Http Client - GET")), - (Lineage.Root, NatchezCommand.ReleaseRootSpan("root")) - ) - } - - ) - -} - -trait InMemorySuite extends CatsEffectSuite { - type Lineage = InMemory.Lineage - val Lineage = InMemory.Lineage - type NatchezCommand = InMemory.NatchezCommand - val NatchezCommand = InMemory.NatchezCommand -} - -trait TraceSuite extends InMemorySuite { - val defaultRootName = "root" - trait TraceTest { - def program[F[_]: Async: Trace]: F[Unit] - - def expectedHistory: List[(Lineage, NatchezCommand)] - } - - def traceTest(name: String, tt: TraceTest): Unit = { - test(s"$name - Kleisli")( - testTraceKleisli(tt.program[Kleisli[IO, Span[IO], *]](implicitly, _), tt.expectedHistory) - ) - test(s"$name - IOLocal")(testTraceIoLocal(tt.program[IO](implicitly, _), tt.expectedHistory)) - } - - def testTraceKleisli( - traceProgram: Trace[Kleisli[IO, Span[IO], *]] => Kleisli[IO, Span[IO], Unit], - expectedHistory: List[(Lineage, NatchezCommand)] - ): IO[Unit] = testTrace[Kleisli[IO, Span[IO], *]]( - traceProgram, - root => IO.pure(Trace[Kleisli[IO, Span[IO], *]] -> (k => k.run(root))), - expectedHistory - ) - - def testTraceIoLocal( - traceProgram: Trace[IO] => IO[Unit], - expectedHistory: List[(Lineage, NatchezCommand)] - ): IO[Unit] = testTrace[IO](traceProgram, Trace.ioTrace(_).map(_ -> identity), expectedHistory) - - def testTrace[F[_]]( - traceProgram: Trace[F] => F[Unit], - makeTraceAndResolver: Span[IO] => IO[(Trace[F], F[Unit] => IO[Unit])], - expectedHistory: List[(Lineage, NatchezCommand)] - ): IO[Unit] = - InMemory.EntryPoint.create[IO].flatMap { ep => - val traced = ep.root(defaultRootName).use { r => - makeTraceAndResolver(r).flatMap { case (traceInstance, resolve) => - resolve(traceProgram(traceInstance)) - } - } - traced *> ep.ref.get.map { history => - assertEquals(history.toList, expectedHistory) - } - } -} \ No newline at end of file diff --git a/core/src/test/scala/io/chrisdavenport/natchezhttp4sotel/MainSpec.scala b/core/src/test/scala/io/chrisdavenport/natchezhttp4sotel/MainSpec.scala deleted file mode 100644 index 7592fa0..0000000 --- a/core/src/test/scala/io/chrisdavenport/natchezhttp4sotel/MainSpec.scala +++ /dev/null @@ -1,12 +0,0 @@ -package io.chrisdavenport.natchezhttp4sotel - -import munit.CatsEffectSuite -// import cats.effect._ - -class MainSpec extends CatsEffectSuite { - - test("Main should exit succesfully") { - assertEquals(true, true) - } - -} diff --git a/examples/src/main/scala/example/Common.scala b/examples/src/main/scala/example/Common.scala index 3f829d0..deeed64 100644 --- a/examples/src/main/scala/example/Common.scala +++ b/examples/src/main/scala/example/Common.scala @@ -1,12 +1,10 @@ package example import cats._ -import cats.effect.{ Trace => _, _ } +import cats.effect.{Trace => _, _} import cats.syntax.all._ -import io.jaegertracing.Configuration.ReporterConfiguration -import io.jaegertracing.Configuration.SamplerConfiguration -import natchez._ -import natchez.jaeger.Jaeger +import org.typelevel.otel4s.Attribute +import org.typelevel.otel4s.trace.Tracer import org.http4s.dsl.Http4sDsl import org.http4s.HttpRoutes import org.http4s.client.Client @@ -16,15 +14,15 @@ import org.http4s.implicits._ trait Common { // A dumb subroutine that does some tracing - def greet[F[_]: Monad: Trace](input: String) = - Trace[F].span("greet") { + def greet[F[_]: Monad: Tracer](input: String) = + Tracer[F].span("greet").use { span => for { - _ <- Trace[F].put("input" -> input) + _ <- span.addAttribute(Attribute("input", input)) } yield s"Hello $input!\n" } // Our routes, in abstract F with a Trace constraint. - def routes[F[_]: Trace: Concurrent](client: Client[F]): HttpRoutes[F] = { + def routes[F[_]: Tracer: Concurrent](client: Client[F]): HttpRoutes[F] = { object dsl extends Http4sDsl[F]; import dsl._ // bleh HttpRoutes.of[F] { @@ -44,16 +42,4 @@ trait Common { } } - // A Jaeger entry point - def entryPoint[F[_]: Sync]: Resource[F, EntryPoint[F]] = - Jaeger.entryPoint[F]( - system = "Http4sExample", - uriPrefix = Some(new java.net.URI("http://localhost:16686")), - ) { c => - Sync[F].delay { - c.withSampler(SamplerConfiguration.fromEnv) - .withReporter(ReporterConfiguration.fromEnv) - .getTracer - } - } -} \ No newline at end of file +} diff --git a/examples/src/main/scala/example/Http4sExample.scala b/examples/src/main/scala/example/Http4sExample.scala index cff8c83..663787a 100644 --- a/examples/src/main/scala/example/Http4sExample.scala +++ b/examples/src/main/scala/example/Http4sExample.scala @@ -1,13 +1,20 @@ package example import cats.effect._ +import cats.syntax.all._ import org.http4s.ember.server.EmberServerBuilder import org.http4s.ember.client.EmberClientBuilder import org.http4s.server.Server import org.http4s.implicits._ -import io.chrisdavenport.natchezhttp4sotel._ -import io.chrisdavenport.fiberlocal.GenFiberLocal +import io.chrisdavenport.http4sotel4s._ import com.comcast.ip4s._ +import fs2.io.net.Network +import io.opentelemetry.api.GlobalOpenTelemetry +import org.typelevel.otel4s.Otel4s +import org.typelevel.otel4s.java.OtelJava +import org.typelevel.otel4s.java.instances._ +import org.typelevel.otel4s.trace.Tracer +import org.typelevel.otel4s.java.context.{Context, LocalContext} /** * Start up Jaeger thus: @@ -28,13 +35,25 @@ import com.comcast.ip4s._ */ object Http4sExample extends IOApp with Common { + def globalOtel4s[F[_]: Async: LiftIO]: F[(OtelJava[F], LocalContext[F])] = + Sync[F].delay(GlobalOpenTelemetry.get) + .flatMap { jOtel => + IOLocal(Context.root) + .map { implicit ioLocal => + OtelJava.local[F](jOtel) -> implicitly[LocalContext[F]] + } + .to[F] + } + + def tracer[F[_]](otel: Otel4s[F]): F[Tracer[F]] = + otel.tracerProvider.tracer("Http4sExample").get + // Our main app resource - def server[F[_]: Async: GenFiberLocal]: Resource[F, Server] = + def server[F[_]: Async: Network: Tracer]: Resource[F, Server] = for { - iClient <- EmberClientBuilder.default[F].build - ep <- entryPoint[F] - app = ServerMiddleware.default(ep).buildHttpApp{implicit T: natchez.Trace[F] => - val client = ClientMiddleware.default.build(iClient) + client <- EmberClientBuilder.default[F].build + .map(ClientMiddleware.default.build) + app = ServerMiddleware.default[F].buildHttpApp { routes(client).orNotFound } sv <- EmberServerBuilder.default[F].withPort(port"8080").withHttpApp(app).build @@ -42,6 +61,11 @@ object Http4sExample extends IOApp with Common { // Done! def run(args: List[String]): IO[ExitCode] = - server[IO].use(_ => IO.never) + Resource.eval(globalOtel4s[IO]).flatMap { + case (otel4s, _) => + Resource.eval(tracer(otel4s)).flatMap { implicit T: Tracer[IO] => + server[IO] + } + }.use(_ => IO.never) } \ No newline at end of file diff --git a/jaeger.sh b/jaeger.sh new file mode 100755 index 0000000..3eac6fc --- /dev/null +++ b/jaeger.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash +docker run --name jaeger \ + -e COLLECTOR_OTLP_ENABLED=true \ + -p 16686:16686 \ + -p 4317:4317 \ + -p 4318:4318 \ + jaegertracing/all-in-one:1.35 \ No newline at end of file