Revert "McpExecutor: Run steps in parallel (#295)"

This reverts commit ea2490b84b.

The parallel execution seems to be incompatible with Java toolchains
as the toolchain detector code's event emitter doesn't work on custom threads.
This commit is contained in:
Juuz
2025-08-27 22:48:21 +03:00
parent ea2490b84b
commit 58d5476ea4
4 changed files with 23 additions and 76 deletions

View File

@@ -29,7 +29,6 @@ import static net.fabricmc.loom.configuration.providers.forge.ConfigValue.PREVIO
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -80,9 +79,8 @@ public final class DependencySet {
this.ignoreDependenciesFilter = ignoreDependenciesFilter;
}
public BuiltDependencies buildExecutionSet() {
public SortedSet<String> buildExecutionSet() {
SortedSet<String> steps = new TreeSet<>(Comparator.comparingInt(stepNames::indexOf));
Map<String, Set<String>> dependenciesByStep = new HashMap<>();
Queue<String> queue = new ArrayDeque<>(this.steps);
while (!queue.isEmpty()) {
@@ -99,16 +97,12 @@ public final class DependencySet {
if (name.endsWith(PREVIOUS_OUTPUT_SUFFIX) && name.length() > PREVIOUS_OUTPUT_SUFFIX.length()) {
String substep = name.substring(0, name.length() - PREVIOUS_OUTPUT_SUFFIX.length());
queue.offer(substep);
dependenciesByStep.computeIfAbsent(step, x -> new HashSet<>()).add(substep);
}
}
});
}
}
return new BuiltDependencies(steps, dependenciesByStep);
}
public record BuiltDependencies(SortedSet<String> stepsToExecute, Map<String, Set<String>> dependenciesByStep) {
return steps;
}
}

View File

