Lokales Partitioning skaliert innerhalb einer JVM. Remote Partitioning geht weiter: Manager und Worker laufen in getrennten Prozessen oder Hosts und kommunizieren über Messaging. Das ist ideal, wenn ein einzelner Host nicht mehr reicht oder du Jobs horizontal skalieren willst. In diesem Teil nutzen wir JMS als Transport und bauen eine minimal funktionierende Manager/ Worker-Konfiguration ohne Spring Boot.
Zielbild
Nach diesem Kapitel kannst du Remote Partitioning mit JMS aufsetzen, die Rollen von Manager und Worker sauber trennen und weißt, welche Betriebspunkte (Timeouts, Redelivery, Idempotenz) entscheidend sind.
Kernkonzepte
- Manager-Step: partitioniert die Arbeit und verschickt Requests.
- Worker-Step: verarbeitet eine Partition und sendet ein Ergebnis zurück.
- Partitioner: erzeugt deterministische ExecutionContexts pro Partition.
- Request/Reply-Channels: MessageChannels für Requests und Replies.
- JMS Queues: Transportebene, entkoppelt Manager und Worker.
Artefakt: Manager/Worker-Architektur-Skizze
Manager JVM JMS Worker JVMs
----------- --- -----------
Partitioner -> requests channel -> [requests queue] -> inbound -> worker step
replies channel <- [replies queue] <- outbound <- results
Abhängigkeiten (minimal)
Für Remote Partitioning brauchst du zusätzlich zu Spring Batch:
spring-batch-integration, spring-integration-jms und einen JMS-Provider
wie ActiveMQ Artemis.
Praxisblock: Minimal-Konfiguration (Manager)
Ausschnitt: Java Config. Die Batch-Infrastruktur aus Teil 3 bleibt bestehen.
import jakarta.jms.ConnectionFactory;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.partition.Partitioner;
import org.springframework.batch.integration.config.annotation.EnableBatchIntegration;
import org.springframework.batch.integration.partition.RemotePartitioningManagerStepBuilderFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.jms.dsl.Jms;
import org.springframework.messaging.MessageChannel;
@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
public class RemotePartitioningManagerConfig {
private final RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory;
public RemotePartitioningManagerConfig(
RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory) {
this.managerStepBuilderFactory = managerStepBuilderFactory;
}
@Bean
public Step managerStep(Partitioner partitioner) {
return this.managerStepBuilderFactory
.get("import.manager")
.partitioner("import.worker", partitioner)
.gridSize(6)
.outputChannel(requests())
.inputChannel(replies())
.build();
}
@Bean
public MessageChannel requests() {
return new DirectChannel();
}
@Bean
public MessageChannel replies() {
return new QueueChannel();
}
@Bean
public IntegrationFlow outboundRequests(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(requests())
.handle(Jms.outboundAdapter(connectionFactory)
.destination("sb.partition.requests"))
.get();
}
@Bean
public IntegrationFlow inboundReplies(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(Jms.messageDrivenChannelAdapter(connectionFactory)
.destination("sb.partition.replies"))
.channel(replies())
.get();
}
}
Wichtig:
import.worker ist der Step-Name, den die Worker-Seite bereitstellt.
gridSize begrenzt die parallelen Partitionen, nicht die Partition-Anzahl.
Die Reply-Channel-Queue puffert Antworten, falls der Manager gerade nicht
aktiv liest.
Praxisblock: Minimal-Konfiguration (Worker)
Ausschnitt: Java Config für einen Worker-Knoten.
import jakarta.jms.ConnectionFactory;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.integration.config.annotation.EnableBatchIntegration;
import org.springframework.batch.integration.partition.RemotePartitioningWorkerStepBuilderFactory;
import org.springframework.batch.infrastructure.item.ItemProcessor;
import org.springframework.batch.infrastructure.item.ItemReader;
import org.springframework.batch.infrastructure.item.ItemWriter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.jms.dsl.Jms;
import org.springframework.messaging.MessageChannel;
@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
public class RemotePartitioningWorkerConfig {
private final RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;
public RemotePartitioningWorkerConfig(
RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory) {
this.workerStepBuilderFactory = workerStepBuilderFactory;
}
@Bean
public Step workerStep(ItemReader<RawCustomerRow> reader,
ItemProcessor<RawCustomerRow, Customer> processor,
ItemWriter<Customer> writer) {
return this.workerStepBuilderFactory
.get("import.worker")
.inputChannel(incomingRequests())
.outputChannel(outgoingReplies())
.<RawCustomerRow, Customer>chunk(200)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
@Bean
public MessageChannel incomingRequests() {
return new DirectChannel();
}
@Bean
public MessageChannel outgoingReplies() {
return new DirectChannel();
}
@Bean
public IntegrationFlow inboundRequests(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(Jms.messageDrivenChannelAdapter(connectionFactory)
.destination("sb.partition.requests"))
.channel(incomingRequests())
.get();
}
@Bean
public IntegrationFlow outboundReplies(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(outgoingReplies())
.handle(Jms.outboundAdapter(connectionFactory)
.destination("sb.partition.replies"))
.get();
}
}
Der Worker-Step ist ein ganz normaler Chunk-Step. Der Unterschied: Er wird
remote ausgelöst und liefert seine StepExecution über JMS zurück.
Partitioner: deterministisch und serialisierbar
Remote Partitioning transportiert ExecutionContexts über JMS. Darin dürfen nur kleine, serialisierbare Werte stehen (Strings, Zahlen, keine komplexen Objekte). Determinismus ist Pflicht, sonst passen Restart und Re-Runs nicht.
Beispiel: File-Splitting über mehrere CSV-Dateien:
import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.Map;
import org.springframework.batch.core.partition.Partitioner;
import org.springframework.batch.infrastructure.item.ExecutionContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.ResourcePatternResolver;
@Configuration
public class FilePartitionerConfig {
@Bean
public Partitioner filePartitioner(ResourcePatternResolver resolver) throws IOException {
Resource[] files = resolver.getResources("file:input/customers_*.csv");
Arrays.sort(files, Comparator.comparing(Resource::getFilename));
return gridSize -> {
Map<String, ExecutionContext> partitions = new LinkedHashMap<>();
int index = 0;
for (Resource file : files) {
ExecutionContext context = new ExecutionContext();
context.putString("file", toPath(file));
partitions.put("partition-" + index, context);
index++;
}
return partitions;
};
}
private String toPath(Resource file) {
try {
return file.getFile().getAbsolutePath();
} catch (IOException ex) {
throw new IllegalStateException("Cannot resolve file path", ex);
}
}
}
Betriebspunkte, die du vorher entscheiden musst
- Redelivery/DLQ: Was passiert bei Poison Messages? Lege Dead-Letter-Queues an.
- Timeouts: Der Manager darf nicht unendlich auf Replies warten.
- Idempotenz: Worker müssen Replays und Duplikate verkraften.
- Skalierung: JMS-Listener-Parallelität und DB-Connection-Pool müssen passen.
- QoS: Für Produktionsdaten sind persistente JMS-Nachrichten Pflicht.
Typische Fehlerbilder
Typische Fehlerbilder sind: Der Manager wartet ewig, weil Replies nicht zurückkommen (Worker down, falsche Queue, Timeout fehlt). Doppelte Writes entstehen durch Redelivery plus nicht-idempotenten Writer. Skew entsteht, wenn eine Partition ewig dauert und alle anderen Worker idle sind. Nicht-deterministische Partitionen führen dazu, dass ein Restart andere Splits erzeugt.
Anti-Patterns
ExecutionContext als Datenspeicher (große Payloads) überlastet JMS. Nicht-persistente Nachrichten riskieren Datenverlust bei Broker-Restarts. Keine DLQ-Strategie macht Fehler unsichtbar. Thread-Safety ignorieren führt zu nondeterministischen Ergebnissen.
Kurzfazit
Remote Partitioning skaliert Partitionen über JVM-Grenzen hinweg. JMS entkoppelt Manager und Worker, verlangt aber klare Betriebsregeln. Deterministische Partitionierung und idempotente Writer sind nicht optional.
Im nächsten Teil geht es um Monitoring und Observability: Metriken, Logs, Dashboards und die Frage, was ein grüner Batch wirklich bedeutet.
Alle Teile der Serie: Serie: Spring Batch