Node Red Examples - nygma2004/km GitHub Wiki

In this page I want to collect my flows and support documents for my Node-Red related projects that I covered in various videos. Please use the navigation on the right to jump to relevant section.

Simple Scheduler in Telegram

This flow is part of my Telegram flow in Node-Red that now handles my Node-Red communication with Telegram. I do most of my Node-Red messaging via Telegram, I find it easier than using Dashboard and for Telegram I don't need to open up my server to the Internet.

You can watch the full story of this integration here:

Telegram integration

And the details of this program can be found in this video:

Telegram Scheduler

First piece is the Scheduler function node that needs to be tied into all the function node that provide the response in Telegram. The Link Out node links to the Telegram Sender node to send the messages out:

[
    {
        "id": "b94342b5128f4d65",
        "type": "function",
        "z": "661be9dfd2de4740",
        "name": "Scheduler",
        "func": "// Set the required access for this function here\nvar checklevel = 2;\n\nlet now = new Date();\nlet schedulerdevices = flow.get(\"schedulerdevices\");\n\n// Handle the help request\nif (msg.originalMessage.help) {\n    // only add the help command if the user has access to execute it\n    if (msg.originalMessage.from.accesslevel >= checklevel) {\n        msg.payload.content = msg.payload.content + \"\\r\\nscheduler: Schedule lamps, sockets to turn on/off\";  \n    }\n    return [null,null,msg];\n}\n\n// Check if the current message contains this function\nif (msg.originalMessage.command===\"scheduler\") {\n    // check if the user has access to execute this function\n    if (msg.originalMessage.from.accesslevel >= checklevel) {\n        // user has access to run the \n\n        // Check the user session\n        let sessiondata = flow.get(\"sessiondata\");\n        // Check session data for graph\n        if (sessiondata[msg.originalMessage.from.id].scheduler===undefined) {\n            // no session data so far, it needs to be created\n            sessiondata[msg.originalMessage.from.id].scheduler = {\"step\": 1, \"lastseen\": now.getTime()};\n            sessiondata[msg.originalMessage.from.id].context = \"scheduler\";\n            flow.set(\"sessiondata\",sessiondata);\n            // Add the schedule list \n            msg.payload.content = \"📆 \";\n            let schedules = global.get(\"schedules\", \"file\");\n            if (schedules!==undefined) {\n                if (schedules.length>0) {\n                    msg.payload.content += \"Active schedules:\\r\\n\";\n                    for (let i = 0; i < schedules.length; i++) {\n                        msg.payload.content += schedules[i].name + \" @ \" + schedules[i].timetext + \" -> \" + schedules[i].statetext +\"\\r\\n\";\n\n                    }\n                } else {\n                    msg.payload.content += \"No schedules\\r\\n\";\n                }\n                msg.payload.content += \"\\r\\n\";\n            }\n            // Generate a button for each device\n            msg.payload.content += \"Select a device:\\r\\n\";\n            let keyboard = {\n                keyboard: [],\n                'resize_keyboard': true,\n                'one_time_keyboard': true\n            }\n            for (let i=0; i<schedulerdevices.length; i++) {\n                let newarray = []\n                newarray.push(schedulerdevices[i].name);\n                keyboard.keyboard.push(newarray);\n            }\n            let opts = {\n                reply_to_message_id: msg.originalMessage.messageId,\n                reply_markup: JSON.stringify(keyboard)\n            };\n            msg.payload.options = opts;\n            return [msg,null,null];  \n        } else {\n            if (sessiondata[msg.originalMessage.from.id].scheduler.step === 1) {\n\n                sessiondata[msg.originalMessage.from.id].scheduler.step = 2;\n                sessiondata[msg.originalMessage.from.id].scheduler.lastseen = now.getTime();\n                let device = schedulerdevices.find(item => item.name === msg.payload.content);\n                if (device===undefined) {\n                    msg.payload.content = \"Invalid device, please start again.\";\n                    delete sessiondata[msg.originalMessage.from.id].scheduler;\n                    delete sessiondata[msg.originalMessage.from.id].context;\n                    flow.set(\"sessiondata\", sessiondata);\n                    return [msg, null, null];\n                }\n                sessiondata[msg.originalMessage.from.id].scheduler.device = device;\n                sessiondata[msg.originalMessage.from.id].context = \"scheduler\";\n                flow.set(\"sessiondata\", sessiondata);\n                msg.payload.content = \"📆 Ok, specify time (HH:MM format):\";\n                return [msg, null, null];\n\n            }\n            if (sessiondata[msg.originalMessage.from.id].scheduler.step === 2) {\n\n                sessiondata[msg.originalMessage.from.id].scheduler.step = 3;\n                sessiondata[msg.originalMessage.from.id].scheduler.lastseen = now.getTime();\n                // Interpret the time provided by the user\n                let arr = msg.payload.content.split(\":\");\n                let error = false;\n                if (arr.length!==2) {\n                    error = true;\n                } else {\n                    arr[0]=parseInt(arr[0]);                \n                    arr[1] = parseInt(arr[1]);\n                    if (arr[0]>23) { error = true };\n                    if (arr[1]>59) { error = true }\n                }\n                if (error) {\n                    msg.payload.content = \"Invalid time, please start again\";\n                    delete sessiondata[msg.originalMessage.from.id].scheduler;\n                    delete sessiondata[msg.originalMessage.from.id].context;\n                    flow.set(\"sessiondata\", sessiondata);\n                    return [msg, null, null];\n                }\n                sessiondata[msg.originalMessage.from.id].scheduler.timetext = msg.payload.content;\n                sessiondata[msg.originalMessage.from.id].scheduler.time = arr[0]*60+arr[1];\n                sessiondata[msg.originalMessage.from.id].context = \"scheduler\";\n                flow.set(\"sessiondata\", sessiondata);\n                // generate the buttons for the possible states\n                msg.payload.content = \"📆 Select a state:\\r\\n\";\n                let keyboard = {\n                    keyboard: [],\n                    'resize_keyboard': true,\n                    'one_time_keyboard': true\n                }\n                let device = sessiondata[msg.originalMessage.from.id].scheduler.device\n                for (let i = 0; i < device.options.length; i++) {\n                    let newarray = []\n                    newarray.push(device.options[i].text);\n                    keyboard.keyboard.push(newarray);\n                }\n                let opts = {\n                    reply_to_message_id: msg.originalMessage.messageId,\n                    reply_markup: JSON.stringify(keyboard)\n                };\n                msg.payload.options = opts;\n                return [msg, null, null];\n\n            }\n            if (sessiondata[msg.originalMessage.from.id].scheduler.step === 3) {\n\n                let device = sessiondata[msg.originalMessage.from.id].scheduler.device\n                let state = device.options.find(item => item.text === msg.payload.content);\n                if (state === undefined) {\n                    msg.payload.content = \"Invalid state, please start again.\";\n                    delete sessiondata[msg.originalMessage.from.id].scheduler;\n                    delete sessiondata[msg.originalMessage.from.id].context;\n                    flow.set(\"sessiondata\", sessiondata);\n                    return [msg, null, null];\n                }\n\n                // Save schedule\n                let schedules = global.get(\"schedules\", \"file\");\n                if (schedules===undefined) { schedules=[]; }\n                let newschedule = {\n                    \"chatid\": msg.originalMessage.from.id,\n                    \"name\": device.name,\n                    \"id\": device.id,\n                    \"type\": device.type,\n                    \"time\": sessiondata[msg.originalMessage.from.id].scheduler.time,\n                    \"timetext\": sessiondata[msg.originalMessage.from.id].scheduler.timetext,\n                    \"statetext\": state.text,\n                    \"statevalue\": state.value\n                };\n                msg.payload.content = \"📆 Schedule created:\\r\\n\";\n                msg.payload.content += newschedule.name + \" @ \" + newschedule.timetext + \" -> \" + newschedule.statetext;\n                schedules.push(newschedule);\n                global.set(\"schedules\", schedules, \"file\");\n                delete sessiondata[msg.originalMessage.from.id].scheduler;\n                delete sessiondata[msg.originalMessage.from.id].context;\n                flow.set(\"sessiondata\", sessiondata);\n                return [msg, null, null];\n            }\n        }\n    } else {\n        return [null,msg,null];\n    }\n} else {\n    // pass the message to the next node\n    return [null,null,msg];\n}\n\n\n",
        "outputs": "3",
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 360,
        "y": 2900,
        "wires": [
            [
                "bb58bfabd0b79b07"
            ],
            [
                "3cf389de146de164"
            ],
            [
                "852df5045accc6eb"
            ]
        ],
        "outputLabels": [
            "User has access",
            "Insufficient access",
            "Handover to next node"
        ]
    },
    {
        "id": "bb58bfabd0b79b07",
        "type": "link out",
        "z": "661be9dfd2de4740",
        "name": "",
        "mode": "link",
        "links": [
            "82332965f25e893c"
        ],
        "x": 515,
        "y": 2880,
        "wires": []
    }
]

