"""
ddicdi_model.py
This module defines the DdiCdiModel class, which provides an interface for loading, querying,
and interacting with DDI-CDI specification using rdflib and Pydantic.
It supports loading Turtle files, querying RDF graphs with SPARQL, and retrieving model
metadata, classes, attributes, associations, enumerations, and structured datatypes.
The XML schema version is also loaded for retrieving resource multiplicty / acardinalities
which are not capture in the ontology.
The class provides utility methods for working with namespaces and prefixed URIs.
Examples:
Load the model:
```python
from dartfx.ddi.ddicdi_model import DdiCdiModel
model = DdiCdiModel(root_dir="/path/to/ddi-cdi")
```
Get all ucmis resources:
```python
classes = model.get_ucmis_classes()
associations = model.get_ucmis_associations()
attributes = model.get_ucmis_attributes()
enums = model.get_ucmis_enumerations()
data_types = model.get_ucmis_structureddatatypes()
```
Get resource information:
```python
superclasses = model.get_resource_superclasses("cdi:InstanceVariable")
subclasses = model.get_resource_subclasses("cdi:InstanceVariable")
attributes = model.get_resource_attributes("cdi:InstanceVariable", inherited=True)
associations = model.get_resource_associations("cdi:InstanceVariable")
```
SPDX-Identifier: MIT
"""
import os
import xml.etree.ElementTree as ET
from typing import Any
from pydantic import BaseModel, computed_field
from rdflib import Graph, Namespace
NAMESPACES = {
"rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
"rdfs": "http://www.w3.org/2000/01/rdf-schema#",
"owl": "http://www.w3.org/2002/07/owl#",
"xsd": "http://www.w3.org/2001/XMLSchema#",
"dc": "http://purl.org/dc/elements/1.1/",
"skos": "http://www.w3.org/2004/02/skos/core#",
"cdi": "http://ddialliance.org/Specification/DDI-CDI/1.0/RDF/",
"ucmis": "tag:ddialliance.org,2024:ucmis:",
}
REVERSE_NAMESPACES = {uri: prefix for prefix, uri in NAMESPACES.items()}
XMLNS = {"xs": "http://www.w3.org/2001/XMLSchema", "xml": "http://www.w3.org/XML/1998/namespace"}
[docs]
class DdiCdiModel(BaseModel):
"""
A class to represent the DDI CDI model
this is implemented as a wrapper around the DDI-CDI specification official
"""
root_dir: str
_graph: Graph | None = None
_xml: ET.Element | None = None
@classmethod
def validate(cls, value):
if not os.path.isdir(value["root"]):
raise ValueError(f"Root directory does not exist: {value['root']}")
return value
def model_post_init(self, __context):
"""
Pydantic post-init method to load Turtle and XML files.
"""
self._graph = self.load_ontology(self.ontology_dir)
self._xml = self.load_xml(os.path.join(self.xmlschema_dir, "ddi-cdi.xsd"))
for prefix, uri in NAMESPACES.items():
self._graph.bind(prefix, Namespace(uri))
@computed_field # type: ignore
@property
def build_dir(self) -> str:
"""
Returns the directory path where the model build artifacts are located.
"""
return os.path.join(self.root_dir, "build")
@computed_field # type: ignore
@property
def encoding_dir(self) -> str:
"""
Returns the directory path where the model encoding are located.
"""
return os.path.join(self.build_dir, "encoding")
@property
def graph(self) -> Graph:
"""
Returns the in-memory RDF graph containing the loaded Turtle files.
This graph can be queried using SPARQL.
"""
if self._graph is None:
raise ValueError("Graph has not been initialized. Call model_post_init first.")
return self._graph
@computed_field # type: ignore
@property
def jsonld_dir(self) -> str:
"""
Returns the directory path where the JSON-LD files are located.
"""
return os.path.join(self.encoding_dir, "jsonld")
@computed_field # type: ignore
@property
def ontology_dir(self) -> str:
"""
Returns the directory path where the ontology files are located.
"""
return os.path.join(self.encoding_dir, "ontology")
@computed_field # type: ignore
@property
def source_dir(self) -> str:
"""
Returns the directory path where the source files are located.
"""
return os.path.join(self.root_dir, "source")
@property
def xml(self) -> ET.Element:
"""
Returns the root element of the loaded XML schema.
"""
if self._xml is None:
raise ValueError("XML has not been initialized. Call model_post_init first.")
return self._xml
@computed_field # type: ignore
@property
def xmlschema_dir(self) -> str:
"""
Returns the directory path where the XML Schema files are located.
"""
return os.path.join(self.encoding_dir, "xml-schema")
def load_ontology(self, directory_path) -> Graph:
"""
Loads all Turtle (.ttl) files from the specified directory into an in-memory RDF graph.
Returns the rdflib.Graph object, which can be queried with SPARQL.
"""
g = Graph()
for filename in os.listdir(directory_path):
if filename.endswith(".ttl"):
file_path = os.path.join(directory_path, filename)
g.parse(file_path, format="turtle")
return g
def load_xml(self, file_path: str) -> ET.Element:
"""
Loads an XML file and returns its root element.
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"XML file not found: {file_path}")
tree = ET.parse(file_path)
return tree.getroot()
def get_association_cardinalities(self, association_uri: str) -> dict[str, dict[str, Any]]:
"""
Retrieves the from and to cardinalities of an association for a given association URI.
Uses the XML schema to determine the minOccurs and maxOccurs values and other attributes.
The complexType name and element @id are extracted from the association URI,
and the XPath is constructed to find the corresponding element in the XML schema.
Returns a dictionary with the two cardinalities: 'to' and 'from'
Example:
{
"from": {
"minOccurs": "0",
"maxOccurs": "unbounded",
"display": "0..*"
"type": "InstanceVariable"
},
"to": {
"minOccurs": "0",
"maxOccurs": "unbounded",
"display": "0..*",
"types": [
"PhysicalSegmentLayout",
"UnitSegmentLayout"
]
}
}
"""
cardinalities = {}
association_name = association_uri.split(":")[-1]
association_components = association_name.split("_")
association_from = association_components[0]
# association_type = association_components[1]
# association_to = association_components[2]
# ./xs:complexType[@id='InstanceVariableXsdType']//xs:element[@id='InstanceVariable_has_PhysicalSegmentLayout']
xpath = f"./{{{XMLNS['xs']}}}complexType[@{{{XMLNS['xml']}}}id='{association_from}XsdType']//{{{XMLNS['xs']}}}element[@{{{XMLNS['xml']}}}id='{association_name}']"
from_element = self.xml.find(xpath)
if from_element:
# FROM
cardinality_from = {}
cardinality_from["minOccurs"] = from_element.get("minOccurs")
cardinality_from["maxOccurs"] = from_element.get("maxOccurs")
# Convert to a string representation
cardinality_from["display"] = (
f"{cardinality_from['minOccurs']}..{cardinality_from['maxOccurs']}"
if cardinality_from["maxOccurs"] != "unbounded"
else f"{cardinality_from['minOccurs']}..*"
)
cardinality_from["type"] = association_from
cardinalities["from"] = cardinality_from
# TO .//xs:element[@id='InstanceVariable_has_PhysicalSegmentLayout-validType']
xpath = f".//{{{XMLNS['xs']}}}element[@{{{XMLNS['xml']}}}id='{association_name}-validType']"
to_element = from_element.find(xpath)
if to_element:
cardinality_to: dict[str, Any] = {}
cardinality_to["minOccurs"] = to_element.get("minOccurs")
cardinality_to["maxOccurs"] = to_element.get("maxOccurs")
# Convert to a string representation
cardinality_to["display"] = (
f"{cardinality_to['minOccurs']}..{cardinality_to['maxOccurs']}"
if cardinality_to["maxOccurs"] != "unbounded"
else f"{cardinality_to['minOccurs']}..*"
)
# add types
xpath = f".//{{{XMLNS['xs']}}}enumeration"
to_types_elements = to_element.findall(xpath)
if to_types_elements:
cardinality_to["types"] = [e.get("value") for e in to_types_elements]
cardinalities["to"] = cardinality_to
return cardinalities
def get_class_frequency(self, class_uri) -> int:
"""
Counts the number of instances of a specific class in the RDF graph.
"""
query = f"""
SELECT (COUNT(?instance) AS ?count)
WHERE {{
?instance a {class_uri} .
}}
"""
results = self.graph.query(query)
count = int(next(iter(results))[0]) # type: ignore
return count
def get_classes(self) -> list[str]:
"""
Retrieves all classes in the RDF graph.
Returns a list of class URIs (as prefixed URIs if possible).
"""
query = """
SELECT DISTINCT ?c
WHERE {
{[] a ?c .}
}
order by ?c """
results = self.graph.query(query)
return [self.prefixed_uri(str(row[0])) for row in results] # type: ignore
def get_enumeration(self, enumeration_uri: str) -> dict[str, Any]:
"""
Retrieves information about an enumeration.
"""
enumeration = {}
properties = self.get_resource_properties(enumeration_uri)
enumeration["uri"] = enumeration_uri
enumeration["label"] = properties.get("rdfs:label") # type: ignore
enumeration["description"] = properties.get("rdfs:comment") # type: ignore
# get members
members = {}
query = f"""
SELECT ?s
WHERE {{
?s a {enumeration_uri} ;
}}
"""
results = self.graph.query(query)
for row in results:
member_uri = self.prefixed_uri(row[0]) # type: ignore
member_properties = self.get_resource_properties(member_uri)
label = member_properties.get("rdfs:label")
description = member_properties.get("rdfs:comment")
if isinstance(description, list): # NOTE: this is to accommodate for extra lines in enumeration's comments
description = "\n".join(description)
if description:
description = description.replace("\n", " ").strip()
members[label] = {"uri": member_uri, "label": label, "description": description}
enumeration["members"]: dict[str, Any] = members # type: ignore
return enumeration
def get_resource_attribute_cardinality(self, _resource_uri: str, attribute_uri) -> dict[str, Any]:
"""
Retrieves the cardinality of attribute for a given resource URI.
Uses the XML schema to determine the minOccurs and maxOccurs values.
The complexType name and element @id are extracted from the resource URI,
and the XPath is constructed to find the corresponding element in the XML schema.
XPath example for InstanceVariable-physicalDataType:
./{xs}complexType[@{xml}id='InstanceVariableXsdType']//{xs}element[@{xml}id='InstanceVariable-physicalDataType']
Returns a dictionary with attribute cardinality information.
Example:
{
"minOccurs": "0",
"maxOccurs": "unbounded",
"display": "0..*"
}
"""
cardinality = {}
attribute_name = attribute_uri.split(":")[-1] # Get the local name of the attribute
resource_name = attribute_name.split("-")[0] # Get the local name of the resource
xpath = f"./{{{XMLNS['xs']}}}complexType[@{{{XMLNS['xml']}}}id='{resource_name}XsdType']//{{{XMLNS['xs']}}}element[@{{{XMLNS['xml']}}}id='{attribute_name}']"
elements = self.xml.findall(xpath)
if elements:
# Assuming the first element is the one we want
element = elements[0]
cardinality["minOccurs"] = element.get("minOccurs")
cardinality["maxOccurs"] = element.get("maxOccurs")
# Convert to a string representation
cardinality["display"] = (
f"{cardinality['minOccurs']}..{cardinality['maxOccurs']}"
if cardinality["maxOccurs"] != "unbounded"
else f"{cardinality['minOccurs']}..*"
)
return cardinality
def get_resource_domain_attributes(
self, resource_uri: str, description: bool = False, inherited: bool = False
) -> dict[str, dict[str, Any]]:
"""
Retrieves all domain attributes for a given resource URI.
Returns a dictionary with attribute URIs as keys and their cardinality as values.
"""
attributes = {}
for attribute_uri in self.get_resource_ucmis_domain_attributes(resource_uri):
properties = self.get_resource_properties(attribute_uri)
range = properties.get("rdfs:range")
if isinstance(range, list):
range = [self.prefixed_uri(r) for r in range]
else:
range = self.prefixed_uri(range) if range else None
cardinality = self.get_resource_attribute_cardinality(resource_uri, attribute_uri)
attribute = {
"uri": attribute_uri,
"label": properties.get("rdfs:label"),
"range": range,
"cardinality": cardinality,
}
if inherited:
attribute["inherited"] = False
if description:
attribute["description"] = properties.get("rdfs:comment")
attributes[attribute_uri] = attribute
if inherited:
# get attributes from superclasses
superclasses = self.get_resource_superclasses(resource_uri)
for superclass_uri in superclasses:
superclass_attributes = self.get_resource_domain_attributes(superclass_uri, inherited=False)
for superclass_attribute in superclass_attributes.values():
superclass_attribute["inherited"] = True # Mark as inherited
superclass_attribute["inherited_from"] = superclass_uri # Mark the superclass
attributes[superclass_attribute["uri"]] = superclass_attribute
return attributes
def get_resource_range_attributes(
self, resource_uri: str, description: bool = False, inherited: bool = False
) -> dict[str, dict[str, Any]]:
"""
Retrieves all range attributes for a given resource URI.
Returns a dictionary with attribute URIs as keys and their cardinality as values.
"""
attributes = {}
for attribute_uri in self.get_resource_ucmis_range_attributes(resource_uri):
properties = self.get_resource_properties(attribute_uri)
range = properties.get("rdfs:range")
if isinstance(range, list):
range = [self.prefixed_uri(r) for r in range]
else:
range = self.prefixed_uri(range) if range else None
cardinality = self.get_resource_attribute_cardinality(resource_uri, attribute_uri)
attribute = {
"uri": attribute_uri,
"label": properties.get("rdfs:label"),
"range": range,
"cardinality": cardinality,
}
if inherited:
attribute["inherited"] = False
if description:
attribute["description"] = properties.get("rdfs:comment")
attributes[attribute_uri] = attribute
if inherited:
# get attributes from superclasses
superclasses = self.get_resource_superclasses(resource_uri)
for superclass_uri in superclasses:
superclass_attributes = self.get_resource_range_attributes(superclass_uri, inherited=False)
for superclass_attribute in superclass_attributes.values():
superclass_attribute["inherited"] = True # Mark as inherited
superclass_attribute["inherited_from"] = superclass_uri # Mark the superclass
attributes[superclass_attribute["uri"]] = superclass_attribute
return attributes
def get_resource_associations(
self,
resource_uri: str,
include_from: bool = True,
include_to: bool = True,
inherited: bool = False,
cardinalities: bool = False,
) -> dict[str, dict[str, Any]]:
associations = {}
# from
if include_from:
for association_uri in self.get_resource_ucmis_associations_from(resource_uri):
association: dict[str, Any] = {"uri": association_uri, "direction": "from"}
properties = self.get_resource_properties(association_uri)
range = properties.get("rdfs:range")
if isinstance(range, list):
range = [self.prefixed_uri(r) for r in range]
else:
range = self.prefixed_uri(range) if range else None
association["label"] = properties.get("rdfs:label")
association["altLabel"] = properties.get("skos:altLabel")
association["description"] = properties.get("rdfs:comment")
association["domain"] = properties.get("rdfs:domain")
association["range"] = range
if inherited:
association["inherited"] = False
if cardinalities:
association.update(self.get_association_cardinalities(association_uri))
associations[association_uri] = association
# to
if include_to:
for association_uri in self.get_resource_ucmis_associations_to(resource_uri):
association = {"uri": association_uri, "direction": "to"}
if inherited:
association["inherited"] = False
if cardinalities:
association.update(self.get_association_cardinalities(association_uri))
associations[association_uri] = association
if inherited:
# get attributes from superclasses
superclasses = self.get_resource_superclasses(resource_uri)
for superclass_uri in superclasses:
superclass_associations = self.get_resource_associations(
superclass_uri, inherited=False, cardinalities=cardinalities
)
for superclass_attribute in superclass_associations.values():
superclass_attribute["inherited"] = True # Mark as inherited
superclass_attribute["inherited_from"] = superclass_uri # Mark the superclass
associations[superclass_attribute["uri"]] = superclass_attribute
return associations
def get_resource_associations_from(
self, resource_uri: str, inherited: bool = False, cardinalities: bool = False
) -> dict[str, dict[str, Any]]:
"""
Retrieves all FROM associations that have the given resource_uri as their rdfs:domain.
Returns a dictionary with association URIs as keys and their properties as values.
"""
return self.get_resource_associations(
resource_uri, include_from=True, include_to=False, inherited=inherited, cardinalities=cardinalities
)
def get_resource_associations_to(
self, resource_uri: str, inherited: bool = False, cardinalities: bool = False
) -> dict[str, dict[str, Any]]:
"""
Retrieves all TO associations that have the given resource_uri as their rdfs:range.
Returns a dictionary with association URIs as keys and their properties as values.
"""
return self.get_resource_associations(
resource_uri, include_from=False, include_to=True, inherited=inherited, cardinalities=cardinalities
)
def get_resource_properties(self, resource_uri: str) -> dict[str, Any]:
"""
Retrieves all RDF properties of a given resource URI in the graph.
Returns a dictionary with property URIs as keys and their values as lists or single values.
"""
query = f"""
SELECT ?property ?value
WHERE {{
{resource_uri} ?property ?value .
}}
"""
results = self.graph.query(query)
properties: dict[str, Any] = {}
# Process results into a dictionary
for row in results:
prop = self.prefixed_uri(str(row[0])) # type: ignore
value = str(row[1]) # type: ignore
if prop not in properties:
properties[prop] = []
properties[prop].append(value)
# Convert lists with a single item to just the item
for prop in properties:
if len(properties[prop]) == 1:
properties[prop] = properties[prop][0]
return properties
def get_resource_ucmis_associations_from(self, resource_uri: str) -> list[str]:
"""
Retrieves all ucmis:Association resources that have the given resource_uri as their rdfs:domain.
Returns a list of association URIs (as prefixed URIs if possible).
"""
query = f"""
SELECT ?association
WHERE {{
?association a ucmis:Association ;
rdfs:domain {resource_uri} .
}}
"""
results = self.graph.query(query)
return [self.prefixed_uri(str(row[0])) for row in results] # type: ignore
def get_resource_ucmis_associations_to(self, resource_uri: str) -> list[str]:
"""
Retrieves all ucmis:Association resources that have the given resource_uri as their rdfs:range.
Returns a list of association URIs (as prefixed URIs if possible).
"""
query = f"""
SELECT ?association
WHERE {{
?association a ucmis:Association ;
rdfs:range {resource_uri} .
}}
"""
results = self.graph.query(query)
return [self.prefixed_uri(str(row[0])) for row in results] # type: ignore
def get_resource_ucmis_domain_attributes(self, resource_uri: str) -> list[str]:
"""
Retrieves all ucmis:Attribute resources that have the given resource_uri as their rdfs:domain.
Returns a list of attribute URIs (as prefixed URIs if possible).
"""
query = f"""
SELECT ?attribute
WHERE {{
?attribute a ucmis:Attribute ;
rdfs:domain {resource_uri} .
}}
"""
results = self.graph.query(query)
return [self.prefixed_uri(str(row[0])) for row in results] # type: ignore
def get_resource_ucmis_range_attributes(self, resource_uri: str) -> list[str]:
"""
Retrieves all ucmis:Attribute resources that have the given resource_uri as their rdfs:range.
Returns a list of attribute URIs (as prefixed URIs if possible).
"""
query = f"""
SELECT ?attribute
WHERE {{
?attribute a ucmis:Attribute ;
rdfs:range {resource_uri} .
}}
"""
results = self.graph.query(query)
return [self.prefixed_uri(str(row[0])) for row in results] # type: ignore
def get_resource_superclasses(self, resource_uri: str) -> list[str]:
"""
Retrieves all superclasses of a given resource URI via rdfs:subClassOf.
Returns a set of superclass URIs.
"""
query = f"""
SELECT DISTINCT ?superclass
WHERE {{
{resource_uri} rdfs:subClassOf* ?superclass .
FILTER(?superclass != {resource_uri})
}}
"""
results = self.graph.query(query)
if results:
return [self.prefixed_uri(str(row[0])) for row in results] # type: ignore
else:
return []
def get_resource_subclasses(self, resource_uri: str) -> list[str]:
"""
Retrieves all subclasses of a given resource URI via rdfs:subClassOf.
Returns a set of subclass URIs.
"""
query = f"""
SELECT DISTINCT ?subclass
WHERE {{
?subclass rdfs:subClassOf* {resource_uri} .
FILTER(?subclass != {resource_uri})
}}
"""
results = self.graph.query(query)
if results:
return [self.prefixed_uri(str(row[0])) for row in results] # type: ignore
else:
return []
def get_ucmis_attributes(self) -> list[str]:
"""
Retrieves all ucmis:Attribute
Returns a list of attribute URIs (as prefixed URIs if possible).
"""
query = """
SELECT ?attribute
WHERE {
?attribute a ucmis:Attribute .
}
order by ?attribute
"""
results = self.graph.query(query)
return [self.prefixed_uri(str(row[0])) for row in results] # type: ignore
def get_ucmis_associations(self) -> list[str]:
"""
Retrieves all ucmis:Association
Returns a list of associations URIs (as prefixed URIs if possible).
"""
query = """
SELECT ?association
WHERE {
?association a ucmis:Association .
}
order by ?association
"""
results = self.graph.query(query)
return [self.prefixed_uri(str(row[0])) for row in results] # type: ignore
def get_ucmis_classes(self) -> list[str]:
"""
Retrieves all ucmis:Classes
Returns a list of attribute URIs (as prefixed URIs if possible).
"""
query = """
SELECT ?class
WHERE {
?class a ucmis:Class .
}
order by ?class
"""
results = self.graph.query(query)
return [self.prefixed_uri(str(row[0])) for row in results] # type: ignore
def get_ucmis_enumerations(self) -> list[str]:
"""
Retrieves all ucmis:Enumeration
Returns a list of enumeration URIs (as prefixed URIs if possible).
"""
query = """
SELECT ?enumeration
WHERE {
?enumeration a ucmis:Enumeration .
}
order by ?enumeration
"""
results = self.graph.query(query)
return [self.prefixed_uri(str(row[0])) for row in results] # type: ignore
def get_ucmis_structureddatatypes(self) -> list[str]:
"""
Retrieves all ucmis:StructuredDataType
Returns a list of datatype URIs (as prefixed URIs if possible).
"""
query = """
SELECT ?datatype
WHERE {
?datatype a ucmis:StructuredDataType .
}
order by ?datatype
"""
results = self.graph.query(query)
return [self.prefixed_uri(str(row[0])) for row in results] # type: ignore
def get_subclassof(self) -> dict[str, str]:
"""
Retrieves all rdfs:subClassOf relationships in the RDF graph.
Returns a dictionary where keys are child class URIs and values are parent class URIs.
The URIs are returned as prefixed URIs if possible.
"""
query = """
SELECT ?child ?parent
WHERE {
?child rdfs:subClassOf ?parent.
}
order by ?child
"""
results = self.graph.query(query)
subclass_of = {}
for row in results:
child_uri = self.prefixed_uri(str(row[0])) # type: ignore
parent_uri = self.prefixed_uri(str(row[1])) # type: ignore
subclass_of[child_uri] = parent_uri
return subclass_of
def prefixed_uri(self, uri: str) -> str:
"""
Converts a full URI to a prefixed URI if it matches any of the known namespaces.
"""
for prefix, ns_uri in NAMESPACES.items():
if uri.startswith(ns_uri):
uri = f"{prefix}:{uri[len(ns_uri) :]}"
return uri
def full_uri(self, uri: str) -> str:
"""
Converts a prefixed URI to a full URI if it matches any of the known namespaces.
"""
for prefix, ns_uri in NAMESPACES.items():
if uri.startswith(f"{prefix}:"):
uri = f"{ns_uri}{uri[len(prefix) + 1 :]}"
return uri
def search_classes(self, class_name: str) -> list[str]:
"""
Searches for classes in the RDF graph by their name.
Returns a list of matching class URIs.
"""
query = f"""
PREFIX ucmis: <{NAMESPACES["ucmis"]}>
SELECT ?class
WHERE {{
?class a ucmis:Class .
FILTER regex(str(?class), "{class_name}", "i")
}}
"""
results = self.graph.query(query)
return [self.prefixed_uri(str(row[0])) for row in results] # type: ignore