@@ -25,20 +25,12 @@
package net.fabricmc.loom.configuration.providers.forge.mcpconfig;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.LinkedHashMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SequencedMap;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.base.Stopwatch;
import dev.architectury.loom.forge.tool.ForgeToolExecutor;
@@ -76,11 +68,8 @@ public final class McpExecutor extends Service<McpExecutor.Options> {
private static final Logger LOGGER = Logging.getLogger(McpExecutor.class);
private static final LogLevel STEP_LOG_LEVEL = LogLevel.LIFECYCLE;
private final Path cache;
// The initial config set before executing
private final Map<String, String> config;
private final ConcurrentMap<String, String> outputsByStep = new ConcurrentHashMap<>();
private final Map<String, String> extraConfig = new HashMap<>();
public interface Options extends Service.Options {
// Steps
@@ -97,12 +86,6 @@ public final class McpExecutor extends Service<McpExecutor.Options> {
@Input
ListProperty<McpConfigStep> getStepsToExecute();
/**
* Each step's dependencies.
*/
@Input
MapProperty<String, Set<String>> getDependenciesByStep();
// Config data
/**
@@ -134,7 +117,7 @@ public final class McpExecutor extends Service<McpExecutor.Options> {
public McpExecutor(Options options, ServiceFactory serviceFactory) {
super(options, serviceFactory);
this.config = Map.copyOf(options.getInitialConfig().get());
this.config = new HashMap<>(options.getInitialConfig().get());
this.cache = options.getCache().get().getAsFile().toPath();
}
@@ -162,10 +145,8 @@ public final class McpExecutor extends Service<McpExecutor.Options> {
if (config.containsKey(name)) {
return config.get(name);
} else if (name.equals(ConfigValue.OUTPUT)) {
return outputsByStep.get(step.name());
} else if (name.endsWith(ConfigValue.PREVIOUS_OUTPUT_SUFFIX)) {
return outputsByStep.get(name.substring(0, name.length() - ConfigValue.PREVIOUS_OUTPUT_SUFFIX.length()));
} else if (extraConfig.containsKey(name)) {
return extraConfig.get(name);
} else if (name.equals(ConfigValue.LOG)) {
return cache.resolve("log.log").toAbsolutePath().toString();
}
@@ -182,48 +163,21 @@ public final class McpExecutor extends Service<McpExecutor.Options> {
public Path execute() throws IOException {
List<McpConfigStep> steps = getOptions().getStepsToExecute().get();
int totalSteps = steps.size();
int currentStepIndex = 0;
LOGGER.log(STEP_LOG_LEVEL, ":executing {} MCP steps", totalSteps);
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
AtomicInteger currentStepIndex = new AtomicInteger(1); // used for progress counter
SequencedMap<String, CompletableFuture<?>> stepFutures = new LinkedHashMap<>();
for (McpConfigStep currentStep : steps) {
currentStepIndex++;
StepLogic<?> stepLogic = getStepLogic(currentStep.name());
LOGGER.log(STEP_LOG_LEVEL, ":step {}/{} - {}", currentStepIndex, totalSteps, stepLogic.getDisplayName(currentStep.name()));
for (McpConfigStep currentStep : steps) {
StepLogic<?> stepLogic = getStepLogic(currentStep.name());
// Resolve all futures that need to complete before this one.
Set<String> dependencyNames = getOptions().getDependenciesByStep()
.getting(currentStep.name())
.getOrElse(Set.of());
CompletableFuture<?>[] dependencies = dependencyNames.stream()
.map(stepFutures::get)
.toArray(CompletableFuture[]::new);
// Create this step's future and store it.
CompletableFuture<?> future = CompletableFuture.allOf(dependencies)
.thenRunAsync(() -> {
try {
int index = currentStepIndex.getAndIncrement();
String displayName = stepLogic.getDisplayName(currentStep.name());
LOGGER.log(STEP_LOG_LEVEL, ":step {}/{} - {}", index, totalSteps, displayName);
Stopwatch stopwatch = Stopwatch.createStarted();
stepLogic.execute(new ExecutionContextImpl(currentStep));
LOGGER.log(STEP_LOG_LEVEL, ":{} done in {}", currentStep.name(), stopwatch.stop());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}, executor);
stepFutures.put(currentStep.name(), future);
}
// Wait for all the futures to complete. Closing the executor isn't enough
// since the unstarted ones haven't even reached the executor yet.
stepFutures.sequencedValues().reversed().forEach(CompletableFuture::join);
Stopwatch stopwatch = Stopwatch.createStarted();
stepLogic.execute(new ExecutionContextImpl(currentStep));
LOGGER.log(STEP_LOG_LEVEL, ":{} done in {}", currentStep.name(), stopwatch.stop());
}
return Path.of(outputsByStep.get(steps.getLast().name()));
return Path.of(extraConfig.get(ConfigValue.OUTPUT));
}
private StepLogic<?> getStepLogic(String name) {
@@ -251,7 +205,8 @@ public final class McpExecutor extends Service<McpExecutor.Options> {
@Override
public Path setOutput(Path output) {
String absolutePath = output.toAbsolutePath().toString();
outputsByStep.put(step.name(), absolutePath);
extraConfig.put(ConfigValue.OUTPUT, absolutePath);
extraConfig.put(step.name() + ConfigValue.PREVIOUS_OUTPUT_SUFFIX, absolutePath);
return output;
}

View File

@@ -177,8 +177,7 @@ public final class McpExecutorBuilder {
* @return the options
*/
public Provider<McpExecutor.Options> build() throws IOException {
DependencySet.BuiltDependencies builtDependencies = dependencySet.buildExecutionSet();
SortedSet<String> stepNames = builtDependencies.stepsToExecute();
SortedSet<String> stepNames = dependencySet.buildExecutionSet();
dependencySet.clear();
List<McpConfigStep> toExecute = new ArrayList<>();
@@ -196,7 +195,6 @@ public final class McpExecutorBuilder {
}
options.getStepsToExecute().set(toExecute);
options.getDependenciesByStep().set(builtDependencies.dependenciesByStep());
options.getMappings().set(extension.getMcpConfigProvider().getMappings().toFile());
options.getInitialConfig().set(config);
options.getOffline().set(project.getGradle().getStartParameter().isOffline());

View File

@@ -57,7 +57,7 @@ class DependencySetTest extends Specification {
def "single child"() {
when:
dependencySet.add('childAB')
def executedSteps = dependencySet.buildExecutionSet().stepsToExecute()
def executedSteps = dependencySet.buildExecutionSet()
then:
executedSteps.toList() == [
'root',
@@ -72,7 +72,7 @@ class DependencySetTest extends Specification {
when:
dependencySet.add('childA1')
dependencySet.add('orphanB')
def executedSteps = dependencySet.buildExecutionSet().stepsToExecute()
def executedSteps = dependencySet.buildExecutionSet()
then:
executedSteps.toList() == ['orphanB', 'root', 'childA1']
}
@@ -81,7 +81,7 @@ class DependencySetTest extends Specification {
when:
dependencySet.add('childAB')
dependencySet.skip('childA2')
def executedSteps = dependencySet.buildExecutionSet().stepsToExecute()
def executedSteps = dependencySet.buildExecutionSet()
then:
executedSteps.toList() == ['root', 'childB', 'childAB']
}
@@ -90,7 +90,7 @@ class DependencySetTest extends Specification {
when:
dependencySet.add('childAB')
dependencySet.ignoreDependenciesFilter = { it.name() == 'childA2' }
def executedSteps = dependencySet.buildExecutionSet().stepsToExecute()
def executedSteps = dependencySet.buildExecutionSet()
then:
executedSteps.toList() == [
'root',