Class ResourcePayloadReader
- java.lang.Object
-
- javax.batch.api.chunk.AbstractItemReader
-
- com.ibm.fhir.jbatch.bulkdata.export.fast.ResourcePayloadReader
-
- All Implemented Interfaces:
javax.batch.api.chunk.ItemReader
@Dependent public class ResourcePayloadReader extends javax.batch.api.chunk.AbstractItemReader
A high-performance version of the bulk-export job which doesn't support typeFilter and so can use a more efficient mechanism to page through the set of resources filtered by _lastModified. The Java Batch framework separates the reader from the writer. On the face of it, separation of concerns seems like a good idea. However, because the fetching of the data is controlled by the persistence layer and the fact that we may need to end one multi-part upload and continue feeding a new multi-part upload, the separation makes things a lot harder and less readable - there is a lack of cohesion. To address this, we handle both reading and writing in this class, leaving the ItemWriter as a minimal stub. The framework is still leveraged for checkpointing. ThereadItem()
call simply returns an Object if there is more data to process. TheResourceExportCheckpointAlgorithm
will always say we need a checkpoint, so the checkpoint will occur immediately after each read/write. The export is complete whenreadItem()
returns null. The export assumes that the persistence layer can efficiently scan forward based on the last_updated time of a resource. It is possible that multiple resources may share the same last_updated time, so this class has to track which resources have already been processed for a given timestamp. This is fairly easy, because the persistence layer must provide the data in order of last_updated. Because the number of resources for a given timestamp is probably very small, it is more efficient to track it this way than asking the persistence layer (e.g. JDBC) to sort. This can cause performance issues, because it negates the benefit of traversing an index in order.
-
-
Field Summary
Fields Modifier and Type Field Description (package private) String
cosApiKeyProperty
The IBM COS API key or S3 access key.(package private) String
cosBucketFileMaxResources
(package private) String
cosBucketName
The Cos bucket name.(package private) String
cosBucketPathPrefix
The Cos bucket path prefix.(package private) String
cosCredentialIbm
If use IBM credential or Amazon secret keys.(package private) String
cosEndpointUrl
The Cos End point URL.(package private) String
cosLocation
The Cos End point location.(package private) String
cosSrvinstId
The IBM COS service instance id or s3 secret key.(package private) String
fhirDatastoreId
Fhir data store id.protected String
fhirExportFormat
Fhir export format.(package private) FHIRPersistence
fhirPersistence
(package private) String
fhirResourceType
Fhir resource type to process.(package private) String
fhirSearchFromDate
Fhir Search from date.(package private) String
fhirSearchPageSize
Fhir search page size.(package private) String
fhirSearchToDate
Fhir search to date.(package private) String
fhirTenant
Fhir tenant id.(package private) String
fhirTypeFilters
Fhir export type filters.(package private) String
incomingUrl
(package private) javax.batch.runtime.context.JobContext
jobContext
(package private) int
resourcesPerObject
(package private) Class<? extends Resource>
resourceType
(package private) javax.batch.runtime.context.StepContext
stepCtx
-
Constructor Summary
Constructors Constructor Description ResourcePayloadReader()
Public constructor
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description Serializable
checkpointInfo()
void
close()
protected boolean
isTxTimeExpired()
Check to see if the current clock time exceeds the marker time we laid down to stop processing to make sure we commit before the Liberty transaction timeout limit.void
open(Serializable checkpoint)
Boolean
processPayload(ResourcePayload t)
Process this payload result.Object
readItem()
-
-
-
Field Detail
-
fhirPersistence
FHIRPersistence fhirPersistence
-
fhirTenant
@Inject String fhirTenant
Fhir tenant id.
-
fhirDatastoreId
@Inject String fhirDatastoreId
Fhir data store id.
-
fhirResourceType
@Inject String fhirResourceType
Fhir resource type to process.
-
cosApiKeyProperty
@Inject String cosApiKeyProperty
The IBM COS API key or S3 access key.
-
cosSrvinstId
@Inject String cosSrvinstId
The IBM COS service instance id or s3 secret key.
-
cosEndpointUrl
@Inject String cosEndpointUrl
The Cos End point URL.
-
cosLocation
@Inject String cosLocation
The Cos End point location.
-
cosBucketName
@Inject String cosBucketName
The Cos bucket name.
-
cosBucketPathPrefix
@Inject String cosBucketPathPrefix
The Cos bucket path prefix.
-
cosCredentialIbm
@Inject String cosCredentialIbm
If use IBM credential or Amazon secret keys.
-
fhirExportFormat
@Inject protected String fhirExportFormat
Fhir export format.
-
fhirSearchFromDate
@Inject String fhirSearchFromDate
Fhir Search from date.
-
fhirSearchToDate
@Inject String fhirSearchToDate
Fhir search to date.
-
fhirTypeFilters
@Inject String fhirTypeFilters
Fhir export type filters. Ignored for the basicsystem export
-
fhirSearchPageSize
@Inject String fhirSearchPageSize
Fhir search page size. Not used
-
incomingUrl
@Inject String incomingUrl
-
cosBucketFileMaxResources
@Inject String cosBucketFileMaxResources
-
resourcesPerObject
int resourcesPerObject
-
stepCtx
@Inject javax.batch.runtime.context.StepContext stepCtx
-
jobContext
@Inject javax.batch.runtime.context.JobContext jobContext
-
-
Method Detail
-
open
public void open(Serializable checkpoint) throws Exception
- Specified by:
open
in interfacejavax.batch.api.chunk.ItemReader
- Overrides:
open
in classjavax.batch.api.chunk.AbstractItemReader
- Throws:
Exception
-
readItem
public Object readItem() throws Exception
- Specified by:
readItem
in interfacejavax.batch.api.chunk.ItemReader
- Specified by:
readItem
in classjavax.batch.api.chunk.AbstractItemReader
- Throws:
Exception
-
processPayload
public Boolean processPayload(ResourcePayload t)
Process this payload result. Called as a lambda (callback) from the persistence layer. This method collects the payloads into a buffer and will trigger a write of that buffer to COS if we hit a certain threshold.- Parameters:
t
-- Returns:
-
isTxTimeExpired
protected boolean isTxTimeExpired()
Check to see if the current clock time exceeds the marker time we laid down to stop processing to make sure we commit before the Liberty transaction timeout limit.- Returns:
-
checkpointInfo
public Serializable checkpointInfo() throws Exception
- Specified by:
checkpointInfo
in interfacejavax.batch.api.chunk.ItemReader
- Overrides:
checkpointInfo
in classjavax.batch.api.chunk.AbstractItemReader
- Throws:
Exception
-
-