[Spring Batch] Job / Step 아키텍처
스프링배치 5 버전부터는 JobBuilderFactory와 StepBuilderFactory등 Factory 클래스가 Deprecate 됐고 Builder를 직접 사용하는 방식으로 변경됐다.
JobRepository와 TransactionManager를 Builder에 직접 주입받아 설정을 좀 더 편하게 할 수 있다.
JobBuilder는 실제 Job의 생성을 위임한다.
// JobBuilder
public class JobBuilder extends JobBuilderHelper<JobBuilder> {
@Deprecated(since = "5.0", forRemoval = true)
public JobBuilder(String name) {
super(name);
}
public JobBuilder(String name, JobRepository jobRepository) {
super(name, jobRepository);
}
public SimpleJobBuilder start(Step step) {
return new SimpleJobBuilder(this).start(step);
}
public JobFlowBuilder start(Flow flow) {
return new FlowJobBuilder(this).start(flow);
}
public JobFlowBuilder start(JobExecutionDecider decider) {
return new FlowJobBuilder(this).start(decider);
}
public JobFlowBuilder flow(Step step) {
return new FlowJobBuilder(this).start(step);
}
}
// JobBuilderHelper
public abstract class JobBuilderHelper<B extends JobBuilderHelper<B>> {
protected final Log logger = LogFactory.getLog(getClass());
private final CommonJobProperties properties;
public JobBuilderHelper(String name, JobRepository jobRepository) {
this.properties = new CommonJobProperties();
properties.name = name;
properties.jobRepository = jobRepository;
}
}
실제 Job 생성은 SimpleJobBuilder와 FlowJobBuilder가 처리한다.(각각 SimpleJob과 FlowJob을 생성한다)
JobBuilderHelper는 Job을 생성하는 공통 기능을 포함하며 SimpleJobRepository를 참조하고 있어 메타데이터를 다룰 수 있다.
JobRepository는 Builder를 통해 실제 Job 객체에 전달되고 메타데이터를 기록할 때 사용된다.
@Bean
public Job job() {
return new JobBuilder("job", jobRepository)
.start(step1())
.next(step2())
.incrementer(null)
.preventRestart()
.validator(new CustomJobParameterValidator())
.listener(null)
.build();
}
// JobBuilder.java
public SimpleJobBuilder start(Step step) {
return new SimpleJobBuilder(this).start(step);
}
public class CustomJobParameterValidator implements JobParametersValidator {
@Override
public void validate(JobParameters parameters) throws JobParametersInvalidException {
if (parameters.getString("name") == null) {
throw new JobParametersInvalidException("name parameter not found");
}
throw new UnsupportedOperationException("Unimplemented method 'validate'");
}
}
// JobBuilderHelper.java
public B preventRestart() {
properties.restartable = false;
@SuppressWarnings("unchecked")
B result = (B) this;
return result;
}
처음 start 메서드로 SimpleJobBuilder를 반환하고 next 메서드로 다음에 실행할 Step을 설정한다.
JobParameter 값을 증가시켜주는 incrementer를 설정할 수 있고 JobParameter를 검증하는 Validator를 설정할 수 있다.
기본적으로 Job은 재시작이 허용되지만 preventRestart 메서드를 사용하면 재시작을 방지할 수 있다.
start와 next 메서드를 사용할 때 넘겨주는 파라미터의 타입에 따라 SimpleJobBuilder, FlowJobBuilder가 각각 다르게 반환된다.
JobLauncher가 배치 작업의 시작점이 되고, JobParameters를 기반으로 새로운 JobInstance를 생성한다.
Job이 실행될 때 마다 JobExecution이 생성되고 Job의 메타데이터는 ExecutionContext에 저장된다.
SimpleJob은 Job을 정의하고 Step을 실행시키는데, Job 실행 전후에 JobListener를 확인해 리스너에 등록된 작업을 처리한다.
StepBuilder의 구조는 JobBuilder과 매우 유사하다.
TaskletStep, SimpleStep, PartitionStep 등 여러 종류가 있고 StepBuilder가 제공하는 메서드를 통해 적절한 하위 StepBuilder 구현체가 생성된다.
가장 기본적인 Step은 TaskletStep으로, Tasklet을 실행시킨다.
Tasklet은 RepeatTemplate 안에서 실행되고 RepeatStatus로 반복 여부를 결정한다.
Step 단위로 트랜잭션이 걸려, 하나의 Step이 완료되면 알아서 커밋되니 Tasklet 내부에서 따로 트랜잭션을 걸어주지 않아도 된다. (StepBuilderHelper 에서 처리하고 기본적으로 REQUIRED 옵션이 설정된다)
스프링 배치가 제공하는 방식인 ItemReader, ItemProcessor, ItemWriter 기반으로 처리하는 경우 chunk 단위로 작업을 처리할 수 있어 chunk 전용 Tasklet인 ChunkOrientedTasklet 구현체가 사용돼 대량 처리에 효과적이고, 단일 작업으로 처리하는 편이 더 효과적일 때는 Tasklet 구현체를 만들어서 사용한다.
@Bean
public Step taskStep() {
return new StepBuilder("taskStep", jobRepository)
.tasklet((contribution, chunkContext) -> {
System.out.println("taskletSTep");
if (count < 5) {
count++;
return RepeatStatus.CONTINUABLE;
} else {
return RepeatStatus.FINISHED;
}
}, platformTransactionManager)
.allowStartIfComplete(true)
.build();
}
@Bean
public Step chunkStep() {
return new StepBuilder("chunkStep", jobRepository)
.chunk(3, platformTransactionManager)
.reader(() -> null)
.processor(item -> "_" + item)
.writer(items -> items.forEach(System.out::println))
.startLimit(5)
.build();
}
chunk 단위로 작업을 처리할 때와 tasklet 구현체를 직접 구현할 때 사용하는 스프링 배치 api가 다르다.
// TaskletStep.java
@Override
public RepeatStatus doInTransaction(TransactionStatus status) {
TransactionSynchronizationManager.registerSynchronization(this);
RepeatStatus result = RepeatStatus.CONTINUABLE;
StepContribution contribution = stepExecution.createStepContribution();
chunkListener.beforeChunk(chunkContext);
// In case we need to push it back to its old value
// after a commit fails...
oldVersion = new StepExecution(stepExecution.getStepName(), stepExecution.getJobExecution());
copy(stepExecution, oldVersion);
try {
try {
try {
result = tasklet.execute(contribution, chunkContext);
if (result == null) {
result = RepeatStatus.FINISHED;
}
}
catch (Exception e) {
if (transactionAttribute.rollbackOn(e)) {
chunkContext.setAttribute(ChunkListener.ROLLBACK_EXCEPTION_KEY, e);
throw e;
}
}
}
finally {
// If the step operations are asynchronous then we need
// to synchronize changes to the step execution (at a
// minimum). Take the lock *before* changing the step
// execution.
try {
semaphore.acquire();
locked = true;
}
catch (InterruptedException e) {
logger.error("Thread interrupted while locking for repository update");
stepExecution.setStatus(BatchStatus.STOPPED);
stepExecution.setTerminateOnly();
Thread.currentThread().interrupt();
}
// Apply the contribution to the step
// even if unsuccessful
if (logger.isDebugEnabled()) {
logger.debug("Applying contribution: " + contribution);
}
stepExecution.apply(contribution);
}
stepExecutionUpdated = true;
stream.update(stepExecution.getExecutionContext());
try {
// Going to attempt a commit. If it fails this flag will
// stay false and we can use that later.
if (stepExecution.getExecutionContext().isDirty()) {
getJobRepository().updateExecutionContext(stepExecution);
}
stepExecution.incrementCommitCount();
if (logger.isDebugEnabled()) {
logger.debug("Saving step execution before commit: " + stepExecution);
}
getJobRepository().update(stepExecution);
}
catch (Exception e) {
// If we get to here there was a problem saving the step
// execution and we have to fail.
String msg = "JobRepository failure forcing rollback";
logger.error(msg, e);
throw new FatalStepExecutionException(msg, e);
}
}
catch (Error e) {
if (logger.isDebugEnabled()) {
logger.debug("Rollback for Error: " + e.getClass().getName() + ": " + e.getMessage());
}
rollback(stepExecution);
throw e;
}
catch (RuntimeException e) {
if (logger.isDebugEnabled()) {
logger.debug("Rollback for RuntimeException: " + e.getClass().getName() + ": " + e.getMessage());
}
rollback(stepExecution);
throw e;
}
catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug("Rollback for Exception: " + e.getClass().getName() + ": " + e.getMessage());
}
rollback(stepExecution);
// Allow checked exceptions
throw new UncheckedTransactionException(e);
}
return result;
}
RepeatStatus로 특정 조건이 충족 될 때 까지 Tasklet을 반복하도록 설정할 수 있다.
리턴 값을 설정하지 않으면 RepeatStatus.FINISHED 가 자동으로 설정되고, Step 종료 후에는 리스너의 afterStep api가 실행된다.
Step도 Job처럼 성공한 Step은 재실행하지 않지만 allowStartIfComplete 메서드로 재실행을 허용할 수 있고, startLimit 메서드로 Step이 몇 번 까지 실행될 수 있는지 지정할 수 있다.
api를 사용하려면 스프링배치가 제공하는 메타데이터 테이블을 구축해 둬야 한다.
'Spring > Spring Batch' 카테고리의 다른 글
[Spring Batch] Chunk 아키텍처 (0) | 2024.08.24 |
---|---|
[Spring Batch] Flow 아키텍처 (2) | 2024.07.21 |
[Spring Batch] 배치 도메인 이해 (0) | 2024.06.30 |
[Spring Batch] ItemReader / ItemWriter (1) | 2024.02.27 |
[Spring Batch] 스프링 배치 내부 흐름 (0) | 2024.02.25 |
댓글
이 글 공유하기
다른 글
-
[Spring Batch] Chunk 아키텍처
[Spring Batch] Chunk 아키텍처
2024.08.24 -
[Spring Batch] Flow 아키텍처
[Spring Batch] Flow 아키텍처
2024.07.21 -
[Spring Batch] 배치 도메인 이해
[Spring Batch] 배치 도메인 이해
2024.06.30 -
[Spring Batch] ItemReader / ItemWriter
[Spring Batch] ItemReader / ItemWriter
2024.02.27