Time travel, commits, and data lineage
Workflow position: after
03_STA_Observations.ipynb.
This notebook updates an existingObservationand then queries its current and historical states.
The istSOS4 time-travel extension supports reproducibility and data lineage. It makes it possible to ask:
- what does this resource look like now?
- what did this resource look like at a previous time?
- which commit explains the change?
Time-travel parameters
| Parameter | Type | Purpose |
|---|---|---|
$as_of |
ISO 8601 date-time | Return the resource as it was at a specific instant |
$from_to |
ISO 8601 interval | Return historical states over a time interval |
Commit entity
A Commit records metadata about a change.
| Property | Type | Meaning |
|---|---|---|
author |
string | User or authority responsible for the change |
encodingType |
string | Encoding type of the commit message |
message |
string | Explanation of the transaction |
date |
ISO 8601 date-time | Time when the commit was created |
Persistent reference pattern
A reproducible request can be identified by combining:
<service-url>/<version>/<request-path>?<query>&$as_of=<date-time>
This notebook demonstrates that pattern on an Observation.
In a shared workshop instance, run this notebook immediately after notebook
03or manually setobservation_idto an observation you created.
Setup
Import the required libraries, define the API endpoint, and define a small helper for UTC timestamps.
When the notebook runs inside the Jupyter Docker container, the API is reachable through:
IST_SOS_ENDPOINT = "http://api:5000/v4/v1.1"
from datetime import datetime, timedelta, timezone
import requests
from istsos_utils import (
REQUEST_TIMEOUT,
auth_headers,
display_error_response,
display_json,
login,
)
IST_SOS_ENDPOINT = "http://api:5000/v4/v1.1"
def utc_now_iso(offset_seconds=0):
"""Return the current UTC time as an ISO 8601 string accepted by the API."""
value = datetime.now(timezone.utc) + timedelta(seconds=offset_seconds)
value = value.replace(microsecond=0)
return value.isoformat().replace("+00:00", "Z")
Login as editor
Use the same editor account used in notebooks 02 and 03.
The editor is needed because this notebook modifies an existing observation before querying its history.
editor_username = input("Enter your username: ")
editor_password = input("Enter your password: ")
if not editor_username or not editor_password:
print("Username or password is empty")
else:
editor_token, login_body = login(
IST_SOS_ENDPOINT,
editor_username,
editor_password,
timeout=REQUEST_TIMEOUT,
)
if editor_token:
prefix = editor_username + "-"
print(f"Logged in as {editor_username}")
print("Your station name is prefixed with: " + prefix)
Retrieve the latest Observation
To avoid hard-coding an observation ID, the notebook retrieves the most recent observation from the server.
The returned @iot.id is stored in observation_id and reused in the following requests.
In a multi-user workshop, this may return another participant's latest observation. Replace
observation_idmanually if needed.
response = requests.get(
f"{IST_SOS_ENDPOINT}/Observations?$orderby=phenomenonTime desc&$top=1",
headers=auth_headers(editor_token, accept="application/json"),
timeout=REQUEST_TIMEOUT,
)
if response.status_code == 200:
observations = response.json().get("value", [])
if not observations:
raise Exception(
"No Observations found. Run notebook 03_STA_Observation first."
)
latest_observation = observations[0]
observation_id = latest_observation["@iot.id"]
display_json(latest_observation)
else:
display_error_response(response)
Update the Observation
Before applying the update, the notebook stores a timestamp just before the change.
That timestamp is later used with $as_of to retrieve the previous state.
The update changes:
result;resultQuality.
The Commit-message header explains why the resource changed.
datetime_before_update = utc_now_iso(offset_seconds=-1)
body = {
"result": 10,
"resultQuality": "100",
}
response = requests.patch(
f"{IST_SOS_ENDPOINT}/Observations({observation_id})",
json=body,
headers=auth_headers(
editor_token,
commit_message="Corrected result and result quality",
content_type="application/json",
),
timeout=REQUEST_TIMEOUT,
)
if response.status_code in (200, 204):
print("Observation updated successfully")
print(f"Timestamp before update: {datetime_before_update}")
else:
display_error_response(response)
Retrieve the current state
This request retrieves the current version of the updated observation.
The $expand=Commit option includes the commit metadata associated with the current state.
response = requests.get(
f"{IST_SOS_ENDPOINT}/Observations({observation_id})",
headers=auth_headers(editor_token, accept="application/json"),
params={
"$expand": "Commit",
},
timeout=REQUEST_TIMEOUT,
)
if response.status_code == 200:
display_json(response.json())
else:
display_error_response(response)
Retrieve the previous state with $as_of
The $as_of query parameter evaluates the request as if it had been executed at a specific time.
Here, the timestamp captured before the update should return the observation state before the correction.
response = requests.get(
f"{IST_SOS_ENDPOINT}/Observations({observation_id})",
headers=auth_headers(editor_token, accept="application/json"),
params={
"$expand": "Commit",
"$as_of": datetime_before_update,
},
timeout=REQUEST_TIMEOUT,
)
if response.status_code == 200:
display_json(response.json())
else:
display_error_response(response)
Retrieve historical states with $from_to
The $from_to query parameter retrieves all historical states over a time interval.
The interval starts just before the update and ends just after it, so the response should include the state transition caused by the correction.
datetime_after_update = utc_now_iso(offset_seconds=1)
time_interval = f"{datetime_before_update}/{datetime_after_update}"
print(f"Requested interval: {time_interval}")
response = requests.get(
f"{IST_SOS_ENDPOINT}/Observations({observation_id})",
headers=auth_headers(editor_token, accept="application/json"),
params={
"$expand": "Commit",
"$from_to": time_interval,
},
timeout=REQUEST_TIMEOUT,
)
if response.status_code == 200:
display_json(response.json())
else:
display_error_response(response)