- Overview
- Edge Computing Benefits
- Architecture Patterns
- Implementation Examples
- Data Processing at Edge
- Offline Operation
- Performance Considerations
- Best Practices
- Next Steps
Overview
Edge data collection leverages the IndustryOS IoT Gateway to gather, process, and transmit data from distributed locations. The gateway performs local data aggregation, filtering, and buffering before sending to the cloud, optimising bandwidth and enabling offline operation.
Edge Computing Benefits
Reduced Latency
- Local data processing
- Immediate response to events
- Real-time decision making
- Faster alarm triggers
Bandwidth Optimisation
- Data aggregation at edge
- Filter unnecessary data
- Compress before transmission
- Send only changes
Reliability
- Operate during network outages
- Local data buffering
- Automatic retry and resend
- Guaranteed data delivery
Cost Reduction
- Lower cloud ingestion costs
- Reduced bandwidth usage
- Less cloud storage needed
- Optimised data transmission
Architecture Patterns
Pattern 1: Simple Edge Collection
1
2
Sensors → Gateway → Cloud
(Collect) (Store/Analyse)
Use Case: Basic monitoring
Configuration:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
"mqtt": {
"broker": "localhost",
"port": 1883,
"mapping": [
{
"topicFilter": "sensors/+/data",
"converter": {
"deviceNameExpression": "${topic[1]}",
"deviceTypeExpression": "sensor"
}
}
]
}
}
Pattern 2: Aggregation at Edge
1
2
3
Multiple Sensors → Gateway → Cloud
(Aggregate)
(Filter)
Use Case: High-frequency sensors
Report Strategy:
1
2
3
4
5
6
7
{
"reportStrategy": {
"type": "onReportPeriod",
"reportPeriod": 60000,
"aggregation": "average"
}
}
Pattern 3: Store and Forward
1
2
3
Devices → Gateway → Cloud
(Buffer) (When connected)
(Store)
Use Case: Intermittent connectivity
Configuration:
1
2
3
4
5
6
7
8
9
10
11
{
"storage": {
"type": "file",
"maxSize": 1000000,
"dataFolderPath": "/var/lib/gateway/data"
},
"reportStrategy": {
"type": "onReportPeriod",
"reportPeriod": 300000
}
}
Pattern 4: Multi-site Collection
1
2
3
4
Site 1 Gateway ──┬───────────────┐
Site 2 Gateway ──┤ Central ├───────────→ Cloud
Site 3 Gateway ──┤ Gateway │ Platform
Site N Gateway ──┴───────────────┘
Use Case: Distributed operations
Implementation Examples
Example 1: Manufacturing Floor
Scenario
Collect data from production line sensors.
Equipment
- 50 temperature sensors (MQTT)
- 20 pressure sensors (Modbus)
- 10 vibration sensors (OPC-UA)
- 5 PLCs (Modbus TCP)
Gateway Configuration
MQTT Sensors:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{
"mqtt": {
"broker": "localhost",
"port": 1883,
"reportStrategy": {
"type": "onChange",
"threshold": 0.5
},
"mapping": [
{
"topicFilter": "production/temp/+",
"converter": {
"deviceNameExpression": "Temp-Sensor-${topic[2]}",
"telemetry": [
{"key": "temperature", "value": "${temperature}"}
]
}
}
]
}
}
Modbus Devices:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{
"modbus": {
"type": "tcp",
"host": "192.168.1.100",
"devices": [
{
"deviceName": "Pressure-Sensor-${unitId}",
"unitId": 1,
"pollPeriod": 5000,
"telemetry": [
{
"tag": "pressure",
"address": 100,
"type": "16int",
"multiplier": 0.01
}
]
}
]
}
}
Data Flow
- Gateway polls/subscribes to devices
- Collects data every 5 seconds
- Aggregates over 1 minute
- Sends average to platform
- Buffers during network issues
Example 2: Remote Oil & Gas
Scenario
Monitor remote wellhead with satellite connectivity.
Constraints
- Limited bandwidth (satellite)
- High latency
- Expensive data transmission
- Intermittent connectivity
Strategy
Local Buffering:
1
2
3
4
5
6
7
{
"storage": {
"type": "file",
"maxSize": 10000000,
"dataFolderPath": "/data/gateway"
}
}
Aggressive Filtering:
1
2
3
4
5
6
{
"reportStrategy": {
"type": "onChange",
"threshold": 2.0
}
}
Scheduled Transmission:
1
2
3
4
5
6
{
"reportStrategy": {
"type": "onReportPeriod",
"reportPeriod": 3600000 // 1 hour
}
}
Results
- 95% reduction in data volume
- Operate offline for 24+ hours
- Critical alerts sent immediately
- Batch historical data hourly
Example 3: Smart Building
Scenario
Collect data from building automation systems.
Devices
- HVAC units (BACnet)
- Lighting (KNX)
- Energy meters (Modbus)
- Occupancy sensors (MQTT)
Multi-Protocol Configuration
BACnet:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
"bacnet": {
"devices": [
{
"deviceId": 1001,
"deviceName": "HVAC-Floor-${floor}",
"objectsList": [
{"objectType": "analogInput", "objectId": 0, "key": "temperature"},
{"objectType": "analogInput", "objectId": 1, "key": "setpoint"}
]
}
]
}
}
KNX:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
"knx": {
"gatewayIP": "192.168.1.50",
"devices": [
{
"deviceName": "Lighting-Zone-${zone}",
"groupAddresses": [
{"address": "1/2/3", "key": "state", "dpt": "DPT-1"},
{"address": "1/2/4", "key": "brightness", "dpt": "DPT-5"}
]
}
]
}
}
Data Processing at Edge
Filtering
Remove unnecessary data:
1
2
3
4
5
6
7
8
class FilterConverter:
def convert(self, data, metadata):
# Only send if value changed significantly
if abs(data["value"] - self._last_value) < self._threshold:
return None # Don't send
self._last_value = data["value"]
return self._format_data(data)
Aggregation
Combine multiple readings:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class AggregationConverter:
def __init__(self, config):
self._buffer = []
self._buffer_size = config.get("bufferSize", 10)
def convert(self, data, metadata):
self._buffer.append(data["value"])
if len(self._buffer) >= self._buffer_size:
avg = sum(self._buffer) / len(self._buffer)
self._buffer.clear()
return {
"deviceName": data["device"],
"telemetry": [
{"ts": int(time.time() * 1000),
"values": {"average": avg}}
]
}
return None
Alerting
Local threshold detection:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class AlertingConverter:
def convert(self, data, metadata):
value = data["temperature"]
if value > self._threshold:
# Send alert immediately
return {
"deviceName": data["device"],
"attributes": [
{"key": "alert", "value": "High temperature"}
],
"telemetry": [
{"ts": int(time.time() * 1000),
"values": {"temperature": value}}
]
}
# Normal data follows report strategy
return None
Offline Operation
Data Buffering
Configuration:
1
2
3
4
5
6
7
8
{
"storage": {
"type": "file",
"maxFileCount": 100,
"maxRecordsPerFile": 1000,
"dataFolderPath": "/var/lib/gateway/buffer"
}
}
Behaviour:
- Network disconnects
- Gateway buffers data locally
- Continues collecting from devices
- Network reconnects
- Gateway transmits buffered data
- Resumes normal operation
Storage Management
1
2
3
4
5
6
7
8
9
10
11
12
13
class StorageManager:
def __init__(self, config):
self._max_size = config.get("maxSize", 1000000)
self._policy = config.get("policy", "dropOldest")
def store(self, data):
if self._get_size() > self._max_size:
if self._policy == "dropOldest":
self._remove_oldest()
elif self._policy == "dropNewest":
return # Don't store
self._write_to_disk(data)
Performance Considerations
Resource Usage
- CPU: Data processing, protocol conversion
- Memory: Data buffering, caching
- Disk: Offline storage
- Network: Data transmission
Optimisation Techniques
Batch Processing:
1
2
3
4
5
6
7
def process_batch(self, data_batch):
# Process multiple items at once
for data in data_batch:
self._parse(data)
# Send as single message
self._gateway.send_to_storage(self.get_name(), batch_result)
Compression:
1
2
3
4
5
{
"thingsboard": {
"compression": "gzip"
}
}
Best Practices
Design Principles
- Collect only necessary data
- Process at edge when possible
- Buffer for reliability
- Monitor gateway health
- Plan for offline operation
Configuration
- Set appropriate poll intervals
- Configure adequate buffer size
- Use change-based reporting
- Enable data compression
- Implement local alerting
Monitoring
- Gateway resource usage
- Buffer fill levels
- Network connectivity
- Data transmission rates
- Device availability
Next Steps
- Legacy System Integration
- Protocol Conversion
- Industrial Automation
- Gateway Features