And this flow below sets up the Scheduler config and also provides the actual scheduling (sending Telegram notifications out and sending in my case the MQTT messages out). As above the Link Out node links to Telegram Sender node to send the messages out:

[
    {
        "id": "9cddb30e46e3f9ec",
        "type": "comment",
        "z": "661be9dfd2de4740",
        "name": "Telegram Scheduler",
        "info": "",
        "x": 2220,
        "y": 2080,
        "wires": []
    },
    {
        "id": "802b4b90a99c2de9",
        "type": "inject",
        "z": "661be9dfd2de4740",
        "name": "Config",
        "props": [
            {
                "p": "payload"
            }
        ],
        "repeat": "",
        "crontab": "",
        "once": true,
        "onceDelay": 0.1,
        "topic": "",
        "payload": "[{\"name\":\"Pool heater relay 2\",\"type\":\"mqtt\",\"id\":\"/poolheater/relay2\",\"options\":[{\"text\":\"On\",\"value\":1},{\"text\":\"Off\",\"value\":0}]},{\"name\":\"Pool heater relay 1\",\"type\":\"mqtt\",\"id\":\"/poolheater/relay1\",\"options\":[{\"text\":\"On\",\"value\":1},{\"text\":\"Off\",\"value\":0}]},{\"name\":\"Planter\",\"type\":\"mqtt\",\"id\":\"/planter/gpio/12\",\"options\":[{\"text\":\"On\",\"value\":1},{\"text\":\"Off\",\"value\":0}]}]",
        "payloadType": "json",
        "x": 2250,
        "y": 2140,
        "wires": [
            [
                "a6a8211341dcafe8"
            ]
        ]
    },
    {
        "id": "a6a8211341dcafe8",
        "type": "change",
        "z": "661be9dfd2de4740",
        "name": "Store config",
        "rules": [
            {
                "t": "set",
                "p": "schedulerdevices",
                "pt": "flow",
                "to": "payload",
                "tot": "msg"
            }
        ],
        "action": "",
        "property": "",
        "from": "",
        "to": "",
        "reg": false,
        "x": 2430,
        "y": 2140,
        "wires": [
            []
        ]
    },
    {
        "id": "482aaf1874b5100d",
        "type": "function",
        "z": "661be9dfd2de4740",
        "name": "Monitor Scheduler",
        "func": "let schedules = global.get(\"schedules\", \"file\");\nlet now = new Date();\n\n// Exit if there are not subscriptions in the system\nif (schedules===undefined) {\n    node.status({fill:\"grey\",shape:\"ring\",text:\"No schedules\"});\n    return null;\n}\n\nvar output = [];\n\nvar hour = \"00000\" + now.getHours();\nvar minute = \"00000\" + now.getMinutes();\nhour = hour.substring(hour.length - 2, hour.length);\nminute = minute.substring(minute.length - 2, minute.length);\nnow = now.getHours()*60+now.getMinutes();\n\n\n\nfor (let i = schedules.length-1; i>=0; i--) {\n    if (now===schedules[i].time) {\n        output.push(schedules[i]);\n        msg.payload = schedules[i];\n        schedules.splice(i,1);\n        global.set(\"schedules\", schedules, \"file\");\n        // Send message one-by-one\n        node.send(msg);\n    }\n}\n\nif (output.length === 0) {\n    node.status({fill:\"yellow\",shape:\"ring\",text:now + \" | \" + hour + \":\"+minute +  \" | Nothing to schedule \"});\n    return null;\n} else {\n    node.status({fill:\"green\",shape:\"ring\",text:now + \" | \" + hour + \":\"+minute +  \" | \"+output.length+\" message(s) sent\"});\n    return null;\n    // return [output];\n}\n\n",
        "outputs": 1,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 2490,
        "y": 2200,
        "wires": [
            [
                "354de93ad8facfe5",
                "88e54179b0bb369e"
            ]
        ]
    },
    {
        "id": "c26db04a8bda2292",
        "type": "inject",
        "z": "661be9dfd2de4740",
        "name": "",
        "props": [
            {
                "p": "payload"
            },
            {
                "p": "topic",
                "vt": "str"
            }
        ],
        "repeat": "60",
        "crontab": "",
        "once": false,
        "onceDelay": 0.1,
        "topic": "",
        "payloadType": "date",
        "x": 2270,
        "y": 2200,
        "wires": [
            [
                "482aaf1874b5100d"
            ]
        ]
    },
    {
        "id": "354de93ad8facfe5",
        "type": "switch",
        "z": "661be9dfd2de4740",
        "name": "",
        "property": "payload.type",
        "propertyType": "msg",
        "rules": [
            {
                "t": "eq",
                "v": "mqtt",
                "vt": "str"
            }
        ],
        "checkall": "true",
        "repair": false,
        "outputs": 1,
        "x": 2710,
        "y": 2240,
        "wires": [
            [
                "bf91d13f1b287858"
            ]
        ]
    },
    {
        "id": "88e54179b0bb369e",
        "type": "function",
        "z": "661be9dfd2de4740",
        "name": "Telegram message",
        "func": "msg.payload = { \"chatId\": msg.payload.chatid, \"type\": \"message\", \"content\": \"📆 Schedule executed: \"+ msg.payload.name + \" -> \"+ msg.payload.statetext };\nreturn msg;\n",
        "outputs": 1,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 2750,
        "y": 2160,
        "wires": [
            [
                "16965b4b7748b185"
            ]
        ]
    },
    {
        "id": "16965b4b7748b185",
        "type": "link out",
        "z": "661be9dfd2de4740",
        "name": "",
        "mode": "link",
        "links": [
            "82332965f25e893c"
        ],
        "x": 2915,
        "y": 2160,
        "wires": []
    },
    {
        "id": "072a1fdff9f259a1",
        "type": "mqtt out",
        "z": "661be9dfd2de4740",
        "name": "MQTT out",
        "topic": "",
        "qos": "0",
        "retain": "false",
        "respTopic": "",
        "contentType": "",
        "userProps": "",
        "correl": "",
        "expiry": "",
        "broker": "4ab53a892fcbe5fa",
        "x": 3080,
        "y": 2240,
        "wires": []
    },
    {
        "id": "bf91d13f1b287858",
        "type": "change",
        "z": "661be9dfd2de4740",
        "name": "MQTT message",
        "rules": [
            {
                "t": "set",
                "p": "topic",
                "pt": "msg",
                "to": "payload.id",
                "tot": "msg"
            },
            {
                "t": "set",
                "p": "payload",
                "pt": "msg",
                "to": "payload.statevalue",
                "tot": "msg"
            }
        ],
        "action": "",
        "property": "",
        "from": "",
        "to": "",
        "reg": false,
        "x": 2900,
        "y": 2240,
        "wires": [
            [
                "072a1fdff9f259a1"
            ]
        ]
    },
    {
        "id": "4ab53a892fcbe5fa",
        "type": "mqtt-broker",
        "name": "Local Broker",
        "broker": "127.0.0.1",
        "port": "1883",
        "clientid": "NodeRed",
        "autoConnect": true,
        "usetls": false,
        "protocolVersion": "4",
        "keepalive": "60",
        "cleansession": true,
        "birthTopic": "",
        "birthQos": "0",
        "birthPayload": "",
        "birthMsg": {},
        "closeTopic": "",
        "closeQos": "0",
        "closePayload": "",
        "closeMsg": {},
        "willTopic": "",
        "willQos": "0",
        "willPayload": "",
        "willMsg": {},
        "sessionExpiry": ""
    }
]

Re-Scheduling in the Telegram Scheduler

I added a rescheduling function to the flow above so once a schedule is executed it can be rescheduled very easily. You can watch the explanation in details here:

Re-scheduling

First is the new "Save message to handle replies" function node that is responsible for storing required messages in the session variable:

[
    {
        "id": "e14446e655896bd1",
        "type": "function",
        "z": "661be9dfd2de4740",
        "name": "Save message to handle replies",
        "func": "if (msg.save) {\n\n    let sessiondata = flow.get(\"sessiondata\");\n\n    // create the object to be saved\n    let data = {};\n    data.content = msg.savecontent;\n    data.context = msg.context;\n    data.savetime = new Date().getTime();\n\n    if (sessiondata[msg.payload.chatId].messages === undefined) {\n        sessiondata[msg.payload.chatId].messages = {};\n    }\n\n    // add the data to the session information\n    sessiondata[msg.payload.chatId].messages[msg.payload.sentMessageId] = data;\n\n    // save the session\n    flow.set(\"sessiondata\", sessiondata);\n\n    node.status({ fill: \"yellow\", shape: \"ring\", text: \"Message \" + msg.payload.sentMessageId+\" saved\"});\n\n}",
        "outputs": 1,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 1680,
        "y": 1320,
        "wires": [
            []
        ]
    }
]

I also modified the "Telegram message" function node after the "Monitor Scheduler" node:

[
    {
        "id": "88e54179b0bb369e",
        "type": "function",
        "z": "661be9dfd2de4740",
        "name": "Telegram message",
        "func": "// START: save message data to support reply\nmsg.save = true;\nmsg.savecontent = msg.payload;\nmsg.savecontent.step = \"executeschedule\";\nmsg.context = \"scheduler\";\n// END: save message data to support reply\n\nmsg.payload = { \"chatId\": msg.payload.chatid, \"type\": \"message\", \"content\": \"📆 Schedule executed: \"+ msg.payload.name + \" -> \"+ msg.payload.statetext };\n\nreturn msg;\n",
        "outputs": 1,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 2750,
        "y": 2160,
        "wires": [
            [
                "16965b4b7748b185"
            ]
        ]
    }
]

Also made changes to the "Language Processing" function node: handle session timeout, purge old saved message and handle the replies

[
    {
        "id": "55ddebfa94304c40",
        "type": "function",
        "z": "661be9dfd2de4740",
        "name": "Language processing",
        "func": "// This function processes the incoming text and tries to determine if it is a \n// specific \"single word\" command like the ones below in the list or\n// other free English commands like \"turn on the garden light\"\n\n// Timeout of a session\nlet timeout = 60*1000; // 1 minute\n\n// Purge all stored messages that are old\n// Expiry period for stored messages\nlet expiryperiod = 1*7*24*60*60*1000; // 1 week\n// first loop through all users\nlet sessiondata = flow.get(\"sessiondata\");\nlet now = new Date().getTime();\nfor (let user in sessiondata) {\n    if (typeof user === \"object\") {\n        // the proprty is an object, most probably user session data\n        if (user.messages !== undefined) {\n            // it hass messages property which are the stored messages\n            for (let message in user.messages) {\n                if (typeof message === \"object\") {\n                    if (message.savetime !== undefined) {\n                        if (now - message.savetime > expiryperiod) {\n                            // This message is old, let's purge it\n                            delete sessiondata[user].messages[message];\n                            flow.set(\"sessiondata\", sessiondata);\n                        }\n                    }\n                }\n            }\n        }\n    }\n}\n\n// Handle replies to stored messages\nif (msg.originalMessage.hasOwnProperty(\"reply_to_message\")) {\n    // This is a reply message\n    if (sessiondata[msg.originalMessage.from.id].messages.hasOwnProperty(msg.originalMessage.reply_to_message.message_id)) {\n        // The original message is stored\n        // update the context \n        msg.originalMessage.command = sessiondata[msg.originalMessage.from.id].messages[msg.originalMessage.reply_to_message.message_id].context;\n        sessiondata[msg.originalMessage.from.id][sessiondata[msg.originalMessage.from.id].messages[msg.originalMessage.reply_to_message.message_id].context] = {\n            \"step\": 99,\n            \"content\": sessiondata[msg.originalMessage.from.id].messages[msg.originalMessage.reply_to_message.message_id].content\n        };\n        flow.set(\"sessiondata\", sessiondata);\n        return msg;\n    }\n}\n\n\n//Check if there is an active session at the moment\n//let sessiondata = flow.get(\"sessiondata\");\nif (sessiondata[msg.originalMessage.from.id] !== undefined) {\n    if (sessiondata[msg.originalMessage.from.id].context !== undefined) {\n        // Checking if this context timed out or not\n        if (sessiondata[msg.originalMessage.from.id][sessiondata[msg.originalMessage.from.id].context].lastseen !== undefined) {\n            let now = new Date();\n            if (now.getTime() - sessiondata[msg.originalMessage.from.id][sessiondata[msg.originalMessage.from.id].context].lastseen>timeout) {\n                // the active context is older than the timeout value, deleting the context\n                node.warn(\"Context \" + sessiondata[msg.originalMessage.from.id].context+\" timed out\");\n                delete sessiondata[msg.originalMessage.from.id][sessiondata[msg.originalMessage.from.id].context];\n                delete sessiondata[msg.originalMessage.from.id].context;\n                flow.set(\"sessiondata\", sessiondata);\n            } else {\n                // Context not timed out, continue\n                msg.originalMessage.command = sessiondata[msg.originalMessage.from.id].context;\n                return msg;\n            }\n        } else {\n            // Last seen value is not stored\n            msg.originalMessage.command = sessiondata[msg.originalMessage.from.id].context;\n            return msg;\n        }\n    }\n}\n\n\n\n\n    // These are the commands (\"reserved words\") that execute a function without parameters\n    var commands = [\"help\",\"camera\",\"me\",\"userlist\",\"loglevel\",\"logmute\",\"weather\",\"youtube\",\"solar\",\"miflora\",\"systeminfo\",\"rain\",\"google\",\"sensor\",\"graph\",\"scheduler\"];\n\n    var temp = msg.payload.content.toLowerCase();\n    var arr = temp.split(\" \");\n\n    // Check if the content contains any of the commands\n    for (var i=0; i<commands.length; i++) {\n        for (var j=0; j<arr.length; j++) {\n            if (arr[j]===commands[i]) {\n                msg.originalMessage.command=commands[i];\n                return msg;\n            }\n        }\n    }\n\n    // removing rubbish\n    temp = temp.replace(\"a \",\"\");\n    temp = temp.replace(\"the \",\"\");\n    temp = temp.replace(\"what \",\"\");\n    temp = temp.replace(\"is \",\"\");\n    temp = temp.replace(\"to \",\"\");\n    temp = temp.replace(\"lights\",\"light\");\n    temp = temp.replace(\"light \",\"\");\n    temp = temp.replace(\"light\",\"\");\n\n\n\n    // check if the command is to turn something on/off\n    if (temp.indexOf(\"turn\")>-1) {\n        temp = temp.replace(\"turn \",\"\");\n        msg.originalMessage.command = \"switch\";\n    }\n    if (temp.indexOf(\"switch\")>-1) {\n        temp = temp.replace(\"switch \",\"\");\n        msg.originalMessage.command = \"switch\";\n    }\n\n    // let's try finding the thing and state\n    // the turn/switch commands are expected as 'turn <state> the <thing>'\n    var lastIndex= temp.indexOf(\" \");\n    var voice_state = temp.substring(0, lastIndex).trim();\n    var voice_thing = temp.substring(lastIndex+1,temp.length).trim();\n\n\n    // evaulate the state\n    if (voice_state===\"on\") {\n        msg.originalMessage.state = \"1\";\n    }\n    if (voice_state===\"off\") {\n        msg.originalMessage.state = \"0\";\n    }\n\n    // process the thing\n    msg.originalMessage.thing = voice_thing;\n\n\n\nreturn msg;",
        "outputs": 1,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 380,
        "y": 1340,
        "wires": [
            [
                "03fdac1d78b7a9bc"
            ]
        ]
    }
]

And finally changes to the "Scheduler" node to handle all the replies (rescheduling, responses, etc.)

