| JSON Control Record | Description |
|---|---|
| { | |
| “type”:”recurring_event”, | The KEY will be <<type>>::<<id>> |
| “id”:1, | |
| “hour”:14, | The hour of the day 0-23, *, *2X, *4X to trigger |
| “min”:54, | The minute in the hour 0-59, *, *2X, *4X to trigger |
| “action”:”doCronActionA”, | JavaScript function to run when the timer fires |
| “active”:true, | Flag to enable or disable this schedule |
| “verbose”: { | [OPTIONAL] logging control |
| “user_func”:2, | Logging level for the action logic : 0=none, etc. etc. |
| “scheduler”:3 | Logging level for the cron logic : 0=none, etc. etc. |
| }, | |
| “dynamic”: { | [DYNAMIC] system control and statistics |
| “state”:”arm”, | “arm”|”rearm”|”pending” any value != “pending” start a schedule |
| “next_sched”: 0, | Number of seconds since epoch to next desired schedule |
| “prev_sched”: 0, | Number of seconds since epoch for previous schedule |
| “prev_etime”: 0, | Number of seconds since epoch for previous schedule actual exec time |
| “prev_delay”: 0, | Number of seconds that the timer was delayed from the schedule |
| “prev_atime”: 0 | Number of seconds taken by the user ‘action’ |
| } | |
| } |
| hour | min | Values can be numbers or strings |
|---|---|---|
| 13 | 32 | Run at 13:32 (or 1:32 pm) |
| * | 15 | Run every hour at 15 minutes past |
| 8 | 12 | Run once a day at 8:32 (or 8:32 am) |
| * | * | Run once a minute |
| *2X | *2X | Run twice a minute – requires both hour and min set to “*2X” |
| *4X | *4X | Run four times a minute – requires both hour and min set to “*2X” |
| Action | N1QL statement |
|---|---|
| Create a schedule | INSERT INTO travel-sample (KEY,VALUE) VALUES (“recurring_event::1”, { “type”:”recurring_event”, “id”:1, “hour”:”14″, “min”:”54″, “action”:”doCronActionA”, “active”:true } ); |
| Make an index to query data without specifying keys | CREATE primary INDEX on crondata ; |
| Show all schedules order by id | SELECT * FROM crondata WHERE type=”recurring_event” order by id ; |
| Show specific schedule | SELECT * FROM crondata WHERE type=”recurring_event” AND id=1 ; |
| Arm or set active | UPDATE crondata SET active = true WHERE type=”recurring_event” AND id=1 ; |
| Disarm or set inactive | UPDATE crondata SET active = false WHERE type=”recurring_event” AND id=1 ; |
| Adjust time of trigger | UPDATE crondata SET hour = 11, min = 30 WHERE type=”recurring_event” AND id=1 ; |
| Adjust logging of the “action” | UPDATE crondata SET verbose.user_data = 0 WHERE type=”recurring_event” AND id=1 ; |
| Adjust logging of the scheduler logic | UPDATE crondata SET verbose.scheduler = 0 WHERE type=”recurring_event” AND id=1 ; |
| Delete the schedule | DELETE FROM crondata WHERE type=”recurring_event” AND id=1 ; |
crondata where type=”recurring_event” order by id ;| active | action | hour | id | min | scheduler | type | user_func |
|---|---|---|---|---|---|---|---|
| true | “doCronActionA” | 14 | 1 | 54 | 1 | “recurring_event” | 2 |
| true | “doCronActionB” | * | 2 | * | 1 | “recurring_event” | 1 |
| true | “doCronActionC” | *2X | 3 | *2X | 4 | “recurring_event” | 4 |
| true | “doCronActionD” | * | 4 | 0 | 0 | “recurring_event” | 1 |
travel-sample WHERE type = ‘airline’ GROUP BY country;function doCronActionA(doc) {
try {
// Check that doc has desired values
if (!doc.type || doc.type !== "recurring_event" || !doc.active || doc.active !== true) return;
if (doc.verbose.user_func >= 1)
log(doc.action + ' user action controlled by ' + doc.type + '::' + doc.id);
// this is a 6.5 N1QL query (feature not available in GA prior to 6.5)
// Create an embedded N1QL iterator by issuing a SELECT statement to get the
// counts of airlines by country. Make a new document and write it out to KV
// We will use the iterator to create a KV document representing the results of a
// HARD lengthy embedded N1QL query and write it back to KV, the idea is to keep
// a calculation up to date once a day such that it that can be read 'quickly'
// by other Eventing Functions, other Couchbase services or SDKs.
// Consider if we had 1 million docs in a minute do we really want to use N1QL
// to recalculate something that is almost static for all 1 million documents, of
// course not, so we make an intermediate value that can be read into Eventing
// and used via a single 'light weight' KV read.
var q_iter = SELECT country,
count( * ) cnt
FROM `travel-sample`
WHERE `type` = 'airline'
GROUP BY country;
// loop through the result set and update the map 'accumulate'
var accumulate = {};
var idx = 0;
for (var val of q_iter) {
if (doc.verbose.user_func >= 2)
log(doc.action + ' N1QL idx ' + idx + ', country ' + val.country + " cnt " + val.cnt);
accumulate[val.country] = val.cnt;
idx++;
}
// close out embedded N1QL iterator
q_iter.close();
// Now let’s make a cached KV document representing a HARD length embedded N1QL
// query and write it back to KV, we need a KEY and a type and id and then we
// upsert it into the `travel-sample` bucket.
var cachedoc = {};
cachedoc.type = "cron_cache";
cachedoc.id = "airlines_by_country";
cachedoc.date = new Date();
cachedoc.data = accumulate;
var ckey = cachedoc.type + '::' + cachedoc.id;
ts_bkt[ckey] = cachedoc;
if (doc.verbose.user_func >= 2) {
log(doc.action + ' upsert to KV with KEY ' + ckey + ' cachedoc ', cachedoc);
}
} catch (e) {
log(doc.action + ' Error exception:', e);
return false;
}
return true;
}function doCronActionB(doc) {
try {
// check that doc has desired values
if (doc.type !== "recurring_event" || doc.active !== true) return;
if (doc.verbose.user_func >= 1)
log(doc.action + ' user action controlled by ' + doc.type + '::' + doc.id);
// YOUR LOGIC HERE
} catch (e) {
log(doc.action + ' Error exception:', e);
return false;
}
return true;
}function doCronActionC(doc) {
try {
// check that doc has desired values
if (doc.type !== "recurring_event" || doc.active !== true) return;
if (doc.verbose.user_func >= 1)
log(doc.action + ' user action controlled by ' + doc.type + '::' + doc.id);
// YOUR LOGIC HERE
} catch (e) {
log(doc.action + ' Error exception:', e);
return false;
}
return true;
}function OnUpdate(doc, meta) {
try {
// Check if further analysis is needed we only trigger on an active recurring_event
if (doc.type !== "recurring_event" || doc.active !== true) return;
var update_doc = false;
if (!doc.dynamic) {
// Add if missing doc.dynamic with defaults
doc.dynamic = {
"state": "arm",
"next_sched": 0,
"prev_sched": 0,
"prev_etime": 0,
"prev_delay": 0,
"prev_atime": 0
};
// we need to update the document once we have the next schedule
update_doc = true;
}
if (!doc.verbose) {
// Add if missing doc.dynamic with defaults
doc.verbose = {
"user_func": 1,
"scheduler": 1
};
// we need to update the document once we have the next schedule
update_doc = true;
}
// Do not process dynamic.state pending
if (!doc.dynamic || !doc.dynamic.state || doc.dynamic.state === "pending") return;
var mid = doc.type + "::" + doc.id; // this is the same as meta.id or the KEY
var hour = doc.hour;
var min = doc.min;
// Do an eval check the JavaScript function exists. The eval occurs in a common
// utility function shared with Callback
if (!verifyFunctionExistsViaEval(doc, mid)) {
// doc.action did not exist, we have already logged the issue
return;
}
// Get the next valid execution time
var date_timer = getNextRecurringDate(hour, min);
var next_sched = Math.round(date_timer.getTime() / 1000);
if (!update_doc && next_sched !== doc.dynamic.next_sched) {
// the next_sched should be the same as the setting from the helper application, however
// if we undeploy/deploy or pause/resume we might haver to reschedule to the next time slot
log('OnUpdate U ' + mid + ' calculated next_sched !== doc.dynamic.next_sched, delta ' +
(next_sched - doc.dynamic.next_sched) + ', reschedule');
update_doc = true;
}
if (update_doc) {
// this mutation is recursive and will be suppressed, we ensure we have a dynamic structure
doc.dynamic.next_sched = next_sched;
try {
cron_bkt[mid] = doc;
} catch (e) {
log('OnUpdate help: F ' + mid + ' FATAL could not update KV cron cycle ' + doc.action);
return;
}
}
// Schedule an Eventing timer
var timer_id = createTimer(Callback, date_timer, null, doc);
if (doc.verbose.scheduler >= 1) {
log('OnUpdate A ' + mid + ' rcv mutation (initial or rearm) schedule timer at ' +
toLocalISOTime(date_timer));
}
if (doc.verbose.scheduler >= 2) {
log('OnUpdate B ' + mid + ' recurring timer was created, timer_id ' + timer_id);
}
} catch (e) {
log('OnUpdate E ' + meta.id + ', Error exception:', e);
}
}| hour | min | Values can be numbers or strings |
|---|---|---|
| 13 | 32 | Run at 13:32 (or 1:32 pm) |
| * | 15 | Run every hour at 15 minutes past |
| 8 | 12 | Run once a day at 8:32 (or 8:32 am) |
| * | * | Run once a minute |
| *2X | *2X | Run twice a minute – requires both hour and min set to “*2X” |
| *4X | *4X | Run four times a minute – requires both hour and min set to “*2X” |
function getNextRecurringDate(hour_str, min_str) {
// Note Javascript Dates are in milliseconds
var date_now = new Date();
var date_ret = new Date();
var hour;
var min;
try {
hour = parseInt(hour_str);
} catch (e) {}
try {
min = parseInt(min_str);
} catch (e) {}
// Note, this is only a simplistic partial 'crontab' syntax with some slight extensions
// it allows once a day, once an hour, once a minute. It also contains some non-standard
// syntax to provide the ability to execute twice a minute or four times a minute.
if (hour_str === '*4X' && min_str === '*4X') {
// once every 15 seconds or four times a minute
date_ret.setMilliseconds(0);
date_ret.setSeconds(15);
while (date_ret.getTime() < date_now.getTime()) {
date_ret.setSeconds(date_ret.getSeconds() + 15);
}
return date_ret;
} else
if (hour_str === '*2X' && min_str === '*2X') {
// once every 30 seconds or twice a minute
date_ret.setMilliseconds(0);
date_ret.setSeconds(30);
while (date_ret.getTime() < date_now.getTime()) {
date_ret.setSeconds(date_ret.getSeconds() + 30);
}
return date_ret;
} else
if (hour_str === '*' && min_str === '*') {
// once a minute
date_ret.setMilliseconds(0);
date_ret.setSeconds(0);
date_ret.setMinutes(date_ret.getMinutes() + 1);
} else
if (hour_str !== '*' && isNaN(hour) === false && min_str === '*') {
// once a minute only for a given hour
date_ret.setMilliseconds(0);
date_ret.setSeconds(0);
date_ret.setMinutes(date_ret.getMinutes() + 1);
if (date_ret.getTime() < date_now.getTime()) { date_ret.setHours(hour); } if (date_ret.getTime() > date_now.getTime()) {
date_ret.setDate(date_ret.getDate() + 1);
date_ret.setSeconds(0);
date_ret.setMinutes(0);
date_ret.setHours(hour);
}
} else
if (hour_str === '*' && min_str !== '*' && isNaN(min) === false) {
// once a hour at a given minute
date_ret.setMilliseconds(0);
date_ret.setSeconds(0);
date_ret.setMinutes(min);
// schedule for next hour
date_ret.setHours(date_ret.getHours() + 1);
} else
if (isNaN(hour) === false && isNaN(min) === false) {
// once a day for a given hour and a given minute
date_ret.setMilliseconds(0);
date_ret.setSeconds(0);
date_ret.setMinutes(min);
date_ret.setHours(hour);
if (date_ret.getTime() < date_now.getTime()) {
// schedule for tomorrow
date_ret.setDate(date_ret.getDate() + 1);
}
} else {
log('getNextRecurringDate illegal input hour_str <' + hour_str + '> min_str <' + min_str + '>');
throw new Error('getNextRecurringDate illegal input hour_str <' + hour_str + '> min_str <' + min_str + '>');
return null;
}
return date_ret;
}
function verifyFunctionExistsViaEval(curDoc, id) {
var result = false;
try {
// check for function if missing this is invalid return result
result = eval("typeof " + curDoc.action + " === 'function';");
if (result === false) {
if (curDoc.verbose.scheduler >= 1)
log("Warn/Disable (No Action and No Re-Arm), because required 'action' of " +
curDoc.action + "(doc) does not exist, id is", id);
return result;
}
} catch (e) {
log('verifyFunctionExistsViaEval Error exception:', e);
}
return result;
}function toNumericFixed(number, precision) {
var multi = Math.pow(10, precision);
return Math.round((number * multi).toFixed(precision + 1)) / multi;
}function toLocalISOTime(d) {
var tzoffset = (new Date()).getTimezoneOffset() * 60000; //offset in milliseconds
return (new Date(d.getTime() - tzoffset)).toISOString().slice(0, -1);
}
function Callback(doc) {
try {
var fired_at = new Date();
// Check if further analysis is needed we only trigger on a recurring_event that is active
if (doc.type !== "recurring_event") return;
// doc must have 'action', 'dynamic {}', verbose {}, dynamic.state
if (!doc.action || !doc.dynamic || !doc.verbose || !doc.dynamic.state) return;
// process any doc.dynamic.state BUT pending
if (doc.dynamic.state === "pending") return;
// ==================
// Check if still active
// We make sure that in KV the 'doc' still exists and that it is still active if not just
// return thus skipping the action and not Re-arming the timer. Note `travel-sample` is
// aliased to the map 'cron_bkt
var mid = doc.type + '::' + doc.id; // make our KEY
var curDoc = null;
try {
// read the current version of doc from KV, e.g. curDoc
curDoc = cron_bkt[mid];
} catch (e) {} // needed for pre 6.5, note pure 6.5+ deployment returns null sans exception
var reason = null;
if (!curDoc || curDoc === null) {
reason = "cron document is missing";
} else
if (!curDoc.active) {
reason = "cron document has active = false";
} else
if (!curDoc.dynamic.state || curDoc.dynamic.state !== doc.dynamic.state) {
reason = "cron document wrong dynamic.state expected " + doc.dynamic.state;
} else
if (crc64(doc) !== crc64(curDoc)) {
reason = "cron document changed";
}
if (reason !== null) {
if (!curDoc || curDoc === null || curDoc.verbose.scheduler >= 1) {
log('Callback X ' + mid + " ignore/stop this timer's schedule because " + reason);
}
if (!curDoc || curDoc === null || curDoc.verbose.scheduler >= 4) {
log('Callback Y ' + mid + ' timer doc', doc);
log('Callback Z ' + mid + ' KV curDoc', curDoc);
}
return;
}
// ==================
// Verify user routine exists and if so eval it
// Assume curDoc.action contains something like "doCronActionA" and we have a function in
// this handler like "doCronActionA(doc)". Below we use curDoc as the end user should be
// able to alter the eval'd JavaScript function. We will execute two (2) evals.
// First eval check the JavaScript function exists. The eval occurs in a common
// utility function shared with Callback
if (!verifyFunctionExistsViaEval(curDoc, mid)) {
// curDoc.action did not exist, we have already logged the issue
return;
}
// Second eval execute and process the user function we execute the defined function
// with an argument of curDoc
var beg_act = new Date();
var result = null;
eval("result = " + curDoc.action + "(curDoc);");
var end_act = new Date();
var atime_ms = end_act.getTime() - beg_act.getTime();
if (curDoc.verbose.scheduler >= 2)
log('Callback R ' + mid + ' action took ' + toNumericFixed((atime_ms / 1000), 3) +
' sec., returned ' + result);
// ==================
// Calculate next time and mutate the control document for our our helper function
// which will create another mutation such that OnUpdate of this function will pick
// it up and generate the timer (avoids the MB-38554 issue).
var hour = curDoc.hour;
var min = curDoc.min;
var date_timer = getNextRecurringDate(hour, min);
curDoc.dynamic.prev_delay =
toNumericFixed(((fired_at.getTime() / 1000) - curDoc.dynamic.next_sched), 3);
curDoc.dynamic.prev_sched = curDoc.dynamic.next_sched;
curDoc.dynamic.prev_etime = Math.round(fired_at.getTime() / 1000);
curDoc.dynamic.prev_atime = toNumericFixed((atime_ms / 1000), 3);
curDoc.dynamic.state = "pending";
curDoc.dynamic.next_sched = Math.round(date_timer.getTime() / 1000);
try {
cron_bkt[mid] = curDoc;
} catch (e) {
log('Callback help: F ' + mid + ' FATAL could not update KV cron cycle ' + curDoc.action);
return;
}
if (curDoc.verbose.scheduler >= 1) {
log('Callback A ' + mid + ' gen mutation #1 to doc to force schedule rearm at ' +
toLocalISOTime(date_timer));
}
if (curDoc.verbose.scheduler >= 2) {
log('Callback B ' + mid + ' sched ' + curDoc.dynamic.prev_sched +
', actual ' + curDoc.dynamic.prev_etime +
', delay ' + curDoc.dynamic.prev_delay +
', took ' + curDoc.dynamic.prev_atime);
}
if (curDoc.verbose.scheduler >= 3) {
log('Callback C ' + mid + ' curDoc', curDoc);
}
} catch (e) {
var mid = doc.type + '::' + doc.id; // make our KEY
log('Callback E ' + mid + ' Error exception:', e);
}
}
function OnUpdate(doc, meta) {
try {
// Check that doc has desired values
if (!doc.type || doc.type !== "recurring_event" || !doc.active || doc.active != true) return;
// doc must have 'action', 'dynamic {}', verbose {}, dynamic.state
if (!doc.action || !doc.dynamic || !doc.verbose || !doc.dynamic.state) return;
// Only process state pending this will only exist for a 'breif' time
if (doc.dynamic.state !== "pending") return;
var mid = doc.type + '::' + doc.id; // make our KEY
var newdoc = null;
try {
// read the current version of doc from KV, e.g. curDoc
newdoc = cron_bkt[mid];
} catch (e) {} // needed for pre 6.5, note pure 6.5+ deployment returns null sans exception
var reason = null;
if (!newdoc || newdoc == null) {
reason = "cron document is missing";
} else
if (!newdoc.active) {
reason = "cron document has active = false";
} else
if (!newdoc.dynamic || !newdoc.dynamic.state || newdoc.dynamic.state !== doc.dynamic.state) {
reason = "cron document wrong dynamic.state expected " + doc.dynamic.state;
} else
if (crc64(doc) !== crc64(newdoc)) {
reason = "cron document changed";
}
if (reason != null) {
if (!newdoc || newdoc == null || newdoc.verbose.scheduler >= 1) {
log('OnUpdate help: X stopping schedule because ' + reason + ',', newdoc)
return;
}
}
newdoc.dynamic.state = "rearm";
try {
cron_bkt[mid] = newdoc;
} catch (e) {
log('OnUpdate help: F ' + mid + ' FATAL could not update KV cron cycle ' + newdoc.action);
return;
}
if (newdoc.verbose.scheduler >= 1) {
log('OnUpdate help: A ' + mid + ' mutation #2 to doc to force schedule rearm');
}
if (newdoc.verbose.scheduler >= 3) {
log('OnUpdate help: B ' + mid + ',', newdoc);
}
} catch (e) {
log('OnUpdate help: E ' + meta.id + ', Error exception:', e);
}
}
/*
Function "cron_impl_2func_651" also requires "cron_impl_2func_651_help"
Create a basic cron system using Eventing allows a recurring function to execute activity
at a specified time every day, hour, min, 30 sec., and 15 sec. We use a bucket called
'crondata' aliased to 'cron_bkt' which can hold one or more schduler control documents of
type = "recurring_event".
The following uses of timers do not work reliably in Couchbase versions 6.5 and 6.5.1
a) scheduling an Eventing timer within a timer's callback
b) overwriting an existing timer by id
In addition the ability to cancel a timer does not exist in Couchbase version 6.5 and
also version 6.5.1
For this example, we supply one real user function that builds a recurring 'static' cache
document from bucket `travel-sample` via an N1QL query and save the result back to
`travel-sample` via the alais 'ts_bkt'. This JavaScript function is doCronActionA(), we
also provide two placeholders doCronActionB() and doCronActionC() for some additional
experimentation.
Test Doc:
{
"type":"recurring_event", // The KEY will be <>::<>
"id":1, //
"hour":14, // The hour of the day 0-23, *, *2X, *4X to trigger
"min":54, // The minute in the hour 0-59, *, *2X, *4X to trigger
"action":"doCronActionA", // What function to run on the trigger
"active":false, // Flag to arm or disable this schedule
"verbose" : {
"user_func":2, // Logging level for the action logic : 0=none, etc. etc.
"scheduler":3 // Logging level for the cron logic : 0=none, etc. etc.
},
"dynamic" : {
"state":"arm", // States "arm"|"rearm"|"pending" if any value but "pending" start a schedule
"next_sched": 0, // Number of seconds since epoch to next desired schedule
"prev_sched": 0, // Number of seconds since epoch for previous schedule
"prev_etime": 0, // Number of seconds since epoch for previous schedule actual exec time
"prev_delay": 0, // Number of seconds that the timer was delayed from the schedule
"prev_atime": 0 // Number of seconds taken by the user 'action'
}
}
INSERT INTO `crondata` (KEY,VALUE) VALUES ("recurring_event::1",
{
"type":"recurring_event",
"id":1,
"hour":14,
"min":54,
"action":"doCronActionA",
"verbose" : {
"user_func":2,
"scheduler":3
},
"active":false,
"dynamic" : {
"state": "arm",
"next_sched": 0,
"prev_sched": 0,
"prev_etime": 0,
"prev_delay": 0,
"prev_atime": 0
}
}
);
Note, you can omit verbose{} and dynamic{} as they will be auto-created by this main Eventing
Function "cron_impl_2func_651". If verbose{} is missing the logging levels will default to
verbose" : { "user_func":1, "scheduler":1 }
INSERT INTO `crondata` (KEY,VALUE) VALUES ("recurring_event::1",
{
"type":"recurring_event",
"id":1,
"hour":14,
"min":54,
"action":"doCronActionA",
"active":false
}
);
N1QL : Make an index to query data without specifying keys
CREATE primary INDEX on `crondata` ;
N1QL : Verify or inspect settings in schedule
SELECT * FROM `crondata` WHERE type="recurring_event";
N1QL : Arm or set active
UPDATE `crondata` SET active = true WHERE type="recurring_event" AND id=1 ;
N1QL : Disarm or set inactive
UPDATE `crondata` SET active = false WHERE type="recurring_event" AND id=1 ;
N1QL : Adjust time of trigger
UPDATE `crondata` SET hour = 11, min = 30 WHERE type="recurring_event" AND id=1 ;
N1QL : Adjust logging
UPDATE `crondata` SET verbose.user_func = 1, verbose.scheduler = 0 WHERE type="recurring_event" AND id=1 ;
N1QL : Delete the schedule
DELETE FROM `crondata` WHERE type="recurring_event" AND id=1 ;
The action field is important it 'should' exist in this Eventing Function note it could be any
JavaScript name e.g. MyFunc and you must implement like the example doCronActionA(doc) where
doc will be the currently active item of type = 'recurring_event' read from the alias bucket
‘cron_bkt’ when the timer is fired. The action JavaScript function should return either true
or false used for logging purposes. If the action does not exist it is an error and a warning
is logged and the timer is disabled.
In Couchbase version 6.5+ to add a new cron like daily function just pause the active handler
insert your new function doCronActionB(doc) {...} then Resume the eventing handler. The nice
thing is if a timer was to be fired will the function was paused it will NOT be lost, when you
resume the function it will be processed at the next available time slot.
Any change to a control structure will create a new recurring schedule or timer and cancel the
current previous schedule this includes changing the verbosity level. The previous timer will
continue to run however when executed it will do a Checksum on the current control structure
from KV against it’s passed context and if different the Callback will ignore the old schedule.
This logic could be altered to process immediately if the schedule has expired search for the
string "OnUpdate U" in the code below.
*/
// ==================
/* BEG USER FUNCTIONS TO RUN ONCE A DAY, HOUR, OR MINUTE - ANYTHING YOU WANT BELOW */
function doCronActionA(doc) {
try {
// Check that doc has desired values
if (!doc.type || doc.type !== "recurring_event" || !doc.active || doc.active !== true) return;
if (doc.verbose.user_func >= 1)
log(doc.action + ' user action controlled by ' + doc.type + '::' + doc.id);
// this is a 6.5 N1QL query (feature not available in GA prior to 6.5)
// Create an embedded N1QL iterator by issuing a SELECT statement to get the
// counts of airlines by country. Make a new document and write it out to KV
// We will use the iterator to create a KV document representing the results of a
// HARD lengthy embedded N1QL query and write it back to KV, the idea is to keep
// a calculation up to date once a day such that it that can be read 'quickly'
// by other Eventing Functions, other Couchbase services or SDKs.
// Consider if we had 1 million docs in a minute do we really want to use N1QL
// to recalculate something that is almost static for all 1 million documents, of
// course not, so we make an intermediate value that can be read into Eventing
// and used via a single 'light weight' KV read.
var q_iter = SELECT country,
count( * ) cnt
FROM `travel-sample`
WHERE `type` = 'airline'
GROUP BY country;
// loop through the result set and update the map 'accumulate'
var accumulate = {};
var idx = 0;
for (var val of q_iter) {
if (doc.verbose.user_func >= 2)
log(doc.action + ' N1QL idx ' + idx + ', country ' + val.country + " cnt " + val.cnt);
accumulate[val.country] = val.cnt;
idx++;
}
// close out embedded N1QL iterator
q_iter.close();
// Now let’s make a cached KV document representing a HARD length embedded N1QL
// query and write it back to KV, we need a KEY and a type and id and then we
// upsert it into the `travel-sample` bucket.
var cachedoc = {};
cachedoc.type = "cron_cache";
cachedoc.id = "airlines_by_country";
cachedoc.date = new Date();
cachedoc.data = accumulate;
var ckey = cachedoc.type + '::' + cachedoc.id;
ts_bkt[ckey] = cachedoc;
if (doc.verbose.user_func >= 2) {
log(doc.action + ' upsert to KV with KEY ' + ckey + ' cachedoc ', cachedoc);
}
} catch (e) {
log(doc.action + ' Error exception:', e);
return false;
}
return true;
}
function doCronActionB(doc) {
try {
// check that doc has desired values
if (doc.type !== "recurring_event" || doc.active !== true) return;
if (doc.verbose.user_func >= 1)
log(doc.action + ' user action controlled by ' + doc.type + '::' + doc.id);
// YOUR LOGIC HERE
} catch (e) {
log(doc.action + ' Error exception:', e);
return false;
}
return true;
}
function doCronActionC(doc) {
try {
// check that doc has desired values
if (doc.type !== "recurring_event" || doc.active !== true) return;
if (doc.verbose.user_func >= 1)
log(doc.action + ' user action controlled by ' + doc.type + '::' + doc.id);
// YOUR LOGIC HERE
} catch (e) {
log(doc.action + ' Error exception:', e);
return false;
}
return true;
}
/* END USER FUNCTIONS TO RUN ONCE A DAY, HOUR, OR MINUTE - ANYTHING YOU WANT ABOVE */
// ==================
function OnUpdate(doc, meta) {
try {
// Check if further analysis is needed we only trigger on an active recurring_event
if (doc.type !== "recurring_event" || doc.active !== true) return;
var update_doc = false;
if (!doc.dynamic) {
// Add if missing doc.dynamic with defaults
doc.dynamic = {
"state": "arm",
"next_sched": 0,
"prev_sched": 0,
"prev_etime": 0,
"prev_delay": 0,
"prev_atime": 0
};
// we need to update the document once we have the next schedule
update_doc = true;
}
if (!doc.verbose) {
// Add if missing doc.dynamic with defaults
doc.verbose = {
"user_func": 1,
"scheduler": 1
};
// we need to update the document once we have the next schedule
update_doc = true;
}
// Do not process dynamic.state pending
if (!doc.dynamic || !doc.dynamic.state || doc.dynamic.state === "pending") return;
var mid = doc.type + "::" + doc.id; // this is the same as meta.id or the KEY
var hour = doc.hour;
var min = doc.min;
// Do an eval check the JavaScript function exists. The eval occurs in a common
// utility function shared with Callback
if (!verifyFunctionExistsViaEval(doc, mid)) {
// doc.action did not exist, we have already logged the issue
return;
}
// Get the next valid execution time
var date_timer = getNextRecurringDate(hour, min);
var next_sched = Math.round(date_timer.getTime() / 1000);
if (!update_doc && next_sched !== doc.dynamic.next_sched) {
// the next_sched should be the same as the setting from the helper application, however
// if we undeploy/deploy or pause/resume we might haver to reschedule to the next time slot
log('OnUpdate U ' + mid + ' calculated next_sched !== doc.dynamic.next_sched, delta ' +
(next_sched - doc.dynamic.next_sched) + ', reschedule');
update_doc = true;
}
if (update_doc) {
// this mutation is recursive and will be suppressed, we ensure we have a dynamic structure
doc.dynamic.next_sched = next_sched;
try {
cron_bkt[mid] = doc;
} catch (e) {
log('OnUpdate help: F ' + mid + ' FATAL could not update KV cron cycle ' + doc.action);
return;
}
}
// Schedule an Eventing timer
var timer_id = createTimer(Callback, date_timer, null, doc);
if (doc.verbose.scheduler >= 1) {
log('OnUpdate A ' + mid + ' rcv mutation (initial or rearm) schedule timer at ' +
toLocalISOTime(date_timer));
}
if (doc.verbose.scheduler >= 2) {
log('OnUpdate B ' + mid + ' recurring timer was created, timer_id ' + timer_id);
}
} catch (e) {
log('OnUpdate E ' + meta.id + ', Error exception:', e);
}
}
function getNextRecurringDate(hour_str, min_str) {
// Note Javascript Dates are in milliseconds
var date_now = new Date();
var date_ret = new Date();
var hour;
var min;
try {
hour = parseInt(hour_str);
} catch (e) {}
try {
min = parseInt(min_str);
} catch (e) {}
// Note, this is only a simplistic partial 'crontab' syntax with some slight extensions
// it allows once a day, once an hour, once a minute. It also contains some non-standard
// syntax to provide the ability to execute twice a minute or four times a minute.
if (hour_str === '*4X' && min_str === '*4X') {
// once every 15 seconds or four times a minute
date_ret.setMilliseconds(0);
date_ret.setSeconds(15);
while (date_ret.getTime() < date_now.getTime()) {
date_ret.setSeconds(date_ret.getSeconds() + 15);
}
return date_ret;
} else
if (hour_str === '*2X' && min_str === '*2X') {
// once every 30 seconds or twice a minute
date_ret.setMilliseconds(0);
date_ret.setSeconds(30);
while (date_ret.getTime() < date_now.getTime()) {
date_ret.setSeconds(date_ret.getSeconds() + 30);
}
return date_ret;
} else
if (hour_str === '*' && min_str === '*') {
// once a minute
date_ret.setMilliseconds(0);
date_ret.setSeconds(0);
date_ret.setMinutes(date_ret.getMinutes() + 1);
} else
if (hour_str !== '*' && isNaN(hour) === false && min_str === '*') {
// once a minute only for a given hour
date_ret.setMilliseconds(0);
date_ret.setSeconds(0);
date_ret.setMinutes(date_ret.getMinutes() + 1);
if (date_ret.getTime() < date_now.getTime()) { date_ret.setHours(hour); } if (date_ret.getTime() > date_now.getTime()) {
date_ret.setDate(date_ret.getDate() + 1);
date_ret.setSeconds(0);
date_ret.setMinutes(0);
date_ret.setHours(hour);
}
} else
if (hour_str === '*' && min_str !== '*' && isNaN(min) === false) {
// once a hour at a given minute
date_ret.setMilliseconds(0);
date_ret.setSeconds(0);
date_ret.setMinutes(min);
// schedule for next hour
date_ret.setHours(date_ret.getHours() + 1);
} else
if (isNaN(hour) === false && isNaN(min) === false) {
// once a day for a given hour and a given minute
date_ret.setMilliseconds(0);
date_ret.setSeconds(0);
date_ret.setMinutes(min);
date_ret.setHours(hour);
if (date_ret.getTime() < date_now.getTime()) {
// schedule for tomorrow
date_ret.setDate(date_ret.getDate() + 1);
}
} else {
log('getNextRecurringDate illegal input hour_str <' + hour_str + '> min_str <' + min_str + '>');
throw new Error('getNextRecurringDate illegal input hour_str <' + hour_str + '> min_str <' + min_str + '>');
return null;
}
return date_ret;
}
function verifyFunctionExistsViaEval(curDoc, id) {
var result = false;
try {
// check for function if missing this is invalid return result
result = eval("typeof " + curDoc.action + " === 'function';");
if (result === false) {
if (curDoc.verbose.scheduler >= 1)
log("Warn/Disable (No Action and No Re-Arm), because required 'action' of " +
curDoc.action + "(doc) does not exist, id is", id);
return result;
}
} catch (e) {
log('verifyFunctionExistsViaEval Error exception:', e);
}
return result;
}
function toNumericFixed(number, precision) {
var multi = Math.pow(10, precision);
return Math.round((number * multi).toFixed(precision + 1)) / multi;
}
function toLocalISOTime(d) {
var tzoffset = (new Date()).getTimezoneOffset() * 60000; //offset in milliseconds
return (new Date(d.getTime() - tzoffset)).toISOString().slice(0, -1);
}
function Callback(doc) {
try {
var fired_at = new Date();
// Check if further analysis is needed we only trigger on a recurring_event that is active
if (doc.type !== "recurring_event") return;
// doc must have 'action', 'dynamic {}', verbose {}, dynamic.state
if (!doc.action || !doc.dynamic || !doc.verbose || !doc.dynamic.state) return;
// process any doc.dynamic.state BUT pending
if (doc.dynamic.state === "pending") return;
// ==================
// Check if still active
// We make sure that in KV the 'doc' still exists and that it is still active if not just
// return thus skipping the action and not Re-arming the timer. Note `travel-sample` is
// aliased to the map 'cron_bkt
var mid = doc.type + '::' + doc.id; // make our KEY
var curDoc = null;
try {
// read the current version of doc from KV, e.g. curDoc
curDoc = cron_bkt[mid];
} catch (e) {} // needed for pre 6.5, note pure 6.5+ deployment returns null sans exception
var reason = null;
if (!curDoc || curDoc === null) {
reason = "cron document is missing";
} else
if (!curDoc.active) {
reason = "cron document has active = false";
} else
if (!curDoc.dynamic.state || curDoc.dynamic.state !== doc.dynamic.state) {
reason = "cron document wrong dynamic.state expected " + doc.dynamic.state;
} else
if (crc64(doc) !== crc64(curDoc)) {
reason = "cron document changed";
}
if (reason !== null) {
if (!curDoc || curDoc === null || curDoc.verbose.scheduler >= 1) {
log('Callback X ' + mid + " ignore/stop this timer's schedule because " + reason);
}
if (!curDoc || curDoc === null || curDoc.verbose.scheduler >= 4) {
log('Callback Y ' + mid + ' timer doc', doc);
log('Callback Z ' + mid + ' KV curDoc', curDoc);
}
return;
}
// ==================
// Verify user routine exists and if so eval it
// Assume curDoc.action contains something like "doCronActionA" and we have a function in
// this handler like "doCronActionA(doc)". Below we use curDoc as the end user should be
// able to alter the eval'd JavaScript function. We will execute two (2) evals.
// First eval check the JavaScript function exists. The eval occurs in a common
// utility function shared with Callback
if (!verifyFunctionExistsViaEval(curDoc, mid)) {
// curDoc.action did not exist, we have already logged the issue
return;
}
// Second eval execute and process the user function we execute the defined function
// with an argument of curDoc
var beg_act = new Date();
var result = null;
eval("result = " + curDoc.action + "(curDoc);");
var end_act = new Date();
var atime_ms = end_act.getTime() - beg_act.getTime();
if (curDoc.verbose.scheduler >= 2)
log('Callback R ' + mid + ' action took ' + toNumericFixed((atime_ms / 1000), 3) +
' sec., returned ' + result);
// ==================
// Calculate next time and mutate the control document for our our helper function
// which will create another mutation such that OnUpdate of this function will pick
// it up and generate the timer (avoids the MB-38554 issue).
var hour = curDoc.hour;
var min = curDoc.min;
var date_timer = getNextRecurringDate(hour, min);
curDoc.dynamic.prev_delay =
toNumericFixed(((fired_at.getTime() / 1000) - curDoc.dynamic.next_sched), 3);
curDoc.dynamic.prev_sched = curDoc.dynamic.next_sched;
curDoc.dynamic.prev_etime = Math.round(fired_at.getTime() / 1000);
curDoc.dynamic.prev_atime = toNumericFixed((atime_ms / 1000), 3);
curDoc.dynamic.state = "pending";
curDoc.dynamic.next_sched = Math.round(date_timer.getTime() / 1000);
try {
cron_bkt[mid] = curDoc;
} catch (e) {
log('Callback help: F ' + mid + ' FATAL could not update KV cron cycle ' + curDoc.action);
return;
}
if (curDoc.verbose.scheduler >= 1) {
log('Callback A ' + mid + ' gen mutation #1 to doc to force schedule rearm at ' +
toLocalISOTime(date_timer));
}
if (curDoc.verbose.scheduler >= 2) {
log('Callback B ' + mid + ' sched ' + curDoc.dynamic.prev_sched +
', actual ' + curDoc.dynamic.prev_etime +
', delay ' + curDoc.dynamic.prev_delay +
', took ' + curDoc.dynamic.prev_atime);
}
if (curDoc.verbose.scheduler >= 3) {
log('Callback C ' + mid + ' curDoc', curDoc);
}
} catch (e) {
var mid = doc.type + '::' + doc.id; // make our KEY
log('Callback E ' + mid + ' Error exception:', e);
}
}
/*
Function "cron_impl_2func_651_help" also requires "cron_impl_2func_651"
Test Doc:
{
"type":"recurring_event", // The KEY will be <>::<>
"id":1, //
"hour":14, // The hour of the day 0-23, *, *2X, *4X to trigger
"min":54, // The minute in the hour 0-59, *, *2X, *4X to trigger
"action":"doCronActionA", // What function to run on the trigger
"active":false, // Flag to arm or disable this schedule
"verbose" : {
"user_func":2, // Logging level for the action logic : 0=none, etc. etc.
"scheduler":3 // Logging level for the cron logic : 0=none, etc. etc.
},
"dynamic" : {
"state":"arm", // States "arm"|"rearm"|"pending" if any value but "pending" start a schedule
"next_sched": 0, // Number of seconds since epoch to next desired schedule
"prev_sched": 0, // Number of seconds since epoch for previous schedule
"prev_etime": 0, // Number of seconds since epoch for previous schedule actual exec time
"prev_delay": 0, // Number of seconds that the timer was delayed from the schedule
"prev_atime": 0 // Number of seconds taken by the user 'action'
}
}
Note, you can omit verbose{} and dynamic{} as they will be autocreated by the main Eventing
Function "cron_impl_2func_651". If verbose{} is missing the logging levels will default to
verbose" : { "user_func":1, "scheduler":1 }
*/
function OnUpdate(doc, meta) {
try {
// Check that doc has desired values
if (!doc.type || doc.type !== "recurring_event" || !doc.active || doc.active != true) return;
// doc must have 'action', 'dynamic {}', verbose {}, dynamic.state
if (!doc.action || !doc.dynamic || !doc.verbose || !doc.dynamic.state) return;
// Only process state pending this will only exist for a 'breif' time
if (doc.dynamic.state !== "pending") return;
var mid = doc.type + '::' + doc.id; // make our KEY
var newdoc = null;
try {
// read the current version of doc from KV, e.g. curDoc
newdoc = cron_bkt[mid];
} catch (e) {} // needed for pre 6.5, note pure 6.5+ deployment returns null sans exception
var reason = null;
if (!newdoc || newdoc == null) {
reason = "cron document is missing";
} else
if (!newdoc.active) {
reason = "cron document has active = false";
} else
if (!newdoc.dynamic || !newdoc.dynamic.state || newdoc.dynamic.state !== doc.dynamic.state) {
reason = "cron document wrong dynamic.state expected " + doc.dynamic.state;
} else
if (crc64(doc) !== crc64(newdoc)) {
reason = "cron document changed";
}
if (reason != null) {
if (!newdoc || newdoc == null || newdoc.verbose.scheduler >= 1) {
log('OnUpdate help: X stopping schedule because ' + reason + ',', newdoc)
return;
}
}
newdoc.dynamic.state = "rearm";
try {
cron_bkt[mid] = newdoc;
} catch (e) {
log('OnUpdate help: F ' + mid + ' FATAL could not update KV cron cycle ' + newdoc.action);
return;
}
if (newdoc.verbose.scheduler >= 1) {
log('OnUpdate help: A ' + mid + ' mutation #2 to doc to force schedule rearm');
}
if (newdoc.verbose.scheduler >= 3) {
log('OnUpdate help: B ' + mid + ',', newdoc);
}
} catch (e) {
log('OnUpdate help: E ' + meta.id + ', Error exception:', e);
}
}
INSERT INTO `crondata` (KEY,VALUE) VALUES (
"recurring_event::1",
{
"type": "recurring_event",
"id":1,
"hour":"*",
"min":"0",
"action": "doCronActionA",
"verbose": {
"user_func": 2,
"scheduler": 3
},
"active": false
}
);
UPDATE `crondata` SET active=TRUE, hour="*4X", min="*4X" WHERE type="recurring_event" AND id=1 ;
UPDATE `crondata`
SET verbose.scheduler = 0, verbose.user_func = 1,
active=true, hour="*", min="*"
WHERE type="recurring_event" AND id=1 ;
SELECT data FROM `travel-sample` WHERE `type` = 'cron_cache' AND id== 'airlines_by_country';
[
{
"data": {
"France": 21,
"United Kingdom": 39,
"United States": 127
}
}
]
DELETE FROM `travel-sample` WHERE `type` = 'airline' AND callsign LIKE 'U%'
{
"results": []
}
SELECT data FROM `travel-sample` WHERE `type` = 'cron_cache' AND id== 'airlines_by_country';
[
{
"data": {
"France": 21,
"United Kingdom": 39,
"United States": 123
}
}
]
INSERT INTO `crondata` (KEY,VALUE) VALUES (
"recurring_event::2",
{
"type":"recurring_event",
"id":2,
"hour":"*2X",
"min":"*2X",
"action":"doCronActionB",
"verbose": {
"user_func": 1,
"scheduler": 0
},
"active": true
}
);
function doCronActionB(doc) {
try {
// check that doc has desired values
if (doc.type !== "recurring_event" || doc.active !== true) return;
if (doc.verbose.user_func >= 1)
log(doc.action + ' user action controlled by ' + doc.type + '::' + doc.id);
// YOUR LOGIC HERE
var a = 1 + 7;
log('this is my logic, a = 1 +7 = ' + a);
} catch (e) {
log(doc.action + ' Error exception:', e);
return false;
}
return true;
}