Platforms
Product Lines
Platforms Safecrete Safewall Mine Operating System (Coming Soon)
On this page

Telemetry Synchronisation

Overview

Telemetry synchronisation between IndustryOS Edge and Cloud Platform enables flexible data management. You control what data stays local, what syncs to the cloud, and when synchronisation occurs.

Telemetry Flow

Basic Flow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Device
  │
  │ MQTT/HTTP/CoAP
  ▼
Edge Transport Layer
  │
  ▼
Edge Rule Engine
  ├─→ Save to Local PostgreSQL (always)
  ├─→ Update Dashboard (websocket)
  ├─→ Check Alarm Conditions
  └─→ Push to Cloud (conditional)
        │
        │ gRPC (port 7070)
        ▼
     Cloud Platform

Edge Root Rule Chain

The default Edge Root Rule Chain controls telemetry flow:

1
2
3
4
5
6
7
8
9
10
11
12
13
Message Type Switch
  │
  ├──[Post telemetry]─→ Save Timeseries
  │                         │
  │                         ▼
  │                    Push to Cloud
  │
  ├──[Post attributes]─→ Save Server Attributes
  │                         │
  │                         ▼
  │                    Push to Cloud
  │
  └──[RPC Request]────→ Handle RPC locally

Synchronisation Modes

Mode 1: Full Sync

All telemetry syncs to cloud:

1
2
// No filtering - all messages pushed
Save Timeseries  Push to Cloud

Use Cases:

  • Cloud-based analytics on full dataset
  • Compliance requirements (all data in cloud)
  • Edge used primarily for local dashboards

Bandwidth Impact:

  • High (all data transmitted)
  • Example: 100 devices × 1 msg/sec × 100 bytes = 10 KB/sec = 864 MB/day

Mode 2: Filtered Sync

Only specific conditions sync:

1
2
3
4
5
6
7
8
// Script filter node
var temp = msg.temperature;
var threshold = metadata.ss_tempThreshold || 30;

if (temp > threshold) {
    return {msg: msg, metadata: metadata, msgType: "Push to Cloud"};
}
return {msg: msg, metadata: metadata, msgType: "Local Only"};

Use Cases:

  • Exception monitoring (only anomalies)
  • Bandwidth-constrained environments
  • Privacy-sensitive data (most stays local)

Bandwidth Impact:

  • Low (1-5% of full sync typical)
  • Example: 100 devices, 5% anomaly rate = 432 KB/day vs. 864 MB/day

Mode 3: Aggregated Sync

Periodic summaries sync:

1
2
3
4
5
6
Telemetry → Aggregate Node (1 hour window)
            │
            ├─→ Calculate: AVG, MIN, MAX, COUNT
            │
            ▼
         Push to Cloud (1 msg/hour instead of 3600)

Configuration:

1
2
3
4
5
6
{
  "interval": 3600,
  "intervalTimeUnit": "SECONDS",
  "aggregateKeys": ["temperature", "humidity"],
  "aggregateFunctions": ["AVG", "MIN", "MAX", "COUNT"]
}

Use Cases:

  • Trend analysis (hourly/daily averages)
  • Long-term historical data
  • Bandwidth optimisation

Bandwidth Impact:

  • Minimal (99.97% reduction)
  • Example: 100 devices, hourly avg = 2.4 KB/day vs. 864 MB/day

Mode 4: On-Demand Sync

Manually triggered sync:

1
2
3
4
5
6
// Only push on explicit command
if (metadata.pushCommand === "true") {
    metadata.pushCommand = "false"; // Reset
    return {msg: msg, metadata: metadata, msgType: "Push to Cloud"};
}
return {msg: msg, metadata: metadata, msgType: "Local Only"};

Trigger Methods:

  • Dashboard button
  • Scheduled task
  • External API call
  • Alarm condition

Use Cases:

  • Offline-first deployments
  • Periodic batch uploads (e.g., nightly)
  • Data sovereignty requirements

Sync Configuration

Rule Chain Configuration

1. Navigate to Rule Chains:

  • Edge UI → Rule Chains → Edge Root Rule Chain

2. Add Filter Node:

  • Drag Script node
  • Add after “Save Timeseries”
  • Configure filter logic

3. Connect to Push Node:

  • Script output → Push to Cloud node

Example Filter Scripts:

Threshold Filter:

1
2
var temp = msg.temperature;
return temp > 30 || temp < 10; // Only extremes

Delta Filter (change detection):

1
2
3
4
5
6
7
8
9
var current = msg.temperature;
var last = metadata.lastTemp || current;
var delta = Math.abs(current - last);

if (delta > 0.5) {
    metadata.lastTemp = current;
    return true; // Push to cloud
}
return false; // Local only

