Platforms
Product Lines
Platforms Safecrete Safewall Mine Operating System (Coming Soon)
On this page

Getting Started with Pipeline

Introduction

The Pipeline in IndustryOS is the core data processing mechanism responsible for receiving, transforming, routing, and reacting to events and telemetry coming from devices and related assets.

The Pipeline is built around three main components:

  • Message - Any incoming event. It can be incoming data from devices, device lifecycle event, REST API event, RPC request, etc.
  • Rule Node - A function that is executed on an incoming message. There are many different node types that can filter, transform, or execute actions on incoming messages.
  • Rule Chain - Nodes are connected with each other through relations, so the outbound message from one rule node is sent to the next connected rule nodes.

Key Characteristics

Stream Processing

Incoming data (telemetry, attributes, RPC, events) is immediately processed by the Pipeline, where it can be filtered, enriched, or transformed in real-time.

Rule Chain as a Workflow

Data processing is organised into Rule Chains — workflows composed of individual Rule Nodes. Each node performs a specific action such as filtering, saving data to a database, sending a notification, or calling an external API.

Flexibility and Extensibility

The Pipeline supports both built-in nodes and custom logic via scripts (JavaScript or TBEL). It also allows integrations with external systems (HTTP, Kafka, MQTT).

Integrations and Reactivity

The Pipeline can trigger business logic in response to events — for example, sending an email or SMS when a temperature threshold is exceeded, or integrating with external services and platforms.

Typical Use Cases

Here are some common scenarios that can be configured using IndustryOS Pipeline:

  • Data Validation and Transformation – Validate and modify incoming telemetry or attributes before persisting them in the database.
  • Telemetry Aggregation – Copy telemetry or attributes from devices to related assets to enable aggregation. For example, data from multiple devices can be combined into a related Asset for summary analytics.
  • Alarm Management – Create, update, or clear alarms based on defined conditions.
  • Device Lifecycle Monitoring – Trigger actions when device state changes. For example, generate alerts when a device goes online or offline.
  • Data Enrichment – Load additional context required for processing. For example, load temperature threshold values for a device that are defined in the device’s Customer or Tenant attributes.
  • External System Integration – Trigger REST API calls to external applications and services.
  • Notifications – Send email alerts when complex events occur, with the ability to include attributes from related entities in the email template.
  • User Personalisation – Take into account user preferences during event processing.
  • Remote Control – Execute RPC calls to devices based on defined conditions.
  • Big Data / Cloud Integration – Connect to external pipelines and platforms such as Kafka, Spark, or AWS services.

Rule Engine Message

A Rule Engine Message is a serialisable, immutable data structure that represents various messages in the system. For example:

  • Incoming telemetry, attribute updates, or RPC calls from devices
  • Entity lifecycle events: created, updated, deleted, assigned, unassigned, attributes updated
  • Device status events: connected, disconnected, active, inactive
  • Other system events

Rule Engine Messages contain the following information:

  • Message ID - Time-based, universally unique identifier
  • Originator - Device, Asset, or other Entity identifier
  • Message Type - “Post telemetry”, “Inactivity Event”, etc.
  • Payload - JSON body with actual message payload
  • Metadata - List of key-value pairs with additional data about the message

Rule Node

A Rule Node is a basic component of the Pipeline that processes a single incoming message at a time and produces one or more outgoing messages. Rule Nodes are the main logical units of the Pipeline. A Rule Node can filter, enrich, transform incoming messages, perform actions, or communicate with external systems.

Rule Node Categories

All available rule nodes are grouped by their function:

  • Filter Nodes - Used for message filtering and routing
  • Enrichment Nodes - Used to enrich messages with information stored in the database
  • Transformation Nodes - Used for changing message fields such as originator, type, data, and metadata
  • Action Nodes - Used to execute various actions based on the message
  • External Nodes - Used to interact with external systems
  • Flow Nodes - Used to control message flow between rule chains and interact with queues
  • Analytics Nodes - Used to aggregate data

Rule Node Connection

Rule Nodes may be connected to other rule nodes. Each relation has a relation type — a label used to identify the logical meaning of the relation. When a rule node produces an outgoing message, it always specifies the relation type which is used to route the message to the next nodes.

Typical rule node relations are “Success” and “Failure”. Rule nodes that represent logical operations may use “True” or “False”. Some specific rule nodes may use completely different relation types, for example: “Post Telemetry”, “Attributes Updated”, “Entity Created”, etc.

Rule Chain

A Rule Chain is a logical group of rule nodes and their relations. For example, a rule chain can:

  • Save all telemetry messages to the database
  • Raise “High Temperature Alarm” if the temperature field in the message is higher than 50 degrees
  • Raise “Low Temperature Alarm” if the temperature field in the message is lower than -40 degrees
  • Log failure to execute temperature check scripts to the console in case of logical or syntax errors

Tenant administrators can define one Root Rule Chain and optionally multiple other rule chains. The Root Rule Chain handles all incoming messages and may forward them to other rule chains for additional processing.

Example: Temperature Validation Rule Chain

Let’s create a simple rule chain that validates temperature readings from a DHT22 sensor. The sensor can measure temperature from -40°C to +80°C. We’ll configure the Pipeline to store all temperatures within this range and log invalid readings.

Step 1: Add Temperature Validation Node

Navigate to Rule Chains page and open the Root Rule Chain.

Drag and drop a Script Filter rule node to the chain. The node configuration window will open.

Use this script for data validation:

1
2
return msg.temperature == null || 
       (msg.temperature >= -40 && msg.temperature <= 80);

If the temperature property is not defined or the temperature is valid, the script returns True, otherwise it returns False. Messages returning True will be routed to nodes connected with the True relation.

Step 2: Connect the Nodes

Remove the existing Post Telemetry relation between the Message Type Switch node and Save Telemetry node.

Connect the Message Type Switch node with the Script Filter node using the Post Telemetry relation.

Connect the Script Filter node with the Save Telemetry node using the True relation so all valid telemetry is saved.

Connect the Script Filter node with a Log Other node using the False relation so invalid telemetry is logged in the system log.

Press Save to apply changes.

Step 3: Validate Results

Create a device and submit telemetry to IndustryOS. Navigate to Devices section and create a new device.

For posting device telemetry, use the REST API. Copy the device access token from your device.

Send a message with invalid temperature reading (99°C):

1
2
3
curl -v -X POST -d '{"temperature":99}' \
  http://localhost/api/v1/$ACCESS_TOKEN/telemetry \
  --header "Content-Type:application/json"

You’ll see that this telemetry was not added to the Device Latest Telemetry section.

Now send a message with a valid temperature reading (24°C):

1
2
3
curl -v -X POST -d '{"temperature":24}' \
  http://localhost/api/v1/$ACCESS_TOKEN/telemetry \
  --header "Content-Type:application/json"

You’ll see that this telemetry was saved successfully.

Debugging

IndustryOS provides the ability to review incoming and outgoing messages for each Rule Node. To enable debug mode, ensure the “Debug mode” checkbox is selected in the rule chain configuration window.

Once debug is enabled, you can see incoming and outgoing message information as well as corresponding relation types. This helps troubleshoot rule chain logic and identify where messages are being filtered or transformed.

Next Steps