[Spring Batch] Chunk 아키텍처
단일 Task 기반 처리로도 배치 작업을 수행할 수 있지만, Chunk 단위로 묶어서 배치 작업을 처리하면 여러 측면에서 이점을 얻을 수 있다.
ItemReader가 설정된 Chunk 사이즈만큼 데이터를 읽고, ItemProcessor가 처리 작업을 수행한 후 ItemWriter가 기록하는 구조이다.
ItemReader는 한 번에 하나의 아이템을 읽고 읽은 아이템은 Chunk의 일부로 저장된다.
지정된 사이즈에 도달하면 ItemProcessor를 통해 비즈니스 로직을 적용하고, ItemWriter에게 넘겨 한 번에 기록한다.
트랜잭션은 Chunk 단위로 묶어서 처리돼 실패한 Chunk 단위로 재시도 로직을 적용할 수 있다.
@Bean
@JobScope
public Step step1(@Value("#{jobParameters['message']}") String message, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
System.out.println(message);
return new StepBuilder("STEP1", jobRepository)
.<String, String>chunk(5, transactionManager)
.reader(new ListItemReader<>(Arrays.asList("item1", "item2", "item3", "item4", "item5")))
.processor(new ItemProcessor<String, String>() {
@Override
public String process(String item) throws Exception {
Thread.sleep(300);
return "my" + item;
}
})
.writer(new ItemWriter<String>() {
@Override
public void write(Chunk<? extends String> items) throws Exception {
System.out.println(items);
}
})
.build();
}
chunk api를 사용하면 내부적으로 SimpleStepBuilder가 사용돼 chunk 기반 배치 단계 설정을 수행한다.
chunk api를 사용해 ItemReader, ItemProcessor, ItemWriter를 정의하면 내부적으로 스프링 배치가 ChunkProvider와 ChunkProcessor를 사용해서 배치 작업을 처리한다.
ChunkProvider는 ItemReader를 사용해서 데이터를 읽고 설정된 chunk의 크기만큼 데이터를 모아 chunk를 생성하는 작업을 수행한다.
외부에서 ChunkProvider가 호출될 때 마다 새로운 chunk가 생성돼 데이터를 읽어들이고 이 때 RepeatTemplate을 사용해 데이터를 반복적으로 읽어들이는 작업을 제어한다.
ChunkProvider가 chunk 단위로 데이터를 읽는 작업을 마무리한 경우, 완료된 결과가 ItemProcessor와 ItemProvider로 전달된다.
이 때 ItemProcessor와 ItemWriter를 결합해서 chunk 단위로 데이터를 처리하기 위해 ChunkProcessor가 사용되고, 역시 호출될 때 마다 새로운 chunk가 생성된다.
직접 ChunkProvider, ChunkProcessor 객체를 다룰 일은 거의 없지만.. ItemReader ItemProcessor ItemWriter를 선언하고 사용할 때, 디버깅모드로 애플리케이션을 실행하면 해당 객체들이 내부적으로 사용됨을 확인할 수 있다.
(디버깅으로 어떤 구조로 작동하는지 확인하면 오픈소스가 어떻게 구성되어있는지 알 수 있어 추후 특정 부분을 커스텀해서 사용할 때 유용할 것 같다. 좋은 코드를 볼 수 있어 코드 보는 인사이트도 넓어지고..)
ItemReader
csv, txt, xml, json, DB, MQ 등 다양한 입력으로부터 데이터를 하나씩 읽는 인터페이스로, 더 읽을 데이터가 없으면 null을 반환한다.
데이터를 읽어오는 source, 읽어오는 방식에 따라 인터페이스의 구현체들이 있으니 필요에 따라 사용하면 된다.
ItemWriter
데이터 하나가 아닌 chunk 단위로 데이터를 받아 일괄 출력 작업을 처리하는 인터페이스이다.
ItemReader 구현체와 1:1 대응 관계인 구현체들로 구성되어있다.
ItemProcessor
데이터를 ItemWriter에게 보내기 전에 비즈니스 로직을 적용하는 역할을 수행한다.
chunk 기반 배치 작업에서 필수 작업은 아니니지만 비즈니스 로직을 추가할 때 보통 ItemProcessor를 사용한다.
ItemReader와 ItemWriter는 스프링 배치가 제공하는 구현체를 직접 사용하는 경우가 많지만, ItemProcessor는 웬만하면 커스텀해서 구현하는 편이다.
ItemStream
ItemReader와 ItemWriter 처리 과정 중 상태를 저장하고 오류 발생 시 상태를 통해 실패한 곳 부터 재시작하도록 지원하는 인터페이스이고, ItemReader와 ItemWriter는 ItemStream을 구현해야 한다.
public class CustomItemStreamReader implements ItemStreamReader<String> {
private final List<String> items;
private int index = -1;
private boolean restart = false;
public CustomItemStreamReader(List<String> items) {
this.items = items;
this.index = 0;
}
@Override
public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
String item = null;
if(this.index < this.items.size()) {
item = this.items.get(index)
index++;
};
if (this.index == 6 && !restart) throw new RuntimeException("Restart required.");
return item;
}
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
if(executionContext.containsKey("index")) {
index = executionContext.getInt("index");
this.restart = true;
}
else {
index = 0;
executionContext.put("index", index);
}
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
executionContext.put("index", index);
}
@Override
public void close() throws ItemStreamException {
System.out.println("Resource closed.");
}
}
ExecutionContext를 매개변수로 받아 상태를 업데이트하는데, 이 때 업데이트 주기는 chunk size 기반으로 수행된다.
FlatFileItemReader로 csv, tsv 등 파일을 특정 단위로 읽어 자바 객체로 변환할 수 있고
StaxEventItemReader로 xml 파일을 읽어 자바 객체로 변환할 수 있고
JsonItemReader로 json 파일을 읽어 자바 객체로 변화할 수 있으며
Jdbc / Jpa / Cursor / Paging ItemReader로 데이터베이스에서 값을 읽어 자바 객체로 변환할 수 있다.
Writer는 Reader에서 읽어서 변환한 자바 객체를 저장하는 역할을 수행한다.
FlatFileItemWriter로 자바 객체를 파일에 저장할 수 있고
StaxEventItemWriter로 자바 객체를 XML 문서로 작성할 수 있고
JsonFileItemWriter로 자바 객체를 json 파일로 저장할 수 있으며
JdbcBatchItemWriter로 자바 객체를 데이터베이스에 저장할 수 있다.
JPA를 사용하지 않는 경우 MyBatis로 쿼리를 작성하는데... 스프링 배치가 제공하는 Reader, Writer 클래스도 MyBatis와 연동하는 기능을 제공하니 배치 작업에 복잡한 쿼리를 사용하는 경우 MyBatis를, 간단한 쿼리만 사용할 때는 Jdbc 방식을 사용하자.
미리 작성해 둔 Repository, Service 레이어를 스프링배치 Job 내부의 Reader, Writer에서 사용할 때는 스프링배치가 제공하는 Adapter 클래스를 사용한다.
반복을 제어할 때는 RepeatStatus를 사용한다.
RepeatStatus는 배치 처리 작업이 끝났는지 확인하기 위한 enum으로 CONTINUABLE과 FINISHED로 구성된다.
CompletionPolicy로 RepeatTemplate을 조작해 반복여부를 결정하고, 비정상적인 종료는 ExceptionHandler가 처리한다.
해당 인터페이스의 구현체를 제공하니 스프링 배치가 제공하는 구현체를 골라서 사용하면 된다.
청크 크기 기반 반복은 SimpleCompletionPolicy
시간 설정은 TimeoutTerminationPolicy
단순 예외 로깅은 DefaultExceptionHandler
지정한 횟수만큼의 예외를 허용하려면 SimpleLimitExceptionHandler
여러 요소를 조합해서 구현하는 경우 CompositeExceptionHandler / CompositeCompletionPolicy
.processor(new ItemProcessor<String ,String>() {
@Override
public String process(String item) throws Exception {
RepeatTemplate repeatTemplate = new RepeatTemplate();
repeatTemplate.setCompletionPolicy(new SimpleCompletionPolicy(3));
repeatTemplate.setCompletionPolicy(new TimeoutTerminationPolicy(3000));
CompositeCompletionPolicy completionPolicy = new CompositeCompletionPolicy();
CompletionPolicy[] completionPolicies = new CompletionPolicy[]{
new SimpleCompletionPolicy(3),
new TimeoutTerminationPolicy(3000)
};
completionPolicy.setPolicies(completionPolicies);
repeatTemplate.setCompletionPolicy(completionPolicy);
repeatTemplate.setExceptionHandler(new SimpleLimitExceptionHandler(3));
repeatTemplate.iterate(new RepeatCallback() {
@Override
public RepeatStatus doInIteration(RepeatContext context) throws Exception {
System.out.println("testing...");
return RepeatStatus.CONTINUABLE;
}
});
return item;
}
})
iterate 메서드에서는 주어진 반복 작업을 실행하고 CompletionPolicy가 만족될 때 까지 반복을 수행한다.
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) throws Exception {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(5, transactionManager)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.faultTolerant()
.skip(null)
.skipLimit(4)
.skipPolicy(null)
.retry(null)
.retryLimit(2)
.retryPolicy(null)
.build();
}
스프링배치는 Job 실행 중 오류가 발생한 경우 FaultTolerant 구조로 오류를 처리한다.
skip으로 특정 chunk나 item에서 오류가 발생한 경우 해당 오류를 무시하고 다음 항목을 처리하도록 설정할 수 있고
retry로 오류가 발생한 경우 정해진 횟수만큼 로직을 재시도하도록 설정할 수 있다.
faultTolerant() api를 사용해야 skip과 retry를 정의할 수 있다.
각 api는 처리할 예외와 최대 횟수를 지정할 수 있고 Policy를 직접 설정해 복잡한 로직에 대한 예외 처리를 작성할 수 있다.
꼭 데이터베이스 관련 작업이 아니더라도..
배치 프로세스에는 파일 복사, 디렉토리 생성 등 여러 가지 작업이 포함될 수 있는데 이런 경우에도 skip 아키텍처를 활용해 견고한 배치 애플리케이션을 구축할 수 있다.
'Spring > Spring Batch' 카테고리의 다른 글
[Spring Batch] Flow 아키텍처 (2) | 2024.07.21 |
---|---|
[Spring Batch] Job / Step 아키텍처 (0) | 2024.07.07 |
[Spring Batch] 배치 도메인 이해 (0) | 2024.06.30 |
[Spring Batch] ItemReader / ItemWriter (1) | 2024.02.27 |
[Spring Batch] 스프링 배치 내부 흐름 (0) | 2024.02.25 |
댓글
이 글 공유하기
다른 글
-
[Spring Batch] Flow 아키텍처
[Spring Batch] Flow 아키텍처
2024.07.21 -
[Spring Batch] Job / Step 아키텍처
[Spring Batch] Job / Step 아키텍처
2024.07.07 -
[Spring Batch] 배치 도메인 이해
[Spring Batch] 배치 도메인 이해
2024.06.30 -
[Spring Batch] ItemReader / ItemWriter
[Spring Batch] ItemReader / ItemWriter
2024.02.27