Time-Based Filter:

1
2
3
// Only sync during business hours
var hour = new Date().getHours();
return hour >= 8 && hour < 18;

Queue Configuration

File: /etc/industryos-edge/conf/industryos-edge.conf

1
2
3
4
# Queue settings
export EDGE_STORAGE_MAX_READ_RECORDS_COUNT="1000"  # Batch size
export CLOUD_RPC_TIMEOUT="60000"                   # Timeout (ms)
export CLOUD_RPC_KEEP_ALIVE_TIME="10"              # Keepalive (sec)

Queue Behaviour:

  • Messages stored in PostgreSQL
  • Survives edge restart
  • Automatic drain when cloud available
  • Oldest messages sent first (FIFO)

Bandwidth Management

Compression:

1
2
# Enable gRPC compression
export CLOUD_RPC_COMPRESSION="true"

Typical Compression Ratios:

  • JSON telemetry: 60-80% reduction
  • Example: 864 MB/day → 173 MB/day

Offline Behaviour

During Offline Period

Edge Continues:

  1. Accept device telemetry
  2. Save to local PostgreSQL
  3. Process rule chains
  4. Update dashboards
  5. Create alarms
  6. Queue messages for cloud

Queue Growth:

1
2
3
4
5
Time Offline: 24 hours
Devices: 100
Message Rate: 1/sec/device
Queue Growth: 100 × 1 × 86,400 = 8,640,000 messages
Storage: ~860 MB (100 bytes/message)

Queue Limits:

1
2
3
4
5
6
# Maximum queue size
max_queue_size: 100000  # messages

# Behaviour when full:
# - Drop oldest messages (default)
# - Stop accepting new telemetry (optional)

Reconnection Process

Step-by-Step:

  1. Detect Cloud Availability:
    • Periodic connection attempts (30 sec interval)
    • Exponential backoff on failures
  2. Re-establish gRPC:
    • Authenticate with edge key/secret
    • Verify cloud accepts connection
  3. Sync Metadata:
    • Entity updates (new devices/assets)
    • Attribute changes
    • Alarm states
  4. Drain Queue:
    • Batch size: 1000 messages
    • Interval: 100ms between batches
    • Priority: Alarms > Entities > Telemetry
  5. Resume Normal Operation:
    • Real-time telemetry sync
    • Bidirectional communication

Queue Drain Rate:

1
2
3
4
Batch Size: 1000 messages
Interval: 100ms
Drain Rate: 10,000 messages/sec
Time to drain 100k messages: ~10 seconds

Telemetry Storage

Local Storage (Edge)

PostgreSQL Tables:

ts_kv (time-series):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
CREATE TABLE ts_kv (
  entity_id UUID,
  key VARCHAR,
  ts BIGINT,  -- Timestamp (milliseconds)
  bool_v BOOLEAN,
  str_v VARCHAR,
  long_v BIGINT,
  dbl_v DOUBLE PRECISION,
  json_v JSONB
) PARTITION BY RANGE (ts);

-- Monthly partitions
CREATE TABLE ts_kv_2024_03 PARTITION OF ts_kv
  FOR VALUES FROM (1709251200000) TO (1711929600000);

Retention Policy:

1
2
-- Drop partitions older than 90 days
DROP TABLE ts_kv_2023_12;

Indexes:

1
2
CREATE INDEX idx_ts_kv_entity_ts ON ts_kv(entity_id, ts DESC);
CREATE INDEX idx_ts_kv_key ON ts_kv(key);

Cloud Storage

Cloud Receives:

  • Filtered telemetry (based on edge rules)
  • Entity metadata (device names, types)
  • Timestamps (from edge, preserving original time)

Cloud Benefits:

  • Unlimited retention (no partition cleanup)
  • Cross-edge analytics
  • Historical reporting
  • Compliance archives

Data Consistency

Timestamp Handling

Device Timestamp:

1
2
3
4
5
6
7
// Device sends with timestamp
{
  "ts": 1710504000000,
  "values": {
    "temperature": 25.5
  }
}

Server Timestamp:

1
2
3
4
5
// Device sends without timestamp (edge adds)
{
  "temperature": 25.5
}
// Edge adds: ts = current_time()

Best Practice:

  • Use device timestamps for time-sensitive data
  • Use server timestamps for simplicity
  • Ensure device time sync (NTP)

Conflict Resolution

Scenario: Same telemetry arrives via edge and directly to cloud

Resolution:

  • Deduplication: Cloud checks timestamp + device ID
  • Last Write Wins: Most recent timestamp kept
  • Merge: Combine non-overlapping keys

Example:

1
2
3
Edge sends:  {ts: 1000, temp: 25, humidity: 60}
Cloud has:   {ts: 1000, temp: 25}
Result:      {ts: 1000, temp: 25, humidity: 60}  // Merged

Performance Optimisation

Batch Processing

Configuration:

1
2
3
# Batch settings
ts_kv_batch_size: 1000
ts_kv_batch_max_delay: 100  # milliseconds

Benefits:

  • Reduce database I/O
  • Improve throughput (10x+)
  • Lower CPU usage

Tradeoff:

  • Slight latency increase (< 100ms)
  • Acceptable for most use cases

Partition Management

Automatic Partition Creation:

1
2
3
-- Edge creates partitions automatically
-- Future partitions: +2 months
-- Example: Current = March, creates April, May

Partition Cleanup:

1
2
# Cron job (monthly)
0 0 1 * * psql -c "DROP TABLE ts_kv_$(date -d '4 months ago' +\%Y_\%m);"

Query Optimisation

Efficient Queries:

1
2
3
4
5
6
7
8
9
10
11
-- Good: Uses index
SELECT * FROM ts_kv
WHERE entity_id = '<UUID>'
  AND ts >= 1710504000000
  AND ts < 1710590400000
ORDER BY ts DESC;

-- Bad: Full table scan
SELECT * FROM ts_kv
WHERE key = 'temperature'
  AND dbl_v > 30;

Monitoring Sync Status

Edge Status Page

Navigate: Edge UI → System → Edge Status

Metrics:

  • Cloud connection: CONNECTED / DISCONNECTED
  • Last sync time: 2024-03-15 10:30:00
  • Queue size: 42 messages
  • Bytes sent: 1.2 MB
  • Bytes received: 600 KB
  • Sync errors: 0

Cloud Events Page

Navigate: Edge UI → System → Cloud Events

Event Types:

  • ENTITY_ASSIGNED: Dashboard/rule chain assigned from cloud
  • ENTITY_DELETED: Entity deleted on cloud
  • ATTRIBUTE_UPDATED: Attribute changed on cloud
  • RELATION_UPDATED: Relation added/removed

Health Check API

1
curl http://localhost:8080/api/edge/health

Response:

1
2
3
4
5
6
7
8
9
{
  "status": "UP",
  "cloudConnection": "CONNECTED",
  "queueSize": 42,
  "lastSyncTime": "2024-03-15T10:30:00Z",
  "syncErrors": 0,
  "totalSyncedMessages": 1500000,
  "totalSyncedBytes": 157286400
}

Troubleshooting

Issue: Telemetry Not Syncing

Check 1: Cloud Connection

1
2
curl http://localhost:8080/api/edge/health | grep cloudConnection
# Expected: "cloudConnection": "CONNECTED"

Check 2: Rule Chain

  • Verify “Push to Cloud” node exists
  • Check node connections
  • Review filter scripts

Check 3: Queue

1
2
3
curl http://localhost:8080/api/edge/health | grep queueSize
# If growing: Cloud connection issue
# If zero: Rule chain issue

Issue: High Queue Size

Causes:

  • Slow/intermittent cloud connection
  • Excessive telemetry rate
  • Insufficient bandwidth

Solutions:

  1. Improve Connectivity:
    • Check network stability
    • Increase bandwidth
    • Configure proxy (if needed)
  2. Reduce Sync Rate:
    • Add filtering logic
    • Increase aggregation window
    • Reduce device message rate
  3. Increase Batch Size:
    1
    
    export EDGE_STORAGE_MAX_READ_RECORDS_COUNT="5000"
    

Issue: Duplicate Telemetry on Cloud

Cause:

  • Edge and device both pushing to cloud

Solution:

  • Remove “Push to Cloud” from edge rule chain
  • OR disable direct cloud connection on devices

Best Practices

1. Filter Early

Apply filters before “Push to Cloud” node:

1
2
3
Save Timeseries → Filter Script → Push to Cloud
                       │
                       └───→ [Filtered Out] → End

Raw data locally, aggregates to cloud:

1
2
3
Local: 1 msg/sec (full resolution)
Cloud: 1 msg/hour (averages)
Reduction: 99.97%

3. Monitor Queue Size

Set alerts:

1
2
3
4
// Alert rule
if (metadata.queueSize > 50000) {
    sendNotification("High edge queue size");
}

4. Configure Retention

Match to use case:

  • Real-time dashboards: 7 days
  • Historical analysis: 90 days
  • Compliance: Use cloud (unlimited)

5. Test Offline Scenarios

Periodically test:

  1. Disconnect cloud
  2. Generate telemetry
  3. Verify local storage
  4. Reconnect cloud
  5. Verify queue drain

Next Steps