wis2-data-processing-chain

Demonstration project outlining the processing chain implemented at Ifremer for publishing data on the WMO Information System (WIS 2.0)

https://github.com/ifremer/wis2-data-processing-chain

Science Score: 44.0%

This score indicates how likely this project is to be science-related based on various indicators:

  • CITATION.cff file
    Found CITATION.cff file
  • codemeta.json file
    Found codemeta.json file
  • .zenodo.json file
    Found .zenodo.json file
  • DOI references
  • Academic publication links
  • Academic email domains
  • Institutional organization owner
  • JOSS paper metadata
  • Scientific vocabulary similarity
    Low similarity (8.5%) to scientific vocabulary
Last synced: 10 months ago · JSON representation ·

Repository

Demonstration project outlining the processing chain implemented at Ifremer for publishing data on the WMO Information System (WIS 2.0)

Basic Info
  • Host: GitHub
  • Owner: ifremer
  • License: lgpl-3.0
  • Language: Python
  • Default Branch: main
  • Size: 875 KB
Statistics
  • Stars: 0
  • Watchers: 5
  • Forks: 0
  • Open Issues: 0
  • Releases: 0
Created about 1 year ago · Last pushed about 1 year ago
Metadata Files
Readme Citation Codemeta

README.md

Summary

This project demonstrates a processing chain for creating, validating, and publishing a message on the WMO Information System (WIS 2.0).

The process is triggered when a data file is stored into a file system (this part is out of scope for this project). This event is transmitted via an MQTT broker (Mosquitto) and captured by a scheduler (Airflow), which schedule the processing chain to generate, validate and send a notification to the WMO Information System.

For demonstration purposes, the Argo use case is implemented. However, this workflow is designed to be adaptable to any data source, provided that an event is emitted when a new data file is created.

Tools / Technologies

List of tools and technologies used:

  • Mosquitto: MQTT message broker used to transmit events and notifications.
  • Airflow: Scheduler that subscribes to the MQTT broker and orchestrates processing based on received events.
  • Python: Processing tasks dedicated to WIS2 are written in Python and use the following libraries:
    • Paho: Python MQTT client for publishing and subscribing to topics.
    • pywis_pubsub: Validates data notification messages.
    • pywcmp: Validates notification messages of type WMO WIS Core Metadata Profile (WCMP).

Architecture

```mermaid flowchart TB; %% Styles améliorés classDef filesystem fill:#FFD700,stroke:#B8860B,stroke-width:2px,color:#000,font-weight:bold; classDef process fill:#0A89B0,stroke:#08637F,stroke-width:2px,color:#fff,font-weight:bold; classDef broker fill:#007acc,stroke:#005f99,stroke-width:2px,color:#fff,font-weight:bold; classDef listener fill:#34D399,stroke:#0F9D58,stroke-width:2px,color:#fff,font-weight:bold; classDef airflow fill:#888f8a,stroke:#ffffff,stroke-width:2px,color:#000,font-weight:bold;

%% Réseau interne - Ifremer WIS2 Node subgraph internalnetwork["🔗 Ifremer WIS2 Node"] brokerifremer["📨 Ifremer Broker"]:::broker

%% Serveur Web sécurisé subgraph httpsserversubgraph["🌐 HTTPS Server"] datafilesystem["🗂️ Argo Data Files"]:::filesystem diffusionprocess["⚙️ Argo Event Processor"]:::process end

%% Planification des tâches - Airflow subgraph airflowsubgraph["🗓️ Airflow Scheduler"] mqttlistener["🎧 Event Listener"]:::listener notificationmessageprocess["📦 WIS2 Message Generator"]:::process end

%% Connexions internes datafilesystem -->|new file| diffusionprocess diffusionprocess -->|CloudEvent/STAC message| brokerifremer brokerifremer -->|CloudEvent/STAC| mqttlistener mqttlistener -->|Trigger Notification| notificationmessageprocess notificationmessageprocess -->|WIS2 Notification| brokerifremer end

%% Noeuds externes broker_wis2["📨 WIS2 Global Broker"]:::broker

%% Connexions externes brokerwis2 -->|origin/a/wis2/fr-ifremer-argo/...| brokerifremer

```

Organization

