Platforms
Product Lines
Platforms Safecrete Safewall Mine Operating System (Coming Soon)
On this page

Edge Data Collection

Overview

Edge data collection leverages the IndustryOS IoT Gateway to gather, process, and transmit data from distributed locations. The gateway performs local data aggregation, filtering, and buffering before sending to the cloud, optimising bandwidth and enabling offline operation.

Edge Computing Benefits

Reduced Latency

  • Local data processing
  • Immediate response to events
  • Real-time decision making
  • Faster alarm triggers

Bandwidth Optimisation

  • Data aggregation at edge
  • Filter unnecessary data
  • Compress before transmission
  • Send only changes

Reliability

  • Operate during network outages
  • Local data buffering
  • Automatic retry and resend
  • Guaranteed data delivery

Cost Reduction

  • Lower cloud ingestion costs
  • Reduced bandwidth usage
  • Less cloud storage needed
  • Optimised data transmission

Architecture Patterns

Pattern 1: Simple Edge Collection

1
2
Sensors → Gateway → Cloud
         (Collect)   (Store/Analyse)

Use Case: Basic monitoring

Configuration:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
  "mqtt": {
    "broker": "localhost",
    "port": 1883,
    "mapping": [
      {
        "topicFilter": "sensors/+/data",
        "converter": {
          "deviceNameExpression": "${topic[1]}",
          "deviceTypeExpression": "sensor"
        }
      }
    ]
  }
}

Pattern 2: Aggregation at Edge

1
2
3
Multiple Sensors → Gateway → Cloud
                   (Aggregate)
                   (Filter)

Use Case: High-frequency sensors

Report Strategy:

1
2
3
4
5
6
7
{
  "reportStrategy": {
    "type": "onReportPeriod",
    "reportPeriod": 60000,
    "aggregation": "average"
  }
}

Pattern 3: Store and Forward

1
2
3
Devices → Gateway → Cloud
          (Buffer)    (When connected)
          (Store)

Use Case: Intermittent connectivity

Configuration:

1
2
3
4
5
6
7
8
9
10
11
{
  "storage": {
    "type": "file",
    "maxSize": 1000000,
    "dataFolderPath": "/var/lib/gateway/data"
  },
  "reportStrategy": {
    "type": "onReportPeriod",
    "reportPeriod": 300000
  }
}

Pattern 4: Multi-site Collection

1
2
3
4
Site 1 Gateway ──┬───────────────┐
Site 2 Gateway ──┤  Central      ├───────────→ Cloud
Site 3 Gateway ──┤  Gateway      │           Platform
Site N Gateway ──┴───────────────┘

Use Case: Distributed operations

Implementation Examples

Example 1: Manufacturing Floor

Scenario

Collect data from production line sensors.

Equipment

  • 50 temperature sensors (MQTT)
  • 20 pressure sensors (Modbus)
  • 10 vibration sensors (OPC-UA)
  • 5 PLCs (Modbus TCP)

Gateway Configuration

MQTT Sensors:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{
  "mqtt": {
    "broker": "localhost",
    "port": 1883,
    "reportStrategy": {
      "type": "onChange",
      "threshold": 0.5
    },
    "mapping": [
      {
        "topicFilter": "production/temp/+",
        "converter": {
          "deviceNameExpression": "Temp-Sensor-${topic[2]}",
          "telemetry": [
            {"key": "temperature", "value": "${temperature}"}
          ]
        }
      }
    ]
  }
}

Modbus Devices:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{
  "modbus": {
    "type": "tcp",
    "host": "192.168.1.100",
    "devices": [
      {
        "deviceName": "Pressure-Sensor-${unitId}",
        "unitId": 1,
        "pollPeriod": 5000,
        "telemetry": [
          {
            "tag": "pressure",
            "address": 100,
            "type": "16int",
            "multiplier": 0.01
          }
        ]
      }
    ]
  }
}

Data Flow

  1. Gateway polls/subscribes to devices
  2. Collects data every 5 seconds
  3. Aggregates over 1 minute
  4. Sends average to platform
  5. Buffers during network issues

Example 2: Remote Oil & Gas

Scenario

Monitor remote wellhead with satellite connectivity.

Constraints

  • Limited bandwidth (satellite)
  • High latency
  • Expensive data transmission
  • Intermittent connectivity

Strategy

Local Buffering:

1
2
3
4
5
6
7
{
  "storage": {
    "type": "file",
    "maxSize": 10000000,
    "dataFolderPath": "/data/gateway"
  }
}

Aggressive Filtering:

1
2
3
4
5
6
{
  "reportStrategy": {
    "type": "onChange",
    "threshold": 2.0
  }
}

Scheduled Transmission:

1
2
3
4
5
6
{
  "reportStrategy": {
    "type": "onReportPeriod",
    "reportPeriod": 3600000  // 1 hour
  }
}