[
    {
        "id": "b94342b5128f4d65",
        "type": "function",
        "z": "661be9dfd2de4740",
        "name": "Scheduler",
        "func": "function generateUUID() { // Public Domain/MIT\n    var d = new Date().getTime();//Timestamp\n    var d2 = 0;//Time in microseconds since page-load or 0 if unsupported\n    return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function (c) {\n        var r = Math.random() * 16;//random number between 0 and 16\n        if (d > 0) {//Use timestamp until depleted\n            r = (d + r) % 16 | 0;\n            d = Math.floor(d / 16);\n        } else {//Use microseconds since page-load if supported\n            r = (d2 + r) % 16 | 0;\n            d2 = Math.floor(d2 / 16);\n        }\n        return (c === 'x' ? r : (r & 0x3 | 0x8)).toString(16);\n    });\n}\n\n\n// Set the required access for this function here\nvar checklevel = 2;\n\nlet now = new Date();\nlet schedulerdevices = flow.get(\"schedulerdevices\");\n\n// Handle the help request\nif (msg.originalMessage.help) {\n    // only add the help command if the user has access to execute it\n    if (msg.originalMessage.from.accesslevel >= checklevel) {\n        msg.payload.content = msg.payload.content + \"\\r\\nscheduler: Schedule lamps, sockets to turn on/off\";  \n    }\n    return [null,null,msg];\n}\n\n// Check if the current message contains this function\nif (msg.originalMessage.command===\"scheduler\") {\n    // check if the user has access to execute this function\n    if (msg.originalMessage.from.accesslevel >= checklevel) {\n        // user has access to run the \n\n        // Check the user session\n        let sessiondata = flow.get(\"sessiondata\");\n        // Check session data for scheduler\n        if (sessiondata[msg.originalMessage.from.id].scheduler===undefined) {\n            // no session data so far, it needs to be created\n            sessiondata[msg.originalMessage.from.id].scheduler = {\"step\": 1, \"lastseen\": now.getTime()};\n            sessiondata[msg.originalMessage.from.id].context = \"scheduler\";\n            flow.set(\"sessiondata\",sessiondata);\n            // Add the schedule list \n            msg.payload.content = \"📆 \";\n            let schedules = global.get(\"schedules\", \"file\");\n            if (schedules!==undefined) {\n                if (schedules.length>0) {\n                    msg.payload.content += \"Active schedules:\\r\\n\";\n                    for (let i = 0; i < schedules.length; i++) {\n                        msg.payload.content += schedules[i].name + \" @ \" + schedules[i].timetext + \" -> \" + schedules[i].statetext +\"\\r\\n\";\n\n                    }\n                } else {\n                    msg.payload.content += \"No schedules\\r\\n\";\n                }\n                msg.payload.content += \"\\r\\n\";\n            }\n            // Generate a button for each device\n            msg.payload.content += \"Select a device:\\r\\n\";\n            let keyboard = {\n                keyboard: [],\n                'resize_keyboard': true,\n                'one_time_keyboard': true\n            }\n            for (let i=0; i<schedulerdevices.length; i++) {\n                let newarray = []\n                newarray.push(schedulerdevices[i].name);\n                keyboard.keyboard.push(newarray);\n            }\n            let opts = {\n                reply_to_message_id: msg.originalMessage.messageId,\n                reply_markup: JSON.stringify(keyboard)\n            };\n            msg.payload.options = opts;\n            return [msg,null,null];  \n        } else {\n            if (sessiondata[msg.originalMessage.from.id].scheduler.step === 1) {\n\n                sessiondata[msg.originalMessage.from.id].scheduler.step = 2;\n                sessiondata[msg.originalMessage.from.id].scheduler.lastseen = now.getTime();\n                let device = schedulerdevices.find(item => item.name === msg.payload.content);\n                if (device===undefined) {\n                    msg.payload.content = \"Invalid device, please start again.\";\n                    delete sessiondata[msg.originalMessage.from.id].scheduler;\n                    delete sessiondata[msg.originalMessage.from.id].context;\n                    flow.set(\"sessiondata\", sessiondata);\n                    return [msg, null, null];\n                }\n                sessiondata[msg.originalMessage.from.id].scheduler.device = device;\n                sessiondata[msg.originalMessage.from.id].context = \"scheduler\";\n                flow.set(\"sessiondata\", sessiondata);\n                msg.payload.content = \"📆 Ok, specify time (HH:MM format):\";\n                return [msg, null, null];\n\n            }\n            if (sessiondata[msg.originalMessage.from.id].scheduler.step === 2) {\n\n                sessiondata[msg.originalMessage.from.id].scheduler.step = 3;\n                sessiondata[msg.originalMessage.from.id].scheduler.lastseen = now.getTime();\n                // Interpret the time provided by the user\n                let arr = msg.payload.content.split(\":\");\n                let error = false;\n                if (arr.length!==2) {\n                    error = true;\n                } else {\n                    arr[0]=parseInt(arr[0]);                \n                    arr[1] = parseInt(arr[1]);\n                    if (arr[0]>23) { error = true };\n                    if (arr[1]>59) { error = true }\n                }\n                if (error) {\n                    msg.payload.content = \"Invalid time, please start again\";\n                    delete sessiondata[msg.originalMessage.from.id].scheduler;\n                    delete sessiondata[msg.originalMessage.from.id].context;\n                    flow.set(\"sessiondata\", sessiondata);\n                    return [msg, null, null];\n                }\n                sessiondata[msg.originalMessage.from.id].scheduler.timetext = msg.payload.content;\n                sessiondata[msg.originalMessage.from.id].scheduler.time = arr[0]*60+arr[1];\n                sessiondata[msg.originalMessage.from.id].context = \"scheduler\";\n                flow.set(\"sessiondata\", sessiondata);\n                // generate the buttons for the possible states\n                msg.payload.content = \"📆 Select a state:\\r\\n\";\n                let keyboard = {\n                    keyboard: [],\n                    'resize_keyboard': true,\n                    'one_time_keyboard': true\n                }\n                let device = sessiondata[msg.originalMessage.from.id].scheduler.device\n                for (let i = 0; i < device.options.length; i++) {\n                    let newarray = []\n                    newarray.push(device.options[i].text);\n                    keyboard.keyboard.push(newarray);\n                }\n                let opts = {\n                    reply_to_message_id: msg.originalMessage.messageId,\n                    reply_markup: JSON.stringify(keyboard)\n                };\n                msg.payload.options = opts;\n                return [msg, null, null];\n\n            }\n            if (sessiondata[msg.originalMessage.from.id].scheduler.step === 3) {\n\n                let device = sessiondata[msg.originalMessage.from.id].scheduler.device\n                let state = device.options.find(item => item.text === msg.payload.content);\n                if (state === undefined) {\n                    msg.payload.content = \"Invalid state, please start again.\";\n                    delete sessiondata[msg.originalMessage.from.id].scheduler;\n                    delete sessiondata[msg.originalMessage.from.id].context;\n                    flow.set(\"sessiondata\", sessiondata);\n                    return [msg, null, null];\n                }\n\n                // Save schedule\n                let schedules = global.get(\"schedules\", \"file\");\n                if (schedules===undefined) { schedules=[]; }\n                let newschedule = {\n                    \"uuid\": generateUUID(),\n                    \"chatid\": msg.originalMessage.from.id,\n                    \"name\": device.name,\n                    \"id\": device.id,\n                    \"type\": device.type,\n                    \"time\": sessiondata[msg.originalMessage.from.id].scheduler.time,\n                    \"timetext\": sessiondata[msg.originalMessage.from.id].scheduler.timetext,\n                    \"statetext\": state.text,\n                    \"statevalue\": state.value\n                };\n                // START: save message data to support reply\n                msg.save = true;\n                msg.savecontent = newschedule;\n                msg.savecontent.step = \"newschedule\";\n                msg.context = \"scheduler\";\n                // END: save message data to support reply\n                msg.payload.content = \"📆 Schedule created:\\r\\n\";\n                msg.payload.content += newschedule.name + \" @ \" + newschedule.timetext + \" -> \" + newschedule.statetext;\n                schedules.push(newschedule);\n                global.set(\"schedules\", schedules, \"file\");\n                delete sessiondata[msg.originalMessage.from.id].scheduler;\n                delete sessiondata[msg.originalMessage.from.id].context;\n                flow.set(\"sessiondata\", sessiondata);\n                return [msg, null, null];\n            }\n            if (sessiondata[msg.originalMessage.from.id].scheduler.step === 99) {\n                // This is a reply to an earlier scheduler message\n                let content = sessiondata[msg.originalMessage.from.id].scheduler.content;\n\n                if (content.step === \"executeschedule\") {\n                    // This is a reply to an earlier \"schedule executed\" message. Need to reschdule it\n                    \n                    // You can check what the reply text is, but here I accept anything\n                    // if (msg.payload.content.toLowerCase() = \"reschedule\") {\n                    \n                    // Save schedule\n                    let schedules = global.get(\"schedules\", \"file\");\n                    if (schedules === undefined) { schedules = []; }\n                    let newschedule = content;\n                    newschedule.uuid = generateUUID(); // Regenerate the UUID to be safe\n                    delete newschedule.step; // this is not needed any more\n\n                    // START: save message data to support reply\n                    msg.save = true;\n                    msg.savecontent = newschedule;\n                    msg.savecontent.step = \"newschedule\";\n                    msg.context = \"scheduler\";\n                    // END: save message data to support reply\n                    msg.payload.content = \"📆 (Re)schedule created:\\r\\n\";\n                    msg.payload.content += newschedule.name + \" @ \" + newschedule.timetext + \" -> \" + newschedule.statetext;\n                    schedules.push(newschedule);\n                    global.set(\"schedules\", schedules, \"file\");\n                    delete sessiondata[msg.originalMessage.from.id].scheduler;\n                    delete sessiondata[msg.originalMessage.from.id].context;\n                    flow.set(\"sessiondata\", sessiondata);\n                    return [msg, null, null];\n                }\n\n\n                // This is a fallback step for schedule replies that are not supported yet\n                msg.payload.content = \"Function not supported yet\";\n                delete sessiondata[msg.originalMessage.from.id].scheduler;\n                delete sessiondata[msg.originalMessage.from.id].context;\n                flow.set(\"sessiondata\", sessiondata);\n                return [msg, null, null];\n            }\n        }\n    } else {\n        return [null,msg,null];\n    }\n} else {\n    // pass the message to the next node\n    return [null,null,msg];\n}\n\n\n",
        "outputs": "3",
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 360,
        "y": 2900,
        "wires": [
            [
                "bb58bfabd0b79b07"
            ],
            [
                "3cf389de146de164"
            ],
            [
                "852df5045accc6eb"
            ]
        ],
        "outputLabels": [
            "User has access",
            "Insufficient access",
            "Handover to next node"
        ]
    }
]

Modbus Communication in Node-Red done right

