diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5cfda504e..b29f7a909 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -91,11 +91,11 @@ jobs: - name: Make target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') - run: mkdir -p semconv/stable/.jvm/target oteljava/metrics/target oteljava/context-storage/target sdk/common/native/target core/trace/.js/target semconv/metrics/stable/.jvm/target semconv/metrics/experimental/.jvm/target semconv/metrics/stable/.native/target sdk-exporter/all/.jvm/target semconv/experimental/.js/target core/common/.jvm/target oteljava/common-testkit/target sdk/metrics/.native/target sdk-exporter/metrics/.jvm/target sdk-exporter/trace/.jvm/target unidocs/target sdk-contrib/aws/resource/.jvm/target oteljava/trace-testkit/target core/metrics/.native/target core/all/.native/target sdk/trace-testkit/.jvm/target sdk/trace-testkit/.native/target sdk/testkit/.native/target sdk-exporter/prometheus/.jvm/target semconv/experimental/.native/target core/metrics/.jvm/target core/all/.js/target sdk-exporter/proto/.jvm/target sdk-exporter/proto/.js/target semconv/stable/.native/target sdk/all/.native/target sdk/metrics-testkit/.js/target sdk-contrib/aws/xray-propagator/.native/target core/metrics/.js/target sdk/testkit/.js/target core/all/.jvm/target sdk/common/jvm/target core/trace/.native/target oteljava/metrics-testkit/target sdk/trace/.native/target semconv/experimental/.jvm/target sdk/metrics-testkit/.native/target sdk/metrics/.jvm/target oteljava/common/target scalafix/rules/target sdk-exporter/proto/.native/target core/trace/.jvm/target sdk-exporter/common/.jvm/target sdk/metrics-testkit/.jvm/target core/common/.native/target sdk/trace-testkit/.js/target core/common/.js/target oteljava/trace/target semconv/metrics/experimental/.native/target oteljava/testkit/target sdk/testkit/.jvm/target sdk-exporter/all/.js/target sdk-contrib/aws/xray/.native/target semconv/metrics/experimental/.js/target semconv/metrics/stable/.js/target sdk/all/.jvm/target sdk-exporter/all/.native/target oteljava/all/target sdk/trace/.jvm/target sdk-contrib/aws/xray-propagator/.jvm/target semconv/stable/.js/target sdk-contrib/aws/xray/.jvm/target project/target + run: mkdir -p semconv/stable/.jvm/target oteljava/metrics/target instrumentation/metrics/js/target sdk-exporter/common/.js/target sdk/common/native/target sdk/common/js/target core/trace/.js/target semconv/metrics/stable/.jvm/target semconv/metrics/experimental/.jvm/target semconv/metrics/stable/.native/target sdk-exporter/all/.jvm/target sdk-exporter/prometheus/.js/target semconv/experimental/.js/target sdk/trace/.js/target core/common/.jvm/target oteljava/common-testkit/target sdk/metrics/.native/target sdk-exporter/metrics/.jvm/target sdk-exporter/trace/.jvm/target unidocs/target sdk-contrib/aws/resource/.jvm/target oteljava/trace-testkit/target core/metrics/.native/target core/all/.native/target sdk/trace-testkit/.jvm/target sdk/trace-testkit/.native/target sdk/testkit/.native/target sdk-exporter/prometheus/.jvm/target sdk-contrib/aws/resource/.js/target semconv/experimental/.native/target core/metrics/.jvm/target core/all/.js/target sdk-exporter/proto/.jvm/target sdk-exporter/proto/.js/target sdk-exporter/metrics/.js/target semconv/stable/.native/target sdk/all/.native/target sdk/metrics-testkit/.js/target sdk-contrib/aws/xray-propagator/.native/target core/metrics/.js/target sdk/testkit/.js/target core/all/.jvm/target sdk/common/jvm/target core/trace/.native/target oteljava/metrics-testkit/target instrumentation/metrics/jvm/target sdk/trace/.native/target semconv/experimental/.jvm/target sdk/metrics-testkit/.native/target sdk/metrics/.jvm/target oteljava/common/target scalafix/rules/target sdk-exporter/proto/.native/target core/trace/.jvm/target sdk-exporter/common/.jvm/target sdk/metrics-testkit/.jvm/target sdk/metrics/.js/target sdk-exporter/trace/.js/target core/common/.native/target sdk/trace-testkit/.js/target core/common/.js/target oteljava/trace/target semconv/metrics/experimental/.native/target oteljava/testkit/target sdk/testkit/.jvm/target sdk-exporter/all/.js/target sdk-contrib/aws/xray/.native/target sdk-contrib/aws/xray/.js/target sdk-contrib/aws/xray-propagator/.js/target semconv/metrics/experimental/.js/target semconv/metrics/stable/.js/target instrumentation/metrics/native/target sdk/all/.js/target sdk/all/.jvm/target sdk-exporter/all/.native/target oteljava/all/target sdk/trace/.jvm/target sdk-contrib/aws/xray-propagator/.jvm/target semconv/stable/.js/target sdk-contrib/aws/xray/.jvm/target project/target - name: Compress target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') - run: tar cf targets.tar semconv/stable/.jvm/target oteljava/metrics/target oteljava/context-storage/target sdk/common/native/target core/trace/.js/target semconv/metrics/stable/.jvm/target semconv/metrics/experimental/.jvm/target semconv/metrics/stable/.native/target sdk-exporter/all/.jvm/target semconv/experimental/.js/target core/common/.jvm/target oteljava/common-testkit/target sdk/metrics/.native/target sdk-exporter/metrics/.jvm/target sdk-exporter/trace/.jvm/target unidocs/target sdk-contrib/aws/resource/.jvm/target oteljava/trace-testkit/target core/metrics/.native/target core/all/.native/target sdk/trace-testkit/.jvm/target sdk/trace-testkit/.native/target sdk/testkit/.native/target sdk-exporter/prometheus/.jvm/target semconv/experimental/.native/target core/metrics/.jvm/target core/all/.js/target sdk-exporter/proto/.jvm/target sdk-exporter/proto/.js/target semconv/stable/.native/target sdk/all/.native/target sdk/metrics-testkit/.js/target sdk-contrib/aws/xray-propagator/.native/target core/metrics/.js/target sdk/testkit/.js/target core/all/.jvm/target sdk/common/jvm/target core/trace/.native/target oteljava/metrics-testkit/target sdk/trace/.native/target semconv/experimental/.jvm/target sdk/metrics-testkit/.native/target sdk/metrics/.jvm/target oteljava/common/target scalafix/rules/target sdk-exporter/proto/.native/target core/trace/.jvm/target sdk-exporter/common/.jvm/target sdk/metrics-testkit/.jvm/target core/common/.native/target sdk/trace-testkit/.js/target core/common/.js/target oteljava/trace/target semconv/metrics/experimental/.native/target oteljava/testkit/target sdk/testkit/.jvm/target sdk-exporter/all/.js/target sdk-contrib/aws/xray/.native/target semconv/metrics/experimental/.js/target semconv/metrics/stable/.js/target sdk/all/.jvm/target sdk-exporter/all/.native/target oteljava/all/target sdk/trace/.jvm/target sdk-contrib/aws/xray-propagator/.jvm/target semconv/stable/.js/target sdk-contrib/aws/xray/.jvm/target project/target + run: tar cf targets.tar semconv/stable/.jvm/target oteljava/metrics/target instrumentation/metrics/js/target sdk-exporter/common/.js/target sdk/common/native/target sdk/common/js/target core/trace/.js/target semconv/metrics/stable/.jvm/target semconv/metrics/experimental/.jvm/target semconv/metrics/stable/.native/target sdk-exporter/all/.jvm/target sdk-exporter/prometheus/.js/target semconv/experimental/.js/target sdk/trace/.js/target core/common/.jvm/target oteljava/common-testkit/target sdk/metrics/.native/target sdk-exporter/metrics/.jvm/target sdk-exporter/trace/.jvm/target unidocs/target sdk-contrib/aws/resource/.jvm/target oteljava/trace-testkit/target core/metrics/.native/target core/all/.native/target sdk/trace-testkit/.jvm/target sdk/trace-testkit/.native/target sdk/testkit/.native/target sdk-exporter/prometheus/.jvm/target sdk-contrib/aws/resource/.js/target semconv/experimental/.native/target core/metrics/.jvm/target core/all/.js/target sdk-exporter/proto/.jvm/target sdk-exporter/proto/.js/target sdk-exporter/metrics/.js/target semconv/stable/.native/target sdk/all/.native/target sdk/metrics-testkit/.js/target sdk-contrib/aws/xray-propagator/.native/target core/metrics/.js/target sdk/testkit/.js/target core/all/.jvm/target sdk/common/jvm/target core/trace/.native/target oteljava/metrics-testkit/target instrumentation/metrics/jvm/target sdk/trace/.native/target semconv/experimental/.jvm/target sdk/metrics-testkit/.native/target sdk/metrics/.jvm/target oteljava/common/target scalafix/rules/target sdk-exporter/proto/.native/target core/trace/.jvm/target sdk-exporter/common/.jvm/target sdk/metrics-testkit/.jvm/target sdk/metrics/.js/target sdk-exporter/trace/.js/target core/common/.native/target sdk/trace-testkit/.js/target core/common/.js/target oteljava/trace/target semconv/metrics/experimental/.native/target oteljava/testkit/target sdk/testkit/.jvm/target sdk-exporter/all/.js/target sdk-contrib/aws/xray/.native/target sdk-contrib/aws/xray/.js/target sdk-contrib/aws/xray-propagator/.js/target semconv/metrics/experimental/.js/target semconv/metrics/stable/.js/target instrumentation/metrics/native/target sdk/all/.js/target sdk/all/.jvm/target sdk-exporter/all/.native/target oteljava/all/target sdk/trace/.jvm/target sdk-contrib/aws/xray-propagator/.jvm/target semconv/stable/.js/target sdk-contrib/aws/xray/.jvm/target project/target - name: Upload target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') diff --git a/build.sbt b/build.sbt index 20ee3fa72..cb8503011 100644 --- a/build.sbt +++ b/build.sbt @@ -132,6 +132,7 @@ lazy val root = tlCrossRootProject `core-metrics`, `core-trace`, core, + `instrumentation-metrics`, `sdk-common`, `sdk-metrics`, `sdk-metrics-testkit`, @@ -237,6 +238,24 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform) ) .settings(scalafixSettings) +// +// Instrumentation +// + +lazy val `instrumentation-metrics` = crossProject(JVMPlatform, JSPlatform, NativePlatform) + .crossType(CrossType.Full) + .in(file("instrumentation/metrics")) + .dependsOn(`core-metrics`, `core-common` % "test->test", `sdk-metrics-testkit` % Test) + .settings(munitDependencies) + .settings( + name := "otel4s-instrumentation-metrics", + startYear := Some(2024), + libraryDependencies ++= Seq( + "org.typelevel" %%% "scalacheck-effect-munit" % MUnitScalaCheckEffectVersion % Test + ) + ) + .settings(scalafixSettings) + // // SDK // @@ -883,6 +902,7 @@ lazy val docs = project oteljava, `oteljava-context-storage`, `oteljava-testkit`, + `instrumentation-metrics`.jvm, sdk.jvm, `sdk-exporter`.jvm, `sdk-exporter-prometheus`.jvm, @@ -954,6 +974,7 @@ lazy val unidocs = project `core-metrics`.jvm, `core-trace`.jvm, core.jvm, + `instrumentation-metrics`.jvm, `sdk-common`.jvm, `sdk-metrics`.jvm, `sdk-metrics-testkit`.jvm, diff --git a/core/common/src/main/scala/org/typelevel/otel4s/context/LocalProvider.scala b/core/common/src/main/scala/org/typelevel/otel4s/context/LocalProvider.scala index 688a1b522..5749c36c8 100644 --- a/core/common/src/main/scala/org/typelevel/otel4s/context/LocalProvider.scala +++ b/core/common/src/main/scala/org/typelevel/otel4s/context/LocalProvider.scala @@ -21,7 +21,6 @@ import cats.effect.IOLocal import cats.effect.LiftIO import cats.effect.MonadCancelThrow import cats.mtl.Local -import org.typelevel.otel4s.instances.local._ /** A utility class to simplify the creation of the [[cats.mtl.Local Local]]. * @@ -73,15 +72,9 @@ object LocalProvider extends LocalProviderLowPriority { * @tparam Ctx * the type of the context */ - def fromIOLocal[F[_]: MonadCancelThrow: LiftIO, Ctx]( - ioLocal: IOLocal[Ctx] - ): LocalProvider[F, Ctx] = + def fromIOLocal[F[_]: MonadCancelThrow: LiftIO, Ctx](ioLocal: IOLocal[Ctx]): LocalProvider[F, Ctx] = new LocalProvider[F, Ctx] { - val local: F[Local[F, Ctx]] = - MonadCancelThrow[F].pure( - localForIOLocal[F, Ctx](implicitly, implicitly, ioLocal) - ) - + val local: F[Local[F, Ctx]] = MonadCancelThrow[F].pure(localForIOLocal[F, Ctx](ioLocal)) override def toString: String = "LocalProvider.fromIOLocal" } @@ -96,9 +89,7 @@ object LocalProvider extends LocalProviderLowPriority { * @tparam Ctx * the type of the context */ - def fromLocal[F[_]: Applicative, Ctx]( - l: Local[F, Ctx] - ): LocalProvider[F, Ctx] = + def fromLocal[F[_]: Applicative, Ctx](l: Local[F, Ctx]): LocalProvider[F, Ctx] = new LocalProvider[F, Ctx] { val local: F[Local[F, Ctx]] = Applicative[F].pure(l) override def toString: String = "LocalProvider.fromLocal" @@ -118,16 +109,11 @@ object LocalProvider extends LocalProviderLowPriority { * @tparam Ctx * the type of the context */ - def fromLiftIO[ - F[_]: MonadCancelThrow: LiftIO, - Ctx: Contextual - ]: LocalProvider[F, Ctx] = + def fromLiftIO[F[_]: MonadCancelThrow: LiftIO, Ctx: Contextual]: LocalProvider[F, Ctx] = new LocalProvider[F, Ctx] { def local: F[Local[F, Ctx]] = IOLocal(Contextual[Ctx].root) - .map { implicit ioLocal: IOLocal[Ctx] => - localForIOLocal[F, Ctx](implicitly, implicitly, ioLocal) - } + .map(ioLocal => localForIOLocal[F, Ctx](ioLocal)) .to[F] override def toString: String = "LocalProvider.fromLiftIO" @@ -139,19 +125,25 @@ object LocalProvider extends LocalProviderLowPriority { ](implicit ioLocal: IOLocal[Ctx]): LocalProvider[F, Ctx] = LocalProvider.fromIOLocal(ioLocal) - implicit def liftFromLocal[ - F[_]: Applicative, - Ctx - ](implicit local: Local[F, Ctx]): LocalProvider[F, Ctx] = + implicit def liftFromLocal[F[_]: Applicative, Ctx](implicit local: Local[F, Ctx]): LocalProvider[F, Ctx] = LocalProvider.fromLocal(local) + + /** Cats Effect 3.6 introduced `IOLocal#asLocal`. However, we need a variation for a polymorphic type. + */ + private def localForIOLocal[F[_]: MonadCancelThrow: LiftIO, Ctx](ioLocal: IOLocal[Ctx]): Local[F, Ctx] = + new Local[F, Ctx] { + def applicative: Applicative[F] = + Applicative[F] + def ask[E2 >: Ctx]: F[E2] = + MonadCancelThrow[F].widen[Ctx, E2](ioLocal.get.to[F]) + def local[A](fa: F[A])(f: Ctx => Ctx): F[A] = + MonadCancelThrow[F].bracket(ioLocal.modify(e => (f(e), e)).to[F])(_ => fa)(ioLocal.set(_).to[F]) + } } sealed trait LocalProviderLowPriority { self: LocalProvider.type => - implicit def liftFromLiftIO[ - F[_]: MonadCancelThrow: LiftIO, - Ctx: Contextual - ]: LocalProvider[F, Ctx] = + implicit def liftFromLiftIO[F[_]: MonadCancelThrow: LiftIO, Ctx: Contextual]: LocalProvider[F, Ctx] = LocalProvider.fromLiftIO } diff --git a/core/common/src/main/scala/org/typelevel/otel4s/instances/AllInstances.scala b/core/common/src/main/scala/org/typelevel/otel4s/instances/AllInstances.scala deleted file mode 100644 index 83702f62e..000000000 --- a/core/common/src/main/scala/org/typelevel/otel4s/instances/AllInstances.scala +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Copyright 2022 Typelevel - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.typelevel.otel4s.instances - -trait AllInstances extends LocalInstances diff --git a/core/common/src/main/scala/org/typelevel/otel4s/instances/LocalInstances.scala b/core/common/src/main/scala/org/typelevel/otel4s/instances/LocalInstances.scala deleted file mode 100644 index 1c48dc03a..000000000 --- a/core/common/src/main/scala/org/typelevel/otel4s/instances/LocalInstances.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright 2022 Typelevel - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.typelevel.otel4s.instances - -import cats.Applicative -import cats.Functor -import cats.effect.IOLocal -import cats.effect.LiftIO -import cats.effect.MonadCancelThrow -import cats.mtl.Local - -trait LocalInstances { - - // We hope this instance is moved into Cats Effect. - // See https://github.com/typelevel/cats-effect/pull/3429 - implicit final def localForIOLocal[F[_]: MonadCancelThrow: LiftIO, E](implicit - ioLocal: IOLocal[E] - ): Local[F, E] = - new Local[F, E] { - def applicative: Applicative[F] = - Applicative[F] - def ask[E2 >: E]: F[E2] = - Functor[F].widen[E, E2](ioLocal.get.to[F]) - def local[A](fa: F[A])(f: E => E): F[A] = - MonadCancelThrow[F].bracket(ioLocal.modify(e => (f(e), e)).to[F])(_ => fa)(ioLocal.set(_).to[F]) - } - -} diff --git a/core/common/src/main/scala/org/typelevel/otel4s/instances/package.scala b/core/common/src/main/scala/org/typelevel/otel4s/instances/package.scala deleted file mode 100644 index 97ac89a29..000000000 --- a/core/common/src/main/scala/org/typelevel/otel4s/instances/package.scala +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright 2022 Typelevel - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.typelevel.otel4s - -package object instances { - - object all extends AllInstances - - object local extends LocalInstances - -} diff --git a/core/common/src/test/scala/org/typelevel/otel4s/context/LocalProviderSuite.scala b/core/common/src/test/scala/org/typelevel/otel4s/context/LocalProviderSuite.scala index e85259df0..b7e326f4d 100644 --- a/core/common/src/test/scala/org/typelevel/otel4s/context/LocalProviderSuite.scala +++ b/core/common/src/test/scala/org/typelevel/otel4s/context/LocalProviderSuite.scala @@ -36,9 +36,7 @@ class LocalProviderSuite extends CatsEffectSuite { test("lift LocalProvider from implicit Local (1)") { IOLocal(VaultContext.root).map { ioLocal => - import org.typelevel.otel4s.instances.local.localForIOLocal - implicit val local: Local[IO, VaultContext] = - localForIOLocal(implicitly, implicitly, ioLocal) + implicit val local: Local[IO, VaultContext] = ioLocal.asLocal assertEquals( LocalProvider[IO, VaultContext].toString, diff --git a/core/common/src/test/scala/org/typelevel/otel4s/instances/LocalInstancesSuite.scala b/core/common/src/test/scala/org/typelevel/otel4s/instances/LocalInstancesSuite.scala deleted file mode 100644 index ab79a734a..000000000 --- a/core/common/src/test/scala/org/typelevel/otel4s/instances/LocalInstancesSuite.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright 2022 Typelevel - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.typelevel.otel4s.instances - -import cats.effect.IO -import cats.effect.IOLocal -import cats.effect.testkit.TestInstances -import cats.mtl.laws.discipline.LocalTests -import munit.DisciplineSuite -import org.scalacheck.Arbitrary.arbString -import org.typelevel.otel4s.instances.local._ - -class LocalInstancesSuite extends DisciplineSuite with TestInstances { - implicit val ticker: Ticker = Ticker() - - unsafeRun { - IOLocal("").map { implicit ioLocal => - checkAll("IOLocal.LocalLaws", LocalTests[IO, String].local[String, Int]) - } - } -} diff --git a/core/metrics/src/main/scala/org/typelevel/otel4s/metrics/BatchCallback.scala b/core/metrics/src/main/scala/org/typelevel/otel4s/metrics/BatchCallback.scala index e90af39d5..069d8b7dc 100644 --- a/core/metrics/src/main/scala/org/typelevel/otel4s/metrics/BatchCallback.scala +++ b/core/metrics/src/main/scala/org/typelevel/otel4s/metrics/BatchCallback.scala @@ -51,7 +51,7 @@ trait BatchCallback[F[_]] { * }}} * * @param callback - * the callback to to observe values on-demand + * the callback to observe values on-demand * * @param observable * the instrument for which the callback may observe values diff --git a/docs/instrumentation/directory.conf b/docs/instrumentation/directory.conf index 563e85cfe..83c8e3255 100644 --- a/docs/instrumentation/directory.conf +++ b/docs/instrumentation/directory.conf @@ -2,6 +2,7 @@ laika.title = Instrumentation laika.navigationOrder = [ metrics.md + metrics-cats-effect-io-runtime.md tracing.md tracing-cross-service-propagation.md ] diff --git a/docs/instrumentation/metrics-cats-effect-io-runtime.md b/docs/instrumentation/metrics-cats-effect-io-runtime.md new file mode 100644 index 000000000..2b25f4add --- /dev/null +++ b/docs/instrumentation/metrics-cats-effect-io-runtime.md @@ -0,0 +1,573 @@ +# Metrics | Cats Effect IO runtime + +## Available metrics + +```scala mdoc:invisible +import cats.effect.IO +import cats.effect.unsafe.implicits.global +import org.typelevel.otel4s.instrumentation.ce.IORuntimeMetrics +import org.typelevel.otel4s.metrics.MeterProvider +import org.typelevel.otel4s.sdk.testkit.metrics.MetricsTestkit +import IORuntimeMetrics.Config._ + +def printMetrics(config: IORuntimeMetrics.Config): Unit = { + val metrics = MetricsTestkit.inMemory[IO]().use { testkit => + implicit val mp: MeterProvider[IO] = testkit.meterProvider + + IORuntimeMetrics + .register[IO](global.metrics, config) + .surround(testkit.collectMetrics) + }.unsafeRunSync() + + println("| Name | Description | Unit |") + println("|-|-|-|") + println(metrics.sortBy(_.name).map(m => s"${m.name} | ${m.description.getOrElse("")} | ${m.unit.getOrElse("")}").mkString("\n")) +} +``` + +### CPU Starvation + +**Platforms**: JVM, Scala.js, Scala Native. + +These metrics could help identify performance bottlenecks caused by an overloaded compute pool, +excessive task scheduling, or lack of CPU resources. + +```scala mdoc:passthrough +printMetrics(IORuntimeMetrics.Config(CpuStarvationConfig.enabled, WorkStealingThreadPoolConfig.disabled)) +``` + +### Work-stealing thread pool - compute + +**Platforms**: JVM. + +**Built-in attributes**: +* `pool.id` - the id of the work-stealing thread pool + +These metrics provide insights about fibers and threads within the compute pool. +They help diagnose load distribution, identify bottlenecks, and monitor the pool’s efficiency in handling tasks. + +```scala mdoc:passthrough +printMetrics( + IORuntimeMetrics.Config( + CpuStarvationConfig.disabled, + WorkStealingThreadPoolConfig( + WorkStealingThreadPoolConfig.ComputeConfig.enabled, + WorkStealingThreadPoolConfig.WorkerThreadsConfig.disabled, + ) + ) +) +``` + +### Work-stealing thread pool - thread + +**Platforms**: JVM. + +**Built-in attributes**: +* `pool.id` - the id of the work-stealing thread pool the worker is used by +* `worker.index` - the index of the worker thread +* `thread.event` - the thread event + * `parked` - a thread is parked + * `polled` - a thread is polled for I/O events + * `blocked` - a thread is switched to a blocking thread and been replaced + * `respawn` - a thread is replaced by a newly spawned thread + +These metrics provide detailed information about threads state within the compute pool. + +```scala mdoc:passthrough +printMetrics( + IORuntimeMetrics.Config( + CpuStarvationConfig.disabled, + WorkStealingThreadPoolConfig( + WorkStealingThreadPoolConfig.ComputeConfig.disabled, + WorkStealingThreadPoolConfig.WorkerThreadsConfig( + WorkStealingThreadPoolConfig.WorkerThreadsConfig.ThreadConfig.enabled, + WorkStealingThreadPoolConfig.WorkerThreadsConfig.LocalQueueConfig.disabled, + WorkStealingThreadPoolConfig.WorkerThreadsConfig.TimerHeapConfig.disabled, + WorkStealingThreadPoolConfig.WorkerThreadsConfig.PollerConfig.disabled + ), + ) + ) +) +``` + +### Work-stealing thread pool - local queue + +**Platforms**: JVM. + +**Built-in attributes**: +* `pool.id` - the id of the work-stealing thread pool the queue is used by +* `worker.index` - the index of the worker thread the queue is used by + +These metrics provide a detailed view of fiber distribution within the pool. They help diagnose +load imbalances and system inefficiency. + +```scala mdoc:passthrough +printMetrics( + IORuntimeMetrics.Config( + CpuStarvationConfig.disabled, + WorkStealingThreadPoolConfig( + WorkStealingThreadPoolConfig.ComputeConfig.disabled, + WorkStealingThreadPoolConfig.WorkerThreadsConfig( + WorkStealingThreadPoolConfig.WorkerThreadsConfig.ThreadConfig.disabled, + WorkStealingThreadPoolConfig.WorkerThreadsConfig.LocalQueueConfig.enabled, + WorkStealingThreadPoolConfig.WorkerThreadsConfig.TimerHeapConfig.disabled, + WorkStealingThreadPoolConfig.WorkerThreadsConfig.PollerConfig.disabled + ), + ) + ) +) +``` + +### Work-stealing thread pool - timer heap + +**Platforms**: JVM. + +**Built-in attributes**: +* `pool.id` - the id of the work-stealing thread pool the timer heap is used by +* `worker.index` - the index of the worker thread the timer heap is used by +* `timer.state` - the state of the timer + * `executed` - the successfully executed timer + * `scheduled` - the scheduled timer + * `canceled` - the canceled timer + +These metrics provide a detailed view of timer stats within the pool. + +```scala mdoc:passthrough +printMetrics( + IORuntimeMetrics.Config( + CpuStarvationConfig.disabled, + WorkStealingThreadPoolConfig( + WorkStealingThreadPoolConfig.ComputeConfig.disabled, + WorkStealingThreadPoolConfig.WorkerThreadsConfig( + WorkStealingThreadPoolConfig.WorkerThreadsConfig.ThreadConfig.disabled, + WorkStealingThreadPoolConfig.WorkerThreadsConfig.LocalQueueConfig.disabled, + WorkStealingThreadPoolConfig.WorkerThreadsConfig.TimerHeapConfig.enabled, + WorkStealingThreadPoolConfig.WorkerThreadsConfig.PollerConfig.disabled + ), + ) + ) +) +``` + +### Work-stealing thread pool - poller + +**Platforms**: JVM. + +**Built-in attributes**: +* `pool.id` - the id of the work-stealing thread pool the poller is used by +* `worker.index` - the index of the worker thread the poller is used by +* `poller.operation` - the operation performed by the poller + * `accept` + * `connect` + * `read` + * `write` +* `poller.operation.status` - the status of the operation + * `submitted` - the operation has been submitted + * `succeeded` - the operation has errored + * `errored` - the operation has errored + * `canceled` - the operation has been canceled + +These metrics provide a detailed view of poller stats within the pool. + +```scala mdoc:passthrough +printMetrics( + IORuntimeMetrics.Config( + CpuStarvationConfig.disabled, + WorkStealingThreadPoolConfig( + WorkStealingThreadPoolConfig.ComputeConfig.disabled, + WorkStealingThreadPoolConfig.WorkerThreadsConfig( + WorkStealingThreadPoolConfig.WorkerThreadsConfig.ThreadConfig.disabled, + WorkStealingThreadPoolConfig.WorkerThreadsConfig.LocalQueueConfig.disabled, + WorkStealingThreadPoolConfig.WorkerThreadsConfig.TimerHeapConfig.disabled, + WorkStealingThreadPoolConfig.WorkerThreadsConfig.PollerConfig.enabled + ), + ) + ) +) +``` + +## Getting started + +Add the following configuration to the favorite build tool: + +@:select(build-tool) + +@:choice(sbt) + +Add settings to the `build.sbt`: + +```scala +libraryDependencies ++= Seq( + "org.typelevel" %%% "otel4s-instrumentation-metrics" % "@VERSION@" // <1> +) +``` + +@:choice(scala-cli) + +Add directives to the `*.scala` file: + +```scala +//> using dep "org.typelevel::otel4s-instrumentation-metrics::@VERSION@" // <1> +``` + +@:@ + +1. Add the `otel4s-instrumentation-metrics` library + +## Registering metrics collectors + +`IORuntimeMetrics.register` takes care of the metrics lifecycle management. + +@:select(otel-backend) + +@:choice(oteljava) + +```scala mdoc:reset:silent +import cats.effect._ +import org.typelevel.otel4s.instrumentation.ce.IORuntimeMetrics +import org.typelevel.otel4s.metrics.MeterProvider +import org.typelevel.otel4s.trace.TracerProvider +import org.typelevel.otel4s.oteljava.OtelJava + +object Main extends IOApp.Simple { + + def run: IO[Unit] = + OtelJava.autoConfigured[IO]().use { otel4s => + implicit val mp: MeterProvider[IO] = otel4s.meterProvider + IORuntimeMetrics + .register[IO](runtime.metrics, IORuntimeMetrics.Config.default) + .surround { + program(otel4s.meterProvider, otel4s.tracerProvider) + } + } + + def program( + meterProvider: MeterProvider[IO], + tracerProvider: TracerProvider[IO] + ): IO[Unit] = { + val _ = (meterProvider, tracerProvider) + IO.unit + } + +} +``` + +@:choice(sdk) + +```scala mdoc:reset:silent +import cats.effect._ +import org.typelevel.otel4s.instrumentation.ce.IORuntimeMetrics +import org.typelevel.otel4s.metrics.MeterProvider +import org.typelevel.otel4s.trace.TracerProvider +import org.typelevel.otel4s.sdk.OpenTelemetrySdk + +object Main extends IOApp.Simple { + + def run: IO[Unit] = + OpenTelemetrySdk.autoConfigured[IO]().use { autoConfigured => + val sdk = autoConfigured.sdk + implicit val mp: MeterProvider[IO] = sdk.meterProvider + IORuntimeMetrics + .register[IO](runtime.metrics, IORuntimeMetrics.Config.default) + .surround { + program(sdk.meterProvider, sdk.tracerProvider) + } + } + + def program( + meterProvider: MeterProvider[IO], + tracerProvider: TracerProvider[IO] + ): IO[Unit] = { + val _ = (meterProvider, tracerProvider) + IO.unit + } + +} +``` + +@:@ + + +## Customization + +The behavior of the `IORuntimeMetrics.register` can be customized via `IORuntimeMetrics.Config`. + +### CPU Starvation + +```scala mdoc:reset:invisible +import cats.effect.IO +import org.typelevel.otel4s.{Attribute, Attributes} +import org.typelevel.otel4s.instrumentation.ce.IORuntimeMetrics +import org.typelevel.otel4s.metrics.MeterProvider + +val runtime = cats.effect.unsafe.implicits.global +implicit val mp: MeterProvider[IO] = MeterProvider.noop[IO] +``` + +To disable CPU starvation metrics: +```scala mdoc:silent +val config: IORuntimeMetrics.Config = { + import IORuntimeMetrics.Config._ + IORuntimeMetrics.Config( + CpuStarvationConfig.disabled, // disable CPU starvation metrics + WorkStealingThreadPoolConfig.enabled + ) +} + +IORuntimeMetrics.register[IO](runtime.metrics, config) +``` + +To attach attributes to CPU starvation metrics: +```scala mdoc:nest:silent +val config: IORuntimeMetrics.Config = { + import IORuntimeMetrics.Config._ + IORuntimeMetrics.Config( + CpuStarvationConfig.enabled( + Attributes(Attribute("key", "value")) // the attributes + ), + WorkStealingThreadPoolConfig.enabled + ) +} + +IORuntimeMetrics.register[IO](runtime.metrics, config) +``` + +### Work-stealing thread pool - compute + +To disable worker metrics: +```scala mdoc:nest:silent +val config: IORuntimeMetrics.Config = { + import IORuntimeMetrics.Config._ + import WorkStealingThreadPoolConfig._ + + IORuntimeMetrics.Config( + CpuStarvationConfig.enabled, + WorkStealingThreadPoolConfig( + ComputeConfig.disabled, // disable compute metrics + WorkStealingThreadPoolConfig.WorkerThreadsConfig.enabled + ) + ) +} + +IORuntimeMetrics.register[IO](runtime.metrics, config) +``` + +To attach attributes to compute metrics: +```scala mdoc:nest:silent +val config: IORuntimeMetrics.Config = { + import IORuntimeMetrics.Config._ + import WorkStealingThreadPoolConfig._ + + IORuntimeMetrics.Config( + CpuStarvationConfig.enabled, + WorkStealingThreadPoolConfig( + ComputeConfig.enabled( + Attributes(Attribute("key", "value")) // attributes + ), + WorkStealingThreadPoolConfig.WorkerThreadsConfig.enabled + ) + ) +} + +IORuntimeMetrics.register[IO](runtime.metrics, config) +``` + +### Work-stealing thread pool - thread + +To disable thread metrics: +```scala mdoc:nest:silent +val config: IORuntimeMetrics.Config = { + import IORuntimeMetrics.Config._ + import WorkStealingThreadPoolConfig._ + + IORuntimeMetrics.Config( + CpuStarvationConfig.enabled, + WorkStealingThreadPoolConfig( + ComputeConfig.enabled, + WorkerThreadsConfig( + WorkerThreadsConfig.ThreadConfig.disabled, // disable worker thread metrics + WorkerThreadsConfig.LocalQueueConfig.enabled, + WorkerThreadsConfig.TimerHeapConfig.enabled, + WorkerThreadsConfig.PollerConfig.enabled + ) + ) + ) +} + +IORuntimeMetrics.register[IO](runtime.metrics, config) +``` + +To attach attributes to thread metrics: +```scala mdoc:nest:silent +val config: IORuntimeMetrics.Config = { + import IORuntimeMetrics.Config._ + import WorkStealingThreadPoolConfig._ + + IORuntimeMetrics.Config( + CpuStarvationConfig.enabled, + WorkStealingThreadPoolConfig( + ComputeConfig.enabled, + WorkerThreadsConfig( + WorkerThreadsConfig.ThreadConfig.enabled( + Attributes(Attribute("key", "value")) // the attributes + ), + WorkerThreadsConfig.LocalQueueConfig.enabled, + WorkerThreadsConfig.TimerHeapConfig.enabled, + WorkerThreadsConfig.PollerConfig.enabled + ) + ) + ) +} + +IORuntimeMetrics.register[IO](runtime.metrics, config) +``` + +### Work-stealing thread pool - local queue + +To disable local queue metrics: +```scala mdoc:nest:silent +val config: IORuntimeMetrics.Config = { + import IORuntimeMetrics.Config._ + import WorkStealingThreadPoolConfig._ + + IORuntimeMetrics.Config( + CpuStarvationConfig.enabled, + WorkStealingThreadPoolConfig( + ComputeConfig.enabled, + WorkerThreadsConfig( + WorkerThreadsConfig.ThreadConfig.enabled, + WorkerThreadsConfig.LocalQueueConfig.disabled, // disable local queue metrics + WorkerThreadsConfig.TimerHeapConfig.enabled, + WorkerThreadsConfig.PollerConfig.enabled + ) + ) + ) +} + +IORuntimeMetrics.register[IO](runtime.metrics, config) +``` + +To attach attributes to local queue metrics: +```scala mdoc:nest:silent +val config: IORuntimeMetrics.Config = { + import IORuntimeMetrics.Config._ + import WorkStealingThreadPoolConfig._ + + IORuntimeMetrics.Config( + CpuStarvationConfig.enabled, + WorkStealingThreadPoolConfig( + ComputeConfig.enabled, + WorkerThreadsConfig( + WorkerThreadsConfig.ThreadConfig.enabled, + WorkerThreadsConfig.LocalQueueConfig.enabled( + Attributes(Attribute("key", "value")) // the attributes + ), + WorkerThreadsConfig.TimerHeapConfig.enabled, + WorkerThreadsConfig.PollerConfig.enabled + ) + ) + ) +} + +IORuntimeMetrics.register[IO](runtime.metrics, config) +``` + +### Work-stealing thread pool - timer heap + +To disable timer heap metrics: +```scala mdoc:nest:silent +val config: IORuntimeMetrics.Config = { + import IORuntimeMetrics.Config._ + import WorkStealingThreadPoolConfig._ + + IORuntimeMetrics.Config( + CpuStarvationConfig.enabled, + WorkStealingThreadPoolConfig( + ComputeConfig.enabled, + WorkerThreadsConfig( + WorkerThreadsConfig.ThreadConfig.enabled, + WorkerThreadsConfig.LocalQueueConfig.enabled, + WorkerThreadsConfig.TimerHeapConfig.enabled, // disable timer heap metrics + WorkerThreadsConfig.PollerConfig.enabled + ) + ) + ) +} + +IORuntimeMetrics.register[IO](runtime.metrics, config) +``` + +To attach attributes to timer heap metrics: +```scala mdoc:nest:silent +val config: IORuntimeMetrics.Config = { + import IORuntimeMetrics.Config._ + import WorkStealingThreadPoolConfig._ + + IORuntimeMetrics.Config( + CpuStarvationConfig.enabled, + WorkStealingThreadPoolConfig( + ComputeConfig.enabled, + WorkerThreadsConfig( + WorkerThreadsConfig.ThreadConfig.enabled, + WorkerThreadsConfig.LocalQueueConfig.enabled, + WorkerThreadsConfig.TimerHeapConfig.enabled( + Attributes(Attribute("key", "value")) // the attributes + ), + WorkerThreadsConfig.PollerConfig.enabled + ) + ) + ) +} + +IORuntimeMetrics.register[IO](runtime.metrics, config) +``` + +### Work-stealing thread pool - poller + +To disable poller metrics: +```scala mdoc:nest:silent +val config: IORuntimeMetrics.Config = { + import IORuntimeMetrics.Config._ + import WorkStealingThreadPoolConfig._ + + IORuntimeMetrics.Config( + CpuStarvationConfig.enabled, + WorkStealingThreadPoolConfig( + ComputeConfig.enabled, + WorkerThreadsConfig( + WorkerThreadsConfig.ThreadConfig.enabled, + WorkerThreadsConfig.LocalQueueConfig.enabled, + WorkerThreadsConfig.TimerHeapConfig.enabled, + WorkerThreadsConfig.PollerConfig.disabled // disable poller metrics + ) + ) + ) +} + +IORuntimeMetrics.register[IO](runtime.metrics, config) +``` + +To attach attributes to poller metrics: +```scala mdoc:nest:silent +val config: IORuntimeMetrics.Config = { + import IORuntimeMetrics.Config._ + import WorkStealingThreadPoolConfig._ + + IORuntimeMetrics.Config( + CpuStarvationConfig.enabled, + WorkStealingThreadPoolConfig( + ComputeConfig.enabled, + WorkerThreadsConfig( + WorkerThreadsConfig.ThreadConfig.enabled, + WorkerThreadsConfig.LocalQueueConfig.enabled, + WorkerThreadsConfig.TimerHeapConfig.enabled, + WorkerThreadsConfig.PollerConfig.enabled( + Attributes(Attribute("key", "value")) // the attributes + ) + ) + ) + ) +} + +IORuntimeMetrics.register[IO](runtime.metrics, config) +``` diff --git a/docs/tracing-context-propagation.md b/docs/tracing-context-propagation.md index e401a1cf4..3da35e4b0 100644 --- a/docs/tracing-context-propagation.md +++ b/docs/tracing-context-propagation.md @@ -33,7 +33,6 @@ You can find both examples below and choose which one suits your requirements. import cats.effect._ import cats.mtl.Local import cats.syntax.flatMap._ -import org.typelevel.otel4s.instances.local._ // brings Local derived from IOLocal import org.typelevel.otel4s.oteljava.context.Context import org.typelevel.otel4s.oteljava.OtelJava import io.opentelemetry.api.GlobalOpenTelemetry @@ -47,7 +46,7 @@ def program[F[_]: Async](otel4s: OtelJava[F]): F[Unit] = { } val run: IO[Unit] = - IOLocal(Context.root).flatMap { implicit ioLocal: IOLocal[Context] => + IOLocal(Context.root).map(_.asLocal).flatMap { implicit local: Local[IO, Context] => createOtel4s[IO].flatMap(otel4s => program(otel4s)) } ``` diff --git a/instrumentation/metrics/js-native/src/main/scala/org/typelevel/otel4s/instrumentation/ce/IORuntimeMetricsPlatform.scala b/instrumentation/metrics/js-native/src/main/scala/org/typelevel/otel4s/instrumentation/ce/IORuntimeMetricsPlatform.scala new file mode 100644 index 000000000..75421fa42 --- /dev/null +++ b/instrumentation/metrics/js-native/src/main/scala/org/typelevel/otel4s/instrumentation/ce/IORuntimeMetricsPlatform.scala @@ -0,0 +1,173 @@ +/* + * Copyright 2024 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.instrumentation.ce + +import cats.Show +import cats.effect.Resource +import cats.effect.Sync +import cats.effect.unsafe.metrics.{IORuntimeMetrics => CatsIORuntimeMetrics} +import cats.syntax.applicative._ +import org.typelevel.otel4s.Attributes +import org.typelevel.otel4s.metrics.MeterProvider + +private[ce] trait IORuntimeMetricsPlatform { + self: IORuntimeMetrics.type => + + sealed trait Config { + + /** The configuration of the CPU starvation metrics. + */ + def cpuStarvation: Config.CpuStarvationConfig + + override final def toString: String = + Show[Config].show(this) + } + + object Config { + + sealed trait CpuStarvationConfig { + + /** Indicates whether metrics are enabled. + */ + def enabled: Boolean + + /** The attributes to attach to the metrics. + */ + def attributes: Attributes + + override final def toString: String = + Show[CpuStarvationConfig].show(this) + } + + object CpuStarvationConfig { + + /** The metrics are enabled. + */ + def enabled: CpuStarvationConfig = + Impl(enabled = true, Attributes.empty) + + /** The metrics are enabled and the given `attributes` will be attached. + * + * @param attributes + * the attributes to attach to the metrics + */ + def enabled(attributes: Attributes): CpuStarvationConfig = + Impl(enabled = true, attributes) + + /** The metrics are disabled. + */ + def disabled: CpuStarvationConfig = + Impl(enabled = false, Attributes.empty) + + implicit val cpuStarvationConfigShow: Show[CpuStarvationConfig] = { cfg => + s"CpuStarvationConfig{enabled=${cfg.enabled}, attributes=${cfg.attributes}}" + } + + private case class Impl(enabled: Boolean, attributes: Attributes) extends CpuStarvationConfig + } + + /** The default configuration, the following metrics are enabled: + * - CPU starvation + */ + def default: Config = + Impl(CpuStarvationConfig.enabled) + + /** A configuration with the given `cpuStarvation`. + * + * @param cpuStarvation + * the CPU starvation configuration to use + */ + def apply(cpuStarvation: CpuStarvationConfig): Config = + Impl(cpuStarvation) + + implicit val configShow: Show[Config] = { cfg => + s"IORuntimeMetrics.Config{cpuStarvation=${cfg.cpuStarvation}}" + } + + private case class Impl(cpuStarvation: CpuStarvationConfig) extends Config + } + + /** Registers the following collectors depending on the `config`: + * - CPU starvation + * + * @example + * {{{ + * object Main extends IOApp.Simple { + * def program( + * meterProvider: MeterProvider[IO], + * tracerProvider: TracerProvider[IO] + * ): IO[Unit] = ??? + * + * def run: IO[Unit] = + * OpenTelemetrySdk.autoConfigured[IO]().use { autoConfigured => + * val sdk = autoConfigured.sdk + * implicit val mp: MeterProvider[IO] = sdk.meterProvider + * + * IORuntimeMetrics + * .register[IO](runtime.metrics, IORuntimeMetrics.Config.default) + * .surround { + * program(sdk.meterProvider, sdk.tracerProvider) + * } + * } + * } + * }}} + * + * =CPU starvation metrics= + * + * Registers the CPU starvation: + * - `cats.effect.runtime.cpu.starvation.count` + * - `cats.effect.runtime.cpu.starvation.clock.drift.current` + * - `cats.effect.runtime.cpu.starvation.clock.drift.max` + * + * To disable CPU starvation metrics, customize a config: + * {{{ + * val config: IORuntimeMetrics.Config = { + * import IORuntimeMetrics.Config._ + * IORuntimeMetrics.Config( + * CpuStarvationConfig.disabled // disable CPU starvation metrics + * ) + * } + * + * IORuntimeMetrics.register[IO](runtime.metrics, config) + * }}} + * + * To attach attributes to CPU starvation metrics, customize a config: + * {{{ + * val config: IORuntimeMetrics.Config = { + * import IORuntimeMetrics.Config._ + * IORuntimeMetrics.Config( + * CpuStarvationConfig.enabled( + * Attributes(Attribute("key", "value")) // the attributes + * ) + * ) + * } + * + * IORuntimeMetrics.register[IO](runtime.metrics, config) + * }}} + */ + def register[F[_]: Sync: MeterProvider]( + metrics: CatsIORuntimeMetrics, + config: Config + ): Resource[F, Unit] = + Resource.eval(MeterProvider[F].get(Const.MeterNamespace)).flatMap { implicit meter => + cpuStarvationMetrics( + metrics.cpuStarvation, + config.cpuStarvation.attributes + ).whenA(config.cpuStarvation.enabled) + } + +} diff --git a/instrumentation/metrics/js-native/src/test/scala/org/typelevel/otel4s/instrumentation/ce/IORuntimeMetricsSuite.scala b/instrumentation/metrics/js-native/src/test/scala/org/typelevel/otel4s/instrumentation/ce/IORuntimeMetricsSuite.scala new file mode 100644 index 000000000..d876a48d4 --- /dev/null +++ b/instrumentation/metrics/js-native/src/test/scala/org/typelevel/otel4s/instrumentation/ce/IORuntimeMetricsSuite.scala @@ -0,0 +1,118 @@ +/* + * Copyright 2024 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.instrumentation.ce + +import cats.Show +import cats.effect.IO +import munit.CatsEffectSuite +import munit.ScalaCheckEffectSuite +import org.scalacheck.Arbitrary +import org.scalacheck.Prop +import org.scalacheck.effect.PropF +import org.typelevel.otel4s.Attributes +import org.typelevel.otel4s.metrics.MeterProvider +import org.typelevel.otel4s.scalacheck.Arbitraries._ +import org.typelevel.otel4s.sdk.metrics.data.MetricData +import org.typelevel.otel4s.sdk.testkit.metrics.MetricsTestkit + +class IORuntimeMetricsSuite extends CatsEffectSuite with ScalaCheckEffectSuite { + import IORuntimeMetrics.Config.CpuStarvationConfig + + test("register metrics using default config") { + MetricsTestkit.inMemory[IO]().use { testkit => + implicit val meterProvider: MeterProvider[IO] = testkit.meterProvider + + val expected = cpuStarvationMetrics + + for { + metrics <- IORuntimeMetrics + .register[IO](munitIORuntime.metrics, IORuntimeMetrics.Config.default) + .surround(testkit.collectMetrics) + } yield assertEquals(metrics.map(toMetric).sortBy(_.name), expected.sortBy(_.name)) + } + } + + test("register metrics according to the config") { + PropF.forAllF { (config: IORuntimeMetrics.Config) => + MetricsTestkit.inMemory[IO]().use { testkit => + implicit val meterProvider: MeterProvider[IO] = testkit.meterProvider + + val expected = List( + config.cpuStarvation.enabled -> cpuStarvationMetrics, + ).collect { case (true, metrics) => metrics }.flatten + + for { + metrics <- IORuntimeMetrics + .register[IO](munitIORuntime.metrics, config) + .surround(testkit.collectMetrics) + } yield assertEquals(metrics.map(toMetric).sortBy(_.name), expected.sortBy(_.name)) + } + } + } + + test("Show[IORuntimeMetrics.Config]") { + Prop.forAll { (config: IORuntimeMetrics.Config) => + val cpuStarvation = config.cpuStarvation + + val expected = "IORuntimeMetrics.Config{" + + s"cpuStarvation=CpuStarvationConfig{enabled=${cpuStarvation.enabled}, attributes=${cpuStarvation.attributes}}" + + "}" + + assertEquals(Show[IORuntimeMetrics.Config].show(config), expected) + assertEquals(config.toString, expected) + } + } + + private case class Metric(name: String, description: Option[String], unit: Option[String]) + + private def toMetric(metric: MetricData): Metric = + Metric(metric.name, metric.description, metric.unit) + + private val cpuStarvationMetrics = List( + Metric( + "cats.effect.runtime.cpu.starvation.count", + Some("The number of CPU starvation events."), + None + ), + Metric( + "cats.effect.runtime.cpu.starvation.clock.drift.current", + Some("The current CPU drift in milliseconds."), + Some("ms") + ), + Metric( + "cats.effect.runtime.cpu.starvation.clock.drift.max", + Some("The max CPU drift in milliseconds."), + Some("ms") + ) + ) + + private implicit val cpuStarvationArbitrary: Arbitrary[CpuStarvationConfig] = + Arbitrary( + for { + enabled <- Arbitrary.arbitrary[Boolean] + attributes <- Arbitrary.arbitrary[Attributes] + } yield if (enabled) CpuStarvationConfig.enabled(attributes) else CpuStarvationConfig.disabled + ) + + private implicit val configArbitrary: Arbitrary[IORuntimeMetrics.Config] = + Arbitrary( + for { + cpuStarvation <- Arbitrary.arbitrary[CpuStarvationConfig] + } yield IORuntimeMetrics.Config(cpuStarvation) + ) + +} diff --git a/instrumentation/metrics/jvm/src/main/scala/org/typelevel/otel4s/instrumentation/ce/IORuntimeMetricsPlatform.scala b/instrumentation/metrics/jvm/src/main/scala/org/typelevel/otel4s/instrumentation/ce/IORuntimeMetricsPlatform.scala new file mode 100644 index 000000000..43d422a83 --- /dev/null +++ b/instrumentation/metrics/jvm/src/main/scala/org/typelevel/otel4s/instrumentation/ce/IORuntimeMetricsPlatform.scala @@ -0,0 +1,1183 @@ +/* + * Copyright 2024 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.instrumentation.ce + +import cats.Show +import cats.effect.Resource +import cats.effect.Sync +import cats.effect.unsafe.metrics.{IORuntimeMetrics => CatsIORuntimeMetrics} +import cats.effect.unsafe.metrics.WorkStealingPoolMetrics +import cats.effect.unsafe.metrics.WorkerThreadMetrics +import cats.syntax.applicative._ +import cats.syntax.flatMap._ +import cats.syntax.foldable._ +import cats.syntax.functor._ +import org.typelevel.otel4s.Attribute +import org.typelevel.otel4s.Attributes +import org.typelevel.otel4s.metrics.Meter +import org.typelevel.otel4s.metrics.MeterProvider + +private[ce] trait IORuntimeMetricsPlatform { + self: IORuntimeMetrics.type => + + sealed trait Config { + + /** The configuration of the CPU starvation metrics. + */ + def cpuStarvation: Config.CpuStarvationConfig + + /** The configuration of the work-stealing thread pool (WSTP) metrics. + */ + def workStealingThreadPool: Config.WorkStealingThreadPoolConfig + + override final def toString: String = + Show[Config].show(this) + } + + object Config { + + sealed trait CpuStarvationConfig { + + /** Indicates whether metrics are enabled. + */ + def enabled: Boolean + + /** The attributes to attach to the metrics. + */ + def attributes: Attributes + + override final def toString: String = + Show[CpuStarvationConfig].show(this) + } + + object CpuStarvationConfig { + + /** The metrics are enabled. + */ + def enabled: CpuStarvationConfig = + Impl(enabled = true, Attributes.empty) + + /** The metrics are enabled and the given `attributes` will be attached. + * + * @param attributes + * the attributes to attach to the metrics + */ + def enabled(attributes: Attributes): CpuStarvationConfig = + Impl(enabled = true, attributes) + + /** The metrics are disabled. + */ + def disabled: CpuStarvationConfig = + Impl(enabled = false, Attributes.empty) + + implicit val cpuStarvationConfigShow: Show[CpuStarvationConfig] = { cfg => + s"CpuStarvationConfig{enabled=${cfg.enabled}, attributes=${cfg.attributes}}" + } + + private case class Impl(enabled: Boolean, attributes: Attributes) extends CpuStarvationConfig + } + + sealed trait WorkStealingThreadPoolConfig { + + /** The configuration of the pool metrics. + */ + def compute: WorkStealingThreadPoolConfig.ComputeConfig + + /** The configuration of the worker thread metrics. + */ + def workerThreads: WorkStealingThreadPoolConfig.WorkerThreadsConfig + + override final def toString: String = + Show[WorkStealingThreadPoolConfig].show(this) + } + + object WorkStealingThreadPoolConfig { + sealed trait ComputeConfig { + + /** Indicates whether metrics are enabled. + */ + def enabled: Boolean + + /** The attributes to attach to the metrics. + */ + def attributes: Attributes + + override final def toString: String = + Show[ComputeConfig].show(this) + } + + object ComputeConfig { + + /** The metrics are enabled. + */ + def enabled: ComputeConfig = + Impl(enabled = true, Attributes.empty) + + /** The metrics are enabled and the given `attributes` will be attached. + * + * @param attributes + * the attributes to attach to the metrics + */ + def enabled(attributes: Attributes): ComputeConfig = + Impl(enabled = true, attributes) + + /** The metrics are disabled. + */ + def disabled: ComputeConfig = + Impl(enabled = false, Attributes.empty) + + implicit val computeConfigShow: Show[ComputeConfig] = { cfg => + s"ComputeConfig{enabled=${cfg.enabled}, attributes=${cfg.attributes}}" + } + + private case class Impl(enabled: Boolean, attributes: Attributes) extends ComputeConfig + } + + sealed trait WorkerThreadsConfig { + + /** The configuration of the worker thread metrics. + */ + def thread: WorkerThreadsConfig.ThreadConfig + + /** The configuration of the local queue metrics. + */ + def localQueue: WorkerThreadsConfig.LocalQueueConfig + + /** The configuration of the timer heap metrics. + */ + def timerHeap: WorkerThreadsConfig.TimerHeapConfig + + /** The configuration of the ploler metrics. + */ + def poller: WorkerThreadsConfig.PollerConfig + + override final def toString: String = + Show[WorkerThreadsConfig].show(this) + } + + object WorkerThreadsConfig { + + sealed trait ThreadConfig { + + /** Indicates whether metrics are enabled. + */ + def enabled: Boolean + + /** The attributes to attach to the metrics. + */ + def attributes: Attributes + + override final def toString: String = + Show[ThreadConfig].show(this) + } + + object ThreadConfig { + + /** The metrics are enabled. + */ + def enabled: ThreadConfig = + Impl(enabled = true, Attributes.empty) + + /** The metrics are enabled and the given `attributes` will be attached. + * + * @param attributes + * the attributes to attach to the metrics + */ + def enabled(attributes: Attributes): ThreadConfig = + Impl(enabled = true, attributes) + + /** The metrics are disabled. + */ + def disabled: ThreadConfig = + Impl(enabled = false, Attributes.empty) + + implicit val threadConfigShow: Show[ThreadConfig] = { cfg => + s"ThreadConfig{enabled=${cfg.enabled}, attributes=${cfg.attributes}}" + } + + private case class Impl(enabled: Boolean, attributes: Attributes) extends ThreadConfig + + } + + sealed trait LocalQueueConfig { + + /** Indicates whether metrics are enabled. + */ + def enabled: Boolean + + /** The attributes to attach to the metrics. + */ + def attributes: Attributes + + override final def toString: String = + Show[LocalQueueConfig].show(this) + } + + object LocalQueueConfig { + + /** The metrics are enabled. + */ + def enabled: LocalQueueConfig = + Impl(enabled = true, Attributes.empty) + + /** The metrics are enabled and the given `attributes` will be attached. + * + * @param attributes + * the attributes to attach to the metrics + */ + def enabled(attributes: Attributes): LocalQueueConfig = + Impl(enabled = true, attributes) + + /** The metrics are disabled. + */ + def disabled: LocalQueueConfig = + Impl(enabled = false, Attributes.empty) + + implicit val localQueueConfigShow: Show[LocalQueueConfig] = { cfg => + s"LocalQueueConfig{enabled=${cfg.enabled}, attributes=${cfg.attributes}}" + } + + private case class Impl(enabled: Boolean, attributes: Attributes) extends LocalQueueConfig + } + + sealed trait TimerHeapConfig { + + /** Indicates whether metrics are enabled. + */ + def enabled: Boolean + + /** The attributes to attach to the metrics. + */ + def attributes: Attributes + + override final def toString: String = + Show[TimerHeapConfig].show(this) + } + + object TimerHeapConfig { + + /** The metrics are enabled. + */ + def enabled: TimerHeapConfig = + Impl(enabled = true, Attributes.empty) + + /** The metrics are enabled and the given `attributes` will be attached. + * + * @param attributes + * the attributes to attach to the metrics + */ + def enabled(attributes: Attributes): TimerHeapConfig = + Impl(enabled = true, attributes) + + /** The metrics are disabled. + */ + def disabled: TimerHeapConfig = + Impl(enabled = false, Attributes.empty) + + implicit val timerHeapConfigShow: Show[TimerHeapConfig] = { cfg => + s"TimerHeapConfig{enabled=${cfg.enabled}, attributes=${cfg.attributes}}" + } + + private case class Impl(enabled: Boolean, attributes: Attributes) extends TimerHeapConfig + } + + sealed trait PollerConfig { + + /** Indicates whether metrics are enabled. + */ + def enabled: Boolean + + /** The attributes to attach to the metrics. + */ + def attributes: Attributes + + override final def toString: String = + Show[PollerConfig].show(this) + } + + object PollerConfig { + + /** The metrics are enabled. + */ + def enabled: PollerConfig = + Impl(enabled = true, Attributes.empty) + + /** The metrics are enabled and the given `attributes` will be attached. + * + * @param attributes + * the attributes to attach to the metrics + */ + def enabled(attributes: Attributes): PollerConfig = + Impl(enabled = true, attributes) + + /** The metrics are disabled. + */ + def disabled: PollerConfig = + Impl(enabled = false, Attributes.empty) + + implicit val pollerConfigShow: Show[PollerConfig] = { cfg => + s"PollerConfig{enabled=${cfg.enabled}, attributes=${cfg.attributes}}" + } + + private case class Impl(enabled: Boolean, attributes: Attributes) extends PollerConfig + } + + /** A configuration with the given configs. + * + * @param thread + * the worker configuration to use + * + * @param localQueue + * the local queue configuration to use + * + * @param timerHeap + * the timer heap configuration to use + * + * @param poller + * the poller configuration to use + */ + def apply( + thread: ThreadConfig, + localQueue: LocalQueueConfig, + timerHeap: TimerHeapConfig, + poller: PollerConfig + ): WorkerThreadsConfig = + Impl(thread, localQueue, timerHeap, poller) + + /** All metrics (worker, local queue, timer heap, poller) are enabled. + */ + def enabled: WorkerThreadsConfig = + Impl(ThreadConfig.enabled, LocalQueueConfig.enabled, TimerHeapConfig.enabled, PollerConfig.enabled) + + /** All metrics (worker, local queue, timer heap, poller) are disabled. + */ + def disabled: WorkerThreadsConfig = + Impl(ThreadConfig.disabled, LocalQueueConfig.disabled, TimerHeapConfig.disabled, PollerConfig.disabled) + + implicit val workerThreadsConfigShow: Show[WorkerThreadsConfig] = { cfg => + "WorkerThreadsConfig{" + + s"thread=${cfg.thread}, " + + s"localQueue=${cfg.localQueue}, " + + s"timerHeap=${cfg.timerHeap}, " + + s"poller=${cfg.poller}}" + } + + private case class Impl( + thread: ThreadConfig, + localQueue: LocalQueueConfig, + timerHeap: TimerHeapConfig, + poller: PollerConfig + ) extends WorkerThreadsConfig + } + + /** A configuration with the given `compute` and `workerThreads` configurations. + * + * @param compute + * the compute configuration to use + * + * @param workerThreads + * the worker threads configuration to use + */ + def apply(compute: ComputeConfig, workerThreads: WorkerThreadsConfig): WorkStealingThreadPoolConfig = + Impl(compute, workerThreads) + + /** All metrics (pool, worker threads) are enabled. + */ + def enabled: WorkStealingThreadPoolConfig = + Impl(ComputeConfig.enabled, WorkerThreadsConfig.enabled) + + /** All metrics (pool, worker threads) are disabled. + */ + def disabled: WorkStealingThreadPoolConfig = + Impl(ComputeConfig.disabled, WorkerThreadsConfig.disabled) + + implicit val workStealingThreadPoolConfigShow: Show[WorkStealingThreadPoolConfig] = { cfg => + s"WorkStealingThreadPoolConfig{compute=${cfg.compute}, workerThreads=${cfg.workerThreads}}" + } + + private case class Impl( + compute: ComputeConfig, + workerThreads: WorkerThreadsConfig + ) extends WorkStealingThreadPoolConfig + } + + /** The default configuration, the following metrics are enabled: + * - CPU starvation + * - Work-stealing thread pool - worker + * - Work-stealing thread pool - local queue + * - Work-stealing thread pool - timer heap + * - Work-stealing thread pool - poller + */ + def default: Config = + Config(CpuStarvationConfig.enabled, WorkStealingThreadPoolConfig.enabled) + + /** A configuration with the given `cpuStarvation` and `workStealingThreadPool`. + * + * @param cpuStarvation + * the CPU starvation configuration to use + * + * @param workStealingThreadPool + * the work stealing thread pool configuration to use + */ + def apply(cpuStarvation: CpuStarvationConfig, workStealingThreadPool: WorkStealingThreadPoolConfig): Config = + Impl(cpuStarvation, workStealingThreadPool) + + implicit val configShow: Show[Config] = { cfg => + s"IORuntimeMetrics.Config{cpuStarvation=${cfg.cpuStarvation}, workStealingThreadPool=${cfg.workStealingThreadPool}}" + } + + private case class Impl( + cpuStarvation: CpuStarvationConfig, + workStealingThreadPool: WorkStealingThreadPoolConfig + ) extends Config + } + + /** Registers the following collectors depending on the `config`: + * - runtime pool metrics + * - runtime worker thread metrics + * - runtime local queue metrics + * - runtime timer heap metrics + * - runtime poller metrics + * - CPU starvation + * + * By default, all metrics are enabled. + * + * =CPU starvation metrics= + * + * Registers the CPU starvation: + * - `cats.effect.runtime.cpu.starvation.count` + * - `cats.effect.runtime.cpu.starvation.clock.drift.current` + * - `cats.effect.runtime.cpu.starvation.clock.drift.max` + * + * To disable CPU starvation metrics, customize a config: + * {{{ + * val config: IORuntimeMetrics.Config = { + * import IORuntimeMetrics.Config._ + * IORuntimeMetrics.Config( + * CpuStarvationConfig.disabled, // disable CPU starvation metrics + * WorkStealingThreadPoolConfig.enabled + * ) + * } + * + * IORuntimeMetrics.register[IO](runtime.metrics, config) + * }}} + * + * To attach attributes to CPU starvation metrics, customize a config: + * {{{ + * val config: IORuntimeMetrics.Config = { + * import IORuntimeMetrics.Config._ + * IORuntimeMetrics.Config( + * CpuStarvationConfig.enabled( + * Attributes(Attribute("key", "value")) // the attributes + * ), + * WorkStealingThreadPoolConfig.enabled + * ) + * } + * + * IORuntimeMetrics.register[IO](runtime.metrics, config) + * }}} + * + * =WSTP metrics= + * + * ==Compute metrics== + * + * Registers the runtime compute metrics: + * - `cats.effect.runtime.wstp.compute.thread.count` + * - `cats.effect.runtime.wstp.compute.thread.active.count` + * - `cats.effect.runtime.wstp.compute.thread.blocked.count` + * - `cats.effect.runtime.wstp.compute.thread.searching.count` + * - `cats.effect.runtime.wstp.compute.fiber.enqueued.count` + * + * Built-in attributes: + * - `pool.id` - the id of the work-stealing thread pool + * + * To disable WSTP pool metrics, customize a config: + * {{{ + * val config: IORuntimeMetrics.Config = { + * import IORuntimeMetrics.Config._ + * import WorkStealingThreadPoolConfig._ + * + * IORuntimeMetrics.Config( + * CpuStarvationConfig.enabled, + * WorkStealingThreadPoolConfig( + * ComputeConfig.disabled, // disable compute metrics + * WorkerThreadsConfig.enabled + * ) + * ) + * } + * + * IORuntimeMetrics.register[IO](runtime.metrics, config) + * }}} + * + * To attach attributes to WSTP pool metrics, customize a config: + * {{{ + * val config: IORuntimeMetrics.Config = { + * import IORuntimeMetrics.Config._ + * import WorkStealingThreadPoolConfig._ + * + * IORuntimeMetrics.Config( + * CpuStarvationConfig.enabled, + * WorkStealingThreadPoolConfig( + * ComputeConfig.enabled( + * Attributes(Attribute("key", "value")) // the attributes + * ), + * WorkerThreadsConfig.enabled + * ) + * ) + * } + * + * IORuntimeMetrics.register[IO](runtime.metrics, config) + * }}} + * + * ==Worker thread metrics== + * + * Registers the runtime worker metrics: + * - `cats.effect.runtime.wstp.worker.thread.idle.duration` + * - `cats.effect.runtime.wstp.worker.thread.event.count` + * + * Built-in attributes: + * - `pool.id` - the id of the work-stealing thread pool the queue is used by + * - `worker.index` - the index of the worker + * - `thread.event` - the thread event + * - `parked` - a thread is parked + * - `polled` - a thread is polled for I/O events + * - `blocked` - a thread is switched to a blocking thread and been replaced + * - `respawn` - a thread is replaced by a newly spawned thread + * + * To disable WSTP worker thread metrics, customize a config: + * {{{ + * val config: IORuntimeMetrics.Config = { + * import IORuntimeMetrics.Config._ + * import WorkStealingThreadPoolConfig._ + * + * IORuntimeMetrics.Config( + * CpuStarvationConfig.enabled, + * WorkStealingThreadPoolConfig( + * ComputeConfig.enabled, + * WorkerThreadsConfig( + * WorkerThreadsConfig.ThreadConfig.disabled, // disable worker thread metrics + * WorkerThreadsConfig.LocalQueueConfig.enabled, + * WorkerThreadsConfig.TimerHeapConfig.enabled, + * WorkerThreadsConfig.PollerConfig.enabled + * ) + * ) + * ) + * } + * + * IORuntimeMetrics.register[IO](runtime.metrics, config) + * }}} + * + * To attach attributes to WSTP worker thread metrics, customize a config: + * {{{ + * val config: IORuntimeMetrics.Config = { + * import IORuntimeMetrics.Config._ + * import WorkStealingThreadPoolConfig._ + * + * IORuntimeMetrics.Config( + * CpuStarvationConfig.enabled, + * WorkStealingThreadPoolConfig( + * ComputeConfig.enabled, + * WorkerThreadsConfig( + * WorkerThreadsConfig.ThreadConfig.enabled( + * Attributes(Attribute("key", "value")) // the attributes + * ), + * WorkerThreadsConfig.LocalQueueConfig.enabled, + * WorkerThreadsConfig.TimerHeapConfig.enabled, + * WorkerThreadsConfig.PollerConfig.enabled + * ) + * ) + * ) + * } + * + * IORuntimeMetrics.register[IO](runtime.metrics, config) + * }}} + * + * ==Local queue metrics== + * + * Registers the runtime local queue metrics: + * - `cats.effect.runtime.wstp.worker.localqueue.fiber.enqueued.count` + * - `cats.effect.runtime.wstp.worker.localqueue.fiber.spillover.count` + * - `cats.effect.runtime.wstp.worker.localqueue.fiber.steal.attempt.count` + * - `cats.effect.runtime.wstp.worker.localqueue.fiber.stolen.count` + * - `cats.effect.runtime.wstp.worker.localqueue.fiber.count` + * + * Built-in attributes: + * - `pool.id` - the id of the work-stealing thread pool the queue is used by + * - `worker.index` - the index of the worker the queue is used by + * + * To disable WSTP local queue metrics, customize a config: + * {{{ + * val config: IORuntimeMetrics.Config = { + * import IORuntimeMetrics.Config._ + * import WorkStealingThreadPoolConfig._ + * + * IORuntimeMetrics.Config( + * CpuStarvationConfig.enabled, + * WorkStealingThreadPoolConfig( + * ComputeConfig.enabled, + * WorkerThreadsConfig( + * WorkerThreadsConfig.ThreadConfig.enabled, + * WorkerThreadsConfig.LocalQueueConfig.disabled, // disable local queue metrics + * WorkerThreadsConfig.TimerHeapConfig.enabled, + * WorkerThreadsConfig.PollerConfig.enabled + * ) + * ) + * ) + * } + * + * IORuntimeMetrics.register[IO](runtime.metrics, config) + * }}} + * + * To attach attributes to WSTP local queue metrics, customize a config: + * {{{ + * val config: IORuntimeMetrics.Config = { + * import IORuntimeMetrics.Config._ + * import WorkStealingThreadPoolConfig._ + * + * IORuntimeMetrics.Config( + * CpuStarvationConfig.enabled, + * WorkStealingThreadPoolConfig( + * ComputeConfig.enabled, + * WorkerThreadsConfig( + * WorkerThreadsConfig.ThreadConfig.enabled, + * WorkerThreadsConfig.LocalQueueConfig.enabled( + * Attributes(Attribute("key", "value")) // the attributes + * ), + * WorkerThreadsConfig.TimerHeapConfig.enabled, + * WorkerThreadsConfig.PollerConfig.enabled + * ) + * ) + * ) + * } + * + * IORuntimeMetrics.register[IO](runtime.metrics, config) + * }}} + * + * ==Timer heap metrics== + * + * Registers the runtime time heap metrics: + * - `cats.effect.runtime.wstp.worker.timerheap.outstanding.count` + * - `cats.effect.runtime.wstp.worker.timerheap.packed.count` + * - `cats.effect.runtime.wstp.worker.timerheap.timer.count` + * - `cats.effect.runtime.wstp.worker.timerheap.next.due` + * + * Built-in attributes: + * - `pool.id` - the id of the work-stealing thread pool the time heap is used by + * - `worker.index` - the index of the worker the timer heap is used by + * - `timer.state` - the state of the timer + * - `executed` - the successfully executed timer + * - `scheduled` - the scheduled timer + * - `canceled` - the canceled timer + * + * To disable WSTP timer heap metrics, customize a config: + * {{{ + * val config: IORuntimeMetrics.Config = { + * import IORuntimeMetrics.Config._ + * import WorkStealingThreadPoolConfig._ + * + * IORuntimeMetrics.Config( + * CpuStarvationConfig.enabled, + * WorkStealingThreadPoolConfig( + * ComputeConfig.enabled, + * WorkerThreadsConfig( + * WorkerThreadsConfig.ThreadConfig.enabled, + * WorkerThreadsConfig.LocalQueueConfig.enabled, + * WorkerThreadsConfig.TimerHeapConfig.enabled, // disable timer heap metrics + * WorkerThreadsConfig.PollerConfig.enabled + * ) + * ) + * ) + * } + * + * IORuntimeMetrics.register[IO](runtime.metrics, config) + * }}} + * + * To attach attributes to WSTP timer heap metrics, customize a config: + * {{{ + * val config: IORuntimeMetrics.Config = { + * import IORuntimeMetrics.Config._ + * import WorkStealingThreadPoolConfig._ + * + * IORuntimeMetrics.Config( + * CpuStarvationConfig.enabled, + * WorkStealingThreadPoolConfig( + * ComputeConfig.enabled, + * WorkerThreadsConfig( + * WorkerThreadsConfig.ThreadConfig.enabled, + * WorkerThreadsConfig.LocalQueueConfig.enabled, + * WorkerThreadsConfig.TimerHeapConfig.enabled( + * Attributes(Attribute("key", "value")) // the attributes + * ), + * WorkerThreadsConfig.PollerConfig.enabled + * ) + * ) + * ) + * } + * + * IORuntimeMetrics.register[IO](runtime.metrics, config) + * }}} + * + * ==Poller metrics== + * + * Registers the runtime poller metrics: + * - `cats.effect.runtime.wstp.worker.poller.operation.outstanding.count` + * - `cats.effect.runtime.wstp.worker.poller.operation.count` + * + * Built-in attributes: + * - `pool.id` - the id of the work-stealing thread pool the poller is used by + * - `worker.index` - the index of the worker thread the poller is used by + * - `poller.operation` - the operation performed by the poller + * - `accept` + * - `connect` + * - `read` + * - `write` + * - `poller.operation.status` - the status of the operation + * - `submitted` - the operation has been submitted + * - `succeeded` - the operation has errored + * - `errored` - the operation has errored + * - `canceled` - the operation has been canceled + * + * To disable WSTP poller metrics, customize a config: + * {{{ + * val config: IORuntimeMetrics.Config = { + * import IORuntimeMetrics.Config._ + * import WorkStealingThreadPoolConfig._ + * + * IORuntimeMetrics.Config( + * CpuStarvationConfig.enabled, + * WorkStealingThreadPoolConfig( + * ComputeConfig.enabled, + * WorkerThreadsConfig( + * WorkerThreadsConfig.ThreadConfig.enabled, + * WorkerThreadsConfig.LocalQueueConfig.enabled, + * WorkerThreadsConfig.TimerHeapConfig.enabled, + * WorkerThreadsConfig.PollerConfig.disabled // disable poller metrics + * ) + * ) + * ) + * } + * + * IORuntimeMetrics.register[IO](runtime.metrics, config) + * }}} + * + * To attach attributes to WSTP poller metrics, customize a config: + * {{{ + * val config: IORuntimeMetrics.Config = { + * import IORuntimeMetrics.Config._ + * import WorkStealingThreadPoolConfig._ + * + * IORuntimeMetrics.Config( + * CpuStarvationConfig.enabled, + * WorkStealingThreadPoolConfig( + * ComputeConfig.enabled, + * WorkerThreadsConfig( + * WorkerThreadsConfig.ThreadConfig.enabled, + * WorkerThreadsConfig.LocalQueueConfig.enabled, + * WorkerThreadsConfig.TimerHeapConfig.enabled, + * WorkerThreadsConfig.PollerConfig.enabled( + * Attributes(Attribute("key", "value")) // the attributes + * ) + * ) + * ) + * ) + * } + * + * IORuntimeMetrics.register[IO](runtime.metrics, config) + * }}} + * + * @example + * {{{ + * object Main extends IOApp.Simple { + * def program( + * meterProvider: MeterProvider[IO], + * tracerProvider: TracerProvider[IO] + * ): IO[Unit] = ??? + * + * def run: IO[Unit] = + * OtelJava.autoConfigured[IO]().use { otel4s => + * implicit val mp: MeterProvider[IO] = otel4s.meterProvider + * + * IORuntimeMetrics + * .register[IO](runtime.metrics, IORuntimeMetrics.Config.default) + * .surround { + * program(otel4s.meterProvider, otel4s.tracerProvider) + * } + * } + * } + * }}} + */ + def register[F[_]: Sync: MeterProvider]( + metrics: CatsIORuntimeMetrics, + config: Config + ): Resource[F, Unit] = + Resource.eval(MeterProvider[F].get(Const.MeterNamespace)).flatMap { implicit meter => + val wstpMetrics = metrics.workStealingThreadPool match { + case Some(pool) => + val poolId = pool.identifier + + val computeConfig = config.workStealingThreadPool.compute + val workerThreadsConfig = config.workStealingThreadPool.workerThreads + + for { + _ <- computeMetrics(poolId, pool, computeConfig.attributes) + .whenA(computeConfig.enabled) + + _ <- threadMetrics(poolId, pool.workerThreads, workerThreadsConfig.thread.attributes) + .whenA(workerThreadsConfig.thread.enabled) + + _ <- localQueueMetrics(poolId, pool.workerThreads, workerThreadsConfig.localQueue.attributes) + .whenA(workerThreadsConfig.localQueue.enabled) + + _ <- timerHeapMetrics(poolId, pool.workerThreads, workerThreadsConfig.timerHeap.attributes) + .whenA(workerThreadsConfig.timerHeap.enabled) + + _ <- pollerMetrics(poolId, pool.workerThreads, workerThreadsConfig.poller.attributes) + .whenA(workerThreadsConfig.poller.enabled) + } yield () + + case None => + Resource.unit[F] + } + + for { + _ <- cpuStarvationMetrics(metrics.cpuStarvation, config.cpuStarvation.attributes) + .whenA(config.cpuStarvation.enabled) + + _ <- wstpMetrics + } yield () + } + + private def computeMetrics[F[_]: Sync: Meter]( + poolId: String, + metrics: WorkStealingPoolMetrics, + extraAttributes: Attributes + ): Resource[F, Unit] = { + val prefix = s"${Const.MeterNamespace}.wstp.compute" + + Meter[F].batchCallback.of( + Meter[F] + .observableGauge[Long](s"$prefix.thread.count") + .withDescription( + "The number of worker thread instances backing the work-stealing thread pool (WSTP)." + ) + .withUnit("{thread}") + .createObserver, + Meter[F] + .observableGauge[Long](s"$prefix.thread.active.count") + .withDescription( + "The number of active worker thread instances currently executing fibers on the compute thread pool." + ) + .withUnit("{thread}") + .createObserver, + Meter[F] + .observableGauge[Long](s"$prefix.thread.searching.count") + .withDescription( + "The number of worker thread instances currently searching for fibers to steal from other worker threads." + ) + .withUnit("{thread}") + .createObserver, + Meter[F] + .observableGauge[Long](s"$prefix.thread.blocked.count") + .withDescription( + "The number of worker thread instances that can run blocking actions on the compute thread pool." + ) + .withUnit("{thread}") + .createObserver, + Meter[F] + .observableGauge[Long](s"$prefix.fiber.enqueued.count") + .withDescription("The total number of fibers enqueued on all local queues.") + .withUnit("{fiber}") + .createObserver, + Meter[F] + .observableGauge[Long](s"$prefix.fiber.suspended.count") + .withDescription("The number of fibers which are currently asynchronously suspended.") + .withUnit("{fiber}") + .createObserver + ) { (total, active, searching, blocked, enqueued, suspended) => + val attributes = Attributes(Attribute("pool.id", poolId)) ++ extraAttributes + + for { + snapshot <- Sync[F].delay( + ( + metrics.workerThreadCount(), + metrics.activeThreadCount(), + metrics.searchingThreadCount(), + metrics.blockedWorkerThreadCount(), + metrics.localQueueFiberCount(), + metrics.suspendedFiberCount(), + ) + ) + _ <- total.record(snapshot._1, attributes) + _ <- active.record(snapshot._2, attributes) + _ <- searching.record(snapshot._3, attributes) + _ <- blocked.record(snapshot._4, attributes) + _ <- enqueued.record(snapshot._5, attributes) + _ <- suspended.record(snapshot._6, attributes) + } yield () + } + } + + private def threadMetrics[F[_]: Sync: Meter]( + poolId: String, + metrics: List[WorkerThreadMetrics], + extraAttributes: Attributes + ): Resource[F, Unit] = { + val prefix = s"${Const.MeterNamespace}.wstp.worker.thread" + + Meter[F].batchCallback.of( + Meter[F] + .observableCounter[Long](s"$prefix.idle.duration") + .withDescription("The total amount of time in nanoseconds that this WorkerThread has been idle.") + .withUnit("ns") + .createObserver, + Meter[F] + .observableCounter[Long](s"$prefix.event.count") + .withDescription("The total number of events that happened to this WorkerThread.") + .withUnit("{event}") + .createObserver, + ) { (idleDuration, eventCount) => + metrics.traverse_ { workerMetrics => + val attributes = Attributes( + Attribute("pool.id", poolId), + Attribute("worker.index", workerMetrics.index.toLong) + ) ++ extraAttributes + + def recordCount(value: Long, state: String): F[Unit] = + eventCount.record( + value, + attributes ++ Attributes(Attribute("thread.event", state)) + ) + + for { + snapshot <- Sync[F].delay( + ( + workerMetrics.idleTime(), + workerMetrics.parkedCount(), + workerMetrics.polledCount(), + workerMetrics.blockingCount(), + workerMetrics.respawnCount() + ) + ) + _ <- idleDuration.record(snapshot._1, attributes) + _ <- recordCount(snapshot._2, "parked") + _ <- recordCount(snapshot._3, "polled") + _ <- recordCount(snapshot._4, "blocked") + _ <- recordCount(snapshot._5, "respawn") + } yield () + } + } + } + + private def localQueueMetrics[F[_]: Sync: Meter]( + poolId: String, + metrics: List[WorkerThreadMetrics], + extraAttributes: Attributes + ): Resource[F, Unit] = { + val prefix = s"${Const.MeterNamespace}.wstp.worker.localqueue" + + Meter[F].batchCallback.of( + Meter[F] + .observableUpDownCounter[Long](s"$prefix.fiber.enqueued.count") + .withDescription("The current number of enqueued fibers.") + .withUnit("{fiber}") + .createObserver, + Meter[F] + .observableCounter[Long](s"$prefix.fiber.count") + .withDescription( + "The total number of fibers enqueued during the lifetime of the local queue." + ) + .withUnit("{fiber}") + .createObserver, + Meter[F] + .observableCounter[Long](s"$prefix.fiber.spillover.count") + .withDescription("The total number of fibers spilt over to the external queue.") + .withUnit("{fiber}") + .createObserver, + Meter[F] + .observableCounter[Long](s"$prefix.fiber.steal_attempt.count") + .withDescription("The total number of successful steal attempts by other worker threads.") + .withUnit("{fiber}") + .createObserver, + Meter[F] + .observableCounter[Long](s"$prefix.fiber.stolen.count") + .withDescription("The total number of stolen fibers by other worker threads.") + .withUnit("{fiber}") + .createObserver + ) { (fiberEnqueued, fiberTotal, fiberSpillover, stealAttemptCount, stolenCount) => + metrics.traverse_ { workerMetrics => + val attributes = Attributes( + Attribute("pool.id", poolId), + Attribute("worker.index", workerMetrics.index.toLong) + ) ++ extraAttributes + + for { + snapshot <- Sync[F].delay( + ( + workerMetrics.localQueue.fiberCount(), + workerMetrics.localQueue.totalFiberCount(), + workerMetrics.localQueue.totalSpilloverCount(), + workerMetrics.localQueue.successfulStealAttemptCount(), + workerMetrics.localQueue.stolenFiberCount() + ) + ) + _ <- fiberEnqueued.record(snapshot._1, attributes) + _ <- fiberTotal.record(snapshot._2, attributes) + _ <- fiberSpillover.record(snapshot._3, attributes) + _ <- stealAttemptCount.record(snapshot._4, attributes) + _ <- stolenCount.record(snapshot._5, attributes) + } yield () + } + } + } + + private def timerHeapMetrics[F[_]: Sync: Meter]( + poolId: String, + metrics: List[WorkerThreadMetrics], + extraAttributes: Attributes + ): Resource[F, Unit] = { + val prefix = s"${Const.MeterNamespace}.wstp.worker.timerheap" + + Meter[F].batchCallback.of( + Meter[F] + .observableUpDownCounter[Long](s"$prefix.outstanding.count") + .withDescription("The current number of the outstanding timers, that remain to be executed.") + .withUnit("{timer}") + .createObserver, + Meter[F] + .observableCounter[Long](s"$prefix.timer.count") + .withDescription("The total number of the timers per state.") + .withUnit("{timer}") + .createObserver, + Meter[F] + .observableCounter[Long](s"$prefix.packed.count") + .withDescription("The total number of times the heap packed itself to remove canceled timers.") + .withUnit("{event}") + .createObserver, + Meter[F] + .observableGauge[Long](s"$prefix.next.due") + .withDescription("Returns the time in nanoseconds till the next due to fire.") + .withUnit("ns") + .createObserver + ) { (outstanding, timerCount, packedCount, nextDue) => + metrics.traverse_ { workerMetrics => + val attributes = Attributes( + Attribute("pool.id", poolId), + Attribute("worker.index", workerMetrics.index.toLong) + ) ++ extraAttributes + + def recordCount(value: Long, state: String): F[Unit] = + timerCount.record( + value, + attributes ++ Attributes(Attribute("timer.state", state)) + ) + + for { + snapshot <- Sync[F].delay( + ( + workerMetrics.timerHeap.timersOutstandingCount(), + workerMetrics.timerHeap.totalTimersExecutedCount(), + workerMetrics.timerHeap.totalTimersScheduledCount(), + workerMetrics.timerHeap.totalTimersCanceledCount(), + workerMetrics.timerHeap.packCount(), + workerMetrics.timerHeap.nextTimerDue(), + ) + ) + _ <- outstanding.record(snapshot._1, attributes) + _ <- recordCount(snapshot._2, "executed") + _ <- recordCount(snapshot._3, "scheduled") + _ <- recordCount(snapshot._4, "canceled") + _ <- packedCount.record(snapshot._5, attributes) + _ <- nextDue.record(snapshot._6.getOrElse(0L), attributes) + } yield () + } + } + } + + private def pollerMetrics[F[_]: Sync: Meter]( + poolId: String, + metrics: List[WorkerThreadMetrics], + extraAttributes: Attributes + ): Resource[F, Unit] = { + val prefix = s"${Const.MeterNamespace}.wstp.worker.poller" + + Meter[F].batchCallback.of( + Meter[F] + .observableUpDownCounter[Long](s"$prefix.operation.outstanding.count") + .withDescription("The current number of outstanding operations per category and outcome.") + .withUnit("{operation}") + .createObserver, + Meter[F] + .observableCounter[Long](s"$prefix.operation.count") + .withDescription("The total number of the operations per category and outcome.") + .withUnit("{operation}") + .createObserver + ) { (operationCount, operationOutstanding) => + metrics.traverse_ { workerMetrics => + val attributes = Attributes( + Attribute("pool.id", poolId), + Attribute("worker.index", workerMetrics.index.toLong) + ) ++ extraAttributes + + val poller = workerMetrics.poller + + def recordOutstanding(value: Long, operation: String): F[Unit] = + operationOutstanding.record( + value, + attributes ++ Attributes(Attribute("poller.operation", operation)) + ) + + def recordTotal(value: Long, operation: String, status: String): F[Unit] = + operationCount.record( + value, + attributes ++ Attributes( + Attribute("poller.operation", operation), + Attribute("poller.operation.status", status) + ) + ) + + Sync[F].defer { + for { + _ <- recordOutstanding(poller.totalAcceptOperationsSubmittedCount(), "accept") + _ <- recordTotal(poller.totalAcceptOperationsSubmittedCount(), "accept", "submitted") + _ <- recordTotal(poller.totalAcceptOperationsSucceededCount(), "accept", "succeeded") + _ <- recordTotal(poller.totalAcceptOperationsErroredCount(), "accept", "errored") + _ <- recordTotal(poller.totalAcceptOperationsCanceledCount(), "accept", "canceled") + + _ <- recordOutstanding(poller.totalConnectOperationsSubmittedCount(), "connect") + _ <- recordTotal(poller.totalConnectOperationsSubmittedCount(), "connect", "submitted") + _ <- recordTotal(poller.totalConnectOperationsSucceededCount(), "connect", "succeeded") + _ <- recordTotal(poller.totalConnectOperationsErroredCount(), "connect", "errored") + _ <- recordTotal(poller.totalConnectOperationsCanceledCount(), "connect", "canceled") + + _ <- recordOutstanding(poller.totalReadOperationsSubmittedCount(), "read") + _ <- recordTotal(poller.totalReadOperationsSubmittedCount(), "read", "submitted") + _ <- recordTotal(poller.totalReadOperationsSucceededCount(), "read", "succeeded") + _ <- recordTotal(poller.totalReadOperationsErroredCount(), "read", "errored") + _ <- recordTotal(poller.totalReadOperationsCanceledCount(), "read", "canceled") + + _ <- recordOutstanding(poller.totalWriteOperationsSubmittedCount(), "write") + _ <- recordTotal(poller.totalWriteOperationsSubmittedCount(), "write", "submitted") + _ <- recordTotal(poller.totalWriteOperationsSucceededCount(), "write", "succeeded") + _ <- recordTotal(poller.totalWriteOperationsErroredCount(), "write", "errored") + _ <- recordTotal(poller.totalWriteOperationsCanceledCount(), "write", "canceled") + } yield () + } + } + } + } + +} diff --git a/instrumentation/metrics/jvm/src/test/scala/org/typelevel/otel4s/instrumentation/ce/IORuntimeMetricsSuite.scala b/instrumentation/metrics/jvm/src/test/scala/org/typelevel/otel4s/instrumentation/ce/IORuntimeMetricsSuite.scala new file mode 100644 index 000000000..2eb2338dd --- /dev/null +++ b/instrumentation/metrics/jvm/src/test/scala/org/typelevel/otel4s/instrumentation/ce/IORuntimeMetricsSuite.scala @@ -0,0 +1,297 @@ +/* + * Copyright 2024 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.instrumentation.ce + +import cats.Show +import cats.effect.IO +import munit.CatsEffectSuite +import munit.ScalaCheckEffectSuite +import org.scalacheck.Arbitrary +import org.scalacheck.Prop +import org.scalacheck.effect.PropF +import org.typelevel.otel4s.Attributes +import org.typelevel.otel4s.metrics.MeterProvider +import org.typelevel.otel4s.scalacheck.Arbitraries._ +import org.typelevel.otel4s.sdk.metrics.data.MetricData +import org.typelevel.otel4s.sdk.testkit.metrics.MetricsTestkit + +class IORuntimeMetricsSuite extends CatsEffectSuite with ScalaCheckEffectSuite { + import IORuntimeMetrics.Config.{CpuStarvationConfig, WorkStealingThreadPoolConfig} + import IORuntimeMetrics.Config.WorkStealingThreadPoolConfig.{ComputeConfig, WorkerThreadsConfig} + import IORuntimeMetrics.Config.WorkStealingThreadPoolConfig.WorkerThreadsConfig._ + + test("register metrics using default config") { + MetricsTestkit.inMemory[IO]().use { testkit => + implicit val meterProvider: MeterProvider[IO] = testkit.meterProvider + + val expected = + cpuStarvationMetrics ++ computeMetrics ++ threadMetrics ++ localQueueMetrics ++ timerHeapMetrics ++ pollerMetrics + + for { + metrics <- IORuntimeMetrics + .register[IO](munitIORuntime.metrics, IORuntimeMetrics.Config.default) + .surround(testkit.collectMetrics) + } yield assertEquals(metrics.map(toMetric).sortBy(_.name), expected.sortBy(_.name)) + } + } + + test("register metrics according to the config") { + PropF.forAllF { (config: IORuntimeMetrics.Config) => + MetricsTestkit.inMemory[IO]().use { testkit => + implicit val meterProvider: MeterProvider[IO] = testkit.meterProvider + + val expected = List( + config.cpuStarvation.enabled -> cpuStarvationMetrics, + config.workStealingThreadPool.compute.enabled -> computeMetrics, + config.workStealingThreadPool.workerThreads.thread.enabled -> threadMetrics, + config.workStealingThreadPool.workerThreads.localQueue.enabled -> localQueueMetrics, + config.workStealingThreadPool.workerThreads.timerHeap.enabled -> timerHeapMetrics, + config.workStealingThreadPool.workerThreads.poller.enabled -> pollerMetrics + ).collect { case (true, metrics) => metrics }.flatten + + for { + metrics <- IORuntimeMetrics + .register[IO](munitIORuntime.metrics, config) + .surround(testkit.collectMetrics) + } yield assertEquals(metrics.map(toMetric).sortBy(_.name), expected.sortBy(_.name)) + } + } + } + + test("Show[IORuntimeMetrics.Config]") { + Prop.forAll { (config: IORuntimeMetrics.Config) => + val cpuStarvation = config.cpuStarvation + val compute = config.workStealingThreadPool.compute + val workerThreads = config.workStealingThreadPool.workerThreads + + val expected = "IORuntimeMetrics.Config{" + + s"cpuStarvation=CpuStarvationConfig{enabled=${cpuStarvation.enabled}, attributes=${cpuStarvation.attributes}}, " + + "workStealingThreadPool=WorkStealingThreadPoolConfig{" + + s"compute=ComputeConfig{enabled=${compute.enabled}, attributes=${compute.attributes}}, " + + "workerThreads=WorkerThreadsConfig{" + + s"thread=ThreadConfig{enabled=${workerThreads.thread.enabled}, attributes=${workerThreads.thread.attributes}}, " + + s"localQueue=LocalQueueConfig{enabled=${workerThreads.localQueue.enabled}, attributes=${workerThreads.localQueue.attributes}}, " + + s"timerHeap=TimerHeapConfig{enabled=${workerThreads.timerHeap.enabled}, attributes=${workerThreads.timerHeap.attributes}}, " + + s"poller=PollerConfig{enabled=${workerThreads.poller.enabled}, attributes=${workerThreads.poller.attributes}}}}" + + "}" + + assertEquals(Show[IORuntimeMetrics.Config].show(config), expected) + assertEquals(config.toString, expected) + } + } + + private case class Metric(name: String, description: Option[String], unit: Option[String]) + + private def toMetric(metric: MetricData): Metric = + Metric(metric.name, metric.description, metric.unit) + + private val cpuStarvationMetrics = List( + Metric( + "cats.effect.runtime.cpu.starvation.count", + Some("The number of CPU starvation events."), + None + ), + Metric( + "cats.effect.runtime.cpu.starvation.clock.drift.current", + Some("The current CPU drift in milliseconds."), + Some("ms") + ), + Metric( + "cats.effect.runtime.cpu.starvation.clock.drift.max", + Some("The max CPU drift in milliseconds."), + Some("ms") + ) + ) + + private val computeMetrics = List( + Metric( + "cats.effect.runtime.wstp.compute.thread.count", + Some("The number of worker thread instances backing the work-stealing thread pool (WSTP)."), + Some("{thread}") + ), + Metric( + "cats.effect.runtime.wstp.compute.thread.active.count", + Some("The number of active worker thread instances currently executing fibers on the compute thread pool."), + Some("{thread}") + ), + Metric( + "cats.effect.runtime.wstp.compute.thread.searching.count", + Some("The number of worker thread instances currently searching for fibers to steal from other worker threads."), + Some("{thread}") + ), + Metric( + "cats.effect.runtime.wstp.compute.thread.blocked.count", + Some("The number of worker thread instances that can run blocking actions on the compute thread pool."), + Some("{thread}") + ), + Metric( + "cats.effect.runtime.wstp.compute.fiber.enqueued.count", + Some("The total number of fibers enqueued on all local queues."), + Some("{fiber}") + ), + Metric( + "cats.effect.runtime.wstp.compute.fiber.suspended.count", + Some("The number of fibers which are currently asynchronously suspended."), + Some("{fiber}") + ) + ) + + private val threadMetrics = List( + Metric( + "cats.effect.runtime.wstp.worker.thread.idle.duration", + Some("The total amount of time in nanoseconds that this WorkerThread has been idle."), + Some("ns") + ), + Metric( + "cats.effect.runtime.wstp.worker.thread.event.count", + Some("The total number of events that happened to this WorkerThread."), + Some("{event}") + ), + ) + + private val localQueueMetrics = List( + Metric( + "cats.effect.runtime.wstp.worker.localqueue.fiber.count", + Some("The total number of fibers enqueued during the lifetime of the local queue."), + Some("{fiber}") + ), + Metric( + "cats.effect.runtime.wstp.worker.localqueue.fiber.enqueued.count", + Some("The current number of enqueued fibers."), + Some("{fiber}") + ), + Metric( + "cats.effect.runtime.wstp.worker.localqueue.fiber.spillover.count", + Some("The total number of fibers spilt over to the external queue."), + Some("{fiber}") + ), + Metric( + "cats.effect.runtime.wstp.worker.localqueue.fiber.steal_attempt.count", + Some("The total number of successful steal attempts by other worker threads."), + Some("{fiber}") + ), + Metric( + "cats.effect.runtime.wstp.worker.localqueue.fiber.stolen.count", + Some("The total number of stolen fibers by other worker threads."), + Some("{fiber}") + ), + ) + + private val timerHeapMetrics = List( + Metric( + "cats.effect.runtime.wstp.worker.timerheap.outstanding.count", + Some("The current number of the outstanding timers, that remain to be executed."), + Some("{timer}") + ), + Metric( + "cats.effect.runtime.wstp.worker.timerheap.timer.count", + Some("The total number of the timers per state."), + Some("{timer}") + ), + Metric( + "cats.effect.runtime.wstp.worker.timerheap.packed.count", + Some("The total number of times the heap packed itself to remove canceled timers."), + Some("{event}") + ), + Metric( + "cats.effect.runtime.wstp.worker.timerheap.next.due", + Some("Returns the time in nanoseconds till the next due to fire."), + Some("ns") + ), + ) + + private val pollerMetrics = List( + Metric( + "cats.effect.runtime.wstp.worker.poller.operation.outstanding.count", + Some("The current number of outstanding operations per category and outcome."), + Some("{operation}") + ), + Metric( + "cats.effect.runtime.wstp.worker.poller.operation.count", + Some("The total number of the operations per category and outcome."), + Some("{operation}") + ) + ) + + private implicit val cpuStarvationConfigArbitrary: Arbitrary[CpuStarvationConfig] = + Arbitrary( + for { + enabled <- Arbitrary.arbitrary[Boolean] + attributes <- Arbitrary.arbitrary[Attributes] + } yield if (enabled) CpuStarvationConfig.enabled(attributes) else CpuStarvationConfig.disabled + ) + + private implicit val computeConfigArbitrary: Arbitrary[ComputeConfig] = + Arbitrary( + for { + enabled <- Arbitrary.arbitrary[Boolean] + attributes <- Arbitrary.arbitrary[Attributes] + } yield if (enabled) ComputeConfig.enabled(attributes) else ComputeConfig.disabled + ) + + private implicit val localQueueConfigArbitrary: Arbitrary[LocalQueueConfig] = + Arbitrary( + for { + enabled <- Arbitrary.arbitrary[Boolean] + attributes <- Arbitrary.arbitrary[Attributes] + } yield if (enabled) LocalQueueConfig.enabled(attributes) else LocalQueueConfig.disabled + ) + + private implicit val threadConfigArbitrary: Arbitrary[ThreadConfig] = + Arbitrary( + for { + enabled <- Arbitrary.arbitrary[Boolean] + attributes <- Arbitrary.arbitrary[Attributes] + } yield if (enabled) ThreadConfig.enabled(attributes) else ThreadConfig.disabled + ) + + private implicit val timerHeapConfigArbitrary: Arbitrary[TimerHeapConfig] = + Arbitrary( + for { + enabled <- Arbitrary.arbitrary[Boolean] + attributes <- Arbitrary.arbitrary[Attributes] + } yield if (enabled) TimerHeapConfig.enabled(attributes) else TimerHeapConfig.disabled + ) + + private implicit val pollerConfigArbitrary: Arbitrary[PollerConfig] = + Arbitrary( + for { + enabled <- Arbitrary.arbitrary[Boolean] + attributes <- Arbitrary.arbitrary[Attributes] + } yield if (enabled) PollerConfig.enabled(attributes) else PollerConfig.disabled + ) + + private implicit val workerThreadsConfigArbitrary: Arbitrary[WorkerThreadsConfig] = + Arbitrary( + for { + thread <- Arbitrary.arbitrary[ThreadConfig] + localQueue <- Arbitrary.arbitrary[LocalQueueConfig] + timerHeap <- Arbitrary.arbitrary[TimerHeapConfig] + poller <- Arbitrary.arbitrary[PollerConfig] + } yield WorkerThreadsConfig(thread, localQueue, timerHeap, poller) + ) + + private implicit val configArbitrary: Arbitrary[IORuntimeMetrics.Config] = + Arbitrary( + for { + cpuStarvation <- Arbitrary.arbitrary[CpuStarvationConfig] + pool <- Arbitrary.arbitrary[ComputeConfig] + workerThreads <- Arbitrary.arbitrary[WorkerThreadsConfig] + } yield IORuntimeMetrics.Config(cpuStarvation, WorkStealingThreadPoolConfig(pool, workerThreads)) + ) + +} diff --git a/instrumentation/metrics/shared/src/main/scala/org/typelevel/otel4s/instrumentation/ce/IORuntimeMetrics.scala b/instrumentation/metrics/shared/src/main/scala/org/typelevel/otel4s/instrumentation/ce/IORuntimeMetrics.scala new file mode 100644 index 000000000..3560a2bc3 --- /dev/null +++ b/instrumentation/metrics/shared/src/main/scala/org/typelevel/otel4s/instrumentation/ce/IORuntimeMetrics.scala @@ -0,0 +1,78 @@ +/* + * Copyright 2024 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.instrumentation.ce + +import cats.effect.Resource +import cats.effect.Sync +import cats.effect.unsafe.metrics.CpuStarvationMetrics +import cats.syntax.flatMap._ +import cats.syntax.functor._ +import org.typelevel.otel4s.Attributes +import org.typelevel.otel4s.metrics.Meter + +object IORuntimeMetrics extends IORuntimeMetricsPlatform { + + protected object Const { + val MeterNamespace = "cats.effect.runtime" + } + + /** Registers the CPU starvation: + * - `cats.effect.runtime.cpu.starvation.count` + * - `cats.effect.runtime.cpu.starvation.clock.drift.current` + * - `cats.effect.runtime.cpu.starvation.clock.drift.max` + * + * @param attributes + * the attributes to attach to the metrics + */ + protected def cpuStarvationMetrics[F[_]: Sync: Meter]( + metrics: CpuStarvationMetrics, + attributes: Attributes + ): Resource[F, Unit] = { + val prefix = s"${Const.MeterNamespace}.cpu.starvation" + + Meter[F].batchCallback.of( + Meter[F] + .observableCounter[Long](s"$prefix.count") + .withDescription("The number of CPU starvation events.") + .createObserver, + Meter[F] + .observableGauge[Long](s"$prefix.clock.drift.current") + .withDescription("The current CPU drift in milliseconds.") + .withUnit("ms") + .createObserver, + Meter[F] + .observableGauge[Long](s"$prefix.clock.drift.max") + .withDescription("The max CPU drift in milliseconds.") + .withUnit("ms") + .createObserver, + ) { (count, driftCurrent, driftMax) => + for { + snapshot <- Sync[F].delay( + ( + metrics.starvationCount(), + metrics.clockDriftCurrent(), + metrics.clockDriftMax() + ) + ) + _ <- count.record(snapshot._1, attributes) + _ <- driftCurrent.record(snapshot._2.toMillis, attributes) + _ <- driftMax.record(snapshot._3.toMillis, attributes) + } yield () + } + } + +} diff --git a/oteljava/trace/src/test/scala/org/typelevel/otel4s/oteljava/trace/TraceScopeImplSuite.scala b/oteljava/trace/src/test/scala/org/typelevel/otel4s/oteljava/trace/TraceScopeImplSuite.scala index c3aa06d73..dfcb0db30 100644 --- a/oteljava/trace/src/test/scala/org/typelevel/otel4s/oteljava/trace/TraceScopeImplSuite.scala +++ b/oteljava/trace/src/test/scala/org/typelevel/otel4s/oteljava/trace/TraceScopeImplSuite.scala @@ -18,7 +18,6 @@ package org.typelevel.otel4s.oteljava.trace import cats.effect.IO import cats.effect.IOLocal -import org.typelevel.otel4s.instances.local._ import org.typelevel.otel4s.oteljava.context.Context import org.typelevel.otel4s.trace.TraceScope import org.typelevel.otel4s.trace.TraceScopeSuite @@ -26,7 +25,7 @@ import org.typelevel.otel4s.trace.TraceScopeSuite class TraceScopeImplSuite extends TraceScopeSuite[Context, Context.Key] { protected def createTraceScope: IO[TraceScope[IO, Context]] = - IOLocal(Context.root).map { implicit ioLocal => + IOLocal(Context.root).map(_.asLocal).map { implicit local => TraceScopeImpl.fromLocal[IO] } diff --git a/sdk/trace/src/test/scala/org/typelevel/otel4s/sdk/trace/SdkSpanBuilderSuite.scala b/sdk/trace/src/test/scala/org/typelevel/otel4s/sdk/trace/SdkSpanBuilderSuite.scala index 5ff4d4e27..960ac3ca3 100644 --- a/sdk/trace/src/test/scala/org/typelevel/otel4s/sdk/trace/SdkSpanBuilderSuite.scala +++ b/sdk/trace/src/test/scala/org/typelevel/otel4s/sdk/trace/SdkSpanBuilderSuite.scala @@ -26,7 +26,6 @@ import org.scalacheck.Gen import org.scalacheck.Test import org.scalacheck.effect.PropF import org.typelevel.otel4s.Attributes -import org.typelevel.otel4s.instances.local._ import org.typelevel.otel4s.sdk.TelemetryResource import org.typelevel.otel4s.sdk.common.InstrumentationScope import org.typelevel.otel4s.sdk.context.Context @@ -146,7 +145,7 @@ class SdkSpanBuilderSuite extends CatsEffectSuite with ScalaCheckEffectSuite { } private def createTraceScope: IO[TraceScope[IO, Context]] = - IOLocal(Context.root).map { implicit ioLocal => + IOLocal(Context.root).map(_.asLocal).map { implicit local => SdkTraceScope.fromLocal[IO] } diff --git a/sdk/trace/src/test/scala/org/typelevel/otel4s/sdk/trace/SdkTraceScopeSuite.scala b/sdk/trace/src/test/scala/org/typelevel/otel4s/sdk/trace/SdkTraceScopeSuite.scala index 7df55e65b..afe70a953 100644 --- a/sdk/trace/src/test/scala/org/typelevel/otel4s/sdk/trace/SdkTraceScopeSuite.scala +++ b/sdk/trace/src/test/scala/org/typelevel/otel4s/sdk/trace/SdkTraceScopeSuite.scala @@ -18,7 +18,6 @@ package org.typelevel.otel4s.sdk.trace import cats.effect.IO import cats.effect.IOLocal -import org.typelevel.otel4s.instances.local._ import org.typelevel.otel4s.sdk.context.Context import org.typelevel.otel4s.trace.TraceScope import org.typelevel.otel4s.trace.TraceScopeSuite @@ -26,7 +25,7 @@ import org.typelevel.otel4s.trace.TraceScopeSuite class SdkTraceScopeSuite extends TraceScopeSuite[Context, Context.Key] { protected def createTraceScope: IO[TraceScope[IO, Context]] = - IOLocal(Context.root).map { implicit ioLocal => + IOLocal(Context.root).map(_.asLocal).map { implicit local => SdkTraceScope.fromLocal[IO] }