The project is structured as follows:

  • broker/: Directory containing MQTT broker data and configuration.
  • scheduler/: Directory containing Airflow scheduler data and configuration.
  • data/: Directory containing test data.
  • compose.yml: Docker Compose configuration file defining the services required to execute the full notification message publication process.

Configuration

  • broker/config: Contains the Mosquitto configuration file.
  • broker/data: Contains user configurations and topic permissions for Mosquitto.

  • scheduler/config: Airflow configuration.

  • scheduler/dags: Airflow DAGs.

  • scheduler/logs: Airflow logs.

  • scheduler/plugins: Airflow plugins.

  • scheduler/data: Airflow externalised DAGs data, to store event / notifications from the DAGs in case of error

Services

Metadata

Metadata management is not included in this demonstration since it only needs to be executed once. However, the process follows the same principle. An example of a JSON Core Metadata Profile file is stored in the test data directory. This file must be hosted on a web server with a publicly accessible URL. From the Core Metadata Profile, a WIS2 notification message must be created and published to the global WIS2 broker on the dedicated topic. For example, for Argo:

bash mqttx pub -h localhost --debug -p 8081 -l ws -u wis2-argo-rw -P "wis2-argo-rw" --path / -t origin/a/wis2/fr-ifremer-argo/metadata -m "$(cat ./data/notification-message/core-metadata-msg-notification.json)"

Data

Microservices described in the compose.yml files:

  1. broker/compose.yml: A Mosquitto microservice implementing the MQTT protocol to transmit events (data file creation) and notifications (WIS2 notification message).
  2. scheduler/compose.yml: 9 microservices enabling local execution of the Airflow scheduler (official documentation). Airflow triggers the processing chain (creation of a WIS2 notification message) each time an event is received (new data file creation).
  3. argo-event-diffusion: A microservice simulating the creation of an Argo file diffusion event.
  4. wis2-argo-subscription: A microservice simulating the global WIS2 broker, receiving notification messages from Argo.

We the solution is running properly, you can simulate another Argo diffusion event from a message in your file system with mqttx client :

bash mqttx pub -h localhost --debug -p 8081 -l ws -u prod-files-rw -P "prod-files-rw" --path / -t diffusion/files/coriolis/argo/bufr -m "$(cat ./data/event-message/bufr-creation-cloudevent.json)"

Get Started

To simulate the publication of a notification message on a WIS2 broker upon receiving an Argo data file creation event, follow these steps:

  • Start the Mosquitto and Airflow microservices and simulate Argo file diffusion using Docker:

bash docker compose up

  • Simulate again Argo file diffusion using Docker:

bash docker compose run argo-event-diffusion

  • Once the process is complete, stop and remove the containers:

bash docker compose down

  • Cleaning up : to stop and delete containers, delete volumes with database data and download images

bash docker compose down --volumes --rmi all

Classic Use Case

Once the application is running locally, access the Airflow web interface at http://localhost:8080 using the default credentials (airflow / airflow).

Filter the view to show only active DAGs. You will see the two DAGs related to this project:

Airflow active DAGs

  • 📂 WIS2 - Listen file diffusion event: This DAG subscribes to file diffusion topics on the MQTT broker. When a file diffusion event is received, it triggers the publication DAG.
  • 🔔 WIS2 - Publish notification message: This DAG is triggered by a diffusion event. It processes, validates, and publishes a WIS2 notification message.

As a demonstration, a first diffusion event is automatically sent when the system becomes ready.

Click on each DAG to inspect the status of individual tasks:

  • 📂 WIS2 - Listen file diffusion event contains a single task that listens on the MQTT broker. If you click on the task, you can access the logs and view the received events.

Listener DAG status

  • 🔔 WIS2 - Publish notification message includes several tasks that handle the generation, validation, and publication of the notification message. You can inspect the logs of each task. In the example below, the message was successfully published:

To simulate another file diffusion event, run the following Docker command:

bash docker compose run argo-event-diffusion

Alternatively, you can send a CloudEvent message manually using the mqttx client:

bash mqttx pub -h localhost --debug -p 8081 -l ws -u prod-files-rw -P "prod-files-rw" --path / -t diffusion/files/coriolis/argo/bufr -m "$(cat /path-to-data/cloudevents-message.json)"


What to Do in Case of an Error?

If an error occurs during the 🔔 WIS2 - Publish notification message process, a copy of the original event is saved when received. You can find and edit the message stored : ./scheduler/data/{{dag_id}}/{{run_id}}/{{task_id}}.json