Results

  • 95% reduction in data volume
  • Operate offline for 24+ hours
  • Critical alerts sent immediately
  • Batch historical data hourly

Example 3: Smart Building

Scenario

Collect data from building automation systems.

Devices

  • HVAC units (BACnet)
  • Lighting (KNX)
  • Energy meters (Modbus)
  • Occupancy sensors (MQTT)

Multi-Protocol Configuration

BACnet:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
  "bacnet": {
    "devices": [
      {
        "deviceId": 1001,
        "deviceName": "HVAC-Floor-${floor}",
        "objectsList": [
          {"objectType": "analogInput", "objectId": 0, "key": "temperature"},
          {"objectType": "analogInput", "objectId": 1, "key": "setpoint"}
        ]
      }
    ]
  }
}

KNX:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
  "knx": {
    "gatewayIP": "192.168.1.50",
    "devices": [
      {
        "deviceName": "Lighting-Zone-${zone}",
        "groupAddresses": [
          {"address": "1/2/3", "key": "state", "dpt": "DPT-1"},
          {"address": "1/2/4", "key": "brightness", "dpt": "DPT-5"}
        ]
      }
    ]
  }
}

Data Processing at Edge

Filtering

Remove unnecessary data:

1
2
3
4
5
6
7
8
class FilterConverter:
    def convert(self, data, metadata):
        # Only send if value changed significantly
        if abs(data["value"] - self._last_value) < self._threshold:
            return None  # Don't send
            
        self._last_value = data["value"]
        return self._format_data(data)

Aggregation

Combine multiple readings:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class AggregationConverter:
    def __init__(self, config):
        self._buffer = []
        self._buffer_size = config.get("bufferSize", 10)
        
    def convert(self, data, metadata):
        self._buffer.append(data["value"])
        
        if len(self._buffer) >= self._buffer_size:
            avg = sum(self._buffer) / len(self._buffer)
            self._buffer.clear()
            
            return {
                "deviceName": data["device"],
                "telemetry": [
                    {"ts": int(time.time() * 1000),
                     "values": {"average": avg}}
                ]
            }
        return None

Alerting

Local threshold detection:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class AlertingConverter:
    def convert(self, data, metadata):
        value = data["temperature"]
        
        if value > self._threshold:
            # Send alert immediately
            return {
                "deviceName": data["device"],
                "attributes": [
                    {"key": "alert", "value": "High temperature"}
                ],
                "telemetry": [
                    {"ts": int(time.time() * 1000),
                     "values": {"temperature": value}}
                ]
            }
        
        # Normal data follows report strategy
        return None

Offline Operation

Data Buffering

Configuration:

1
2
3
4
5
6
7
8
{
  "storage": {
    "type": "file",
    "maxFileCount": 100,
    "maxRecordsPerFile": 1000,
    "dataFolderPath": "/var/lib/gateway/buffer"
  }
}

Behaviour:

  1. Network disconnects
  2. Gateway buffers data locally
  3. Continues collecting from devices
  4. Network reconnects
  5. Gateway transmits buffered data
  6. Resumes normal operation

Storage Management

1
2
3
4
5
6
7
8
9
10
11
12
13
class StorageManager:
    def __init__(self, config):
        self._max_size = config.get("maxSize", 1000000)
        self._policy = config.get("policy", "dropOldest")
        
    def store(self, data):
        if self._get_size() > self._max_size:
            if self._policy == "dropOldest":
                self._remove_oldest()
            elif self._policy == "dropNewest":
                return  # Don't store
                
        self._write_to_disk(data)

Performance Considerations

Resource Usage

  • CPU: Data processing, protocol conversion
  • Memory: Data buffering, caching
  • Disk: Offline storage
  • Network: Data transmission

Optimisation Techniques

Batch Processing:

1
2
3
4
5
6
7
def process_batch(self, data_batch):
    # Process multiple items at once
    for data in data_batch:
        self._parse(data)
    
    # Send as single message
    self._gateway.send_to_storage(self.get_name(), batch_result)

Compression:

1
2
3
4
5
{
  "thingsboard": {
    "compression": "gzip"
  }
}

Best Practices

Design Principles

  • Collect only necessary data
  • Process at edge when possible
  • Buffer for reliability
  • Monitor gateway health
  • Plan for offline operation

Configuration

  • Set appropriate poll intervals
  • Configure adequate buffer size
  • Use change-based reporting
  • Enable data compression
  • Implement local alerting

Monitoring

  • Gateway resource usage
  • Buffer fill levels
  • Network connectivity
  • Data transmission rates
  • Device availability

Next Steps

  • Legacy System Integration
  • Protocol Conversion
  • Industrial Automation
  • Gateway Features