TS
Thomas Schmitz

Freiberuflicher IT-Berater, Softwareentwickler & DevOps. Praxisnahe Artikel zu API-Design, Architektur und Cloud-Workflows.

Serie: Spring Batch · Teil 12 von 12

Spring Batch Teil 12: Remote Partitioning mit JMS

Manager/Worker über JMS koppeln, minimal konfigurieren und Skalierung sauber betreiben.

Praxisnah Checkliste

Lokales Partitioning skaliert innerhalb einer JVM. Remote Partitioning geht weiter: Manager und Worker laufen in getrennten Prozessen oder Hosts und kommunizieren über Messaging. Das ist ideal, wenn ein einzelner Host nicht mehr reicht oder du Jobs horizontal skalieren willst. In diesem Teil nutzen wir JMS als Transport und bauen eine minimal funktionierende Manager/ Worker-Konfiguration ohne Spring Boot.

Zielbild

Nach diesem Kapitel kannst du Remote Partitioning mit JMS aufsetzen, die Rollen von Manager und Worker sauber trennen und weißt, welche Betriebspunkte (Timeouts, Redelivery, Idempotenz) entscheidend sind.

Kernkonzepte

  • Manager-Step: partitioniert die Arbeit und verschickt Requests.
  • Worker-Step: verarbeitet eine Partition und sendet ein Ergebnis zurück.
  • Partitioner: erzeugt deterministische ExecutionContexts pro Partition.
  • Request/Reply-Channels: MessageChannels für Requests und Replies.
  • JMS Queues: Transportebene, entkoppelt Manager und Worker.

Artefakt: Manager/Worker-Architektur-Skizze

Manager JVM                         JMS                           Worker JVMs
-----------                         ---                           -----------
Partitioner -> requests channel -> [requests queue] -> inbound -> worker step
                replies channel <- [replies queue] <- outbound <- results

Abhängigkeiten (minimal)

Für Remote Partitioning brauchst du zusätzlich zu Spring Batch: spring-batch-integration, spring-integration-jms und einen JMS-Provider wie ActiveMQ Artemis.

Praxisblock: Minimal-Konfiguration (Manager)

Ausschnitt: Java Config. Die Batch-Infrastruktur aus Teil 3 bleibt bestehen.

import jakarta.jms.ConnectionFactory;

import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.partition.Partitioner;
import org.springframework.batch.integration.config.annotation.EnableBatchIntegration;
import org.springframework.batch.integration.partition.RemotePartitioningManagerStepBuilderFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.jms.dsl.Jms;
import org.springframework.messaging.MessageChannel;

@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
public class RemotePartitioningManagerConfig {

    private final RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory;

    public RemotePartitioningManagerConfig(
            RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory) {
        this.managerStepBuilderFactory = managerStepBuilderFactory;
    }

    @Bean
    public Step managerStep(Partitioner partitioner) {
        return this.managerStepBuilderFactory
                .get("import.manager")
                .partitioner("import.worker", partitioner)
                .gridSize(6)
                .outputChannel(requests())
                .inputChannel(replies())
                .build();
    }

    @Bean
    public MessageChannel requests() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel replies() {
        return new QueueChannel();
    }

    @Bean
    public IntegrationFlow outboundRequests(ConnectionFactory connectionFactory) {
        return IntegrationFlow.from(requests())
                .handle(Jms.outboundAdapter(connectionFactory)
                        .destination("sb.partition.requests"))
                .get();
    }

    @Bean
    public IntegrationFlow inboundReplies(ConnectionFactory connectionFactory) {
        return IntegrationFlow.from(Jms.messageDrivenChannelAdapter(connectionFactory)
                        .destination("sb.partition.replies"))
                .channel(replies())
                .get();
    }
}

Wichtig: import.worker ist der Step-Name, den die Worker-Seite bereitstellt. gridSize begrenzt die parallelen Partitionen, nicht die Partition-Anzahl. Die Reply-Channel-Queue puffert Antworten, falls der Manager gerade nicht aktiv liest.

Praxisblock: Minimal-Konfiguration (Worker)

Ausschnitt: Java Config für einen Worker-Knoten.

import jakarta.jms.ConnectionFactory;

import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.integration.config.annotation.EnableBatchIntegration;
import org.springframework.batch.integration.partition.RemotePartitioningWorkerStepBuilderFactory;
import org.springframework.batch.infrastructure.item.ItemProcessor;
import org.springframework.batch.infrastructure.item.ItemReader;
import org.springframework.batch.infrastructure.item.ItemWriter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.jms.dsl.Jms;
import org.springframework.messaging.MessageChannel;

