Browse Source

feat: dump combined graph as graphml format

master
Dnomd343 1 day ago
parent
commit
1d192b25be
  1. 207
      misc/all-graph/03-convert_graphml.py
  2. 95
      misc/all-graph/05-dump_combined.py
  3. 44
      misc/all-graph/compare.py
  4. 116
      misc/all-graph/graphml.py

207
misc/all-graph/03-convert_graphml.py

@ -1,205 +1,90 @@
#!/usr/bin/env python3
from __future__ import annotations
import io
import os
from typing import override
from abc import ABC, abstractmethod
from dataclasses import dataclass
import numpy as np
import igraph as ig
from lxml import etree
import matplotlib.pyplot as plt
from dataclasses import dataclass
import matplotlib.colors as mcolors
from graphml import INode, IEdge, Config, GraphML
@dataclass(frozen=True)
class Node:
class Node(INode):
id: str
code: str
step: int
code: str
@dataclass(frozen=True)
class Edge:
src: str # node id
dst: str # node id
class Builder(ABC):
def __init__(self, _: list[Node]):
pass
@abstractmethod
def nsmap(self) -> dict[str | None, str]:
pass
@abstractmethod
def build_keys(self) -> list[etree.Element]:
pass
@abstractmethod
def build_node(self, node: Node) -> etree.Element:
pass
@abstractmethod
def build_edge(self, edge: Edge) -> etree.Element:
pass
@abstractmethod
def post_modify(self, content: str) -> str:
pass
class KlskBuilder(Builder):
def __init__(self, _: list[Node]):
pass
@override
def nsmap(self) -> dict[str | None, str]:
return {
None: 'http://graphml.graphdrawing.org/xmlns',
'xsi': 'http://www.w3.org/2001/XMLSchema-instance',
}
@staticmethod
@override
def build_keys(self) -> list[etree.Element]:
k_node = etree.Element('key', attrib={
def add_keys(graphml: etree.Element, cfg: Config) -> None:
etree.SubElement(graphml, 'key', id='code', attrib={
'id': 'code',
'for': 'node',
'attr.name': 'code',
'attr.type': 'string'
})
k_step = etree.Element('key', attrib={
'id': 'step',
etree.SubElement(graphml, 'key', id='step', attrib={
'for': 'node',
'attr.name': 'step',
'attr.type': 'int'
})
return [k_node, k_step]
@override
def build_node(self, node: Node) -> etree.Element:
node_xml = etree.Element('node', id=node.id)
etree.SubElement(node_xml, 'data', key='code').text = node.code
etree.SubElement(node_xml, 'data', key='step').text = str(node.step)
return node_xml
@override
def build_edge(self, edge: Edge) -> etree.Element:
return etree.Element('edge', source=edge.src, target=edge.dst)
if cfg.is_yed:
INode._add_yed_key(graphml)
@override
def post_modify(self, content: str) -> str:
return content
def render(self, cfg: Config) -> etree.Element:
node_xml = etree.Element('node', id=self.id)
etree.SubElement(node_xml, 'data', key='code').text = self.code
etree.SubElement(node_xml, 'data', key='step').text = str(self.step)
if cfg.is_yed:
color = cfg.colors[self.step]
label = f'{self.code}
({self.step})'
node_xml.append(self._yed_render(cfg, color, label))
class yEdBuilder(KlskBuilder):
def __build_colors(self, num: int) -> list[str]:
cmap = mcolors.LinearSegmentedColormap.from_list('custom_bwr', ['#0000ff', '#e8daef', '#ff0000'])
return [mcolors.to_hex(cmap(x)) for x in np.linspace(0, 1, num)]
return node_xml
def __init__(self, nodes: list[Node]):
max_step = max(x.step for x in nodes)
self.__colors = self.__build_colors(max_step + 1)
self.__yed_ns = 'http://www.yworks.com/xml/graphml'
@override
def nsmap(self) -> dict[str | None, str]:
return {
**super().nsmap(),
'y': self.__yed_ns
}
@dataclass(frozen=True)
class Edge(IEdge):
src: str
dst: str
@staticmethod
@override
def build_keys(self) -> list[etree.Element]:
k_info = etree.Element('key', attrib={
'id': 'info',
'for': 'node',
'yfiles.type': 'nodegraphics'
})
return super().build_keys() + [k_info]
def add_keys(graphml: etree.Element, cfg: Config) -> None:
pass
@override
def build_node(self, node: Node) -> etree.Element:
shape = etree.Element(f'{{{self.__yed_ns}}}ShapeNode')
etree.SubElement(shape, f'{{{self.__yed_ns}}}Fill', attrib={'color': self.__colors[node.step]})
etree.SubElement(shape, f'{{{self.__yed_ns}}}Shape', attrib={'type': 'ellipse'})
etree.SubElement(shape, f'{{{self.__yed_ns}}}Geometry', attrib={'height': '50', 'width': '50'})
label = etree.SubElement(shape, f'{{{self.__yed_ns}}}NodeLabel', attrib={'fontSize': '10', 'modelName': 'internal'})
label.text = f'{node.code}
({node.step})'
node_xml = super().build_node(node)
node_info = etree.SubElement(node_xml, 'data', attrib={'key': 'info'})
node_info.append(shape)
return node_xml
def render(self, cfg: Config) -> etree.Element:
return etree.Element('edge', source=self.src, target=self.dst)
@override
def post_modify(self, content: str) -> str:
return content.replace('
', '
')
def load_graph(tag: str, graph: ig.Graph) -> tuple[GraphML, int]:
nodes = []
id_len = len(str(graph.vcount() - 1))
for index in range(graph.vcount()):
info = graph.vs[index]
nodes.append(Node(f'n{index:0{id_len}d}', info['step'], info['code']))
class GraphML:
@dataclass(frozen=True)
class Graph:
name: str
nodes: list[Node]
edges: list[Edge]
edges = []
for n1, n2 in graph.get_edgelist():
node_1 = nodes[n1]
node_2 = nodes[n2]
if node_1.step < node_2.step:
node_1, node_2 = node_2, node_1
edges.append(Edge(node_1.id, node_2.id))
@staticmethod
def __load_graph(name: str, graph: ig.Graph) -> GraphML.Graph:
nodes = []
id_len = len(str(graph.vcount() - 1))
for index in range(graph.vcount()):
info = graph.vs[index]
nodes.append(Node(f'n{index:0{id_len}d}', info['code'], info['step']))
edges = []
for n1, n2 in graph.get_edgelist():
node_1 = nodes[n1]
node_2 = nodes[n2]
if node_1.step < node_2.step:
node_1, node_2 = node_2, node_1
edges.append(Edge(node_1.id, node_2.id))
return GraphML.Graph(name, nodes, edges)
def __init__(self, tag: str, graph: ig.Graph):
self.__graph = self.__load_graph(tag, graph)
def __dump_graph(self, graph: Graph, builder: Builder) -> etree.Element:
graph_xml = etree.Element('graph', id=graph.name, edgedefault='undirected')
for node in graph.nodes:
graph_xml.append(builder.build_node(node))
for edge in graph.edges:
graph_xml.append(builder.build_edge(edge))
return graph_xml
def save_graph(self, output: str, builder_t: type[Builder]) -> None:
builder = builder_t(self.__graph.nodes)
graphml = etree.Element('graphml', nsmap=builder.nsmap())
graphml.set(
'{http://www.w3.org/2001/XMLSchema-instance}schemaLocation',
'http://graphml.graphdrawing.org/xmlns http://graphml.graphdrawing.org/xmlns/1.0/graphml.xsd'
)
[graphml.append(x) for x in builder.build_keys()]
graphml.append(self.__dump_graph(self.__graph, builder))
fake_output = io.BytesIO()
xml_tree = etree.ElementTree(graphml)
xml_tree.write(fake_output, pretty_print=True, xml_declaration=True, encoding='utf-8')
content = fake_output.getvalue().decode('utf-8')
with open(output, 'w') as fp:
fp.write(builder.post_modify(content))
return GraphML(tag, nodes, edges), max(x.step for x in nodes)
def to_graphml(tag: str, input: str, output: str, is_yed: bool) -> None:
print(f'Convert graph {input} into {output}')
gml = GraphML(tag, ig.Graph.Read_Pickle(input))
gml.save_graph(output, yEdBuilder if is_yed else KlskBuilder)
gml, max_step = load_graph(tag, ig.Graph.Read_Pickle(input))
colors = GraphML.build_colors(max_step + 1, ['#0000ff', '#e8daef', '#ff0000'])
cfg = Config(is_yed=is_yed, colors=colors)
gml.save_graphml(output, cfg)
if __name__ == '__main__':

95
misc/all-graph/05-dump_combined.py

@ -0,0 +1,95 @@
#!/usr/bin/env python3
import os
from typing import override
from dataclasses import dataclass
import igraph as ig
from lxml import etree
from graphml import INode, IEdge, Config, GraphML
@dataclass(frozen=True)
class Node(INode):
tag: str
step: int
codes: list[str]
@staticmethod
@override
def add_keys(graphml: etree.Element, cfg: Config) -> None:
etree.SubElement(graphml, 'key', attrib={
'id': 'step',
'for': 'node',
'attr.name': 'step',
'attr.type': 'int'
})
etree.SubElement(graphml, 'key', attrib={
'id': 'codes',
'for': 'node',
'attr.name': 'codes',
'attr.type': 'string'
})
if cfg.is_yed:
INode._add_yed_key(graphml)
@override
def render(self, cfg: Config) -> etree.Element:
node_xml = etree.Element('node', id=self.tag)
etree.SubElement(node_xml, 'data', key='step').text = str(self.step)
etree.SubElement(node_xml, 'data', key='codes').text = '+'.join(self.codes)
if cfg.is_yed:
color = cfg.colors[self.step]
label = f'{self.tag}&#10;({self.step})'
node_xml.append(self._yed_render(cfg, color, label))
return node_xml
@dataclass(frozen=True)
class Edge(IEdge):
src: str
dst: str
@staticmethod
@override
def add_keys(graphml: etree.Element, cfg: Config) -> None:
pass
@override
def render(self, is_yed: bool) -> etree.Element:
return etree.Element('edge', source=self.src, target=self.dst)
def to_graphml(tag: str, file: str, output: str, is_yed: bool) -> None:
print(f'Convert graph {file} into {output}')
g = ig.Graph.Read_Pickle(file)
nodes = []
for v in g.vs:
node = Node(v['tag'], v['step'], v['codes'])
nodes.append(node)
edges = []
for n1, n2 in g.get_edgelist():
node_1 = nodes[n1]
node_2 = nodes[n2]
if node_1.step < node_2.step:
node_1, node_2 = node_2, node_1
edges.append(Edge(node_1.tag, node_2.tag))
gml = GraphML(tag, nodes, edges)
colors = GraphML.build_colors(max(x.step for x in nodes) + 1, ['#0000ff', '#e8daef', '#ff0000'])
cfg = Config(is_yed=is_yed, colors=colors)
gml.save_graphml(output, cfg)
if __name__ == '__main__':
os.makedirs('output-combine-gml', exist_ok=True)
for name in sorted(os.listdir('output-combine')):
name = name.removesuffix('.pkl')
tag = name.split('_')[0] if '_' in name else name
to_graphml(tag, f'output-combine/{name}.pkl', f'output-combine-gml/{name}.graphml', False)
to_graphml(tag, f'output-combine/{name}.pkl', f'output-combine-gml/{name}-yed.graphml', True)

44
misc/all-graph/compare.py

@ -1,44 +0,0 @@
#!/usr/bin/env python3
import os
import igraph as ig
def load_legacy(file: str) -> ig.Graph:
g = ig.Graph.Read_Pickle(file)
for node in g.vs:
node['codes'] = sorted(node['codes'])
return g
def load_modern(file: str) -> ig.Graph:
g = ig.Graph.Read_Pickle(file)
for idx, node in enumerate(g.vs):
assert sorted(node['codes']) == node['codes']
assert int(node['tag'].removeprefix('U')) == idx
del g.vs['tag']
return g
def compare(g1: ig.Graph, g2: ig.Graph) -> None:
assert g1.vcount() == g2.vcount()
assert g1.ecount() == g2.ecount()
assert g1.isomorphic(g2)
assert {len(x.attributes()) for x in g1.es} == {0}
assert {len(x.attributes()) for x in g2.es} == {0}
data_a = {min(x['codes']): x.attributes() for x in g1.vs}
data_b = {min(x['codes']): x.attributes() for x in g2.vs}
assert data_a == data_b
if __name__ == '__main__':
for name in sorted(os.listdir('output-combine')):
if '_' not in name:
continue
g1 = load_legacy(f'combined/{name.split('_')[1]}')
g2 = load_modern(f'output-combine/{name}')
compare(g1, g2)

116
misc/all-graph/graphml.py

@ -0,0 +1,116 @@
#!/usr/bin/env python3
import io
import numpy as np
from lxml import etree
from dataclasses import dataclass
from abc import ABC, abstractmethod
import matplotlib.colors as mcolors
@dataclass
class Config:
is_yed: bool
colors: list[str]
pretty_xml: bool = True
yed_xmlns: str = 'http://www.yworks.com/xml/graphml'
yed_node_type: str = 'ellipse'
yed_node_width: int = 50
yed_node_height: int = 50
yed_node_font_size: int = 10
class INode(ABC):
@staticmethod
def _add_yed_key(graphml: etree.Element) -> None:
etree.SubElement(graphml, 'key', attrib={
'id': 'info',
'for': 'node',
'yfiles.type': 'nodegraphics'
})
@staticmethod
def _yed_render(cfg: Config, color: str, text: str) -> etree.Element:
yed_ns = f'{{{cfg.yed_xmlns}}}'
info = etree.Element('data', attrib={'key': 'info'})
shape = etree.SubElement(info, f'{yed_ns}ShapeNode')
etree.SubElement(shape, f'{yed_ns}Fill', attrib={
'color': color
})
etree.SubElement(shape, f'{yed_ns}Shape', attrib={
'type': cfg.yed_node_type
})
etree.SubElement(shape, f'{yed_ns}Geometry', attrib={
'height': str(cfg.yed_node_width),
'width': str(cfg.yed_node_height),
})
label = etree.SubElement(shape, f'{yed_ns}NodeLabel', attrib={
'fontSize': str(cfg.yed_node_font_size),
'modelName': 'internal'
})
label.text = text
return info
@staticmethod
@abstractmethod
def add_keys(graphml: etree.Element, cfg: Config) -> None:
pass
@abstractmethod
def render(self, cfg: Config) -> etree.Element:
pass
class IEdge(ABC):
@staticmethod
@abstractmethod
def add_keys(graphml: etree.Element, cfg: Config) -> None:
pass
@abstractmethod
def render(self, cfg: Config) -> etree.Element:
pass
class GraphML:
def __init__(self, tag: str, nodes: list[INode], edges: list[IEdge]):
self.__tag = tag
self.__nodes = nodes
self.__edges = edges
assert len(nodes) > 0 and len(edges) > 0
@staticmethod
def __nsmap(cfg: Config) -> dict[str | None, str]:
return {
None: 'http://graphml.graphdrawing.org/xmlns',
'xsi': 'http://www.w3.org/2001/XMLSchema-instance',
**({'y': cfg.yed_xmlns} if cfg.is_yed else {})
}
@staticmethod
def build_colors(num: int, bwr: list[str]) -> list[str]:
cmap = mcolors.LinearSegmentedColormap.from_list('custom_bwr', bwr)
return [mcolors.to_hex(cmap(x)) for x in np.linspace(0, 1, num)]
def __build_graphml(self, cfg: Config) -> etree.Element:
graphml = etree.Element('graphml', nsmap=self.__nsmap(cfg))
graphml.set(
'{http://www.w3.org/2001/XMLSchema-instance}schemaLocation',
'http://graphml.graphdrawing.org/xmlns http://graphml.graphdrawing.org/xmlns/1.0/graphml.xsd'
)
self.__nodes[0].add_keys(graphml, cfg)
self.__edges[0].add_keys(graphml, cfg)
graph = etree.SubElement(graphml, 'graph', id=self.__tag, edgedefault='undirected')
[graph.append(x.render(cfg)) for x in self.__nodes]
[graph.append(x.render(cfg)) for x in self.__edges]
return graphml
def save_graphml(self, file: str, cfg: Config) -> None:
xml_tree = etree.ElementTree(self.__build_graphml(cfg))
fake_output = io.BytesIO()
xml_tree.write(fake_output, pretty_print=cfg.pretty_xml, xml_declaration=True, encoding='utf-8')
content = fake_output.getvalue().decode('utf-8')
with open(file, 'w') as fp:
fp.write(content.replace('&amp;#10;', '&#10;'))
Loading…
Cancel
Save