Skip to content

Load bulk observations from CSV

Workflow position: after 01_Authorization.ipynb; it reuses the same editor concept and the common DDT_workshop network.
This notebook creates a Zurich meteorological platform and uploads observations in bulk.

Compared with notebook 02, this notebook uses a more compact creation pattern:

  • one Thing is created;
  • the Location is nested inside the Thing;
  • three Datastreams are nested inside the same request;
  • observations are loaded from data/observations.csv and sent through /BulkObservations.

CSV contract

The CSV file is expected at:

data/observations.csv

It must contain:

Column Meaning
phenomenonTime Timestamp of the observation
temperature Air temperature values
relative_humidity Relative humidity values
precipitation Precipitation values

The column property inside each datastream definition is used to map CSV columns to the correct datastream IDs.

import requests
from istsos_utils import (
    REQUEST_TIMEOUT,
    auth_headers,
    build_bulk_observations,
    build_column_to_datastream_id,
    display_error_response,
    display_json,
    get_datastreams_for_thing,
    get_observations,
    get_or_create_network,
    login,
    post_bulk_observations,
    preview_bulk_observations,
    print_created,
    read_observations_csv,
    result_to_float,
)

IST_SOS_ENDPOINT = "http://api:5000/v4/v1.1"

Login as administrator

The administrator is used only to create or retrieve the shared workshop Network.

All station entities and observations are created with the editor user.

admin_username = input("Enter administrator username: ")
admin_password = input("Enter administrator password: ")

if not admin_username or not admin_password:
    print("Username or password is empty")
else:
    admin_token, login_body = login(
        IST_SOS_ENDPOINT,
        admin_username,
        admin_password,
        timeout=REQUEST_TIMEOUT,
    )

    if admin_token:
        print(f"Logged in as {admin_username}")

Create or retrieve a Network

The helper function get_or_create_network() reuses DDT_workshop when it already exists, otherwise it creates it.

This step requires the Network extension to be enabled.

network_name = "DDT_workshop"

network_id = get_or_create_network(
    IST_SOS_ENDPOINT,
    admin_token,
    network_name,
    timeout=REQUEST_TIMEOUT,
)

print(f"Network ID: {network_id}")

Login as editor

Use the editor user created in notebook 01.

The editor username becomes a prefix for all station resources created in this notebook. This keeps resources easier to identify in a shared workshop instance.

editor_username = input("Enter your username: ")
editor_password = input("Enter your password: ")

if not editor_username or not editor_password:
    print("Username or password is empty")

else:
    editor_token, login_body = login(
        IST_SOS_ENDPOINT,
        editor_username,
        editor_password,
        timeout=REQUEST_TIMEOUT,
    )

    if editor_token:
        prefix = editor_username + "-"
        print(f"Logged in as {editor_username}")
        print("Your station name will be prefixed with: " + prefix)

Create a Thing with nested Location and Datastreams

This request creates the Zurich platform in one compact SensorThings payload.

The payload contains:

  • one Thing;
  • one nested Location;
  • three nested Datastreams;
  • for each datastream, its ObservedProperty, Sensor, unit, sampling metadata, and CSV column mapping.

The created thing_id is used later to retrieve the datastreams generated by the API.

