Run MapReduce Jobs
The MapReduce technique allows you to process large datasets in parallel by:
- Breaking the dataset in smaller subsets that can be processed in parallel by multiple nodes in the cluster.
- Processing each of the subsets independently.
- Aggregating the processed subsets into a final result.
Data transfer between nodes in the cluster, redundancy, and fault tolerance are all managed automatically for you.
The two stages of MapReduce
When creating a MapReduce job, you select which data to process by specifying:
- An entity Type to fetch from
- A filter to select only specific instances of that Type
- Which fields to fetch from the Type
Then, the data is processed in two stages: map and reduce.
Map
During the map stage, a map() method is called on subsets of the data. This method runs in parallel for different subsets, and executes whatever logic you want it to execute:
- In most cases, the method should return a map that maps unique keys to values.
- If the job is not intended to do any summarization, then it should return an empty map.
The resulting key-value pairs are shuffled, sorted, and then sent to the reduce stage.
Reduce
During the reduce phase, the reduce() method is called once for each unique key generated during the map stage, together with all the intermediate values associated with that key. Each call to reduce() creates an array of final output values for that key.
The final result of a MapReduce job is a map, mapping each unique key to the array of values generated during the reduce stage.
Example
The MapReduce technique is a good fit to count how many times a word shows up in a large corpus.
The first step is to create a Type to represent the corpus you want to analyze.
Model the input dataset
Start by defining a new Type to represent a large text corpus like a book or blog post.
/*
Corpus.c3typ
*
Represents a corpus of text like a book, article, or blog post
*/
entity type Corpus schema name 'CORPUS' {
title: string
@db(clob=true)
content: string
}Create a MapReduce job
The next step to create a MapReduce job is to create a new Type and make it extend MapReduce.
/**
* WordCount.c3typ
*
* Map-reduce job that counts the number of occurrences of each word in
* a given input.
*/
type WordCount extends MapReduce<Corpus, string, int, int> type key 'WORDCOUNT' {
map: ~ js-server
reduce: ~ js-server
}In this example, there are two methods being overridden. These methods are inherited from the MapReduce Type, and should be implemented for each MapReduce job.
Implement the logic
Now implement the logic for the map and reduce stages:
/** WordCount.js */
/**
*
* @param {int} batch The batch number being processed
* @param {Corpus[]} objs Object instances being processed
* @param {MapReduce} job The job instance
* @param {int} Subbatch number within the batch, starting at 1, being processed
* @returns {Map} A map with unique word to count of occurrences
*/
function map(batch, objs, job, subBatch) {
var wordCount = {};
objs.each(function (corpus) {
if (corpus.content) {
// Remove all non-alphanumeric characters, replace new lines by spaces
var str = corpus.content.replace(/[^\w\s]/gi, '').replace(/\s/gi, ' ');
var ary = corpus.content.split(" ");
_.each(ary, function (word) {
var count = wordCount[word] ? wordCount[word] : 0;
wordCount[word] = count + 1;
});
}
});
return wordCount;
}
/**
*
* @param {string} outKey Output key this function is being called for
* @param {int[]} interValues Intermediate counts for the same word
* @param {MapReduce} job The job instance
* @returns {int[]} A single value array with the number of times a word occurred
*/
function reduce(outKey, interValues, job) {
var count = 0;
interValues.each(function (value) {
count += value;
});
return [count];
}Start job
After deployment, you are ready to start the MapReduce job. The MapReduce job needs some data to process, so navigate to C3 AI Console and add data to the application.
Corpus.createBatch([
{title: 'Metamorphosis', content: 'One morning, when Gregor Samsa woke from troubled dreams, he found himself transformed in his bed into a horrible vermin. He lay on his armour-like back, and if he lifted his head a little he could see his brown belly, slightly domed and divided by arches into stiff sections. The bedding was hardly able to cover it and seemed ready to slide off any moment. His many legs, pitifully thin compared with the size of the rest of him, waved about helplessly as he looked. "Whats happened to me?" he thought. It wasnt a dream. His room, a proper human room although a little too small, lay peacefully between its four familiar walls. A collection of textile samples lay spread out on the table - Samsa was a travelling salesman - and above it there hung a picture that he had recently cut out of an illustrated magazine and housed in a nice, gilded frame. It showed a lady fitted out with a fur hat and fur boa who sat upright, raising a heavy fur muff that covered the whole of her lower arm towards the viewer. Gregor then turned to look out the window at the dull weather. Drops of rain could be heard hitting the pane, which made him feel quite sad. "How about if I sleep a little bit longer and forget all this nonsense", he thought, but that was something he was unable to do because he was used to sleeping on his right, and in his present state couldnt get into that position. However hard he threw himself onto his right, he always rolled back to where he was. He must have tried it a hundred times, shut his eyes so that he wouldnt have to look at the floundering legs, and only stopped when he began to feel a mild, dull pain there that he had never felt before.'},
{title: 'Frankenstein', content: 'You will rejoice to hear that no disaster has accompanied the commencement of an enterprise which you have regarded with such evil forebodings. I arrived here yesterday, and my first task is to assure my dear sister of my welfare and increasing confidence in the success of my undertaking. I am already far north of London, and as I walk in the streets of Petersburgh, I feel a cold northern breeze play upon my cheeks, which braces my nerves and fills me with delight. Do you understand this feeling? This breeze, which has travelled from the regions towards which I am advancing, gives me a foretaste of those icy climes. Inspirited by this wind of promise, my daydreams become more fervent and vivid. I try in vain to be persuaded that the pole is the seat of frost and desolation; it ever presents itself to my imagination as the region of beauty and delight. There, Margaret, the sun is for ever visible, its broad disk just skirting the horizon and diffusing a perpetual splendour. There—for with your leave, my sister, I will put some trust in preceding navigators—there snow and frost are banished; and, sailing over a calm sea, we may be wafted to a land surpassing in wonders and in beauty every region hitherto discovered on the habitable globe. Its productions and features may be without example, as the phenomena of the heavenly bodies undoubtedly are in those undiscovered solitudes. What may not be expected in a country of eternal light? I may there discover the wondrous power which attracts the needle and may regulate a thousand celestial observations that require only this voyage to render their seeming eccentricities consistent for ever.'},
{title: 'Dracula', content: '3 May. Bistritz. Left Munich at 8:35 P. M., on 1st May, arriving at Vienna early next morning; should have arrived at 6:46, but train was an hour late. BudaPesth seems a wonderful place, from the glimpse which I got of it from the train and the little I could walk through the streets. I feared to go very far from the station, as we had arrived late and would start as near the correct time as possible. The impression I had was that we were leaving the West and entering the East; the most western of splendid bridges over the Danube, which is here of noble width and depth, took us among the traditions of Turkish rule. We left in pretty good time, and came after nightfall to Klausenburgh. Here I stopped for the night at the Hotel Royale. I had for dinner, or rather supper, a chicken done up some way with red pepper, which was very good but thirsty. (Mem., get recipe for Mina.) I asked the waiter, and he said it was called "paprika hendl," and that, as it was a national dish, I should be able to get it anywhere along the Carpathians. I found my smattering of German very useful here; indeed, I dont know how I should be able to get on without it. Having had some time at my disposal when in London, I had visited the British Museum, and made search among the books and maps in the library regarding Transylvania; it had struck me that some foreknowledge of the country could hardly fail to have some importance in dealing with a nobleman of that country. I find that the district he named is in the extreme east of the country, just on the borders of three states, Transylvania, Moldavia and Bukovina, in the midst of the Carpathian mountains; one of the wildest and least known portions of Europe.'},
]);You can now start the WordCount MapReduce job. You can start a WordCount MapReduce job either as part of your application logic, or from the C3 AI Console.
// Create a new job to process 10 books per batch of work
var job = WordCount.make({
targetType: Corpus,
include: 'title, content',
batchSize: 10
}).upsert();
// Start the job
job.start();Monitor the job and get the result
You can check for the status of a job in your application or in the C3 AI Console:
var job = WordCount.get(/*JobId*/);
// The job status
job.status();
// Cancel the job without deleting the results of the job
job.cancel(false);When the job succeeds, you can get the result of the job by running the following example code snippet:
var job = WordCount.get(/*JobId*/);
// Get 100 first keys on the resulting map
job.results(100);MapReduce jobs are added to the MapReduceQueue for asynchronous processing. You can monitor the MapReduceQueue queue to find what caused the job to fail.
See Monitor and Manage Queues for more information.
Troubleshoot errors
When troubleshooting errors, start by looking at the job status. When the job fails, the job status contains a list of errors that can be helpful to identify issues.
var job = WordCount.get(/*JobId*/);
// Check of there are any errors
job.status().errors;When you find the root cause of the problem, update the logic in your application package to fix it and deploy the application package or MapReduce implementation to your development environment with the latest changes. Then, you can retry running the failed job with the new application logic. In C3 AI Console, run:
var job = WordCount.get(/*JobId*/);
// Retry running failed job
job.recover();Automatic retries
When one or more batches run into an error during their execution, the overall MapReduce job is marked as failed. You can configure a job to automatically schedule for execution the batches that lead to the job failing by running the following command.
// Retry the job once, if it fails. By default a job is not retried.
var job = WordCount.make({options: jobOptions, numAutoRecoveryAttempts: 1}).create();Configure priority
You can also customize the priority of the job so it takes precedence over others. See the following example code snippets for creating a job with a higher priority or increasing the priority of an existing job.
// Create a job with higher priority, with highest priority being 0
var job = WordCount.make({options: jobOptions, priority: 100}).create();
// Increase the priority of an existing job and any batches already scheduled
var job = WordCount.get(/*JobId*/);
job.setPriority(100);