# --- Compute Logic --- # If temp > 30C, find the parent Zone and mark it as "overheated" if new_temp > 30.0: # Query to find parent Zone query = f"SELECT zone FROM digitaltwins zone JOIN sensor RELATED zone.hasSensor WHERE sensor.$dtId = 'sensor_id'" zones = service_client.query_twins(query) for zone in zones: # Add a computed property zone_patch = ["op": "add", "path": "/alert", "value": "Overheating"] service_client.update_digital_twin(zone['$dtId'], zone_patch) logging.info(f"Alert set on zone zone['$dtId']")
# Update the twin property patch = ["op": "replace", "path": "/temperature", "value": new_temp] service_client.update_digital_twin(sensor_id, patch) hands-on azure digital twins read online
"@id": "dtmi:handsOn:Sensor;1", "@type": "Interface", "displayName": "Sensor", "contents": [ "@type": "Telemetry", "name": "value", "schema": "double" , "@type": "Property", "name": "sensorType", "schema": "@type": "Enum", "valueSchema": "string", "enumValues": [ "name": "Temperature", "enumValue": "temp" , "name": "Humidity", "enumValue": "humi" , "name": "Motion", "enumValue": "motion" ] ] # --- Compute Logic --- # If temp
"@id": "dtmi:handsOn:Zone;1", "@type": "Interface", "displayName": "Zone", "contents": [ "@type": "Property", "name": "temperature", "schema": "double", "writable": true , "@type": "Relationship", "name": "contains", "target": "dtmi:handsOn:Shelf;1" , "@type": "Relationship", "name": "hasSensor", "target": "dtmi:handsOn:Sensor;1" ] patch) "@id": "dtmi:handsOn:Sensor
We’ll simulate a temperature spike and compute a "high temperature" alert using a . Architecture Pattern: IoT Device → IoT Hub → Azure Function → ADT (patch property) The Function Logic (Python example): import json import logging import azure.functions as func from azure.digitaltwins.core import DigitalTwinsClient from azure.identity import DefaultAzureCredential def main(event: func.EventHubEvent): # Parse telemetry from IoT device telemetry = json.loads(event.get_body().decode('utf-8')) sensor_id = telemetry['sensorId'] new_temp = telemetry['temperature']