body = {
    "name": f"{prefix}ZURICH_PLATFORM",
    "description": "Zurich meteorological platform",
    "properties": {
        "keywords": "temperature,relative_humidity,precipitation,meteo,zurich",
        "description": "Meteorological measurements at Zurich platform"
    },
    "Locations": [
        {
            "name": f"{prefix}ZURICH_PLATFORM_LOCATION",
            "description": "Zurich platform location",
            "encodingType": "application/vnd.geo+json",
            "location": {
                "type": "Point",
                "coordinates": [
                    2682216.85,
                    1247945.20
                ],
                "crs": {
                    "type": "name",
                    "properties": {
                        "name": "EPSG:2056"
                    }
                }
            }
        }
    ],
    "Datastreams": [
        {
            "name": f"{prefix}TEMPERATURE_ZURICH_PLATFORM",
            "description": "Air temperature measured at Zurich platform",
            "observationType": "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement",
            "unitOfMeasurement": {
                "name": "Degree Celsius",
                "symbol": "°C",
                "definition": "http://www.opengis.net/def/uom/UCUM/0/Cel"
            },
            "properties": {
                "samplingFrequency": "PT10M",
                "acquisitionFrequency": "PT10M",
                "column": "temperature"
            },
            "ObservedProperty": {
                "name": "air:temperature",
                "definition": "urn:ogc:def:parameter:x-istsos:1.0:air:temperature",
                "description": "Air temperature"
            },
            "Sensor": {
                "name": f"{prefix}ZURICH_PLATFORM_TEMPERATURE_SENSOR",
                "description": "Temperature sensor at Zurich platform",
                "encodingType": "application/json",
                "metadata": "{}"
            },
            "Network": {
                "@iot.id": network_id
            }
        },
        {
            "name": f"{prefix}RELATIVE_HUMIDITY_ZURICH_PLATFORM",
            "description": "Relative air humidity measured at Zurich platform",
            "observationType": "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement",
            "unitOfMeasurement": {
                "name": "Percent",
                "symbol": "%",
                "definition": "http://www.opengis.net/def/uom/UCUM/0/%25"
            },
            "properties": {
                "samplingFrequency": "PT10M",
                "acquisitionFrequency": "PT10M",
                "column": "relative_humidity"
            },
            "ObservedProperty": {
                "name": "air:relative_humidity",
                "definition": "urn:ogc:def:parameter:x-istsos:1.0:air:relative_humidity",
                "description": "Relative air humidity"
            },
            "Sensor": {
                "name": f"{prefix}ZURICH_PLATFORM_RELATIVE_HUMIDITY_SENSOR",
                "description": "Relative humidity sensor at Zurich platform",
                "encodingType": "application/json",
                "metadata": "{}"
            },
            "Network": {
                "@iot.id": network_id
            }
        },
        {
            "name": f"{prefix}PRECIPITATION_ZURICH_PLATFORM",
            "description": "Precipitation measured at Zurich platform",
            "observationType": "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement",
            "unitOfMeasurement": {
                "name": "Millimetre",
                "symbol": "mm",
                "definition": "http://www.opengis.net/def/uom/UCUM/0/mm"
            },
            "properties": {
                "samplingFrequency": "PT10M",
                "acquisitionFrequency": "PT10M",
                "column": "precipitation"
            },
            "ObservedProperty": {
                "name": "precipitation",
                "definition": "urn:ogc:def:parameter:x-istsos:1.0:precipitation",
                "description": "Precipitation"
            },
            "Sensor": {
                "name": f"{prefix}ZURICH_PLATFORM_PRECIPITATION_SENSOR",
                "description": "Precipitation sensor at Zurich platform",
                "encodingType": "application/json",
                "metadata": "{}"
            },
            "Network": {
                "@iot.id": network_id
            }
        }
    ]
}

response = requests.post(
    IST_SOS_ENDPOINT + "/Things",
    json=body,
    headers=auth_headers(
        editor_token,
        "Create Zurich platform Thing with Location and 3 Datastreams",
    ),
    timeout=REQUEST_TIMEOUT,
)

if response.status_code == 201:
    thing_id = print_created("Thing", response)
else:
    display_error_response(response)
    raise Exception("Thing creation failed")

Create observations from CSV

The next cells prepare and send a /BulkObservations request.

The workflow is:

  1. retrieve the datastreams created inside the Zurich Thing;
  2. map CSV column names to datastream IDs;
  3. read and validate the CSV;
  4. build the SensorThings dataArray payload;
  5. preview the payload;
  6. send observations to the API.
import matplotlib.pyplot as plt
from dateutil import parser
from IPython.display import display

csv_path = "data/observations.csv"
time_column = "phenomenonTime"

