Load bulk observations from CSV
Workflow position: after
01_Authorization.ipynb; it reuses the sameeditorconcept and the commonDDT_workshopnetwork.
This notebook creates a Zurich meteorological platform and uploads observations in bulk.
Compared with notebook 02, this notebook uses a more compact creation pattern:
- one
Thingis created; - the
Locationis nested inside theThing; - three
Datastreamsare nested inside the same request; - observations are loaded from
data/observations.csvand sent through/BulkObservations.
CSV contract
The CSV file is expected at:
data/observations.csv
It must contain:
| Column | Meaning |
|---|---|
phenomenonTime |
Timestamp of the observation |
temperature |
Air temperature values |
relative_humidity |
Relative humidity values |
precipitation |
Precipitation values |
The column property inside each datastream definition is used to map CSV columns to the correct datastream IDs.
import requests
from istsos_utils import (
REQUEST_TIMEOUT,
auth_headers,
build_bulk_observations,
build_column_to_datastream_id,
display_error_response,
display_json,
get_datastreams_for_thing,
get_observations,
get_or_create_network,
login,
post_bulk_observations,
preview_bulk_observations,
print_created,
read_observations_csv,
result_to_float,
)
IST_SOS_ENDPOINT = "http://api:5000/v4/v1.1"
Login as administrator
The administrator is used only to create or retrieve the shared workshop Network.
All station entities and observations are created with the editor user.
admin_username = input("Enter administrator username: ")
admin_password = input("Enter administrator password: ")
if not admin_username or not admin_password:
print("Username or password is empty")
else:
admin_token, login_body = login(
IST_SOS_ENDPOINT,
admin_username,
admin_password,
timeout=REQUEST_TIMEOUT,
)
if admin_token:
print(f"Logged in as {admin_username}")
Create or retrieve a Network
The helper function get_or_create_network() reuses DDT_workshop when it already exists, otherwise it creates it.
This step requires the Network extension to be enabled.
network_name = "DDT_workshop"
network_id = get_or_create_network(
IST_SOS_ENDPOINT,
admin_token,
network_name,
timeout=REQUEST_TIMEOUT,
)
print(f"Network ID: {network_id}")
Login as editor
Use the editor user created in notebook 01.
The editor username becomes a prefix for all station resources created in this notebook. This keeps resources easier to identify in a shared workshop instance.
editor_username = input("Enter your username: ")
editor_password = input("Enter your password: ")
if not editor_username or not editor_password:
print("Username or password is empty")
else:
editor_token, login_body = login(
IST_SOS_ENDPOINT,
editor_username,
editor_password,
timeout=REQUEST_TIMEOUT,
)
if editor_token:
prefix = editor_username + "-"
print(f"Logged in as {editor_username}")
print("Your station name will be prefixed with: " + prefix)
Create a Thing with nested Location and Datastreams
This request creates the Zurich platform in one compact SensorThings payload.
The payload contains:
- one
Thing; - one nested
Location; - three nested
Datastreams; - for each datastream, its
ObservedProperty,Sensor, unit, sampling metadata, and CSV column mapping.
The created thing_id is used later to retrieve the datastreams generated by the API.
body = {
"name": f"{prefix}ZURICH_PLATFORM",
"description": "Zurich meteorological platform",
"properties": {
"keywords": "temperature,relative_humidity,precipitation,meteo,zurich",
"description": "Meteorological measurements at Zurich platform"
},
"Locations": [
{
"name": f"{prefix}ZURICH_PLATFORM_LOCATION",
"description": "Zurich platform location",
"encodingType": "application/vnd.geo+json",
"location": {
"type": "Point",
"coordinates": [
2682216.85,
1247945.20
],
"crs": {
"type": "name",
"properties": {
"name": "EPSG:2056"
}
}
}
}
],
"Datastreams": [
{
"name": f"{prefix}TEMPERATURE_ZURICH_PLATFORM",
"description": "Air temperature measured at Zurich platform",
"observationType": "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement",
"unitOfMeasurement": {
"name": "Degree Celsius",
"symbol": "°C",
"definition": "http://www.opengis.net/def/uom/UCUM/0/Cel"
},
"properties": {
"samplingFrequency": "PT10M",
"acquisitionFrequency": "PT10M",
"column": "temperature"
},
"ObservedProperty": {
"name": "air:temperature",
"definition": "urn:ogc:def:parameter:x-istsos:1.0:air:temperature",
"description": "Air temperature"
},
"Sensor": {
"name": f"{prefix}ZURICH_PLATFORM_TEMPERATURE_SENSOR",
"description": "Temperature sensor at Zurich platform",
"encodingType": "application/json",
"metadata": "{}"
},
"Network": {
"@iot.id": network_id
}
},
{
"name": f"{prefix}RELATIVE_HUMIDITY_ZURICH_PLATFORM",
"description": "Relative air humidity measured at Zurich platform",
"observationType": "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement",
"unitOfMeasurement": {
"name": "Percent",
"symbol": "%",
"definition": "http://www.opengis.net/def/uom/UCUM/0/%25"
},
"properties": {
"samplingFrequency": "PT10M",
"acquisitionFrequency": "PT10M",
"column": "relative_humidity"
},
"ObservedProperty": {
"name": "air:relative_humidity",
"definition": "urn:ogc:def:parameter:x-istsos:1.0:air:relative_humidity",
"description": "Relative air humidity"
},
"Sensor": {
"name": f"{prefix}ZURICH_PLATFORM_RELATIVE_HUMIDITY_SENSOR",
"description": "Relative humidity sensor at Zurich platform",
"encodingType": "application/json",
"metadata": "{}"
},
"Network": {
"@iot.id": network_id
}
},
{
"name": f"{prefix}PRECIPITATION_ZURICH_PLATFORM",
"description": "Precipitation measured at Zurich platform",
"observationType": "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement",
"unitOfMeasurement": {
"name": "Millimetre",
"symbol": "mm",
"definition": "http://www.opengis.net/def/uom/UCUM/0/mm"
},
"properties": {
"samplingFrequency": "PT10M",
"acquisitionFrequency": "PT10M",
"column": "precipitation"
},
"ObservedProperty": {
"name": "precipitation",
"definition": "urn:ogc:def:parameter:x-istsos:1.0:precipitation",
"description": "Precipitation"
},
"Sensor": {
"name": f"{prefix}ZURICH_PLATFORM_PRECIPITATION_SENSOR",
"description": "Precipitation sensor at Zurich platform",
"encodingType": "application/json",
"metadata": "{}"
},
"Network": {
"@iot.id": network_id
}
}
]
}
response = requests.post(
IST_SOS_ENDPOINT + "/Things",
json=body,
headers=auth_headers(
editor_token,
"Create Zurich platform Thing with Location and 3 Datastreams",
),
timeout=REQUEST_TIMEOUT,
)
if response.status_code == 201:
thing_id = print_created("Thing", response)
else:
display_error_response(response)
raise Exception("Thing creation failed")
Create observations from CSV
The next cells prepare and send a /BulkObservations request.
The workflow is:
- retrieve the datastreams created inside the Zurich
Thing; - map CSV column names to datastream IDs;
- read and validate the CSV;
- build the SensorThings
dataArraypayload; - preview the payload;
- send observations to the API.
import matplotlib.pyplot as plt
from dateutil import parser
from IPython.display import display
csv_path = "data/observations.csv"
time_column = "phenomenonTime"
bulk_observations_url = f"{IST_SOS_ENDPOINT}/BulkObservations"
Retrieve the datastreams created inside the Thing
The API returns the datastreams linked to the Zurich platform. Their @iot.id values are needed for the bulk-observation payload.
created_datastreams = get_datastreams_for_thing(
IST_SOS_ENDPOINT,
editor_token,
thing_id,
timeout=30,
)
print(f"Retrieved {len(created_datastreams)} Datastreams")
display_json(created_datastreams)
Build the CSV column → Datastream ID mapping
Each datastream contains a properties.column value. This value is matched with the corresponding CSV column.
# Final map: CSV column -> Datastream @iot.id
column_to_datastream_id = build_column_to_datastream_id(
body,
created_datastreams,
)
display_json(column_to_datastream_id)
Read and validate the CSV
The helper function reads the CSV, verifies required columns, and converts the time column to timezone-aware UTC datetimes.
df = read_observations_csv(
csv_path,
time_column,
required_value_columns=column_to_datastream_id.keys(),
)
print("CSV columns:")
print(list(df.columns))
print(f"Rows: {len(df)}")
display(df.head())
display(df.dtypes)
Check normalized timestamps
Invalid timestamps are counted before building the payload. Rows with invalid timestamps are skipped later.
invalid_timestamps = df[time_column].isna().sum()
print(f"Rows: {len(df)}")
print(f"Invalid timestamps: {invalid_timestamps}")
if invalid_timestamps > 0:
display(df[df[time_column].isna()].head())
display(df.head())
Build the /BulkObservations payload
The helper function converts each value column into a dataArray associated with the correct datastream.
observations = build_bulk_observations(
df,
time_column,
column_to_datastream_id,
)
print(f"Prepared {len(observations)} observation groups")
for obs in observations:
print(
f"Datastream {obs['Datastream']['@iot.id']}: "
f"{len(obs['dataArray'])} observations"
)
Preview the payload before sending
Preview only a few rows per datastream to verify the structure without printing the full payload.
preview = preview_bulk_observations(observations, rows=3)
display_json(preview)
Send observations to /BulkObservations
The request sends all prepared observation groups in one API call.
response = post_bulk_observations(
IST_SOS_ENDPOINT,
editor_token,
observations,
commit_message="Create Zurich platform observations from CSV",
timeout=60,
)
try:
display_json(response.json())
except Exception:
print(response.text)
Visualize one datastream
After the upload, retrieve recent observations for one selected datastream and plot them.
Change selected_column to inspect another variable:
temperaturerelative_humidityprecipitation
# Choose one of:
# - "temperature"
# - "relative_humidity"
# - "precipitation"
selected_column = "temperature"
datastream_id = column_to_datastream_id[selected_column]
print(f"Selected column: {selected_column}")
print(f"Selected Datastream ID: {datastream_id}")
Fetch observations
The helper function retrieves recent observations for the selected datastream and converts result values to floats.
raw_observations = get_observations(IST_SOS_ENDPOINT, editor_token, datastream_id)
dt = []
values = []
seen_times = set()
for obs in reversed(raw_observations):
phenomenon_time = obs.get("phenomenonTime")
result = result_to_float(obs.get("result"))
if phenomenon_time is None or result is None:
continue
dt.append(parser.parse(phenomenon_time))
values.append(result)
seen_times.add(phenomenon_time)
print(f"Loaded {len(values)} observations for {selected_column}")
Plot the selected time series
The plot is a quick visual check that observations were inserted and can be read back from the API.
fig, ax = plt.subplots(figsize=(20, 10))
ax.set_title(f"{selected_column} - Datastream {datastream_id}")
ax.set_xlabel("Time")
ax.set_ylabel(selected_column)
ax.grid(True)
ax.plot(dt, values, label=selected_column)
ax.legend()
plt.show()