Skip to content

Commit

Permalink
🐳 #350 修复请求消息在 Broker 环节乱序的问题
Browse files Browse the repository at this point in the history
  • Loading branch information
iohao committed Aug 10, 2024
1 parent 5f6ba93 commit 0cc8181
Showing 1 changed file with 18 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,30 @@ public Executor getExecutor(UserProcessorExecutorAware userProcessorExecutorAwar
}

String userProcessorName = userProcessorExecutorAware.getClass().getSimpleName();
if ("RequestMessageClientProcessor".equals(userProcessorName)) {
// 单独一个池
return ofExecutor("RequestMessage");
}

return ofExecutor("common");
return switch (userProcessorName) {
// RequestMessage 相关的单独一个池,使用单线程传递请求消息。
case "RequestMessageClientProcessor" -> ofExecutorRequestMessage();
case "RequestMessageBrokerProcessor" -> ofExecutorRequestMessage();
// 其他 UserProcessor 使用 common 多线程消费任务
default -> ofExecutorCommon();
};
}

private Executor ofExecutorRequestMessage() {
// 使用单线程传递请求消息
return ofExecutorCommon("RequestMessage", 1);
}

Executor ofExecutor(String name) {
private Executor ofExecutorCommon() {
int corePoolSize = RuntimeKit.availableProcessors;
return ofExecutorCommon("common", corePoolSize);
}

private Executor ofExecutorCommon(String name, int corePoolSize) {
Executor executor = this.executorMap.get(name);

if (Objects.isNull(executor)) {
int corePoolSize = RuntimeKit.availableProcessors;
var tempExecutor = createExecutor(name, corePoolSize, corePoolSize);
executor = MoreKit.firstNonNull(this.executorMap.putIfAbsent(name, tempExecutor), tempExecutor);

Expand All @@ -84,7 +94,7 @@ Executor ofExecutor(String name) {
return executor;
}

Executor createExecutor(String userProcessorName, int corePoolSize, int maximumPoolSize) {
private Executor createExecutor(String userProcessorName, int corePoolSize, int maximumPoolSize) {

/*
* 下面对于 UserProcessor 提供了一些默认的 Executor 配置,
Expand Down

0 comments on commit 0cc8181

Please sign in to comment.