Skip to content

Commit

Permalink
Update Scala, SBT, remove context applied
Browse files Browse the repository at this point in the history
  • Loading branch information
psisoyev committed Aug 6, 2024
1 parent 0d768e1 commit e8bd7ff
Show file tree
Hide file tree
Showing 14 changed files with 42 additions and 38 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ out/
.bloop
result
.bsp
.idea
2 changes: 0 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ lazy val `neutron-core` = (project in file("core"))
.settings(
libraryDependencies ++= List(
CompilerPlugins.betterMonadicFor,
CompilerPlugins.contextApplied,
CompilerPlugins.kindProjector,
Libraries.cats,
Libraries.catsEffect,
Expand Down Expand Up @@ -57,7 +56,6 @@ lazy val tests = (project in file("tests"))
Defaults.itSettings,
libraryDependencies ++= List(
CompilerPlugins.betterMonadicFor,
CompilerPlugins.contextApplied,
CompilerPlugins.kindProjector,
Libraries.avro4s % "it,test",
Libraries.circeCore % "it,test",
Expand Down
18 changes: 9 additions & 9 deletions core/src/main/scala/cr/pulsar/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ object Consumer {
opts: Options[F, E]
): Resource[F, Consumer[F, E]] = {

val acquire = F.futureLift {
val c = client.newConsumer(E.schema)
val acquire = FutureLift[F].lift {
val c = client.newConsumer(Schema[E].schema)
val z = topic match {
case s: Topic.Single => c.topic(s.url.value)
case m: Topic.Multi => c.topicsPattern(m.url.value.r.pattern)
Expand All @@ -93,16 +93,16 @@ object Consumer {
}

def release(c: JConsumer[E]): F[Unit] =
F.futureLift(c.unsubscribeAsync()).attempt.unlessA(opts.manualUnsubscribe) >>
F.futureLift(c.closeAsync()).void
FutureLift[F].lift(c.unsubscribeAsync()).attempt.unlessA(opts.manualUnsubscribe) >>
FutureLift[F].lift(c.closeAsync()).void

Resource
.make(acquire)(release)
.map { c =>
new Consumer[F, E] {
private def subscribeInternal(autoAck: Boolean): Stream[F, Message[E]] =
Stream.repeatEval {
F.futureLift(c.receiveAsync()).flatMap { m =>
FutureLift[F].lift(c.receiveAsync()).flatMap { m =>
val e = m.getValue()

opts.logger.log(Topic.URL(m.getTopicName), e) >>
Expand All @@ -113,10 +113,10 @@ object Consumer {

}

override def ack(id: MessageId): F[Unit] = F.delay(c.acknowledge(id))
override def nack(id: MessageId): F[Unit] = F.delay(c.negativeAcknowledge(id))
override def ack(id: MessageId): F[Unit] = Sync[F].delay(c.acknowledge(id))
override def nack(id: MessageId): F[Unit] = Sync[F].delay(c.negativeAcknowledge(id))
override def unsubscribe: F[Unit] =
F.futureLift(c.unsubscribeAsync()).void
FutureLift[F].lift(c.unsubscribeAsync()).void
override def subscribe: Stream[F, Message[E]] =
subscribeInternal(autoAck = false)
override def autoSubscribe: Stream[F, E] =
Expand All @@ -125,7 +125,7 @@ object Consumer {
override def process[T](processor: E => F[T]): Stream[F, T] =
Stream
.repeatEval {
F.futureLift(c.receiveAsync()).flatMap { m =>
FutureLift[F].lift(c.receiveAsync()).flatMap { m =>
opts.logger.log(Topic.URL(m.getTopicName), m.getValue()).as(m)
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/cr/pulsar/Logger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ trait Logger[F[_], E] {

object Logger {
def noop[F[_]: Applicative, E]: Logger[F, E] = new Logger[F, E] {
override def log(topic: Topic.URL, e: E): F[Unit] = F.unit
override def log(topic: Topic.URL, e: E): F[Unit] = Applicative[F].unit
}
}
8 changes: 4 additions & 4 deletions core/src/main/scala/cr/pulsar/Producer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,13 @@ object Producer {

Resource
.make {
F.delay(
Sync[F].delay(
configureBatching(
_opts.batching,
client.newProducer(E.schema).topic(topic.url.value)
client.newProducer(Schema[E].schema).topic(topic.url.value)
).create
)
}(p => F.futureLift(p.closeAsync()).void)
}(p => FutureLift[F].lift(p.closeAsync()).void)
.map { prod =>
new Producer[F, E] {
private def buildMessage(
Expand All @@ -121,7 +121,7 @@ object Producer {
}

private def _send(msg: TypedMessageBuilder[E]): F[MessageId] =
F.futureLift(msg.sendAsync())
FutureLift[F].lift(msg.sendAsync())

override def send(msg: E, key: MessageKey): F[MessageId] =
buildMessage(msg, key, None) >>= _send
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/cr/pulsar/Pulsar.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ object Pulsar {
opts: Options = Options()
): Resource[F, Underlying] =
Resource.fromAutoCloseable(
F.delay(
Sync[F].delay(
JavaPulsar.builder
.serviceUrl(url.value)
.connectionTimeout(
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/cr/pulsar/Reader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,23 +63,23 @@ object Reader {
): Resource[F, JReader[E]] =
Resource
.make {
F.delay {
Sync[F].delay {
client
.newReader(E.schema)
.newReader(Schema[E].schema)
.topic(topic.url.value)
.startMessageId(opts.startMessageId)
.startMessageIdInclusive()
.readCompacted(opts.readCompacted)
.create()
}
}(c => F.futureLift(c.closeAsync()).void)
}(c => FutureLift[F].lift(c.closeAsync()).void)

private def mkMessageReader[
F[_]: Sync: FutureLift,
E: Schema
](c: JReader[E]): MessageReader[F, E] =
new MessageReader[F, E] {
private def readMsg: F[Message[E]] = F.delay {
private def readMsg: F[Message[E]] = Sync[F].delay {
val m = c.readNext()
Message(m.getMessageId, MessageKey(m.getKey), m.getValue)
}
Expand All @@ -88,13 +88,13 @@ object Reader {
Stream.repeatEval(readMsg)

override def readUntil(timeout: FiniteDuration): F[Option[Message[E]]] =
F.delay(c.readNext(timeout.length.toInt, timeout.unit)).map {
Sync[F].delay(c.readNext(timeout.length.toInt, timeout.unit)).map {
Option(_).map { m =>
Message(m.getMessageId, MessageKey(m.getKey), m.getValue)
}
}

override def messageAvailable: F[MessageAvailable] = F.delay {
override def messageAvailable: F[MessageAvailable] = Sync[F].delay {
if (c.hasMessageAvailable) MessageAvailable.Yes else MessageAvailable.No
}
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/cr/pulsar/internal/FutureLift.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ import cats.effect._
import java.util.concurrent._

private[pulsar] trait FutureLift[F[_]] {
def futureLift[A](fa: => CompletableFuture[A]): F[A]
def lift[A](fa: => CompletableFuture[A]): F[A]
}

private[pulsar] object FutureLift {
implicit def instance[F[_]: Async]: FutureLift[F] = new FutureLift[F] {
override def futureLift[A](fa: => CompletableFuture[A]): F[A] =
F.fromCompletableFuture(F.delay(fa))
override def lift[A](fa: => CompletableFuture[A]): F[A] =
Async[F].fromCompletableFuture(Sync[F].delay(fa))
}

def apply[F[_]](implicit fl: FutureLift[F]): FutureLift[F] = fl
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/cr/pulsar/schema/Schema.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ object Schema {
new Schema[E] {
def schema: JSchema[E] =
new JSchema[E] {
override def encode(message: E): Array[Byte] = E.inj(message)
override def encode(message: E): Array[Byte] = Inject[E, Array[Byte]].inj(message)
override def decode(bytes: Array[Byte]): E =
E.prj(bytes)
Inject[E, Array[Byte]]
.prj(bytes)
.getOrElse(throw new DecodingFailure(s"Could not decode bytes: $bytes"))
override def getSchemaInfo(): SchemaInfo = JSchema.BYTES.getSchemaInfo()
override def clone(): JSchema[E] = this
}
}
}
2 changes: 2 additions & 0 deletions core/src/main/scala/cr/pulsar/schema/utf8.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ object utf8 {
.catchNonFatal(new String(bytes, UTF_8))
.getOrElse(throw new DecodingFailure(s"Could not decode bytes: $bytes"))
override def getSchemaInfo(): SchemaInfo = JSchema.BYTES.getSchemaInfo()

override def clone(): JSchema[String] = this
}
}
}
11 changes: 8 additions & 3 deletions function/src/test/scala/cr/pulsar/FunctionInput.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,20 @@ package cr.pulsar

import cr.pulsar.JavaConversions._
import org.apache.pulsar.client.admin.PulsarAdmin
import org.apache.pulsar.client.api.{ConsumerBuilder, Schema, TypedMessageBuilder}
import org.apache.pulsar.client.api.{ ConsumerBuilder, Schema, TypedMessageBuilder }
import org.apache.pulsar.functions.api.utils.FunctionRecord.FunctionRecordBuilder
import org.apache.pulsar.functions.api.{StateStore, Context => JavaContext, Record => JavaRecord, WindowContext => JavaWindowContext}
import org.apache.pulsar.functions.api.{
StateStore,
Context => JavaContext,
Record => JavaRecord,
WindowContext => JavaWindowContext
}
import org.slf4j.Logger

import java.nio.ByteBuffer
import java.util.Optional
import java.util.concurrent.CompletableFuture
import java.{lang, util}
import java.{ lang, util }

object FunctionInput {
def emptyWindowCtx: JavaWindowContext = new JavaWindowContext {
Expand Down
6 changes: 1 addition & 5 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@ object Dependencies {
val circe = "0.14.9"

val newtype = "0.4.4"
val pulsar = "3.1.1"
val pulsar = "3.3.1"
val weaver = "0.8.4"

val betterMonadicFor = "0.3.1"
val contextApplied = "0.1.4"
val kindProjector = "0.13.3"
val macroParadise = "2.1.1"
}
Expand Down Expand Up @@ -48,9 +47,6 @@ object Dependencies {
val betterMonadicFor = compilerPlugin(
"com.olegpy" %% "better-monadic-for" % V.betterMonadicFor
)
val contextApplied = compilerPlugin(
"org.augustjune" %% "context-applied" % V.contextApplied
)
val kindProjector = compilerPlugin(
"org.typelevel" %% "kind-projector" % V.kindProjector cross CrossVersion.full
)
Expand Down
2 changes: 1 addition & 1 deletion project/Settings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import de.heikoseeberger.sbtheader.HeaderPlugin.autoImport._
import Dependencies.CompilerPlugins

object Settings {
val supportedScala = "2.13.5"
val supportedScala = "2.13.14"

val commonSettings = Seq(
scalacOptions ++= compilerFlags(scalaVersion.value),
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.9.9
sbt.version=1.10.1

0 comments on commit e8bd7ff

Please sign in to comment.