Implement Parallel Batch Jobs
A batch job allows you to process large datasets in parallel by breaking the work in three stages:
- Specify which dataset to process. Your logic defines how to create smaller batches from the original dataset, and then schedules the batches for execution.
- Define what logic to run for that batch. Batches are processed in parallel using the same logic.
- Optionally, implement some logic that runs when all batches are successfully processed.
Data transfer between nodes in the cluster, redundancy, and fault tolerance are managed automatically for you.
Following the examples below to create a batch job to process a corpus like a set of books or blog posts to remove all non-alphanumeric characters.
Create a new batch job Type
The BatchJob Type implements the logic for running batch jobs, so to create a new batch job, define a new Type, and configure it to extend from BatchJob:
/**
* WordCount.c3typ
*
* Batch job that cleans a text dataset so that it contains only alphanumeric characters
*/
type WordCount extends BatchJob<WordCount, WordCountOptions, WordCountBatch> type key 'WORDCOUNT' {
doStart: ~ js-server
processBatch: ~ js-server
// omitting as no action is required at the conclusion of the job
// allComplete: ~ js-server
}Notice that BatchJob is a parametric Type, so you need to specify three Type parameters.
| Parameter | Description |
|---|---|
| JT | Job Type: The Type to use for the job. This is usually the same Type you are creating. |
| OT | Options Type: The Type to use when configuring options for the job. |
| BT | Batch Type: The Type to use that represents a batch of work. |
The base BatchJob has three abstract methods corresponding to the three stages of the job.
| Method | Override | Description |
|---|---|---|
doStart(job, options) | Mandatory | Called when starting the batch job. Contains the logic to create and schedule smaller batches. |
processBatch(batch, job, options) | Mandatory | Called for every batch. Contains the logic to process a batch. |
allComplete(job, options) | Optional | Called when all batches have finished processing successfully. |
When defining a batch job, make sure the new Type you are creating overrides the mandatory methods.
Define options and batch Types
After defining the new batch job Type, you can implement the Types used to parametrize BatchJob. Define the Type for configuring the job:
/**
*
* Represents customization options for a WordCount batch job.
*/
type WordCountOptions {
batchSize: int = 10
removeDigits: boolean = true;
}Define the Type to define a batch of work:
/**
*
* Represents a unit of work (batch) in a WordCount batch job.
*/
type WordCountBatch {
values: [Corpus]
}In this example, the batch job manipulates a corpus like a set of books or blog posts, so you also need to define that Type:
/*
* Corpus.c3typ
*
* Represents a corpus of text like a book, article, or blog post
*/
entity type Corpus schema name 'CORPUS' {
title: string
content: string
processed: boolean = false
}Implement the methods
Now that all the Types are defined, you can implement the logic for the new batch job.
// WordCount.js
/**
*
* @param {WordCount} job
* @param {WordCountOptions} options
*/
function doStart(job, options) {
var batch = ArrayType.of(Corpus.meta().referenceType()).makeBuilder();
var dataset = Corpus.fetchObjStream({
include: 'id, title, content',
filter: 'processed == false',
limit: -1
});
while(dataset.hasNext()) {
batch.push(dataset.next());
// Break dataset in batches and schedule them for processing
if(batch.size() >= options.batchSize || !dataset.hasNext()){
var batchSpec = WordCountBatch.make({values: batch.build()});
job.scheduleBatch(batchSpec);
batch = ArrayType.of(Corpus.meta().referenceType()).makeBuilder();
}
}
}
/**
*
* @param {WordCountBatch} batch
* @param {WordCount} job
* @param {WordCountOptions} options
*/
function processBatch(batch, job, options) {
// Enable logger for processBatch method for debugging.
var logger = Logger.for("c3.WordCount.processBatch");
var corpusAry = ArrayType.of(Corpus.meta().referenceType()).makeBuilder();
// Each batch has multiple corpus
_.each(batch.values, function (corpus) {
if(corpus.content) {
// Remove alphanumeric characters, and update corpus
var regexp = options.removeDigits ? /[\W\d]+/gi : /[\W]+/gi;
// Corpus batch is immutable. Create variable to store modified corpus content
// and add to array to be merged.
var newCorpus = corpus.content.replace(regexp, ' ');
corpusAry.push(Corpus.make({id: corpus.id, content: newCorpus, processed: true}));
}
});
logger.info("corpusAry size = " + corpusAry.size());
Corpus.mergeBatch(corpusAry.build());
}
/**
* Implement if needed
*
* @param {WordCount} job
* @param {WordCountOptions} options
*
* function allComplete(job, options) {}
*/Deploy the batch job
The next step after implementing the batch job is to deploy the batch job. Once the Batch Job is deployed, you are ready to start the batch job. The batch job needs some data to process, so navigate to the C3 AI Console, and add data to the application:
Corpus.createBatch([
{title: 'Metamorphosis', content: 'One morning, when Gregor Samsa woke from troubled dreams, he found himself transformed in his bed into a horrible vermin. He lay on his armour-like back, and if he lifted his head a little he could see his brown belly, slightly domed and divided by arches into stiff sections. The bedding was hardly able to cover it and seemed ready to slide off any moment. His many legs, pitifully thin compared with the size of the rest of him, waved about helplessly as he looked. "Whats happened to me?" he thought. It wasnt a dream. His room, a proper human room although a little too small, lay peacefully between its four familiar walls. A collection of textile samples lay spread out on the table - Samsa was a travelling salesman - and above it there hung a picture that he had recently cut out of an illustrated magazine and housed in a nice, gilded frame. It showed a lady fitted out with a fur hat and fur boa who sat upright, raising a heavy fur muff that covered the whole of her lower arm towards the viewer. Gregor then turned to look out the window at the dull weather. Drops of rain could be heard hitting the pane, which made him feel quite sad. "How about if I sleep a little bit longer and forget all this nonsense", he thought, but that was something he was unable to do because he was used to sleeping on his right, and in his present state couldnt get into that position. However hard he threw himself onto his right, he always rolled back to where he was. He must have tried it a hundred times, shut his eyes so that he wouldnt have to look at the floundering legs, and only stopped when he began to feel a mild, dull pain there that he had never felt before.'},
{title: 'Frankenstein', content: 'You will rejoice to hear that no disaster has accompanied the commencement of an enterprise which you have regarded with such evil forebodings. I arrived here yesterday, and my first task is to assure my dear sister of my welfare and increasing confidence in the success of my undertaking. I am already far north of London, and as I walk in the streets of Petersburgh, I feel a cold northern breeze play upon my cheeks, which braces my nerves and fills me with delight. Do you understand this feeling? This breeze, which has travelled from the regions towards which I am advancing, gives me a foretaste of those icy climes. Inspirited by this wind of promise, my daydreams become more fervent and vivid. I try in vain to be persuaded that the pole is the seat of frost and desolation; it ever presents itself to my imagination as the region of beauty and delight. There, Margaret, the sun is for ever visible, its broad disk just skirting the horizon and diffusing a perpetual splendour. There—for with your leave, my sister, I will put some trust in preceding navigators—there snow and frost are banished; and, sailing over a calm sea, we may be wafted to a land surpassing in wonders and in beauty every region hitherto discovered on the habitable globe. Its productions and features may be without example, as the phenomena of the heavenly bodies undoubtedly are in those undiscovered solitudes. What may not be expected in a country of eternal light? I may there discover the wondrous power which attracts the needle and may regulate a thousand celestial observations that require only this voyage to render their seeming eccentricities consistent for ever.'},
{title: 'Dracula', content: '3 May. Bistritz. Left Munich at 8:35 P. M., on 1st May, arriving at Vienna early next morning; should have arrived at 6:46, but train was an hour late. BudaPesth seems a wonderful place, from the glimpse which I got of it from the train and the little I could walk through the streets. I feared to go very far from the station, as we had arrived late and would start as near the correct time as possible. The impression I had was that we were leaving the West and entering the East; the most western of splendid bridges over the Danube, which is here of noble width and depth, took us among the traditions of Turkish rule. We left in pretty good time, and came after nightfall to Klausenburgh. Here I stopped for the night at the Hotel Royale. I had for dinner, or rather supper, a chicken done up some way with red pepper, which was very good but thirsty. (Mem., get recipe for Mina.) I asked the waiter, and he said it was called "paprika hendl," and that, as it was a national dish, I should be able to get it anywhere along the Carpathians. I found my smattering of German very useful here; indeed, I dont know how I should be able to get on without it. Having had some time at my disposal when in London, I had visited the British Museum, and made search among the books and maps in the library regarding Transylvania; it had struck me that some foreknowledge of the country could hardly fail to have some importance in dealing with a nobleman of that country. I find that the district he named is in the extreme east of the country, just on the borders of three states, Transylvania, Moldavia and Bukovina, in the midst of the Carpathian mountains; one of the wildest and least known portions of Europe.'},
]);Start the job
With seed data, you can now start the batch job. You can start a batch job either as part of your application logic, or from the C3 AI Console.
// Configure the job options
var jobOptions = WordCountOptions.make({batchSize: 10, removeDigits: true});
// Create the batch job
var job = WordCount.make({id: "WordCount", options: jobOptions}).merge();
// Start the batch job
job.start();Monitor job
You can check for the status of a job in your application or in the C3 AI Console:
var job = WordCount.make({id: "WordCount"}).get();
// The job status
job.status();
// Cancel the job without deleting the error status
job.cancel(false);In this example, the logic of the batch job fetches unprocessed records from Corpus, processes each record, and updates it with the new value. When the batch job has completed successfully, you can check the updated records. In the C3 AI Console, run the following command.
c3Grid(Corpus.fetch())All batch jobs are added to the BatchQueue for asynchronous processing, so you can also monitor the status of the queue.
Troubleshoot errors
When troubleshooting for errors, start by looking at the job status. When the job fails, the job status contains a list of errors that can be helpful to troubleshoot:
var job = WordCount.make({id: "WordCount"}).get();
// Check if there are any errors
job.status().errors;When you find the root cause of the problem, update the logic in your application package to fix it and provision the application package with the latest changes. Then, you can retry running the failed job with the new application logic. In the C3 AI Console, run the following command.
var job = WordCount.make({id: "WordCount"}).get();
// Retry running failed job
job.recover();Automatic retries
When one or more batches run into an error during their execution, the overall batch job is marked as failed. You can configure a job to automatically schedule for execution the batches that lead to the job failing.
// Retry the job once, if it fails. By default a job is not retried.
var job = WordCount.make({options: jobOptions, numAutoRecoveryAttempts: 1}).upsert();Configure priority
You can also customize the priority of the job so it takes precedence over others:
// Create a job with higher priority, with highest priority being 0
var job = WordCount.make({options: jobOptions, priority: 100}).upsert();
// Increase the priority of an existing job and any batches already scheduled
var job = WordCount.get(/*JobId*/);
job.setPriority(100);