Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-437 Improve schedulers #486

Merged
merged 32 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
47496d5
build: update Minecraft/Velocity versions in examples
BlackBaroness Nov 12, 2024
f494a65
improve bukkit scheduler
BlackBaroness Nov 12, 2024
bed5711
improve bungee scheduler
BlackBaroness Nov 12, 2024
346c7be
rework core scheduler and use it for bungee
BlackBaroness Nov 13, 2024
365fc0b
implement Sponge scheduler
BlackBaroness Nov 13, 2024
37b3304
Revert "build: update Minecraft/Velocity versions in examples"
BlackBaroness Nov 13, 2024
12b062c
revert disabling examples (i didnt want to commit that)
BlackBaroness Nov 13, 2024
fb35365
fix compilation
BlackBaroness Nov 13, 2024
cbabb71
fix bugs
BlackBaroness Nov 13, 2024
d1f6bb1
Support Fabric Scheduler (#1)
huanmeng-qwq Nov 13, 2024
248da9d
fix tests
BlackBaroness Nov 13, 2024
2bff111
Merge remote-tracking branch 'origin/improve-schedulers' into improve…
BlackBaroness Nov 13, 2024
9f0b06e
Don't use Throwables
Rollczi Nov 14, 2024
ba7773e
Merge branch 'master' into improve-schedulers
Rollczi Nov 14, 2024
19e64fc
Wrap using the default implementation (#2)
huanmeng-qwq Nov 15, 2024
37f415c
Revert SchedulerExecutorPoolImpl
Rollczi Nov 15, 2024
56503a5
Create SchedulerMainThreadBased to simplify implementations
Rollczi Nov 15, 2024
9847c2b
Make fabric implementation more simpler.
Rollczi Nov 15, 2024
dbd4910
Revert unit tests changes.
Rollczi Nov 15, 2024
8b441d5
Revert unit tests changes.
Rollczi Nov 15, 2024
4d5322a
SpongeScheduler code style changes.
Rollczi Nov 15, 2024
b2d71e8
Implement important notes from BlackBaroness to the old scheduler.
Rollczi Nov 15, 2024
7b9ef28
Rename AbstractMainThreadBasedScheduler. Remove unused scheduler.
Rollczi Nov 16, 2024
ac26608
Fix runAsynchronous
Rollczi Nov 16, 2024
020ed33
Fix runAsynchronous
Rollczi Nov 16, 2024
7a3fe1f
improve comments
BlackBaroness Nov 17, 2024
de18f69
improve comments
BlackBaroness Nov 17, 2024
496e3f8
improve Sponge scheduler
BlackBaroness Nov 17, 2024
6357936
remove empty lines
BlackBaroness Nov 17, 2024
e661340
improve Sponge scheduler
BlackBaroness Nov 17, 2024
a51898d
Improve the language of an error
BlackBaroness Nov 17, 2024
ca5cddf
Update litecommands-sponge/src/dev/rollczi/litecommands/sponge/Sponge…
BlackBaroness Nov 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions buildSrc/src/main/kotlin/Versions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ object Versions {
const val FABRIC_LOADER = "0.16.9"
const val FABRIC_COMMAND_API_V2 = "2.2.37+7feeb7331c"
const val FABRIC_COMMAND_API_V1 = "1.2.56+f71b366f73"
const val FABRIC_LIFECYCLE_EVENTS_V1 = "2.3.12+6c1df36019"

// ChatGPT
const val GSON = "2.11.0"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dev.rollczi.example.fabric.client.command;

import dev.rollczi.litecommands.annotations.argument.Arg;
import dev.rollczi.litecommands.annotations.async.Async;
import dev.rollczi.litecommands.annotations.command.Command;
import dev.rollczi.litecommands.annotations.context.Sender;
import dev.rollczi.litecommands.annotations.execute.Execute;
Expand Down Expand Up @@ -36,4 +37,14 @@ String health(@Arg ClientPlayerEntity player) {
return String.valueOf(player.getHealth());
}

@Execute(name = "thread1")
String thread1() {
return Thread.currentThread().getName();
}

@Execute(name = "thread2")
@Async
String thread2() {
return Thread.currentThread().getName();
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dev.rollczi.example.fabric.server.command;

import dev.rollczi.litecommands.annotations.argument.Arg;
import dev.rollczi.litecommands.annotations.async.Async;
import dev.rollczi.litecommands.annotations.command.Command;
import dev.rollczi.litecommands.annotations.execute.Execute;
import dev.rollczi.litecommands.annotations.join.Join;
Expand All @@ -19,4 +20,15 @@ void sendMessage(@Arg("player") ServerPlayerEntity player, @Join("reason") Strin
Text sendMessage(@Quoted @Arg String message) {
return Text.of("You saied: " + message);
}

@Execute(name = "thread1")
String thread1() {
return Thread.currentThread().getName();
}

@Execute(name = "thread2")
@Async
String thread2() {
return Thread.currentThread().getName();
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package dev.rollczi.litecommands.annotations.async;

import dev.rollczi.litecommands.unit.annotations.LiteTestSpec;
import dev.rollczi.litecommands.annotations.argument.Arg;
import dev.rollczi.litecommands.annotations.command.Command;
import dev.rollczi.litecommands.annotations.context.Context;
Expand All @@ -13,7 +12,7 @@
import dev.rollczi.litecommands.scheduler.SchedulerExecutorPoolImpl;
import dev.rollczi.litecommands.unit.AssertExecute;
import dev.rollczi.litecommands.unit.TestSender;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import dev.rollczi.litecommands.unit.annotations.LiteTestSpec;
import org.junit.jupiter.api.Test;

import java.util.Date;
Expand Down Expand Up @@ -79,7 +78,7 @@ public String testAsyncArgs(@Context Date date, @Arg String first, @Async @Arg S

@Async
@Execute(name = "async-args-and-method")
public String testAsyncArgs2(@Context Date date, @Async @Arg String first, @Arg String second) {
public String testAsyncArgs2(@Context Date date, @Async @Arg String first, @Arg String second) {
return Thread.currentThread().getName() + " args [first=" + first + ", second=" + second + "]";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,4 @@ void testAsyncArgsAndMethod() {
}
}


}
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package dev.rollczi.litecommands.bukkit;

import dev.rollczi.litecommands.scheduler.Scheduler;
import dev.rollczi.litecommands.scheduler.SchedulerPoll;
import dev.rollczi.litecommands.shared.ThrowingSupplier;
import dev.rollczi.litecommands.scheduler.AbstractMainThreadBasedScheduler;
import java.util.logging.Level;
import org.bukkit.Bukkit;
import org.bukkit.plugin.Plugin;
import org.bukkit.scheduler.BukkitScheduler;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;

class BukkitSchedulerImpl implements Scheduler {
class BukkitSchedulerImpl extends AbstractMainThreadBasedScheduler {

private final BukkitScheduler bukkitScheduler;
private final Plugin plugin;
Expand All @@ -20,68 +18,36 @@ class BukkitSchedulerImpl implements Scheduler {
this.plugin = plugin;
}

@Override
public <T> CompletableFuture<T> supplyLater(SchedulerPoll type, Duration delay, ThrowingSupplier<T, Throwable> supplier) {
SchedulerPoll resolved = type.resolve(SchedulerPoll.MAIN, SchedulerPoll.ASYNCHRONOUS);

if (resolved.equals(SchedulerPoll.MAIN)) {
return supplySync(type, supplier, delay);
}

if (resolved.equals(SchedulerPoll.ASYNCHRONOUS)) {
return supplyAsync(type, supplier, delay);
}

throw new IllegalArgumentException("Unknown scheduler poll type: " + type);
}

@Override
public void shutdown() {
}

private <T> CompletableFuture<T> supplySync(SchedulerPoll type, ThrowingSupplier<T, Throwable> supplier, Duration delay) {
CompletableFuture<T> future = new CompletableFuture<>();

@Override
protected void runSynchronous(Runnable task, Duration delay) {
if (Bukkit.isPrimaryThread() && delay.isZero()) {
return tryRun(type, future, supplier);
task.run();
return;
}

if (delay.isZero()) {
bukkitScheduler.runTask(plugin, () -> tryRun(type, future, supplier));
}
else {
bukkitScheduler.runTaskLater(plugin, () -> tryRun(type, future, supplier), toTicks(delay));
bukkitScheduler.runTask(plugin, task);
} else {
Rollczi marked this conversation as resolved.
Show resolved Hide resolved
bukkitScheduler.runTaskLater(plugin, task, toTicks(delay));
}

return future;
}

private <T> CompletableFuture<T> supplyAsync(SchedulerPoll type, ThrowingSupplier<T, Throwable> supplier, Duration delay) {
CompletableFuture<T> future = new CompletableFuture<>();

@Override
protected void runAsynchronous(Runnable task, Duration delay) {
if (delay.isZero()) {
bukkitScheduler.runTaskAsynchronously(plugin, () -> tryRun(type, future, supplier));
bukkitScheduler.runTaskAsynchronously(plugin, task);
} else {
bukkitScheduler.runTaskLaterAsynchronously(plugin, task, toTicks(delay));
}
else {
bukkitScheduler.runTaskLaterAsynchronously(plugin, () -> tryRun(type, future, supplier), toTicks(delay));
}

return future;
}

private <T> CompletableFuture<T> tryRun(SchedulerPoll type, CompletableFuture<T> future, ThrowingSupplier<T, Throwable> supplier) {
try {
future.complete(supplier.get());
}
catch (Throwable throwable) {
future.completeExceptionally(throwable);

if (type.isLogging()) {
throwable.printStackTrace();
}
}

return future;
@Override
protected void log(Throwable throwable) {
this.plugin.getLogger().log(Level.SEVERE, "An error occurred while executing a task", throwable);
}

private long toTicks(Duration duration) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package dev.rollczi.litecommands.scheduler;

import dev.rollczi.litecommands.shared.ThrowingSupplier;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;

public abstract class AbstractMainThreadBasedScheduler implements Scheduler {

@Override
public final <T> CompletableFuture<T> supplyLater(SchedulerPoll type, Duration delay, ThrowingSupplier<T, Throwable> supplier) {
SchedulerPoll resolved = type.resolve(SchedulerPoll.MAIN, SchedulerPoll.ASYNCHRONOUS);
CompletableFuture<T> future = new CompletableFuture<>();

if (resolved.equals(SchedulerPoll.MAIN)) {
runSynchronous(() -> tryRun(type, future, supplier), delay);
return future;
}

if (resolved.equals(SchedulerPoll.ASYNCHRONOUS)) {
runAsynchronous(() -> tryRun(type, future, supplier), delay);
return future;
}

throw new IllegalArgumentException("Unknown scheduler poll type: " + type);
}

abstract protected void runSynchronous(Runnable task, Duration delay);

protected abstract void runAsynchronous(Runnable task, Duration delay);

private <T> void tryRun(SchedulerPoll type, CompletableFuture<T> future, ThrowingSupplier<T, Throwable> supplier) {
try {
future.complete(supplier.get());
}
catch (Throwable throwable) {
future.completeExceptionally(throwable);

if (type.isLogging()) {
this.log(throwable);
}
}
}

protected void log(Throwable throwable) {}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,45 +6,34 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class SchedulerExecutorPoolImpl implements Scheduler {

public static final int CACHED_POOL = -1;
private static final int MAIN_POOL_SIZE = 1;

private static final String MAIN_THREAD_NAME_FORMAT = "scheduler-%s-main";
private static final String ASYNC_THREAD_NAME_FORMAT = "scheduler-%s-async-%d";

private final ThreadLocal<Boolean> isMainThread = ThreadLocal.withInitial(() -> false);
protected final ThreadLocal<Boolean> isMainThread = ThreadLocal.withInitial(() -> false);

private final ExecutorService mainExecutor;
private final ExecutorService asyncExecutor;
private final ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());

public SchedulerExecutorPoolImpl(String name) {
this(name, -1);
this(name, CACHED_POOL);
}

public SchedulerExecutorPoolImpl(String name, int pool) {
this.mainExecutor = Executors.newSingleThreadScheduledExecutor(runnable -> {
Thread thread = new Thread(runnable);
thread.setName(String.format(MAIN_THREAD_NAME_FORMAT, name));

return thread;
});

this.mainExecutor.submit(() -> isMainThread.set(true));

AtomicInteger asyncCount = new AtomicInteger();
ThreadFactory factory = runnable -> {
Thread thread = new Thread(runnable);
thread.setName(String.format(ASYNC_THREAD_NAME_FORMAT, name, asyncCount.getAndIncrement()));

return thread;
};

this.asyncExecutor = pool < 0 ? Executors.newCachedThreadPool(factory) : Executors.newFixedThreadPool(pool, factory);
this.mainExecutor = createMainExecutor(name);
this.asyncExecutor = createAsyncExecutor(name, pool);
}

@Override
Expand Down Expand Up @@ -88,4 +77,54 @@ public void shutdown() {
isMainThread.remove();
}

/**
* Create the main executor.
* In some cases, the main executor might never be used. We don't want to have to create a useless thread
*
* This may look like a premature optimization, but a typical server has 40+ plugins
* As the framework spreads, more and more of them can use LiteCommands,
* so we better handle this situation, there are practically no losses anyway
*
* To improve performance executor set {@link SchedulerExecutorPoolImpl#isMainThread} to true.
*
* @author BlackBaroness
* @return the main executor. (Should have one thread)
*/
protected ExecutorService createMainExecutor(String name) {
ThreadFactory factory = runnable -> new Thread(() -> {
isMainThread.set(true);
runnable.run();
}, String.format(MAIN_THREAD_NAME_FORMAT, name));

ThreadPoolExecutor mainExecutor = new ThreadPoolExecutor(MAIN_POOL_SIZE, MAIN_POOL_SIZE,
1, TimeUnit.HOURS,
new LinkedBlockingQueue<>(),
factory
);

mainExecutor.allowCoreThreadTimeOut(true);
return mainExecutor;
}

/**
* Create async executor.
*
* @author BlackBaroness
* @return async executor.
*/
protected ExecutorService createAsyncExecutor(String name, int pool) {
AtomicInteger asyncThreadCount = new AtomicInteger();
ThreadFactory factory = runnable -> new Thread(runnable, String.format(ASYNC_THREAD_NAME_FORMAT, name, asyncThreadCount.getAndIncrement()));

if (pool < 0) {
return Executors.newCachedThreadPool(factory);
}

return new ThreadPoolExecutor(pool, pool,
3, TimeUnit.MINUTES,
new LinkedBlockingQueue<>(),
factory
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,4 @@
public interface ThrowingRunnable<E extends Throwable> {

void run() throws E;

Rollczi marked this conversation as resolved.
Show resolved Hide resolved
}
1 change: 1 addition & 0 deletions litecommands-fabric/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ dependencies {
modImplementation("net.fabricmc:fabric-loader:${Versions.FABRIC_LOADER}")
modImplementation("net.fabricmc.fabric-api:fabric-command-api-v2:${Versions.FABRIC_COMMAND_API_V2}")
modImplementation("net.fabricmc.fabric-api:fabric-command-api-v1:${Versions.FABRIC_COMMAND_API_V1}")
modImplementation("net.fabricmc.fabric-api:fabric-lifecycle-events-v1:${Versions.FABRIC_LIFECYCLE_EVENTS_V1}")
}

litecommandsPublish {
Expand Down
Loading