@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
public class RemotePartitioningWorkerConfig {

    private final RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;

    public RemotePartitioningWorkerConfig(
            RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory) {
        this.workerStepBuilderFactory = workerStepBuilderFactory;
    }

    @Bean
    public Step workerStep(ItemReader<RawCustomerRow> reader,
                           ItemProcessor<RawCustomerRow, Customer> processor,
                           ItemWriter<Customer> writer) {

        return this.workerStepBuilderFactory
                .get("import.worker")
                .inputChannel(incomingRequests())
                .outputChannel(outgoingReplies())
                .<RawCustomerRow, Customer>chunk(200)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .build();
    }

    @Bean
    public MessageChannel incomingRequests() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel outgoingReplies() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow inboundRequests(ConnectionFactory connectionFactory) {
        return IntegrationFlow.from(Jms.messageDrivenChannelAdapter(connectionFactory)
                        .destination("sb.partition.requests"))
                .channel(incomingRequests())
                .get();
    }

    @Bean
    public IntegrationFlow outboundReplies(ConnectionFactory connectionFactory) {
        return IntegrationFlow.from(outgoingReplies())
                .handle(Jms.outboundAdapter(connectionFactory)
                        .destination("sb.partition.replies"))
                .get();
    }
}

Der Worker-Step ist ein ganz normaler Chunk-Step. Der Unterschied: Er wird remote ausgelöst und liefert seine StepExecution über JMS zurück.

Partitioner: deterministisch und serialisierbar

Remote Partitioning transportiert ExecutionContexts über JMS. Darin dürfen nur kleine, serialisierbare Werte stehen (Strings, Zahlen, keine komplexen Objekte). Determinismus ist Pflicht, sonst passen Restart und Re-Runs nicht.

Beispiel: File-Splitting über mehrere CSV-Dateien:

import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.Map;

import org.springframework.batch.core.partition.Partitioner;
import org.springframework.batch.infrastructure.item.ExecutionContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.ResourcePatternResolver;

@Configuration
public class FilePartitionerConfig {

    @Bean
    public Partitioner filePartitioner(ResourcePatternResolver resolver) throws IOException {
        Resource[] files = resolver.getResources("file:input/customers_*.csv");
        Arrays.sort(files, Comparator.comparing(Resource::getFilename));
        return gridSize -> {
            Map<String, ExecutionContext> partitions = new LinkedHashMap<>();
            int index = 0;
            for (Resource file : files) {
                ExecutionContext context = new ExecutionContext();
                context.putString("file", toPath(file));
                partitions.put("partition-" + index, context);
                index++;
            }
            return partitions;
        };
    }

    private String toPath(Resource file) {
        try {
            return file.getFile().getAbsolutePath();
        } catch (IOException ex) {
            throw new IllegalStateException("Cannot resolve file path", ex);
        }
    }
}

Betriebspunkte, die du vorher entscheiden musst

  • Redelivery/DLQ: Was passiert bei Poison Messages? Lege Dead-Letter-Queues an.
  • Timeouts: Der Manager darf nicht unendlich auf Replies warten.
  • Idempotenz: Worker müssen Replays und Duplikate verkraften.
  • Skalierung: JMS-Listener-Parallelität und DB-Connection-Pool müssen passen.
  • QoS: Für Produktionsdaten sind persistente JMS-Nachrichten Pflicht.

Typische Fehlerbilder

Typische Fehlerbilder sind: Der Manager wartet ewig, weil Replies nicht zurückkommen (Worker down, falsche Queue, Timeout fehlt). Doppelte Writes entstehen durch Redelivery plus nicht-idempotenten Writer. Skew entsteht, wenn eine Partition ewig dauert und alle anderen Worker idle sind. Nicht-deterministische Partitionen führen dazu, dass ein Restart andere Splits erzeugt.

Anti-Patterns

ExecutionContext als Datenspeicher (große Payloads) überlastet JMS. Nicht-persistente Nachrichten riskieren Datenverlust bei Broker-Restarts. Keine DLQ-Strategie macht Fehler unsichtbar. Thread-Safety ignorieren führt zu nondeterministischen Ergebnissen.

Kurzfazit

Remote Partitioning skaliert Partitionen über JVM-Grenzen hinweg. JMS entkoppelt Manager und Worker, verlangt aber klare Betriebsregeln. Deterministische Partitionierung und idempotente Writer sind nicht optional.

Im nächsten Teil geht es um Monitoring und Observability: Metriken, Logs, Dashboards und die Frage, was ein grüner Batch wirklich bedeutet.

Alle Teile der Serie: Serie: Spring Batch

Mehr Beiträge aus dem Blog.