[Spring Batch] Flow 아키텍처
FlowJob은 FlowJobBuilder가 생성하고, Step을 특정 상태에 따라 흐름을 전환하도록 구성할 때 사용한다.
(Step이 실패하더라도 Job은 성공하도록 설정하거나, Step이 성공한 후 다음 Step을 구분해서 실행하는 경우)
기존 SimpleJob은 순차적으로 Step을 수행했다면 FlowJob은 동적으로 Step을 제어한다.
@Bean
public Job job() {
return new JobBuilder("job", jobRepository)
.start(step1())
.on("COMPLETED").to(step3())
.from(step1())
.on("FAILED").to(step2())
.end()
.build();
}
step1이 성공하면 step3을 실행하고, step1이 실패하면 step2를 실행한다.
on 메서드는 TransitionBuilder를 반환해 특정 상태에서 다음으로 이동할 Step을 정의한다.
여기서 상태는 각 Step이 완료된 후 반환하는 ExitStatus로 COMPLETED와 FAILED를 기본 상태로 사용한다.
ExitStatus는 Step이나 Job의 종료 상태를 나타내며 마지막 Flow의 FlowExecutionStatus가 최종 Job의 ExitStatus로 저장된다.
@Bean
public Job batchJob() {
return new JobBuilder("job", jobRepository)
.start(step1())
.on("FAILED")
.to(step2())
.on("FAILED")
.stop()
.from(step1())
.on("*")
.to(step3())
.next(step4())
.from(step2())
.on("*")
.to(step5())
.end()
.build();
}
step1이 FAILED 상태로 종료되면 다음 트랜지션으로 이동해 step2가 실행되고, step2가 FAILED 상태로 종료되면 Job이 중지된다.
step1 이 FAILED 외 다른 상태로 종료되면 step3가 실행되고, step3가 완료된 후에는 step4가 실행된다.
step2가 종료되면 step5가 실행되고 이후 Job을 종료한다.
@Bean
public Job job() {
return new JobBuilder("job", jobRepository)
.start(step())
.next(decider)
.from(decider()).on("FIRST").to(firstStep())
.from(decider()).on("SECOND").(secndStep())
.end()
.build();
}
public JobExecutionDecider decider() {
return new CustomDecider();
}
public class CustomDecider implements JobExecutionDecider {
private int cnt = 0;
@Override
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
cnt++;
if (cnt % 2 ==0) {
return new FlowExecutionStatus("FIRST");
}
else {
return new FlowExecutionStatus("SECOND");
}
}
}
on 메서드를 사용하지 않고 JobExecutionDecider를 사용해서 배치 흐름을 동적으로 제어할 수 있다.
decider의 결과에 따라 어떤 Step이 실행될 지 결정하는데, 이 때 ExitStatus 대신 FlowExecutionStatus를 사용한다.
JobFlow는 ExitStatus가 FlowExecutiojnStatus로 반영되고, 이 값이 JobFlow에 반영되는 방식으로 작동한다.
decider를 다룰 때는 FlowExecutionStatus 값이 바로 데이터베이스에 반영되니 SimpleJob과 헷갈리지 말자.
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("helloJob", jobRepository)
.start(step1(jobRepository, platformTransactionManager))
.on("COMPLETED").to(step2(jobRepository, platformTransactionManager))
.from(step1(jobRepository, platformTransactionManager))
.on("FAILED").to(flow())
.end()
.build();
}
@Bean
public Flow flow() {
FlowBuilder<Flow> builder = new FlowBuilder<>("flow");
builder.start(step2(jobRepository, platformTransactionManager))
.on("*")
.to(step3(jobRepository, platformTransactionManager))
.end();
return builder.build();
}
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("helloStep", jobRepository)
.tasklet((contribution, chunkContext) -> {
return RepeatStatus.FINISHED;
}, transactionManager)
.build();
}
@Bean
public Step step2(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("helloStep", jobRepository)
.tasklet((contribution, chunkContext) -> {
return RepeatStatus.FINISHED;
}, transactionManager)
.build();
}
@Bean
public Step step3(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("helloStep", jobRepository)
.tasklet((contribution, chunkContext) -> {
return RepeatStatus.FINISHED;
}, transactionManager)
.build();
}
@Bean
public Step step4(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("helloStep", jobRepository)
.tasklet((contribution, chunkContext) -> {
return RepeatStatus.FINISHED;
}, transactionManager)
.build();
}
Job은 FlowJob을 정의하고, FlowJob 내부에는 SimpleFlow가 구성되어있다.
FlowBuilder의 start, next, from 메서드에는 Step / Flow / Decider를 설정할 수 있는데, 여기서 설정하는 객체에 따라 각각 다른 State가 설정된다. (StepState, FlowState, DecisionState)
State는 배치 작업의 실행 흐름을 제어하는 구성요소로, SimpleFlow 내부에는 여러 종류의 State가 포함될 수 있다.
SimpleFlow는 state.handle 메서드를 통해 State를 반환받는데, 이 때 전략 패턴처럼 어떤 종류의 State가 반환되는지에 상관없이 최종 결과물인 FlowExecutionStatus만 반환받는다.
Flow가 완료될 때 까지 FlowExecutionStatus에 따라 TransitionMap을 참조해 다음 실행할 State를 결정한다.
TransitionMap은 SimpleFlow 내부에서 상태 전이를 관리해 다음 상태 전환을 전담한다.
@Bean
@JobScope
public Step step1(@Value("#{jobParameters['message']}") String message, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
System.out.println(message);
return new StepBuilder("STEP1", jobRepository)
.tasklet(tasklet1(null), transactionManager)
.build();
}
@Bean
@StepScope
public Tasklet tasklet1(@Value("#{jobExecutionContext['name']}") String name) {
System.out.println("name" + name);
return (StepContribution, chunkContext) -> {
System.out.println();
return RepeatStatus.FINISHED;
};
}
@JobScope와 @StepScope 애너테이션으로 Job, Step의 빈 생명주기를 유연하게 관리해 동적으로 JobParameter를 주입받아 접근할 수 있다.
내부적으로는 프록시 객체가 생성돼 필요한 시점에 실제 빈을 생성한다. (AOP)
스프링이 시작될 때 applicationContext가 로드될텐데.. 이 때 JobScope 애너테이션이 붙은 빈 객체는 프록시 객체로만 생성되고, Job이 실행될 때 실행 컨텍스트를 초기화하고 실제 빈을 생성해 JobParameter를 주입하는 방식으로 동작한다.
@JobScope @StepScope 애너테이션으로 정의된 빈은 각각 JobContext StepContext에 바인딩돼 배치 작업 실행 중 특정 데이터와 상태를 관리하고 접근하는데, 특정 실행 범위 내에서 유효한 데이터를 처리한다고 생각하면 된다.
@Value 애너테이션으로 JobParameter나 StepParameter에 접근하려면 스프링 컨테이너가 해당 빈의 생명주기를 Job이나 Step의 실행 범위 내에서 관리하도록 설정해야 하니 @JobScope @StepScope 애너테이션을 꼭 붙여 줘야 한다.
'Spring > Spring Batch' 카테고리의 다른 글
[Spring Batch] Chunk 아키텍처 (0) | 2024.08.24 |
---|---|
[Spring Batch] Job / Step 아키텍처 (0) | 2024.07.07 |
[Spring Batch] 배치 도메인 이해 (0) | 2024.06.30 |
[Spring Batch] ItemReader / ItemWriter (1) | 2024.02.27 |
[Spring Batch] 스프링 배치 내부 흐름 (0) | 2024.02.25 |
댓글
이 글 공유하기
다른 글
-
[Spring Batch] Chunk 아키텍처
[Spring Batch] Chunk 아키텍처
2024.08.24 -
[Spring Batch] Job / Step 아키텍처
[Spring Batch] Job / Step 아키텍처
2024.07.07 -
[Spring Batch] 배치 도메인 이해
[Spring Batch] 배치 도메인 이해
2024.06.30 -
[Spring Batch] ItemReader / ItemWriter
[Spring Batch] ItemReader / ItemWriter
2024.02.27