bulk_observations_url = f"{IST_SOS_ENDPOINT}/BulkObservations"

Retrieve the datastreams created inside the Thing

The API returns the datastreams linked to the Zurich platform. Their @iot.id values are needed for the bulk-observation payload.

created_datastreams = get_datastreams_for_thing(
    IST_SOS_ENDPOINT,
    editor_token,
    thing_id,
    timeout=30,
)

print(f"Retrieved {len(created_datastreams)} Datastreams")
display_json(created_datastreams)

Build the CSV column → Datastream ID mapping

Each datastream contains a properties.column value. This value is matched with the corresponding CSV column.

# Final map: CSV column -> Datastream @iot.id
column_to_datastream_id = build_column_to_datastream_id(
    body,
    created_datastreams,
)

display_json(column_to_datastream_id)

Read and validate the CSV

The helper function reads the CSV, verifies required columns, and converts the time column to timezone-aware UTC datetimes.

df = read_observations_csv(
    csv_path,
    time_column,
    required_value_columns=column_to_datastream_id.keys(),
)

print("CSV columns:")
print(list(df.columns))

print(f"Rows: {len(df)}")
display(df.head())
display(df.dtypes)

Check normalized timestamps

Invalid timestamps are counted before building the payload. Rows with invalid timestamps are skipped later.

invalid_timestamps = df[time_column].isna().sum()

print(f"Rows: {len(df)}")
print(f"Invalid timestamps: {invalid_timestamps}")

if invalid_timestamps > 0:
    display(df[df[time_column].isna()].head())

display(df.head())

Build the /BulkObservations payload

The helper function converts each value column into a dataArray associated with the correct datastream.

observations = build_bulk_observations(
    df,
    time_column,
    column_to_datastream_id,
)

print(f"Prepared {len(observations)} observation groups")

for obs in observations:
    print(
        f"Datastream {obs['Datastream']['@iot.id']}: "
        f"{len(obs['dataArray'])} observations"
    )

Preview the payload before sending

Preview only a few rows per datastream to verify the structure without printing the full payload.

preview = preview_bulk_observations(observations, rows=3)
display_json(preview)

Send observations to /BulkObservations

The request sends all prepared observation groups in one API call.

response = post_bulk_observations(
    IST_SOS_ENDPOINT,
    editor_token,
    observations,
    commit_message="Create Zurich platform observations from CSV",
    timeout=60,
)

try:
    display_json(response.json())
except Exception:
    print(response.text)

Visualize one datastream

After the upload, retrieve recent observations for one selected datastream and plot them.

Change selected_column to inspect another variable:

  • temperature
  • relative_humidity
  • precipitation
# Choose one of:
# - "temperature"
# - "relative_humidity"
# - "precipitation"
selected_column = "temperature"

datastream_id = column_to_datastream_id[selected_column]

print(f"Selected column: {selected_column}")
print(f"Selected Datastream ID: {datastream_id}")

Fetch observations

The helper function retrieves recent observations for the selected datastream and converts result values to floats.

raw_observations = get_observations(IST_SOS_ENDPOINT, editor_token, datastream_id)

dt = []
values = []
seen_times = set()

for obs in reversed(raw_observations):
    phenomenon_time = obs.get("phenomenonTime")
    result = result_to_float(obs.get("result"))

    if phenomenon_time is None or result is None:
        continue

    dt.append(parser.parse(phenomenon_time))
    values.append(result)
    seen_times.add(phenomenon_time)

print(f"Loaded {len(values)} observations for {selected_column}")

Plot the selected time series

The plot is a quick visual check that observations were inserted and can be read back from the API.

fig, ax = plt.subplots(figsize=(20, 10))

ax.set_title(f"{selected_column} - Datastream {datastream_id}")
ax.set_xlabel("Time")
ax.set_ylabel(selected_column)
ax.grid(True)

ax.plot(dt, values, label=selected_column)
ax.legend()

plt.show()