| Method | Recommended? | Advantages | Drawbacks | | --------------------------- | -------------- | --------------------------- | ----------------------------------- | | ✅ Re-trigger with conf | ✅ YES | Clean, traceable, resilient | Creates a new DAG run | | 🔁 Clearing tasks | ⚠️ Sometimes | Quick | May re-run already successful tasks | | 🔧 Modifying existing conf | ❌ NO | Not possible (by design) | Unsafe and unsupported |

✅ To fix and retrigger

This solution is the cleanest because actions are tracable and resilent.

Option 1 : Using Airflow REST APIs

💡 Tip: Use the following command to list DAG runs and retrieve the run_id and execution_date:

bash curl -X GET "http://localhost:8080/api/v1/dags/{{dag_id}}/dagRuns" --user "airflow:airflow"

  • Edit the saved event message

```bash

Edit the CloudEvent message

vim ./scheduler/data/{{dagid}}/{{runid}}/{{task_id}}.json ```

  • Trigger a new DAG run with the updated configuration

bash curl -X POST "http://localhost:8080/api/v1/dags/{{dag_id}}/dagRuns" \ -H "Content-Type: application/json" \ --user "airflow:airflow" \ -d '{ "conf": '"$(cat ./scheduler/data/{{dag_id}}/{{run_id}}/{{task_id}}.json)"' }'

  • Clean up stored message after successful reprocessing

bash rm -Rf ./scheduler/data/{{dag_id}}/{{run_id}}

  • Document the recovery on the failed DAG run (Status and comment)

Option 2 : Using the command line

⚠️ Warning : you need to be connected to the server where airflow is running.

💡 Tip: Use the following command to list DAG runs and retrieve the runid and executiondate:

bash docker exec -it wis2-mqtt-broker-airflow-worker-1 airflow dags list-runs -d {{dag_id}}

  • Edit the saved event message

```bash

Edit the CloudEvent message

vim ./scheduler/data/{{dagid}}/{{runid}}/{{task_id}}.json ```

  • Trigger a new DAG run with the updated configuration

bash docker exec -it wis2-mqtt-broker-airflow-worker-1 airflow dags trigger \ -c "$(cat ./scheduler/data/{{dag_id}}/{{run_id}}/{{task_id}}.json)" \ {{dag_id}}

  • Clean up stored message after successful reprocessing

bash rm -Rf ./scheduler/data/{{dag_id}}/{{run_id}}

  • Document the recovery on the failed DAG run (Status and comment)

🔁 To fix and retry – From the Airflow web interface

This solution is Quicker but as it's not tracable, it is not the best option.

Option 1 – Using the command line

```bash

Edit the CloudEvent message

vim ./scheduler/data/{{dagid}}/{{runid}}/{{task_id}}.json

Clear and rerun the DAG after editing

docker exec -it wis2-mqtt-broker-airflow-worker-1 airflow tasks clear wis2-publish-message-notification \ -s "2025-04-15T15:00:47" \ -e "2025-04-15T15:00:48" \ --yes ```

💡 Tip: To retrieve the exact dag_run execution date:

bash docker exec -it wis2-mqtt-broker-airflow-worker-1 airflow dags list-runs -d wis2-publish-message-notification

Option 2 - From the Airflow web interface

  1. After fixing the message stored depending on the error:
    ./scheduler/data/{{dag_id}}/{{run_id}}/{{task_id}}.json
  2. Open the Airflow UI at http://localhost:8080
  3. Click on the DAG and locate the failed run:
    Select DAG Failed
  4. Click the "Clear" button and choose "Clear existing tasks":
    Clear existing tasks

Owner

  • Name: ifremer
  • Login: ifremer
  • Kind: organization

french marine institute

Citation (CITATION.cff)

cff-version: 1.2.0
title: Ifremer WIS2 Data Processing Chain
message: >-
  If you use this software, please cite it using the
  metadata from this file.
type: software
authors:
  - given-names: Leo
    family-names: Bruvry-Lagadec
    email: leo.bruvry.lagadec@ifremer.fr
    affiliation: Ifremer
    orcid: 'https://orcid.org/0000-0002-7768-1671'
  - name: Ifremer
    address: 1625 route de Sainte-Anne
    city: Plouzané
    country: FR
    post-code: '29200'
    region: Bretagne