I have been using modbus in Node-Red for a long time, and always had small connection and reliability issues. Until I found out that I was doing it the wrong way. In this video I talk about using flex-getter and flex-write nodes to queue messages correctly. And for those complex cases, I built a bespoke queue function node.

Modbus queueing

Example flow that I explained in the flow:

[
    {
        "id": "dcef3ac075628620",
        "type": "comment",
        "z": "151def52ea276427",
        "name": "Modbus queue management",
        "info": "What the explainer video:\nhttps://youtu.be/FVI1hgdqcms",
        "x": 160,
        "y": 960,
        "wires": []
    },
    {
        "id": "e35a4b7af5461031",
        "type": "debug",
        "z": "151def52ea276427",
        "name": "Flex Getter",
        "active": true,
        "tosidebar": true,
        "console": false,
        "tostatus": false,
        "complete": "payload",
        "targetType": "msg",
        "statusVal": "",
        "statusType": "auto",
        "x": 610,
        "y": 1080,
        "wires": []
    },
    {
        "id": "1d0acf47af0d0c5c",
        "type": "debug",
        "z": "151def52ea276427",
        "name": "Flex Write",
        "active": true,
        "tosidebar": true,
        "console": false,
        "tostatus": false,
        "complete": "payload",
        "targetType": "msg",
        "statusVal": "",
        "statusType": "auto",
        "x": 600,
        "y": 1120,
        "wires": []
    },
    {
        "id": "74f24cef68a52071",
        "type": "debug",
        "z": "151def52ea276427",
        "name": "Status Messages",
        "active": false,
        "tosidebar": true,
        "console": false,
        "tostatus": false,
        "complete": "payload",
        "targetType": "msg",
        "statusVal": "",
        "statusType": "auto",
        "x": 630,
        "y": 1160,
        "wires": []
    },
    {
        "id": "cff6d83b78cf4600",
        "type": "inject",
        "z": "151def52ea276427",
        "name": "Read 1000",
        "props": [
            {
                "p": "payload"
            },
            {
                "p": "topic",
                "vt": "str"
            }
        ],
        "repeat": "",
        "crontab": "",
        "once": false,
        "onceDelay": 0.1,
        "topic": "read_test",
        "payload": "{\"value\":0,\"fc\":3,\"unitid\":1,\"address\":1000,\"quantity\":20}",
        "payloadType": "json",
        "x": 120,
        "y": 1080,
        "wires": [
            [
                "87cfa28fd7e74c52"
            ]
        ]
    },
    {
        "id": "d8a0df47ce40ca3f",
        "type": "inject",
        "z": "151def52ea276427",
        "name": "update",
        "props": [
            {
                "p": "payload"
            },
            {
                "p": "topic",
                "vt": "str"
            }
        ],
        "repeat": "1",
        "crontab": "",
        "once": false,
        "onceDelay": 0.1,
        "topic": "update",
        "payload": "",
        "payloadType": "date",
        "x": 110,
        "y": 1020,
        "wires": [
            [
                "87cfa28fd7e74c52"
            ]
        ]
    },
    {
        "id": "4940d44cbe7cfd1d",
        "type": "inject",
        "z": "151def52ea276427",
        "name": "next",
        "props": [
            {
                "p": "payload"
            },
            {
                "p": "topic",
                "vt": "str"
            }
        ],
        "repeat": "",
        "crontab": "",
        "once": false,
        "onceDelay": 0.1,
        "topic": "next",
        "payload": "",
        "payloadType": "date",
        "x": 130,
        "y": 1280,
        "wires": [
            [
                "87cfa28fd7e74c52"
            ]
        ]
    },
    {
        "id": "2622679c0fb7ceaf",
        "type": "inject",
        "z": "151def52ea276427",
        "name": "Read 2000",
        "props": [
            {
                "p": "payload"
            },
            {
                "p": "topic",
                "vt": "str"
            }
        ],
        "repeat": "",
        "crontab": "",
        "once": false,
        "onceDelay": 0.1,
        "topic": "read_test_2",
        "payload": "{\"value\":0,\"fc\":3,\"unitid\":1,\"address\":2000,\"quantity\":20}",
        "payloadType": "json",
        "x": 120,
        "y": 1140,
        "wires": [
            [
                "87cfa28fd7e74c52"
            ]
        ]
    },
    {
        "id": "240289ea44994d54",
        "type": "inject",
        "z": "151def52ea276427",
        "name": "Write",
        "props": [
            {
                "p": "payload"
            },
            {
                "p": "topic",
                "vt": "str"
            }
        ],
        "repeat": "",
        "crontab": "",
        "once": false,
        "onceDelay": 0.1,
        "topic": "write",
        "payload": "{\"value\":100,\"fc\":6,\"unitid\":1,\"address\":1520,\"quantity\":20}",
        "payloadType": "json",
        "x": 110,
        "y": 1200,
        "wires": [
            [
                "87cfa28fd7e74c52"
            ]
        ]
    },
    {
        "id": "ccd42b930184550c",
        "type": "delay",
        "z": "151def52ea276427",
        "name": "",
        "pauseType": "delay",
        "timeout": "5",
        "timeoutUnits": "seconds",
        "rate": "1",
        "nbRateUnits": "1",
        "rateUnits": "second",
        "randomFirst": "1",
        "randomLast": "5",
        "randomUnits": "seconds",
        "drop": false,
        "allowrate": false,
        "outputs": 1,
        "x": 510,
        "y": 1020,
        "wires": [
            [
                "d2d85179a058bb66"
            ]
        ]
    },
    {
        "id": "d2d85179a058bb66",
        "type": "change",
        "z": "151def52ea276427",
        "name": "Advance queue",
        "rules": [
            {
                "t": "set",
                "p": "topic",
                "pt": "msg",
                "to": "next",
                "tot": "str"
            }
        ],
        "action": "",
        "property": "",
        "from": "",
        "to": "",
        "reg": false,
        "x": 680,
        "y": 1020,
        "wires": [
            [
                "87cfa28fd7e74c52"
            ]
        ]
    },
    {
        "id": "87cfa28fd7e74c52",
        "type": "function",
        "z": "151def52ea276427",
        "name": "Modbus Queue",
        "func": "let resendifnoresposne = true; // resend the last message is no response is received\nlet resendinterval = 10; // resend last e message every x seconds\nlet online_threshold = 10; // Seconds between updates under which the device is considered online\nlet offline_threshold = 300; // Seconds between updates above which the device is considered offline\n\nlet notifmsg = null;\n\n// Check and make sure the incoming message has a topic\nif ((msg.topic===\"\")||(msg.topic===null)||(msg.topic===undefined)) {\n    node.status({fill:\"red\",shape:\"dot\",text:\"Topic missing\"});\n    return;\n}\n\nlet lastupdate = context.get(\"lastupdate\");\nlet state = context.get(\"state\") | 0;\nlet queue = context.get(\"queue\");\nlet queuecount = 0;\nif (queue === undefined) {\n    queue = [];\n} else {\n    if (Array.isArray(queue)) {\n        queuecount = queue.length;\n    } else {\n        queue = [];\n    }\n}\nlet current = new Date().getTime();\nlet send = false;\n\nswitch (msg.topic.toLowerCase()) {\n    case \"update\":\n        // Update the timer and statistics\n\n        if (lastupdate !== undefined) {\n            notifmsg = { \"topic\": \"Information\", \"payload\": {} };\n            current = current - lastupdate;\n            current = Math.floor(current / 1000);\n            notifmsg.payload.secondsincelastupdate = current;\n            var minute = Math.floor(current / 60);\n            var hour = Math.floor(minute / 60);\n            var day = Math.floor(hour / 24);\n            if (current > 24 * 60 * 60) {\n                notifmsg.payload.updatetext = \"Last update \" + day + \" days, \" + hour % 24 + \" hours, \" + minute % 60 + \" minutes, \" + current % 60 + \" seconds ago\";\n            } else if (current > 60 * 60) {\n                notifmsg.payload.updatetext = \"Last update \" + hour % 24 + \" hours, \" + minute % 60 + \" minutes, \" + current % 60 + \" seconds ago\";\n            } else if (current > 60) {\n                notifmsg.payload.updatetext = \"Last update \" + minute % 60 + \" minutes, \" + current % 60 + \" seconds ago\";\n            } else {\n                notifmsg.payload.updatetext = \"Last update \" + current % 60 + \" seconds ago\";\n            }\n\n            // Resend last message if there is no response from the server\n            if (resendifnoresposne) {\n                if ((current>0) && (current % resendinterval===0)) {\n                    let lastmsg = context.get(\"lastmsg\");\n                    if ((lastmsg!==undefined) && (context.get(\"sent\"))) {\n                        notifmsg.payload.resend = true;\n                        if ((lastmsg.payload.fc === 1) || (lastmsg.payload.fc === 2) || (lastmsg.payload.fc === 3) || (lastmsg.payload.fc === 4)) {\n                            // this is a modbus read request\n                            node.status({ fill: \"green\", shape: \"dot\", text: \"Read re-sent!\" });\n                            return [lastmsg, null, notifmsg];\n                        } else {\n                            // this is a modbus write request\n                            node.status({ fill: \"green\", shape: \"dot\", text: \"Write re-sent!\" });\n                            return [null, lastmsg, notifmsg];\n                        }\n                    }\n                }\n            }\n\n            // Check for online state\n            if (state !== 1) {\n                if (current < online_threshold) {\n                    notifmsg.topic = \"Warning\";\n                    notifmsg.payload.text = \"Device is now online\";\n                    notifmsg.payload.statuschange = true;\n                    state = 1;\n                    context.set(\"state\", state);\n                }\n            } else {\n                if (current > offline_threshold) {\n                    notifmsg.topic = \"Error\";\n                    notifmsg.payload.text = \"Device is not transmitting\";\n                    notifmsg.payload.statuschange = true;\n                    state = 99;\n                    context.set(\"state\", state);\n                }\n            }\n            notifmsg.payload.state = state;\n            if (state===1) {\n                node.status({ fill: \"blue\", shape: \"ring\", text: queuecount+ \" | \"+notifmsg.payload.updatetext });\n            } else {\n                node.status({ fill: \"red\", shape: \"ring\", text: queuecount + \" | \" + notifmsg.payload.updatetext });\n            }\n            return [null, null, notifmsg];\n\n        } else {\n            node.status({ fill: \"grey\", shape: \"ring\", text: \"No data\" });\n        }\n        break;\n    case \"next\":\n        // Update the lastupdate counter\n        context.set(\"lastupdate\", current);\n        context.set(\"sent\", false);\n        send = true;\n        break;\n    case \"reset\":\n        context.set(\"queue\",[]);\n        context.set(\"sent\", false);\n        context.set(\"lastmsg\", undefined);\n        break;\n    default:\n        // The incoming message is a modbus request\n\n        // delete the last msg to prevent from being resent\n        // context.set(\"lastmsg\", undefined);\n\n        // Check if there is already a message in the queue with the same topic. \n        // If there is it needs to be deleted, because we only keep the last message.\n        for (let i=queue.length-1; i>=0; i--) {\n            if (queue[i].topic === msg.topic) {\n                queue.splice(i,1);\n            }\n        }\n\n        // Add the message to the end of the queue\n        queue.push(msg);\n        context.set(\"queue\",queue);\n\n        if (!context.get(\"sent\")) {\n            send = true;\n        }\n        node.status({ fill: \"green\", shape: \"dot\", text: queue.length });\n\n}\n\n// We need to send out a new message\nif (send) {\n\n    if (queue.length>0) {\n        // Get the older message from the array\n        let newmsg = queue[0];\n        // remove this message\n        queue.splice(0,1);\n        context.set(\"queue\",queue);\n        context.set(\"sent\", true);\n        context.set(\"lastmsg\", newmsg);\n\n        if ((newmsg.payload.fc === 1) || (newmsg.payload.fc === 2) || (newmsg.payload.fc === 3) || (newmsg.payload.fc === 4)) {\n            // this is a modbus read request\n            node.status({ fill: \"green\", shape: \"dot\", text: \"Read sent!\" });\n            return[newmsg,null,null];\n        } else {\n            // this is a modbus write request\n            node.status({ fill: \"green\", shape: \"dot\", text: \"Write sent!\" });\n            return [null, newmsg, null];\n        }\n        \n    }\n}\n",
        "outputs": 3,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 320,
        "y": 1120,
        "wires": [
            [
                "e35a4b7af5461031",
                "ccd42b930184550c"
            ],
            [
                "1d0acf47af0d0c5c",
                "ccd42b930184550c"
            ],
            [
                "74f24cef68a52071"
            ]
        ],
        "info": "# Modbus Queue\r\n\r\nThis node queueing read and write messages for modbus. Use this node if you are reading and writing the same device with many different requests. E.g. reading different coil/register intervals continously and also writing to the device at the same time.\r\n\r\nIt does a few things:\r\n- queues all messages arrive on the input port\r\n- based on the msg.topic, older messages of the same topci is ignored\r\n- sends out the oldest message and waits for the \r\n- monitors the time since last message and send out report on the output\r\n- handles online/offline status\r\n- resend the last message is response is not received in time\r\n\r\n## Input Data\r\n\r\n### payload\r\n\r\nThe payload should contain the data that gets sent to the flex-getter or flex-write node.\r\nTypical modbus read payload:\r\n`{\"value\":0,\"fc\":3,\"unitid\":1,\"address\":1000,\"quantity\":20}`\r\nTypical modbus write payload:\r\n`{\"value\":false,\"fc\":5,\"unitid\":1,\"address\":0,\"quantity\":1}`\r\n\r\n### topic\r\n\r\nEach message must contain a topic (any text), and this topic is used to identify the different read/write requests and delete any earlier request with the same topic if it still in the queueing\r\n\r\nThere are a few reserved topic for special function (for these payload is ignored):\r\n- reset: resets the queue and deleted any data collected so far\r\n- next: this is the message fed back from the flex getter/write node to indicate to this node that a new message can be sent out\r\n- update: this should be coming from a 1 second time to display the current queue count, time since the last update and online/offline status\r\n\r\n## Output ports\r\n\r\n### Port 1: flex getter\r\n\r\nThis output should be connected to a modbos-flex-getter and all the read requests will be sent out through this port\r\n\r\n### Port 2: flex write\r\n\r\nThis output should be connected to a modbos-flex-write and all the write requests will be sent out through this port\r\n\r\n### Port 3: status messages\r\n\r\nThis port outputs a status message for every update message (msg.topic=\"update\").\r\n\r\n- topic: \"Information\" for regular updates, \"Warning\": offline device is now back online, \"Error\": device is offline\r\n- payload.text: message like when the device gone offline, or back online\r\n- payload.updatetext: time passed since the last update (human readable format)\r\n- payload.secondsincelastupdate: number of seconds since the last update from the device\r\n- payload.statuschange: true if status is changed (gone offline, back online)\r\n- payload.state: 0: initial state, no data yet, 1: device online, 99: device offline\r\n\r\n## Node Settings\r\n\r\nChange the settings in the first 4 lines of the code to influence the behaviour. Explanation is in the code as comment."
    }
]

