kafka-dicom-fhir-pipeline

A simple project that use kafka to connect dicom and fhir.

https://github.com/rtdicomexplorer/kafka-dicom-fhir-pipeline

Science Score: 44.0%

This score indicates how likely this project is to be science-related based on various indicators:

  • CITATION.cff file
    Found CITATION.cff file
  • codemeta.json file
    Found codemeta.json file
  • .zenodo.json file
    Found .zenodo.json file
  • DOI references
  • Academic publication links
  • Academic email domains
  • Institutional organization owner
  • JOSS paper metadata
  • Scientific vocabulary similarity
    Low similarity (5.9%) to scientific vocabulary
Last synced: 8 months ago · JSON representation ·

Repository

A simple project that use kafka to connect dicom and fhir.

Basic Info
  • Host: GitHub
  • Owner: rtdicomexplorer
  • Language: Python
  • Default Branch: main
  • Size: 79.1 KB
Statistics
  • Stars: 0
  • Watchers: 0
  • Forks: 0
  • Open Issues: 0
  • Releases: 0
Created 9 months ago · Last pushed 9 months ago
Metadata Files
Readme Citation

readme.md

Project Overview: DICOM to FHIR Pipeline with Kafka

This system ingests DICOM files, extracts metadata, groups them into studies, and converts them into FHIR ImagingStudy bundles to send to a FHIR server. It uses Kafka to orchestrate decoupled stages and ensures resilience with retry + dead-letter queue logic.

Architecture Overview Pipeline Architecture

                          +------------------------+
                          |  batch_send_by_study.py|
                          +-----------+------------+
                                      |
                                      v
                           +----------v-----------+
                           |   dicom_receiver.py   |
                           +----------+-----------+
                                      |
                                      v
                   +------------------v------------------+
                   | consumer_grouped_study_processor.py |
                   +-----------+--------------+----------+
                               |              |
              +----------------v--+       +---v----------------+
              | consumer_fhir_uploader.py  | consumer_dlq_handler.py
              +------------------+         +------------------+
                         |                          |
                         v                          v
             +-----------v-----------+    +---------v--------+
             |     FHIR Server       |    |   DLQ (Kafka)    |
             |    (e.g., HAPI)       |    +------------------+
             +-----------------------+

                ▲         ▲         ▲         ▲
                |         |         |         |
    +-----------+---------+---------+---------+-------------+
    |             run_pipeline.py (Flask Dashboard)         |
    |  - Monitors services                                   |
    |  - Provides /status, logs, stop/start controls         |
    +--------------------------------------------------------+

How to use it:

  1. docker compose up -d
    • to start in docker the kafka broker
  2. pip install -r requirements.txt
  3. python run_pipeline.py
    • to start the services, also the end point http://localhost:5000
  4. python batchsendby_study
    • to start the dicom send process

Full Workflow

  1. #### batchsendby_study.py
    • parse a given folder and group the dicom files by studyuid call foreach file the dicom_sender
  2. #### dicom_sender.py
    • Reads .dcm files from a folder.
    • Sends them via DICOM C-STORE to a receiver (dicom_receiver).
    • Supports multiple patients, studies, series, subfolders.
  3. dicom_receiver.py

    • Listens for incoming DICOM files (acts as a DICOM SCP).
    • Saves received files to ./received_dicoms/Studies.
    • Extracts metadata:
    • PatientName,PatientID,PatientSex,PatientBirthDate, AccessionNumber, StudyInstanceUID, SeriesInstanceUID, SOPInstanceUID
    • Publishes metadata to Kafka topic: imaging.raw
  4. consumergroupedstudy_processor.py

    • Listens on imaging.raw
    • Groups incoming files by StudyInstanceUID into study batches using a TTL cache
    • Once a study is considered "complete" (10 seconds inactivity), emits:
    • studyuid, patientid, accession_number, modality, series, instances, and patient info
    • Sends to Kafka topic: imaging.study.ready
      1. #### consumerfhiruploader.py
    • Listens on imaging.study.ready
    • Builds a FHIR transaction bundle with:
    • A Patient resource (from DICOM info: name, sex, accession number)
    • An ImagingStudy resource (with series + instances)
    • Sends the bundle to the FHIR server (http://localhost:8080/fhir)
    • Also saves a local copy under ./bundles/
    • Retry logic: retries failed FHIR sends up to 3 times
      1. #### consumerdlqhandler.py
    • Reads from Kafka topic: imaging.failed

Message flow summary

| Step | Source | Target | Kafka Topic | Description | | ---- | ------------------------------------- | ------------------- | --------------------- | ---------------------------------------- | | 1 | dicom_sender.py | dicom_receiver.py | — | Sends DICOM via C-STORE | | 2 | dicom_receiver.py | Kafka Broker | imaging.raw | Emits metadata message per DICOM file | | 3 | consumer_grouped_study_processor.py | Kafka Broker | imaging.study.ready | Groups by study and emits study-level | | 4 | consumer_fhir_uploader.py | FHIR Server | — | Creates + sends FHIR ImagingStudy bundle |

Tech Stack

| Component | Library/Tool | | -------------- | ------------------------------ | | DICOM I/O | pydicom, pynetdicom | | Messaging | Kafka, kafka-python | | Metadata cache | cachetools.TTLCache | | FHIR Bundle | JSON, requests | | Resilience | Retry logic, DLQ, TTL grouping |

📖 Citation

If you use this code or part of it in your research, please cite:

[Michele, Bufano], Kafka DICOM to FHIR pipeline, GitHub Repository, https://github.com/rtdicomexplorer/kafka-dicom-fhir-pipeline

BibTeX:

```bibtex @misc{kafka-dicom-fhir-pipeline_2025, author = {Michele Bufano}, title = {Kafka DICOM FHIR pipeline}, year = 2025, publisher = {GitHub}, journal = {GitHub repository}, howpublished = {\url{https://github.com/rtdicomexplorer/kafka-dicom-fhir-pipeline}}, }

Owner

  • Name: Michele Bufano
  • Login: rtdicomexplorer
  • Kind: user

Citation (CITATION.cff)

cff-version: 1.2.0
message: "If you use this software, please cite it as below."
authors:
- family-names: "Bufano"
  given-names: "Michele"
  orcid: "https://orcid.org/0009-0000-5067-9814"
title: "kafka-dicom-fhir-pipeline"
version: 1.0.0
date-released: 2025-07-25
url: "https://github.com/rtdicomexplorer/kafka-dicom-fhir-pipeline"

GitHub Events

Total
  • Push event: 1
  • Create event: 1
Last Year
  • Push event: 1
  • Create event: 1

Dependencies

docker-compose.yml docker
  • confluentinc/cp-kafka 7.6.0
  • confluentinc/cp-zookeeper 7.6.0
requirements.txt pypi
  • cachetools ==6.1.0
  • certifi ==2025.7.14
  • charset-normalizer ==3.4.2
  • flask *
  • idna ==3.10
  • kafka-python ==2.2.15
  • pydicom ==3.0.1
  • pynetdicom ==3.0.3
  • python-dotenv *
  • requests ==2.32.4
  • urllib3 ==2.5.0