repository-code: >-
  https://github.com/ifremer/wis2-data-processing-chain/
url: >-
  https://github.com/ifremer/wis2-data-processing-chain/
repository-artifact: >-
  https://github.com/ifremer/wis2-data-processing-chain/
abstract: >-
  The WIS2 Data Processing Chain project by Ifremer demonstrates an efficient 
  workflow for creating, validating, and publishing notifications to the World 
  Meteorological Organization's Information System (WIS 2.0). Designed with 
  adaptability in mind, this processing chain can accommodate various data sources, 
  initiating its sequence upon the detection of a new data file.
keywords:
  - WIS 2.0
  - Data Processing Chain
  - MQTT Broker
  - Mosquitto
  - Airflow Scheduler
  - CloudEvents
  - STAC
  - Argo
  - Argo Data
  - Environmental Data Sharing
  - Open Standards
  - Open Source
  - Data Notification System
  - Real-time Data Processing
  - Scalable Architecture
  - Metadata Management
  - Geospatial Data
  - Event-Driven Architecture
  - Data Dissemination
  - Organisation météorologique mondial
  - WMO Information System
  - Data Integration
  - Cloud-Ready Solution
  - Ifremer
license: LGPL-3.0
commit: Last release
version: 0.0.1
date-released: '2025-04-207'

CodeMeta (codemeta.json)

{
  "@context": [
    "https://w3id.org/codemeta/3.0",
    "https://schema.org"
  ],
  "type": "SoftwareSourceCode",
  "applicationCategory": "Data processing",
  "author": [
    {
      "id": "https://orcid.org/0000-0002-7768-1671",
      "type": "Person",
      "affiliation": {
        "type": "Organization",
        "name": "Ifremer"
      },
      "email": "leo.bruvry.lagadec@ifremer.fr",
      "familyName": "Bruvry-Lagadec",
      "givenName": "Leo"
    }
  ],
  "codeRepository": "https://github.com/ifremer/wis2-data-processing-chain/",
  "dateCreated": "2025-04-01",
  "dateModified": "2025-04-07",
  "datePublished": "2025-04-07",
  "description": "The WIS2 Data Processing Chain project by Ifremer demonstrates an efficient workflow for creating, validating, and publishing notifications to the World Meteorological Organization's Information System (WIS 2.0). Designed with adaptability in mind, this processing chain can accommodate various data sources, initiating its sequence upon the detection of a new data file.",
  "downloadUrl": "https://github.com/ifremer/wis2-data-processing-chain/",
  "identifier": "https://github.com/ifremer/wis2-data-processing-chain/",
  "isPartOf": "https://community.wmo.int/en/activity-areas/wis/wis2-implementation",
  "keywords": [
    "WIS 2.0",
    "Data Processing Chain",
    "MQTT Broker",
    "Mosquitto",
    "Airflow Scheduler",
    "CloudEvents",
    "STAC",
    "Argo Data",
    "Environmental Data Sharing",
    "Open Standards",
    "Open Source",
    "Data Notification System",
    "Real-time Data Processing",
    "Scalable Architecture",
    "Metadata Management",
    "Geospatial Data",
    "Event-Driven Architecture",
    "Data Dissemination",
    "Organisation mtorologique mondial",
    "WMO Information System",
    "Data Integration",
    "Cloud-Ready Solution"
  ],
  "license": "https://spdx.org/licenses/LGPL-3.0",
  "name": "WIS2 node data processing chain",
  "operatingSystem": "Linux",
  "programmingLanguage": [
    "Python"
  ],
  "relatedLink": [
    "https://community.wmo.int/en/activity-areas/wis/wis2-implementation",
    "https://wmo-im.github.io/wis2-guide/",
    "https://github.com/wmo-im/wis2-notification-message"
  ],
  "releaseNotes": "## 0.0.1 (2025-04-07)\n\n### added (1 changes)\n\n- Prototype of Argo WIS2 node\n\n",
  "version": "0.0.1",
  "developmentStatus": "active",
  "funding": {
    "type": "Organization",
    "name": "Ifremer",
    "identifier": "https://ror.org/044jxhp58"
  },
  "issueTracker": "https://github.com/ifremer/wis2-data-processing-chain/issues"
}

GitHub Events

Total
  • Push event: 10
  • Create event: 3
Last Year
  • Push event: 10
  • Create event: 3