Generate graphs dynamically from Telegram

I build a simple "conversation engine" in Telegram which asks you a couple of questions on what data to select from InfluxDB and generates a graph at the end.

On-Demand graphs in Telegram

This is the main flow as explained in the video above. Also please scroll up two sections above where I posted the changes to the "Language Processing" node to include the changes about the session handling.

And now I made a second video because I added a CSV output besides the graph as well: On-Demand graphs now comes with a CSV output as well

[
    {
        "id": "fce8e0dea57fd66b",
        "type": "function",
        "z": "661be9dfd2de4740",
        "name": "Graph",
        "func": "// Set the required access for this function here\nvar checklevel = 2;\n\nlet now = new Date();\n\n// Handle the help request\nif (msg.originalMessage.help) {\n    // only add the help command if the user has access to execute it\n    if (msg.originalMessage.from.accesslevel >= checklevel) {\n        msg.payload.content = msg.payload.content + \"\\r\\ngraph: generate graph from sensor data stored in InfluxDB\";  \n    }\n    return [null,null,msg];\n}\n\n// Check if the current message contains this function\nif (msg.originalMessage.command===\"graph\") {\n    // check if the user has access to execute this function\n    if (msg.originalMessage.from.accesslevel >= checklevel) {\n        // user has access to run the \n\n        // Check the user session\n        let sessiondata = flow.get(\"sessiondata\");\n        // Check session data for graph\n        if (sessiondata[msg.originalMessage.from.id].graph===undefined) {\n            // no session data so far, it needs to be created\n            sessiondata[msg.originalMessage.from.id].graph = {\"step\": 1, \"lastseen\": now.getTime()};\n            sessiondata[msg.originalMessage.from.id].context = \"graph\";\n            flow.set(\"sessiondata\",sessiondata);\n            msg.payload = sessiondata[msg.originalMessage.from.id].graph;\n            return [msg,null,null];  \n        } else {\n            if (sessiondata[msg.originalMessage.from.id].graph.step === 1) {\n\n                sessiondata[msg.originalMessage.from.id].graph.step = 2;\n                sessiondata[msg.originalMessage.from.id].graph.lastseen = now.getTime();\n                sessiondata[msg.originalMessage.from.id].graph.device = msg.payload.content;\n                sessiondata[msg.originalMessage.from.id].context = \"graph\";\n                flow.set(\"sessiondata\", sessiondata);\n                msg.query = \"from(bucket: \\\"nodered\\\")  |> range(start: -24h)  |> filter(fn: (r) => r._measurement == \\\"sensors\\\")  |> filter(fn: (r) => r.device == \\\"\" + msg.payload.content + \"\\\")  |> keep(columns: [\\\"_field\\\"])  |> distinct(column: \\\"_field\\\")  |> keep(columns: [\\\"_value\\\"])\";\n                msg.payload = sessiondata[msg.originalMessage.from.id].graph;\n                return [msg, null, null];\n\n            }\n            /*\n            if (sessiondata[msg.originalMessage.from.id].graph.step === 2) {\n\n                sessiondata[msg.originalMessage.from.id].graph.step = 3;\n                sessiondata[msg.originalMessage.from.id].graph.lastseen = now.getTime();\n                sessiondata[msg.originalMessage.from.id].graph.field = msg.payload.content;\n                sessiondata[msg.originalMessage.from.id].context = \"graph\";\n                flow.set(\"sessiondata\", sessiondata);\n                msg.payload = sessiondata[msg.originalMessage.from.id].graph;\n                return [msg, null, null];\n            }\n            */\n            if (sessiondata[msg.originalMessage.from.id].graph.step === 2) {\n\n                sessiondata[msg.originalMessage.from.id].graph.step = 4;\n                sessiondata[msg.originalMessage.from.id].graph.lastseen = now.getTime();\n                sessiondata[msg.originalMessage.from.id].graph.field = msg.payload.content;\n                sessiondata[msg.originalMessage.from.id].context = \"graph\";\n                msg.query = \"from(bucket: \\\"nodered\\\")   |> range(start: -24h)  |> filter(fn: (r) => r[\\\"_measurement\\\"] == \\\"sensors\\\")   |> filter(fn: (r) => r[\\\"device\\\"] == \\\"\" + sessiondata[msg.originalMessage.from.id].graph.device + \"\\\")   |> filter(fn: (r) => r[\\\"_field\\\"] == \\\"\" + msg.payload.content + \"\\\")\";\n                msg.payload = { \"step\": 4 };\n                msg.graph = sessiondata[msg.originalMessage.from.id].graph;\n                // Last step, delete the session\n                delete sessiondata[msg.originalMessage.from.id].graph;\n                delete sessiondata[msg.originalMessage.from.id].context;\n                flow.set(\"sessiondata\", sessiondata);\n                return [msg, null, null];\n\n            }\n        }\n    } else {\n        return [null,msg,null];\n    }\n} else {\n    // pass the message to the next node\n    return [null,null,msg];\n}\n\n\n",
        "outputs": "3",
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 350,
        "y": 2800,
        "wires": [
            [
                "0e42ce67ee1625e1"
            ],
            [
                "3cf389de146de164"
            ],
            [
                "b94342b5128f4d65"
            ]
        ],
        "outputLabels": [
            "User has access",
            "Insufficient access",
            "Handover to next node"
        ]
    },
    {
        "id": "a252c52bd0242411",
        "type": "function",
        "z": "661be9dfd2de4740",
        "name": "Create response",
        "func": "\nlet keyboard = {\n    keyboard: [],\n    'resize_keyboard': true,\n    'one_time_keyboard': true\n}\n\nfor (let i = 0; i < msg.payload.length; i++) {\n    let newarray = []\n    newarray.push(msg.payload[i]._value);\n    keyboard.keyboard.push(newarray);\n}\n\nlet opts = {\n    reply_to_message_id: msg.originalMessage.messageId,\n    reply_markup: JSON.stringify(keyboard)\n};\n\n\nmsg.payload = { \"chatId\": msg.originalMessage.from.id, \"type\": \"message\", content: \"Please select a device:\" };\n\nmsg.payload.options = opts;\n\nreturn msg;",
        "outputs": 1,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 960,
        "y": 2760,
        "wires": [
            [
                "26e833ca88a010b1"
            ]
        ]
    },
    {
        "id": "0e42ce67ee1625e1",
        "type": "switch",
        "z": "661be9dfd2de4740",
        "name": "Step",
        "property": "payload.step",
        "propertyType": "msg",
        "rules": [
            {
                "t": "eq",
                "v": "1",
                "vt": "num"
            },
            {
                "t": "eq",
                "v": "2",
                "vt": "num"
            },
            {
                "t": "eq",
                "v": "3",
                "vt": "num"
            },
            {
                "t": "eq",
                "v": "4",
                "vt": "num"
            }
        ],
        "checkall": "true",
        "repair": false,
        "outputs": 4,
        "x": 550,
        "y": 2780,
        "wires": [
            [
                "c3f78134cb2a1162"
            ],
            [
                "546b17082c8028ef"
            ],
            [
                "583f7d270c344703"
            ],
            [
                "bd2fcddb3e042bdb"
            ]
        ]
    },
    {
        "id": "c3f78134cb2a1162",
        "type": "influxdb in",
        "z": "661be9dfd2de4740",
        "influxdb": "2dce016f48a38240",
        "name": "Device list",
        "query": "from(bucket: \"nodered\")\n  |> range(start: -1y)\n  |> filter(fn: (r) => r._measurement == \"sensors\")\n  |> keep(columns: [\"device\"])\n  |> distinct(column: \"device\")\n  |> keep(columns: [\"_value\"])",
        "rawOutput": false,
        "precision": "",
        "retentionPolicy": "",
        "org": "nygmatech",
        "x": 730,
        "y": 2760,
        "wires": [
            [
                "a252c52bd0242411"
            ]
        ]
    },
    {
        "id": "4903da082024747a",
        "type": "inject",
        "z": "661be9dfd2de4740",
        "name": "",
        "props": [
            {
                "p": "payload"
            },
            {
                "p": "topic",
                "vt": "str"
            }
        ],
        "repeat": "",
        "crontab": "",
        "once": false,
        "onceDelay": 0.1,
        "topic": "",
        "payload": "",
        "payloadType": "date",
        "x": 540,
        "y": 2840,
        "wires": [
            [
                "c3f78134cb2a1162"
            ]
        ]
    },
    {
        "id": "947cd078fa4c7315",
        "type": "function",
        "z": "661be9dfd2de4740",
        "name": "Create response",
        "func": "\nlet keyboard = {\n    keyboard: [],\n    'resize_keyboard': true,\n    'one_time_keyboard': true\n}\n\nfor (let i = 0; i < msg.payload.length; i++) {\n    let newarray = []\n    newarray.push(msg.payload[i]._value);\n    keyboard.keyboard.push(newarray);\n}\n\nlet opts = {\n    reply_to_message_id: msg.originalMessage.messageId,\n    reply_markup: JSON.stringify(keyboard)\n};\n\n\nmsg.payload = { \"chatId\": msg.originalMessage.from.id, \"type\": \"message\", content: \"Please select a field:\" };\n\nmsg.payload.options = opts;\n\nreturn msg;",
        "outputs": 1,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 960,
        "y": 2800,
        "wires": [
            [
                "a5d4e7c345cb44e6"
            ]
        ]
    },
    {
        "id": "546b17082c8028ef",
        "type": "influxdb in",
        "z": "661be9dfd2de4740",
        "influxdb": "2dce016f48a38240",
        "name": "Field list",
        "query": "",
        "rawOutput": false,
        "precision": "",
        "retentionPolicy": "",
        "org": "nygmatech",
        "x": 720,
        "y": 2800,
        "wires": [
            [
                "947cd078fa4c7315",
                "acc7893c01e3e8fb"
            ]
        ]
    },
    {
        "id": "583f7d270c344703",
        "type": "function",
        "z": "661be9dfd2de4740",
        "name": "Create response",
        "func": "// Send the time options back\nvar opts = {\n    reply_to_message_id: msg.originalMessage.messageId,\n    reply_markup: JSON.stringify({\n        keyboard: [\n            ['today'],\n            ['yesterday'],\n            ['last 5 days'],\n            ['last hour'],\n            ['last 6 hours']],\n        'resize_keyboard': true,\n        'one_time_keyboard': true\n    })\n};\n\n\nmsg.payload = { \"chatId\": msg.originalMessage.from.id, \"type\": \"message\", content: \"Please select a device:\" };\n\nmsg.payload.options = opts;\n\nreturn msg;",
        "outputs": 1,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 960,
        "y": 2840,
        "wires": [
            [
                "d8f201d6ba44acdc"
            ]
        ]
    },
    {
        "id": "bd2fcddb3e042bdb",
        "type": "influxdb in",
        "z": "661be9dfd2de4740",
        "influxdb": "2dce016f48a38240",
        "name": "Graph data",
        "query": "",
        "rawOutput": false,
        "precision": "",
        "retentionPolicy": "",
        "org": "nygmatech",
        "x": 730,
        "y": 2840,
        "wires": [
            [
                "acc7893c01e3e8fb",
                "305208111941492f",
                "5a1cddb272823639"
            ]
        ]
    },
    {
        "id": "305208111941492f",
        "type": "function",
        "z": "661be9dfd2de4740",
        "name": "Line Chart",
        "func": "function getGetOrdinal(n) {\n    var s=[\"th\",\"st\",\"nd\",\"rd\"],\n    v=n%100;\n    return n+(s[(v-20)%10]||s[v]||s[0]);\n }\n\nvar dL = [\"Sunday\", \"Monday\", \"Tuesday\", \"Wednesday\", \"Thursday\", \"Friday\", \"Saturday\"];\nvar dS = [\"Sun\", \"Mon\", \"Tue\", \"Wed\", \"Thu\", \"Fri\", \"Sat\"];\nvar mL = ['January', 'February', 'March', 'April', 'May', 'June', 'July', 'August', 'September', 'October', 'November', 'December'];\nvar mS = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'June', 'July', 'Aug', 'Sept', 'Oct', 'Nov', 'Dec'];\n\nmsg.save = msg.payload;\nlet now = new Date();\n\n// Create the chart object\nlet m = {\n    type: 'line',\n    options: {\n        title: {\n            display:true,\n            text:msg.graph.device +\"/\"+msg.graph.field+\" in the last 24hrs\"\n        },\n        legend: {\n            display:false\n        },\n        chartArea: {\n            backgroundColor: 'white'\n        },\n        plugins: {\n            datalabels: {\n                display:false,\n                backgroundColor:'whitesmoke',\n                borderRadius:1,\n                padding:1,\n                align: 'right',\n                anchor: function(context) {\n                    //node.send({debug:{dataindex:context.dataIndex}});\n                    if (context.dataIndex == context.dataset.data.length - 1) {\n                        return 'center';\n                    } else {\n                        return 'end';\n                    }\n                },\n                offset:8,\n                formatter: function(value, context) {\n                    if ((context.dataIndex>0)&&(context.dataIndex<context.dataset.data.length - 1)) {\n                    // check local maximum\n                        if ((context.dataset.data[context.dataIndex]>context.dataset.data[context.dataIndex+1])&&(context.dataset.data[context.dataIndex]>context.dataset.data[context.dataIndex-1])) {\n                            return value.toLocaleString();\n                        } else {\n                           return \"\";  \n                        }\n                    } else {\n                        return \"\";\n                    }\n                }\n            }\n        }\n    },\n    data: {\n        labels:[],\n        datasets: [\n            {\n                label:\"data\",\n                fill:false,\n                pointRadius: 0,\n                data:[]\n            }\n        ]\n    }\n}\n\n\n\nfor (var i=0;i<msg.payload.length;i++) {\n    var d = new Date(msg.payload[i]._time);\n    let hh = d.getHours() < 10 ? \"0\" + d.getHours() : d.getHours();\n    let mmm  = d.getMinutes() < 10 ? \"0\" + d.getMinutes() : d.getMinutes();    \n    \n    m.data.datasets[0].data.push(msg.payload[i]._value);\n    m.data.labels.push(hh+\":\"+mmm);\n}\n\nmsg.payload = m;\n\nreturn msg;\n\n\n\n",
        "outputs": 1,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 950,
        "y": 2880,
        "wires": [
            [
                "ef1c2347373e0feb"
            ]
        ]
    },
    {
        "id": "ef1c2347373e0feb",
        "type": "chart-image",
        "z": "661be9dfd2de4740",
        "name": "",
        "width": 500,
        "height": "500",
        "x": 1170,
        "y": 2880,
        "wires": [
            [
                "41908db04314e5b4"
            ]
        ]
    },
    {
        "id": "41908db04314e5b4",
        "type": "file",
        "z": "661be9dfd2de4740",
        "name": "Image dump",
        "filename": "/home/nygma/charts/telegramgraph.png",
        "filenameType": "str",
        "appendNewline": true,
        "createDir": false,
        "overwriteFile": "true",
        "encoding": "none",
        "x": 1350,
        "y": 2880,
        "wires": [
            [
                "7d85325f7f3edba0"
            ]
        ]
    },
    {
        "id": "7d85325f7f3edba0",
        "type": "function",
        "z": "661be9dfd2de4740",
        "name": "Telegram message",
        "func": "msg.payload = {};\nmsg.payload.chatId = msg.originalMessage.from.id;\nmsg.payload.type = \"photo\";\nmsg.payload.content = \"/home/nygma/charts/telegramgraph.png\";\nmsg.payload.caption = \"Generated chart\";\nreturn msg;\n",
        "outputs": 1,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 1550,
        "y": 2880,
        "wires": [
            [
                "ff778937ee444b11"
            ]
        ]
    },
    {
        "id": "acc7893c01e3e8fb",
        "type": "debug",
        "z": "661be9dfd2de4740",
        "name": "",
        "active": false,
        "tosidebar": true,
        "console": false,
        "tostatus": false,
        "complete": "true",
        "targetType": "full",
        "statusVal": "",
        "statusType": "auto",
        "x": 1350,
        "y": 2760,
        "wires": []
    },
    {
        "id": "26e833ca88a010b1",
        "type": "link out",
        "z": "661be9dfd2de4740",
        "name": "",
        "mode": "link",
        "links": [
            "82332965f25e893c"
        ],
        "x": 1115,
        "y": 2760,
        "wires": []
    },
    {
        "id": "a5d4e7c345cb44e6",
        "type": "link out",
        "z": "661be9dfd2de4740",
        "name": "",
        "mode": "link",
        "links": [
            "82332965f25e893c"
        ],
        "x": 1115,
        "y": 2800,
        "wires": []
    },
    {
        "id": "d8f201d6ba44acdc",
        "type": "link out",
        "z": "661be9dfd2de4740",
        "name": "",
        "mode": "link",
        "links": [
            "82332965f25e893c"
        ],
        "x": 1115,
        "y": 2840,
        "wires": []
    },
    {
        "id": "ff778937ee444b11",
        "type": "link out",
        "z": "661be9dfd2de4740",
        "name": "",
        "mode": "link",
        "links": [
            "82332965f25e893c"
        ],
        "x": 1675,
        "y": 2920,
        "wires": []
    },
    {
        "id": "5a1cddb272823639",
        "type": "function",
        "z": "661be9dfd2de4740",
        "name": "CSV Prep",
        "func": "function TimestampToDateTime(ts) {\n    let yyyy = ts.getFullYear();\n    let mm = ts.getMonth() < 9 ? \"0\" + (ts.getMonth() + 1) : (ts.getMonth() + 1); // getMonth() is zero-based\n    let dd = ts.getDate() < 10 ? \"0\" + ts.getDate() : ts.getDate();\n    let hh = ts.getHours() < 10 ? \"0\" + ts.getHours() : ts.getHours();\n    let mmm = ts.getMinutes() < 10 ? \"0\" + ts.getMinutes() : ts.getMinutes();\n    let ss = ts.getSeconds() < 10 ? \"0\" + ts.getSeconds() : ts.getSeconds();\n\n    return dd + \".\" + mm + \".\" + yyyy + \" \" + hh + \":\" + mmm + \":\" + ss;\n }\n\n\nlet records = \"timestamp,date,\"+msg.graph.device+\"_\"+msg.graph.field+\"\\n\";\n\nfor (var i=0;i<msg.payload.length;i++) {\n    let timestamp = new Date(msg.payload[i]._time);\n    let formatteddate = TimestampToDateTime(timestamp);\n    let value = msg.payload[i]._value;\n    records += \"\"+timestamp.getTime()+\",\"+formatteddate+\",\"+value+\"\\n\";\n}\n\nmsg.payload = records;\n\nreturn msg;\n\n\n\n",
        "outputs": 1,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 940,
        "y": 2920,
        "wires": [
            [
                "1287ca6a48d6faf0"
            ]
        ]
    },
    {
        "id": "1287ca6a48d6faf0",
        "type": "file",
        "z": "661be9dfd2de4740",
        "name": "CSV dump",
        "filename": "/home/nygma/charts/telegramgraph.csv",
        "filenameType": "str",
        "appendNewline": true,
        "createDir": false,
        "overwriteFile": "true",
        "encoding": "none",
        "x": 1190,
        "y": 2920,
        "wires": [
            [
                "0b3acfb4aab83e25"
            ]
        ]
    },
    {
        "id": "0b3acfb4aab83e25",
        "type": "function",
        "z": "661be9dfd2de4740",
        "name": "Telegram message",
        "func": "msg.payload = {};\nmsg.payload.chatId = msg.originalMessage.from.id;\nmsg.payload.type = \"document\";\nmsg.payload.content = \"/home/nygma/charts/telegramgraph.csv\";\nmsg.payload.caption = \"CSV output\";\nreturn msg;\n",
        "outputs": 1,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 1450,
        "y": 2920,
        "wires": [
            [
                "ff778937ee444b11"
            ]
        ]
    },
    {
        "id": "2dce016f48a38240",
        "type": "influxdb",
        "hostname": "127.0.0.1",
        "port": "8086",
        "protocol": "http",
        "database": "database",
        "name": "InfluxDB",
        "usetls": false,
        "tls": "",
        "influxdbVersion": "2.0",
        "url": "http://localhost:8086",
        "rejectUnauthorized": false
    }
]
⚠️ **GitHub.com Fallback** ⚠️