[Spring Batch] 배치 도메인 이해
@SpringBootApplication
@EnableBatchProcessing(dataSourceRef = "batchDataSource", transactionManagerRef = "batchTransactionManager")
public class SpringBatchMasterApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBatchMasterApplication.class, args);
}
}
스프링 배치 애플리케이션을 활성화하려면 @EnableBatchProcessing 애너테이션을 메인 클래스 위에 붙여줘야 한다.
애너테이션은 4개의 설정 클래스를 실행시켜 스프링 배치 초기 설정과 초기화를 실행한다.
BatchAutoConfiguration
스프링 배치가 초기화 될 때 생성되는 설정 클래스로 JobLauncherApplicationRunner 타입의 빈을 생성한다.
SimpleBatchConfiguration
JobBuilderFactory와 StepBuilderFactory, 스프링 배치의 쥐요 구성 요소를 프록시 객체로 생성한다.
BatchConfigurerConfiguration
SimpleBatchConfiguration에서 생성한 프록시 객체의 실제 대상 객체를 설정한다.
@EnableBatchProcessing이 붙은 메인 메서드가 실행되면 SimpleBatchConfiguration -> BatchConfigurerConfiguration -> BatchAutoConfiguration 순서로 설정 클래스가 호출된다.
배치 버전 5 부터는 애너테이션에 dataSource와 transactionManager를 속성값으로 추가해 JobRepository에서 사용할 수 있도록 설정할 수 있다.
@Configuration
class MyJobConfiguration extends DefaultBatchConfiguration {
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("myJob", jobRepository)
//define job flow as needed
.build();
}
}
@EnableBatchProcessing 애너테이션 대신 기본 설정 빈을 구성해주는 DefaultBatchConfiguration 클래스를 상속받아 사용할 수 있다.
JobRepository 빈은 DefaultBatchConfiguration 클래스에 정의되어있어 스프링배치가 기본적으로 제공하는 설정을 그대로 사용할 수 있으니.. 상황에 따라 몇 가지 설정만 바꿔서 사용하자.
배치 5 버전부터는 @EnableBatchProcessing 애너테이션의 대안으로 기본 설정 빈이 정의된 DefaultBatchConfiguration 클래스가 도입돼 이 클래스를 상속해서 사용할 것을 권장하지만..
@AutoConfiguration(after = { HibernateJpaAutoConfiguration.class, TransactionAutoConfiguration.class })
@ConditionalOnClass({ JobLauncher.class, DataSource.class, DatabasePopulator.class })
@ConditionalOnBean({ DataSource.class, PlatformTransactionManager.class })
@ConditionalOnMissingBean(value = DefaultBatchConfiguration.class, annotation = EnableBatchProcessing.class)
@EnableConfigurationProperties(BatchProperties.class)
@Import(DatabaseInitializationDependencyConfigurer.class)
public class BatchAutoConfiguration {..}
BatchAutoConfiguration 클래스에서 @ConditionalOnMissingBean의 값으로 DefaultBatchConfiguration 클래스와 EnableBatchProcessing 애너테이션이 정의되어있어 몇 가지 빈이 등록되지 않는다. (메타테이블 포함)
스프링 배치는 배치 작업의 실행과 상태를 관리하기 위해 메타테이블을 제공하고, 실무에서도 이 메타테이블을 그대로 사용하는 경우가 많이 있다.
BATCH_JOB_INSTANCE
Job이 실행될 때 JobInstance 정보가 저장돼 job_name과 job_key 쌍의 데이터를 저장한다.
JOB_INSTANCE_ID : PK
VERSION : 업데이트 시 +1
JOB_NAME : Job 구성 시 설정되는 이름
JOB_KEY : 이름과 파라미터를 합쳐서 해시한 후 저장
BATCH_JOB_EXECUTION
Job의 실행정보가 저장된다.
시작 및 종료시간, 실행 상태, 메세지를 관리한다.
JOB_INSTANCE_ID : 인스턴스 키 저장
END_TIME : 에러로 Job이 종료된 경우 저장되지 않을 수 있음
BATCH_JOB_EXECUTION_PARAMS
Job과 함께 실행되는 파라미터 정보를 다룬다.
IDENTIFYING : 식별여부로, BATCH_JOB_PARAM이 JOB_PARAM을 식별할지 여부를 결정
BATCH_JOB_EXECUTION_CONTEXT
Job이 실행될 때 여러 데이터를 직렬화해서 저장해 STEP간 공유한다.
SHORT_CONTEXT : Job의 실행정보를 문자열로 저장
SERIALIZED_CONTEXT : 직렬화된 전체 context
BATCH_STEP_EXECUTION
Step의 실행정보를 저장한다.
시작 및 종료시간, 실행상태, 메세지를 관리한다.
JOB_EXECUTION_ID : 부모 아이디. JobExecution과는 일대다 관계
...COUNT : chunk 기반 프로세싱에서 다루는 내용
BATCH_STEP_EXECUTION_CONTEXT
Step이 실행될 때의 데이터를 직렬화해서 저장하지만 Step별로 공유되지 않는다.
Job
배치 계층의 최상위에 있는 개념으로 하나의 배치 작업을 의미한다.
한 개 이상의 Step을 포함하고 있는 컨테이너로 스프링 배치는 Job 인터페이스의 여러 구현체를 제공한다.
SimpleJob : 순차적으로 Step을 실행한다.
FlowJob :조건과 흐름에 따라 Step을 구성해서 실행한다.
JobLauncher는 Job을 실행시키고
JobInstance
Job이 실행될 때 생성되는 Job의 논리적 실행 단위로 식별 가능한 작업 실행을 나타낸다.
Job을 클래스로, JobInstance를 클래스로 만든 인스턴스라고 생각하면 된다.
JobLauncher는 Job과 JobParameter를 사용해 배치 작업을 수행한다.
JobRepository는 DB로부터 현재 실행 중인 작업이 이전에 실행된 적이 있는지 확인하고, 이미 실행된 적이 있는 작업인 경우 기존에 실행된 JobInstance를 반환한다.
JobParameter
Job을 실행할 떄 함께 포함돼 사용되는 객체이다.
비즈니스 로직에 사용하거나 여러 JobInstance를 구분하기 위해 사용되고, 하나의 JobInstance에는 JobParameter가 하나만 대응된다.
JobParameters는 JobParameter를 감싸고 JobParameter는 여러 가지 파라미터를 가진다.
JobParameters jobParameters = new JobParametersBuilder()
.addString("param1", "value1")
.addLong("param2", 123L)
.addDate("param3", new Date())
.addDouble("param4", 123.45)
.toJobParameters();
JobExecution
JobInstance와는 1 : N 관계로 Job 실행 중 발생한 정보를 저장한다.
Job의 실행 결과를 통해 한 번 완료된 JobInstance는 반복해서 실행되지 않도록 하고, 실패했던 JobInstance는 재실행 할 수 있도록 한다.
Job과 JobParameter가 같아도 실패한 경우 재시도 할 수 있다.
Step
Batch Job을 구성하는 독립적인 단위로 Job은 여러 Step으로 구성된다.
실제 비즈니스 로직을 Step에 정의한다.
스프링 배치는 Step의 여러 기본 구현체를 제공한다.
TaskletStep : 가장 기본인 클래스이다.
PartitionStep : 멀티쓰레딩으로 여러 Step을 분리해서 실행한다.
JobStep : Step 내부에서 Job을 구성한다.
FlowStep : Step 내부에서 Flow를 실행한다.
StepExecution
Step 실행 중 발생된 정보를 저장한다.
Job이 재시작하더라도 성공한 Step은 재실행하지 않고, 모든 StepExecution이 정상적으로 완료돼야 JobExecution이 성공했다고 할 수 있다.
StepContribution
Step이 처리하는 항목들의 결과를 저장해 StepExecution의 상태를 업데이트한다.
@Bean
public Step step1() {
return new StepBuilder("step1", jobRepository)
.tasklet((contribution, chunkContext) -> {
throw new RuntimeException("step1 fail");
// return RepeatStatus.FINISHED;
}, platformTransactionManager)
.build();
}
@FunctionalInterface
public interface Tasklet {
@Nullable
RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception;
}
// TaskletStep.class
finally {
// If the step operations are asynchronous then we need
// to synchronize changes to the step execution (at a
// minimum). Take the lock *before* changing the step
// execution.
try {
semaphore.acquire();
locked = true;
}
catch (InterruptedException e) {
logger.error("Thread interrupted while locking for repository update");
stepExecution.setStatus(BatchStatus.STOPPED);
stepExecution.setTerminateOnly();
Thread.currentThread().interrupt();
}
// Apply the contribution to the step
// even if unsuccessful
if (logger.isDebugEnabled()) {
logger.debug("Applying contribution: " + contribution);
}
stepExecution.apply(contribution);
}
StepExecution이 완료되는 시점에 apply 메서드를 호출해 StepContribution의 상태를 기반으로 StepExecution의 상태를 업데이트한다.
Tasklet에서 contribution 객체를 통해 StepExecution을 참조할 수 있다.
ExecutionContext
스프링 배치 프레임워크에서 관리하는 키-값으로 구성된 자료구조로 StepExecution이나 JobExecution의 상태를 저장하는 공유 객체이다.
데이터베이스에는 JSON으로 직렬화돼 저장된다.
JobExecution에 대한 ExecutionContext는 해당 Job 하위에 있는 Step들과 서로 공유되지만, StepExecution에 대한 ExecutionContext는 Step간 공유되지 않는다.
public class ExecutionContextTasklet implements Tasklet{
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
ExecutionContext jobExecutionContext = contribution.getStepExecution().getJobExecution().getExecutionContext();
ExecutionContext stepExecutionContext = contribution.getStepExecution().getExecutionContext();
String jobName = chunkContext.getStepContext().getStepExecution().getJobExecution().getJobInstance().getJobName();
String stepName = chunkContext.getStepContext().getStepExecution().getStepName();
if (jobExecutionContext.get("jobName") == null) jobExecutionContext.put("jobName", jobName);
if (stepExecutionContext.get("stepName") == null) stepExecutionContext.put("stepName", stepName);
return RepeatStatus.FINISHED;
}
}
등록해 둔 jobName과 stepName은 메타데이터 테이블에 저장되고, jobName은 다른 Step에서도 가져와서 사용할 수 있다.
JobRepository
Job이 언제 수행됐고, 언제 끝났고, 몇 번 실행된지 등 모든 메타데이터를 저장하는 저장소이다.
JobLauncher, Job, Step은 JobRepository를 통해 CRUD 작업을 수행한다.
public class SimpleJobRepository implements JobRepository {
private static final Log logger = LogFactory.getLog(SimpleJobRepository.class);
private JobInstanceDao jobInstanceDao;
private JobExecutionDao jobExecutionDao;
private StepExecutionDao stepExecutionDao;
private ExecutionContextDao ecDao;
public SimpleJobRepository(JobInstanceDao jobInstanceDao, JobExecutionDao jobExecutionDao,
StepExecutionDao stepExecutionDao, ExecutionContextDao ecDao) {
super();
this.jobInstanceDao = jobInstanceDao;
this.jobExecutionDao = jobExecutionDao;
this.stepExecutionDao = stepExecutionDao;
this.ecDao = ecDao;
}
}
기본 구현체로 SimpleJobRepository 가 제공되고, 여러 Dao를 통해 생성된 메타테이블을 다룬다.
@Configuration
public class BatchConfiguration {
@Bean
public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager platformTransactionManager) throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(dataSource);
factory.setTransactionManager(platformTransactionManager);
factory.setIsolationLevelForCreate("ISOLATION_READ_COMMITTED");
factory.setTablePrefix("BATCH_");
return factory.getObject();
}
}
public class JobListener implements JobExecutionListener {
@Autowired
private JobRepository jobRepository;
@Override
public void afterJob(JobExecution jobExecution) {
}
@Override
public void beforeJob(JobExecution jobExecution) {
String jobName = jobExecution.getJobInstance().getJobName();
JobExecution lastJobExecution = jobRepository.getLastJobExecution(jobName, new JobParametersBuilder().addString("requestDate", "20210102").toJobParameters());
if (lastJobExecution != null) {
for(StepExecution execution : lastJobExecution.getStepExecutions()) {
BatchStatus status = execution.getStatus();
log.info(status);
//..
}
}
}
}
이전까지는 BasicBatchConfigurer 클래스를 상속하는 방식으로 설정했지만..
버전이 올라가면서 BatchConfigurer, DefaultBatchConfigurer, BasicBatchConfigurer 클래스와 인터페이스가 삭제됐다.
스프링배치 5버전부터는 스프링의 기본적인 데이터베이스 설정 방식을 사용해 JobRepository를 설정할 수 있다.
JobListener로 배치 작업 시작 전 JobRepository로 메타테이블을 조회해 상태별 로그를 기록한다.
JobLauncher
이름처럼 배치 Job을 실행시키는 역할을 수행한다.
Job과 JobParameters를 인자로 받아 JobExecution을 반환한다.
JobLauncher가 Job을 실행할 때는 동기 / 비동기 두 가지 옵션을 설정할 수 있다.
동기적 실행은 배치 작업이 완료될 때 까지 클라이언트에게 JobExecution을 반환하지 않아 스케쥴러를 통해 실행할 때 적합하고,
비동기적 실행은 배치 작업 시작 후 바로 JobExecution을 반환하고 내부적인 비즈니스 로직은 비동기적으로 처리해 웹 애플리케이션에 적합하다.
@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor("spring_batch");
}
@Bean
public JobLauncher jobLauncher(JobRepository jobRepository, TaskExecutor taskExecutor) {
TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(taskExecutor);
return jobLauncher;
}
public void launch() throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException {
JobParameters jobParameters = new JobParametersBuilder()
.addString("id", "user")
.toJobParameters();
jobLauncher.run(job, jobParameters);
}
JobLauncher를 비동기로 실행시키려면 TaskExecutor와 TaskExecutorJobLauncher를 사용한다.
스프링 배치 4버전까지는 프레임워크가 지정해 주는 방식으로 설정하는게 강제됐다면, 5버전부터는 좀 더 "스프링"스러운 방식으로 설정 할 수 있도록 바뀌었다.
'Spring > Spring Batch' 카테고리의 다른 글
[Spring Batch] Chunk 아키텍처 (0) | 2024.08.24 |
---|---|
[Spring Batch] Flow 아키텍처 (2) | 2024.07.21 |
[Spring Batch] Job / Step 아키텍처 (0) | 2024.07.07 |
[Spring Batch] ItemReader / ItemWriter (1) | 2024.02.27 |
[Spring Batch] 스프링 배치 내부 흐름 (0) | 2024.02.25 |
댓글
이 글 공유하기
다른 글
-
[Spring Batch] Flow 아키텍처
[Spring Batch] Flow 아키텍처
2024.07.21 -
[Spring Batch] Job / Step 아키텍처
[Spring Batch] Job / Step 아키텍처
2024.07.07 -
[Spring Batch] ItemReader / ItemWriter
[Spring Batch] ItemReader / ItemWriter
2024.02.27 -
[Spring Batch] 스프링 배치 내부 흐름
[Spring Batch] 스프링 배치 내부 흐름
2024.02.25