您需要在阅读器中返回一个检查点值,该值 将 在重新启动时传递到阅读器的方法中。阅读器和批处理容器通过这种方式协调以在重新启动时提供检查点。
因此,您可能会遇到类似(查找 注释)的情况:
public class DBItemReader implements ItemReader {
// ...
// CHECKPOINT field defined
private String checkpoint = null;
@Override
public void open(Serializable checkpoint) throws NamingException, sqlException {
// CHECKPOINT-based positioning through query value.
// Initial position = whereclauseFrom, on restart set to checkpoint
String queryVal = (String)(checkpoint == null ? whereclauseFrom : checkpoint);
if(Integer.parseInt(whereclauseFrom) == 5){
sql = "SELECT * FROM " + tableName + " WHERE CAST(REC AS INTEGER) <= "+ queryVal;
}else if(Integer.parseInt(whereclauseFrom) == 6){
sql = "SELECT * FROM " + tableName + " WHERE CAST(REC AS INTEGER) >= "+ queryVal;
}
// ..
}
@Override
public Object readItem() throws sqlException {
if (listRecObj.size() == 0) {
return null;
} else {
RecObj rec =null;
Iterator<RecObj> iter =listRecObj.iterator();
while (iter.hasNext()) {
rec = iter.next();
// CHECKPOINT updated
checkpoint = rec.getRec();
if (Integer.parseInt(rec.getRec()) == 7) {
throw new IllegalStateException("Thrown Error");
}
}
}
// ...
}
@Override
public Serializable checkpointInfo() {
// CHECKPOINT returned at end of chunk
return checkpoint;
}
}