Multi-threaded Steps sind der schnelle Einstieg in Parallelisierung, aber sie haben Grenzen: stateful Reader, shared State, schwer determinierbares Verhalten. Partitioning löst das sauberer, weil jeder Thread seine eigene Step-Execution, einen eigenen ExecutionContext und isolierte Komponenten bekommt. Einfach gesagt: Jeder Worker verarbeitet seinen Teil der Daten.
In diesem Teil bauen wir lokales Partitioning auf: Ein Manager-Step teilt die Arbeit auf, Worker-Steps bearbeiten ihre Partitionen parallel.
Zielbild
Nach diesem Kapitel kannst du einen Partitioned Step konfigurieren, Partitionen deterministisch schneiden und Reader und Writer mit Step-Scope isolieren.
Das Prinzip
Ein Partitioned Step besteht aus einem Manager-Step, der die Arbeit in Partitionen teilt, einem Partitioner, der ExecutionContexts erzeugt, den Worker-Steps, die Partitionen verarbeiten, und einem TaskExecutor, der die Worker parallel ausführt. Beispiel: 8 CSV-Dateien → 8 Partitionen → 8 Worker.
Minimal-Konfiguration (Manager)
Ausschnitt: Bean-Methode in einer @Configuration-Klasse.
import org.springframework.batch.core.Step;
import org.springframework.batch.core.partition.Partitioner;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.core.task.TaskExecutor;
@Bean
public Step importManagerStep(JobRepository jobRepository,
TaskExecutor taskExecutor,
Partitioner partitioner,
Step importWorkerStep) {
return new StepBuilder("import.manager", jobRepository)
.partitioner("import.worker", partitioner)
.step(importWorkerStep)
.gridSize(8)
.taskExecutor(taskExecutor)
.build();
}
Die gridSize definiert die maximale Parallelität. Sie sollte zur Anzahl
der verfügbaren Threads und DB-Verbindungen passen. Beispiel: gridSize(4)
heißt: max. 4 Worker parallel.
gridSize vs. Partition-Anzahl
Die gridSize ist nur die maximale Parallelität. Wie viele Partitionen
es gibt, bestimmt der Partitioner. Wenn der Partitioner 20 Partitionen liefert
und gridSize auf 4 steht, werden 4 gleichzeitig ausgeführt, die restlichen
16 warten in der Queue. Umgekehrt gilt: Wenn es nur 2 Partitionen gibt, helfen
dir 8 Threads nicht weiter.
Kurz: Partition-Anzahl bestimmt die Arbeitsteilung, gridSize die Parallelität.
Partitioner: deterministische Splits
Der Partitioner entscheidet, wie du die Arbeit aufteilst. Beispiele sind Range-Partitioning mit Kundennummern 1 bis 10000, File-Splitting mit mehreren CSV-Dateien oder Keyset-Partitioning auf Hash-Basis. Einsteiger- Beispiel: Jede Datei bekommt einen eigenen Worker.
Minimalinterface:
Ausschnitt aus Spring Batch (Interface-Signatur).
public interface Partitioner {
Map<String, ExecutionContext> partition(int gridSize);
}
Jeder ExecutionContext enthält Parameter für genau eine Partition.
Partitionierungsstrategien im Vergleich
| Strategie | Idee | Gut für | Risiko |
|---|---|---|---|
| File-Splitting | eine Datei = eine Partition | viele CSV-Dateien | sehr ungleiche Dateigrößen |
| Range-Partitioning | IDs in Bereiche schneiden | DB-Tabellen mit IDs | Skew bei ungleich verteilten IDs |
| Hash-Partitioning | Hash(id) % n | gleichmäßige Verteilung | schwieriger Debug, nicht fachlich |
Wichtig ist: Der Split muss deterministisch sein, sonst werden Re-Runs schwer reproduzierbar.
Skew: wenn Partitionen ungleich groß sind
Skew bedeutet: eine Partition ist viel größer als die anderen. Ergebnis: 7 Worker sind fertig, 1 läuft noch. Der Job endet erst, wenn der große Brocken fertig ist. Gegenmaßnahmen sind kleinere Partitionen, File-Splitting nach Größe oder Hash-Partitioning für eine gleichmäßige Verteilung.
Beispiel: File-Splitting für CSV
Für unsere Domäne liegt ein typisches Muster nahe: mehrere CSV-Dateien, je Partition genau eine Datei. Der Partitioner legt pro Datei den Pfad im ExecutionContext ab.
Ausschnitt: Bean-Methode plus Hilfsmethode in einer @Configuration-Klasse.
import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.Map;
import org.springframework.batch.core.partition.Partitioner;
import org.springframework.batch.infrastructure.item.ExecutionContext;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.ResourcePatternResolver;
@Bean
public Partitioner filePartitioner(ResourcePatternResolver resolver)
throws IOException {
Resource[] files = resolver.getResources("file:input/customers_*.csv");
Arrays.sort(files, Comparator.comparing(Resource::getFilename));
return gridSize -> {
Map<String, ExecutionContext> partitions = new LinkedHashMap<>();
int index = 0;
for (Resource file : files) {
ExecutionContext context = new ExecutionContext();
context.putString("file", toPath(file));
partitions.put("partition-" + index, context);
index++;
}
return partitions;
};
}
private String toPath(Resource file) {
try {
return file.getFile().getAbsolutePath();
} catch (IOException ex) {
throw new IllegalStateException("Cannot resolve file path", ex);
}
}
Step-Scoped Reader/Writer
Partitioning funktioniert nur, wenn Reader/Writer pro Partition
eigene Instanzen haben. Dafür nutzt du @StepScope:
Ausschnitt: Bean-Methode in einer @Configuration-Klasse.
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.infrastructure.item.file.FlatFileItemReader;
import org.springframework.batch.infrastructure.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.Resource;
@Bean
@StepScope
public FlatFileItemReader<RawCustomerRow> partitionedReader(
@Value("#{stepExecutionContext['file']}") Resource file) {
return new FlatFileItemReaderBuilder<RawCustomerRow>()
.name("partitionedReader")
.resource(file)
.linesToSkip(1)
.delimited()
.delimiter(";")
.names("customer_id", "first_name", "last_name",
"email", "birth_date", "signup_date")
.fieldSetMapper(new CustomerFieldSetMapper())
.build();
}
Damit liest jede Partition ihre Datei bzw. ihren Bereich. Das verhindert, dass sich mehrere Threads denselben Reader teilen.
Fehlerverhalten und Restart
Jede Partition hat eine eigene StepExecution. Wenn eine Partition scheitert, scheitert der Manager-Step und damit der Job. Das ist gut: du bekommst einen klaren Fehler, statt stille Datenlücken.
Beim Restart werden die Partitionen erneut berechnet. Deshalb ist Determinismus entscheidend: gleiche Eingaben müssen zu denselben Partitionen führen, sonst arbeitest du an der falschen Stelle weiter.
Artefakt: Partitioner-Template
Eine einfache Template-Struktur für ExecutionContexts:
partitionKey: "partition-1"
minId: 1
maxId: 10000
file: "customers_01.csv"
Die Felder hängen von deiner Partitionierungsstrategie ab.
Monitoring pro Partition
Für Betrieb und Debugging ist es hilfreich, pro Partition den Key und die Counts zu loggen. Typische Fragen:
- Welche Partition war am langsamsten?
- Wo gab es die meisten Skips?
- Welche Partition ist wiederholt fehlgeschlagen?
Mit diesen Informationen kannst du Skew erkennen und die Partitionierung gezielt verbessern.
Anti-Patterns
Keine Step-Scope-Komponenten führen über shared state zu Datenkorruption. Grid-Size zu hoch macht den DB-Pool zum Bottleneck. Nicht-deterministische Partitionierung erschwert reproduzierbare Re-Runs.
Kurzfazit
Partitioning ist die saubere Parallelisierung für stateful Reader und Writer. Deterministische Splits machen Re-Runs stabil. Step-Scope ist Pflicht, nicht Option.
Im nächsten Teil geht es um Remote Partitioning: Manager/Worker, Messaging und Skalierung über JVM-Grenzen hinweg.
Alle Teile der Serie: Serie: Spring Batch