- Java Batchとは
- Job の種類と実行制御
- Job作成の流れ
- Chunk の実装例
- Batchlet の簡単な例
- バッチステータス
- Job Specification Language
- Job 定義の要素
- Step 要素
- chunk ステップの例
- タスクステップの例
- partition ステップの例
- フロー要素
- スプリット要素
- decision 要素
- JobContext
- Job のスケジュール起動
- listener
- CDI 名による参照
- IDE サポート
Java Batchとは
JSR-352 Batch Applications for the Java Platform として仕様化されたバッチ処理用 API。 Java EE 7 に取り込まれた。通称 jBatch (日本のみ?)。
構造は Spring Batch とほとんど同じ。標準化されるのは良いことだ。
Job の種類と実行制御
Job の処理は2種類ある。
- STEPA のように 入力・処理・出力 をある塊で処理していくもの(Chunk)
- STEPB のように なんらかのタスクとして処理するもの(Batchlet)
実行順序の制御は xml ファイルで定義する。 以下のような実行制御ができる。
- シーケンシャルに実行することもできるし、パラレルで実行することもできる
- チェックポイントを設けて、チェックポイントからリランなどできる
- エラー発生時に、その対象をスキップして処理を継続するなど実行制御できる
Job作成の流れ
javax.batch.api.chunk.ItemReader
javax.batch.api.chunk.ItemProcessor
javax.batch.api.chunk.ItemWriter
というインターフェースの実装クラスをそれぞれ作成し、以下のような Job 定義でMyReader
MyProcessor
MyWriter
などと指定する。
<?xml version="1.0" encoding="UTF-8"?> <job id="simplejob" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0"> <step id="mychunk"> <chunk> <reader ref="MyReader"></reader> <processor ref="MyProcessor"></processor> <writer ref="MyWriter"></writer> </chunk> </step> </job>
Jobの起動は以下のように BatchRuntime
を介して行う。
JobOperator jobOperator = BatchRuntime.getJobOperator(); long execID = jobOperator.start("simplejob", new Properties());
start の第一引数にはxmlで定義した job の id を指定する。
なお、インターフェースの他、 AbstractItemReader
、 AbstractItemProcessor
、 AbstractItemItemWriter
といった抽象クラスも用意されている。
Chunk の実装例
前述の通り、ItemReader、ItemProcessor、ItemWriter を作成する。
ItemReader
ItemReader は対象アイテムの読み込み処理を書く。
public class MyReader implements ItemReader { private BufferedReader breader; @Inject JobContext jobCtx; @Override public void open(Serializable checkpoint) throws Exception { String fileName = jobCtx.getProperties().getProperty("input_file"); breader = new BufferedReader(new FileReader(fileName)); } @Override public void close() throws Exception { breader.close(); } @Override public Object readItem() throws Exception { String line = breader.readLine(); return line; } }
DI される JobContext
から外部定義したプロパティを取得できる。
この例では指定ファイルを読み込むだけ。
ItemProcessor
アイテムに対する処理を書く。
public class MyProcessor implements ItemProcessor { @Override public Object processItem(Object obj) throws Exception { String line = (String) obj; return line.toUpperCase(); } }
ここでは toUpperCase しているのみ。
ItemWriter
処理結果の書き込み。
public class MyWriter implements ItemWriter { private BufferedWriter bwriter; @Inject private JobContext jobCtx; @Override public void open(Serializable checkpoint) throws Exception { String fileName = jobCtx.getProperties().getProperty("output_file"); bwriter = new BufferedWriter(new FileWriter(fileName)); } @Override public void writeItems(List<Object> items) throws Exception { for (int i = 0; i < items.size(); i++) { String line = (String) items.get(i); bwriter.write(line); bwriter.newLine(); } } @Override public Serializable checkpointInfo() throws Exception { return null; }
writeItems()
の引数が List になっている通り、processItem
での結果は塊(Chunk) で扱われる。
デフォルトでは 10 個のアイテムが Chunk として扱われる。
checkpointInfo()
については後述。
Job 定義
定義は以下のように xml で定義する。
<?xml version="1.0" encoding="UTF-8"?> <job id="simplejob" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0"> <properties> <property name="input_file" value="input.txt"/> <property name="output_file" value="output.txt"/> </properties> <step id="mychunk"> <chunk> <reader ref="MyReader"></reader> <processor ref="MyProcessor"></processor> <writer ref="MyWriter"></writer> </chunk> </job>
propertie を合わせて定義している。
MyReader
などはパッケージ含めたクラス名を書く。
Batchlet の簡単な例
タスク指向のバッチステップは Batchlet で扱う。
javax.batch.api.Batchlet
を実装する。
もちろん javax.batch.api.AbstractBatchlet
もある。
public class MyBatchlet implements Batchlet { @Inject private JobContext jobCtx; @Override public String process() throws Exception { String fileName = jobCtx.getProperties().getProperty("output_file"); System.out.println(""+(new File(fileName)).length()); return "COMPLETED"; } }
定義ファイルは以下のようになる。
<?xml version="1.0" encoding="UTF-8"?> <job id="simplejob" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0"> <properties> <property name="input_file" value="input.txt"/> <property name="output_file" value="output.txt"/> </properties> <step id="mytask"> <batchlet ref="MyBatchlet"></batchlet> <end on="COMPLETED"/> </step> </job>
batchlet
要素で作成した MyBatchlet を指定している。
バッチステータス
バッチ のステータスは JobExecution
を介して取得できる。
long execID = jobOperator.start("simplejob", new Properties()); JobExecution jobExec = jobOperator.getJobExecution(execID); String status = jobExec.getBatchStatus().toString();
以下のような enum が用意されている。
public enum BatchStatus { STARTING, STARTED, STOPPING, STOPPED, FAILED, COMPLETED, ABANDONED }
ステータスの意味は、
Value | Description |
---|---|
STARTING | ジョブがバッチランタイムにサブミットされた |
STARTED | ジョブの実行中. |
STOPPING | ジョブ中断のリクエスト受付済み |
STOPPED | ジョブが中断された |
FAILED | エラーによりジョブ実行が終了した |
COMPLETED | ジョブが成功終了した |
ABANDONED | ジョブが破棄としてマークされた |
Job Specification Language
ジョブの実行制御は xml ファイルで指定する。配置場所は以下。
- jar の場合
META-INF/batch-jobs/
- warの場合
WEB-INF/classes/META-INF/batch-jobs/
ファイル名はジョブIDと一致させる必要があり、以下の場合は simplejob.xml
とする。
<?xml version="1.0" encoding="UTF-8"?> <job id="simplejob" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0"> ・・・・ </job>
Job 定義の要素
job
要素がトップレベルとなる。
<job id="jobname" restartable="true"> <listeners> <listener ref="com.xyz.pkg.ListenerBatchArtifact"/> </listeners> <properties> <property name="propertyName1" value="propertyValue1"/> <property name="propertyName2" value="propertyValue2"/> </properties> <step ...> ... </step> <step ...> ... </step> <decision ...> ... </decision> <flow ...> ... </flow> <split ...> ... </split> </job>
実行制御は以下で行う。
- ステップ(Steps)
- フロー(Flows)
- スプリット(Splits)
- 条件分岐(Decision elements)
listeners
でイベントをインタセプトするリスナを定義する。
properties
で Job に渡すプロパティを定義する。
Step 要素
step
要素に next
で次Jobを指定する。
<job id="loganalysis" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0"> <step id="logprocessor" next="cleanup"> <chunk checkpoint-policy="item" item-count="10"> <reader ref="com.xyz.pkg.LogItemReader"></reader> <processor ref="com.xyz.pkg.LogItemProcessor"></processor> <writer ref="com.xyz.pkg.LogItemWriter"></writer> </chunk> </step> <step id="cleanup"> <batchlet ref="com.xyz.pkg.CleanUp"></batchlet> <end on="COMPLETED"/> </step> </job>
上記例では next="cleanup"
として次ステップを定義している。
chunk 要素では item-count
で chunk サイズを指定する。この例では 10 回毎に ItemWriter へ制御が移る。
checkpoint-policy
で item
が指定されているため、chunk 毎にコミットされる。
checkpoint-policy
で custom
を指定すると checkpoint-algorithm
で指定するアルゴリズムでコミットされる。
chunk ステップの例
具体的な chunk ステップの例は以下のような感じになる。
<step id="stepC" next="stepD"> <chunk checkpoint-policy="item" item-count="5" time-limit="180" buffer-items="true" skip-limit="10" retry-limit="3"> <reader ref="pkg.MyItemReaderImpl"></reader> <processor ref="pkg.MyItemProcessorImpl"></processor> <writer ref="pkg.MyItemWriterImpl"></writer> <skippable-exception-classes> <include class="pkg.MyItemException"/> <exclude class="pkg.MyItemSeriousSubException"/> </skippable-exception-classes> <retryable-exception-classes> <include class="pkg.MyResourceTempUnavailable"/> </retryable-exception-classes> </chunk> </step>
chunk ステップの要素
chunk ステップには以下のような要素を指定可能。
<step id="stepA" next="stepB"> <properties> ... </properties> <listeners> <listener ref="MyItemReadListenerImpl"/> ... </listeners> <chunk ...> ... </chunk> <partition> ... </partition> <end on="COMPLETED" exit-status="MY_COMPLETED_EXIT_STATUS"/> <stop on="MY_TEMP_ISSUE_EXIST_STATUS" restart="step0"/> <fail on="MY_ERROR_EXIT_STATUS" exit-status="MY_ERROR_EXIT_STATUS"/> </step>
partition
は並列実行を定義する。
end on
は終了ステータスに割り当てるバッチステータスを定義する。
stop on
はJobをストップする条件を定義する。
fail on
は失敗に割り当てる終了状態を定義する。
タスクステップの例
具体的な chunk ステップの例は以下のような感じになる。
<step id="stepD" next="stepE"> <batchlet ref="pkg.MyBatchletImpl"> <properties> <property name="pname" value="pvalue"/> </properties> </batchlet> </step>
タスクステップの要素
タスクステップには以下のような要素を指定可能。
<step id="stepB" next="stepC"> <batchlet ...> ... </batchlet> <properties> ... </properties> <listener ref="MyStepListenerImpl"/> </step>
partition ステップの例
扱うデータを分割して並列に扱う。
plan 利用
<step id="stepE" next="stepF"> <chunk> <reader ...></reader> <processor ...></processor> <writer ...></writer> </chunk> <partition> <plan partitions="2" threads="2"> <properties partition="0"> <property name="firstItem" value="0"/> <property name="lastItem" value="500"/> </properties> <properties partition="1"> <property name="firstItem" value="501"/> <property name="lastItem" value="999"/> </properties> </plan> </partition> <reducer ref="MyPartitionReducerImpl"/> <collector ref="MyPartitionCollectorImpl"/> <analyzer ref="MyPartitionAnalyzerImpl"/> </step>
mapper 利用
<step id="stepE" next="stepF"> <chunk> <reader ...></reader> <processor ...></processor> <writer ...></writer> </chunk> <partition> <mapper ref="MyPartitionMapperImpl"/> <reducer ref="MyPartitionReducerImpl"/> <collector ref="MyPartitionCollectorImpl"/> <analyzer ref="MyPartitionAnalyzerImpl"/> </partition> </step>
フロー要素
Job のステップをまとめて一処理単位として扱う。
job
flow
split
の子要素となる。
<flow id="flowA" next="stepE"> <step id="flowAstepA" next="flowAstepB">...</step> <step id="flowAstepB" next="flowAflowC">...</step> <flow id="flowAflowC" next="flowAsplitD">...</flow> <split id="flowAsplitD" next="flowAstepE">...</split> <step id="flowAstepE">...</step> </flow>
スプリット要素
フローはそれぞれ別スレッドで実行される。 すべて完了した時に次へ遷移する。 複数の業務処理を並列で実行(partition は扱うデータを並列で処理)。
job
flow
の子要素となる。
<split id="splitA" next="stepB"> <flow id="splitAflowA" next="splitAflowB">...</flow> <flow id="splitAflowB">...</flow> <flow id="splitAflowC">...</flow> </split>
decision 要素
前ステップの完了ステータスにて、次ステップの分岐や続行/継続を判断する。
job
flow
の子要素となる。
<decision id="decisionA" ref="MyDeciderImpl"> <fail on="FAILED" exit-status="FAILED_AT_DECIDER"/> <end on="COMPLETED" exit-status="COMPLETED_AT_DECIDER"/> <stop on="MY_TEMP_ISSUE_EXIST_STATUS" restart="step2"/> </decision>
分岐。
<decision id="decisionA" ref="MyDeciderImpl"> <next on="FAILED" to="fooStep"/> <end on="COMPLETED" to="barStep"/> </decision>
JobContext
バッチ処理の各実装クラスには CDI によるコンテキストのDIが可能。
public class MyItemReaderImpl implements ItemReader { @Inject JobContext jobCtx; public MyItemReaderImpl() {} @Override public void open(Serializable checkpoint) throws Exception { String fileName = jobCtx.getProperties() .getProperty("log_file_name"); ・・・ } ・・・ }
JobContext
は CDI 経由で取得できる。
ただし コンストラクタで JobContext を取得することがはできない。
Job のスケジュール起動
@Schedule にてJobのスケジュール起動ができる。
@Singleton public class BatchJobRunner { @Schedule(deyOfWeek = "Sun") public void scheduleJob() { JobOperator jobOperator = BatchRuntime.getJobOperator(); jobOperator.start("simplejob", new Properties()); }
ManagedScheduledExecutorServicd 経由の例。
@Resource(lookup = "java:comp/DefaultManagedScheduledExecutorServicd") private ManagedScheduledExecutorService executor; public void scheduleJob() { JobOperator jobOperator = BatchRuntime.getJobOperator(); executor.schedule( () -> jobOperator.start("", new Properties()), 7, TimeUnit.DAYS); }
listener
Job 実行の各タイミングで listener が用意されている。
- Job の実行前後 : javax.batch.api.listener.JobListener
- ステップの前後 : javax.batch.api.listener.StepListener
- 各Chunkの実行前後 : javax.batch.api.chunk.listener.ChunkListener
- 各Itemの読み込み前後 : javax.batch.api.chunk.listener.ItemReadListener
- 各Itemの処理前後 : javax.batch.api.chunk.listener.ItemProcessListener
- 各Itemの書き込み前後 : javax.batch.api.chunk.listener.ItemWriteListener
- リトライ時のItem読み込み時 : javax.batch.api.chunk.listener.RetryReadListener
- リトライ時の処理時 : javax.batch.api.chunk.listener.RetryProcessListener
- リトライ時の書き込み時 : javax.batch.api.chunk.listener.RetryWriteListener
- 読み込みクラスからのSkip可能例外発生時 : javax.batch.api.chunk.listener.SkipReadListener
- 処理クラスからのSkip可能例外発生時 : javax.batch.api.chunk.listener.SkipProcessListener
- 書き込みクラスからのSkip可能例外発生時 : javax.batch.api.chunk.listener.SkipWriteListener
リスナの実装クラスを用意しJob定義の listener
要素として定義する。
CDI 名による参照
Named 定義しておくと
@Dependent @Named("myItemReaderImpl") public class MyItemReaderImpl implements ItemReader { ・・・ }
@Dependent
は CDI1.1 からの CDI デフォルト化 でスコープアノテーションがあるものが CDI 対象となるため付与。
beans.xml で bean-discovery-mode="all"
の指定であれば不要。
CDI 名で定義できる
<chunk> <reader ref="myItemReaderImpl"></reader> ... </chunk>
IDE サポート
Netbeans では jBatch Suite が提供されている。
プラグイン入れる。
GUI で組んで Generate すると、Job定義の xml やら ステップの実装の骨組みを生成してくれる。