java – Pass Spring Batch Tasklet Failure Message to Reporting Step.

java – Pass Spring Batch Tasklet Failure Message to Reporting Step.

Im not sure how to pass in the exception message or if there is defined method to passing in the failure message without using the Step Execution Context.

You can get access to the exception thrown in a previous step from the job execution. Here is an example:

import java.util.List;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class MyJob {

    @Autowired
    private JobBuilderFactory jobs;

    @Autowired
    private StepBuilderFactory steps;

    @Bean
    public Step step1() {
        return steps.get(step1)
                .tasklet((contribution, chunkContext) -> {
                    System.out.println(hello);
                    throw new Exception(Boom!);
                })
                .build();
    }

    @Bean
    public Step step2() {
        return steps.get(step2)
                .tasklet((contribution, chunkContext) -> {
                    JobExecution jobExecution = chunkContext.getStepContext().getStepExecution().getJobExecution();
                    StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next(); // TODO properly get the stepExecution of the previous step
                    List<Throwable> failureExceptions = stepExecution.getFailureExceptions();
                    if (!failureExceptions.isEmpty()) {
                        Throwable throwable = failureExceptions.get(0);
                        System.out.println(Looks like step1 has thrown an exception:  + throwable.getMessage());
                    }
                    System.out.println(world);
                    return RepeatStatus.FINISHED;
                })
                .build();
    }

    @Bean
    public Job job() {
        return jobs.get(job)
                    .flow(step1())
                    .on(*).to(step2())
                    .build()
                .build();
    }

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        jobLauncher.run(job, new JobParameters());
    }

}

This sample prints:

hello
Looks like step1 has thrown an exception: Boom!
world

Obviously, you need to make sure step1 flows to step2 in all cases (Hence the flow definition).

Hope this helps.

Consider making errorMessages a bean:

// somewhere convenient...
@Bean
public List<String> errorMessages() {
    return new ArrayList<>();
}

and inject errorMessages into both tasklets.

In the CsvBatchReader tasklet, inspect for any exceptions thrown by execute(...), and update errorMessages as needed:

public class CsvBatchReader<T> implements Tasklet, StepExecutionListener {

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        List<Throwable> failures = stepExecution.getFailureExceptions();
        if (!failures.isEmpty())  {
            errorMessages.add(...);
        }

        logger.info(Reader ended -  + clazz.getSimpleName());
        return ExitStatus.COMPLETED;  // return INVALID if !failures.isEmpty() ?
    }
}

And since errorMessages was injected into CsvBatchReporting, then:

public class CsvBatchReporting implements Tasklet, StepExecutionListener {

@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
    logger.info(Reporting execute);

    //Email Error 
    String body = null;
    String subject = job finished: ;
    if (!errorMessages.isEmpty())  {
       subject += ERROR;
       body = ... // from errorMessages
    } else {
       subject += SUCCESS;
    }

    return RepeatStatus.FINISHED;
}

}

java – Pass Spring Batch Tasklet Failure Message to Reporting Step.

Although my original question was in reference to passing exceptions from one step to the next, I would like to point out an alternative approach using exitStatus

In my reader step I can catch my csvToBean exception and create an exist status like so.

@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws UnexpectedJobExecutionException {
    logger.info(Reader execute -  + clazz.getSimpleName());

    ICsvToBean csvToBean = new CsvToBean(clazz, path);

    try {
        batch = csvToBean.readCsv();
    } catch(IOException ex) {
        chunkContext.getStepContext().getStepExecution().setExitStatus(new ExitStatus(FAILED, ex.getMessage()));
    }

    return RepeatStatus.FINISHED;
}

I would then create conditional flow with in my job like so.

@Bean
public Job primaryCareJob(@Qualifier(reportingStep) Step reportingStep, @Qualifier(crossValidationStep) Step crossValidationStep) {
    logger.info(Start PrimaryCare Job);

    return jobs.get(primaryCareJob)
            .start(readPrimaryCareStep()).on(FAILED).to(reportingStep)
            .from(readPrimaryCareStep()).on(*).to(readPrimaryCareDetailStep())

            .from(readPrimaryCareDetailStep()).on(FAILED).to(reportingStep)
            .from(readPrimaryCareDetailStep()).on(*).to(processPrimaryCareStep())

            .from(processPrimaryCareStep()).on(INVALID).to(reportingStep)
            .from(processPrimaryCareStep()).on(*).to(processPrimaryCareDetailStep())

            .from(processPrimaryCareDetailStep()).on(INVALID).to(reportingStep)
            //Other steps

            .from(reportingStep).on(*).end()
            .from(reportingStep).on(*).fail()

            .build()
        .build();
}

Lastly in my reader tasklet step I would retrieve the existStatus like so

@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
    JobExecution jobExecution = chunkContext.getStepContext().getStepExecution().getJobExecution();

    Collection<StepExecution> stepExecutions = jobExecution.getStepExecutions();

    Map<String, String> result = stepExecutions.stream()
        .filter(x -> x.getExitStatus().getExitCode().equals(FAILED))
        .collect(Collectors.toMap(StepExecution::getStepName, x -> x.getExitStatus().getExitDescription()));

    result.forEach((k, v) -> {
        System.out.println(Step  + k +  failure  + v);
    });
}

Leave a Reply

Your email address will not be published. Required fields are marked *