mirror of https://github.com/dnomd343/klotski.git
Compare commits
7 Commits
431c8949fe
...
878cb9a191
Author | SHA1 | Date |
---|---|---|
|
878cb9a191 | 1 week ago |
|
1d192b25be | 1 week ago |
|
a65696d76c | 2 weeks ago |
|
77c3922897 | 2 weeks ago |
|
29f66f18fe | 2 weeks ago |
|
8e5022c134 | 2 weeks ago |
|
5fcc3a4fe1 | 2 weeks ago |
8 changed files with 484 additions and 101 deletions
@ -0,0 +1,111 @@ |
|||
#!/usr/bin/env python3 |
|||
|
|||
import os |
|||
import igraph as ig |
|||
import multiprocessing |
|||
|
|||
type Union = set[int] |
|||
|
|||
|
|||
def split_adjacent_layers(graph: ig.Graph, step: int) -> tuple[list[Union], list[Union]]: |
|||
layouts = graph.vs.select(step_in=[step, step + 1]) |
|||
mapping = {x['code']: x.index for x in layouts} |
|||
spawn_union = lambda iter: {mapping[x['code']] for x in iter} |
|||
|
|||
layer_curr, layer_next = [], [] |
|||
g_focus = graph.subgraph(layouts) |
|||
if isolated := g_focus.vs.select(_degree=0): |
|||
assert set(isolated['step']) == {step} |
|||
layer_curr = [spawn_union(isolated)] |
|||
g_focus.delete_vertices(isolated) |
|||
|
|||
for comp in map(g_focus.vs.select, g_focus.connected_components()): |
|||
layer_curr.append(spawn_union(comp.select(step=step))) |
|||
layer_next.append(spawn_union(comp.select(step=step+1))) |
|||
return layer_curr, layer_next |
|||
|
|||
|
|||
def apply_layer_unions(unions_a: list[Union], unions_b: list[Union]) -> list[Union]: |
|||
layouts = {x for u in unions_a for x in u} |
|||
assert layouts == {x for u in unions_b for x in u} |
|||
|
|||
unions = [] |
|||
for curr_union in unions_a: |
|||
for other_union in unions_b: |
|||
if union := curr_union.intersection(other_union): |
|||
unions.append(union) |
|||
curr_union -= union |
|||
other_union -= union |
|||
assert len(curr_union) == 0 |
|||
|
|||
assert set(len(x) for x in unions_a) == {0} |
|||
assert set(len(x) for x in unions_b) == {0} |
|||
assert layouts == {x for u in unions for x in u} |
|||
return unions |
|||
|
|||
|
|||
def build_all_unions(graph: ig.Graph) -> list[Union]: |
|||
max_step = max(graph.vs['step']) |
|||
layer_unions = [[set(graph.vs.select(step=0).indices)]] |
|||
for step in range(0, max_step): |
|||
layer_unions.extend(list(split_adjacent_layers(graph, step))) |
|||
layer_unions.append([set(graph.vs.select(step=max_step).indices)]) |
|||
assert len(layer_unions) == (max_step + 1) * 2 |
|||
|
|||
all_unions = [] |
|||
for idx in range(0, len(layer_unions), 2): |
|||
all_unions.extend(apply_layer_unions(*layer_unions[idx:idx + 2])) |
|||
for unions in all_unions: |
|||
assert len(unions) > 0 |
|||
assert len(set(graph.vs[x]['step'] for x in unions)) == 1 |
|||
return sorted(all_unions, key=lambda u: min(graph.vs[x]['code'] for x in u)) |
|||
|
|||
|
|||
def combine_graph(graph: ig.Graph) -> ig.Graph: |
|||
unions = build_all_unions(graph) |
|||
union_idx = sorted((x, idx) for idx, u in enumerate(unions) for x in u) |
|||
|
|||
combine_idx = [x for _, x in union_idx] |
|||
assert len(combine_idx) == graph.vcount() |
|||
assert set(combine_idx) == set(range(len(unions))) |
|||
|
|||
tag_len = len(str(len(unions) - 1)) |
|||
graph.vs['tag'] = [f'U{x:0{tag_len}}' for x in combine_idx] |
|||
|
|||
graph.contract_vertices(combine_idx, combine_attrs={'tag': 'first', 'step': 'first', 'code': list}) |
|||
assert [int(x.removeprefix('U')) for x in graph.vs['tag']] == list(range(len(unions))) |
|||
assert not any(x.is_loop() for x in graph.es) |
|||
graph.simplify(multiple=True) |
|||
return graph |
|||
|
|||
|
|||
def do_combine(input: str, output: str) -> None: |
|||
print(f'Start combining: {input}') |
|||
|
|||
g_raw = (graph := combine_graph(ig.Graph.Read_Pickle(input))).copy() |
|||
graph.vs['codes'] = graph.vs['code'] |
|||
del graph.vs['code'] |
|||
graph.write_pickle(output) # save combined graph |
|||
|
|||
g_raw.vs['code'] = g_raw.vs['tag'] # modify as origin format |
|||
g_mod = combine_graph(g_raw.copy()) |
|||
|
|||
assert g_raw.vcount() == g_mod.vcount() |
|||
assert g_raw.ecount() == g_mod.ecount() |
|||
assert all(x['code'] == [x['tag']] for x in g_mod.vs) |
|||
assert g_raw.vs['step'] == g_mod.vs['step'] |
|||
assert g_raw.vs['code'] == g_mod.vs['tag'] |
|||
assert g_raw.isomorphic(g_mod) |
|||
|
|||
|
|||
def combine_all(ig_dir: str, output_dir: str) -> None: |
|||
pool = multiprocessing.Pool() |
|||
for name in sorted(os.listdir(ig_dir)): |
|||
pool.apply_async(do_combine, args=(f'{ig_dir}/{name}', f'{output_dir}/{name}')) |
|||
pool.close() |
|||
pool.join() |
|||
|
|||
|
|||
if __name__ == '__main__': |
|||
os.makedirs('output-combine', exist_ok=True) |
|||
combine_all('output-ig', 'output-combine') |
@ -0,0 +1,95 @@ |
|||
#!/usr/bin/env python3 |
|||
|
|||
import os |
|||
from typing import override |
|||
from dataclasses import dataclass |
|||
|
|||
import igraph as ig |
|||
from lxml import etree |
|||
from graphml import INode, IEdge, Config, GraphML |
|||
|
|||
|
|||
@dataclass(frozen=True) |
|||
class Node(INode): |
|||
tag: str |
|||
step: int |
|||
codes: list[str] |
|||
|
|||
@staticmethod |
|||
@override |
|||
def add_keys(graphml: etree.Element, cfg: Config) -> None: |
|||
etree.SubElement(graphml, 'key', attrib={ |
|||
'id': 'step', |
|||
'for': 'node', |
|||
'attr.name': 'step', |
|||
'attr.type': 'int' |
|||
}) |
|||
etree.SubElement(graphml, 'key', attrib={ |
|||
'id': 'codes', |
|||
'for': 'node', |
|||
'attr.name': 'codes', |
|||
'attr.type': 'string' |
|||
}) |
|||
if cfg.is_yed: |
|||
INode._add_yed_key(graphml) |
|||
|
|||
@override |
|||
def render(self, cfg: Config) -> etree.Element: |
|||
node_xml = etree.Element('node', id=self.tag) |
|||
etree.SubElement(node_xml, 'data', key='step').text = str(self.step) |
|||
etree.SubElement(node_xml, 'data', key='codes').text = '+'.join(self.codes) |
|||
|
|||
if cfg.is_yed: |
|||
color = cfg.colors[self.step] |
|||
label = f'{self.tag} ({self.step})' |
|||
node_xml.append(self._yed_render(cfg, color, label)) |
|||
|
|||
return node_xml |
|||
|
|||
|
|||
@dataclass(frozen=True) |
|||
class Edge(IEdge): |
|||
src: str |
|||
dst: str |
|||
|
|||
@staticmethod |
|||
@override |
|||
def add_keys(graphml: etree.Element, cfg: Config) -> None: |
|||
pass |
|||
|
|||
@override |
|||
def render(self, is_yed: bool) -> etree.Element: |
|||
return etree.Element('edge', source=self.src, target=self.dst) |
|||
|
|||
|
|||
def to_graphml(tag: str, file: str, output: str, is_yed: bool) -> None: |
|||
print(f'Convert graph {file} into {output}') |
|||
|
|||
g = ig.Graph.Read_Pickle(file) |
|||
|
|||
nodes = [] |
|||
for v in g.vs: |
|||
node = Node(v['tag'], v['step'], v['codes']) |
|||
nodes.append(node) |
|||
|
|||
edges = [] |
|||
for n1, n2 in g.get_edgelist(): |
|||
node_1 = nodes[n1] |
|||
node_2 = nodes[n2] |
|||
if node_1.step < node_2.step: |
|||
node_1, node_2 = node_2, node_1 |
|||
edges.append(Edge(node_1.tag, node_2.tag)) |
|||
|
|||
gml = GraphML(tag, nodes, edges) |
|||
colors = GraphML.build_colors(max(x.step for x in nodes) + 1, ['#0000ff', '#e8daef', '#ff0000']) |
|||
cfg = Config(is_yed=is_yed, colors=colors) |
|||
gml.save_graphml(output, cfg) |
|||
|
|||
|
|||
if __name__ == '__main__': |
|||
os.makedirs('output-combine-gml', exist_ok=True) |
|||
for name in sorted(os.listdir('output-combine')): |
|||
name = name.removesuffix('.pkl') |
|||
tag = name.split('_')[0] if '_' in name else name |
|||
to_graphml(tag, f'output-combine/{name}.pkl', f'output-combine-gml/{name}.graphml', False) |
|||
to_graphml(tag, f'output-combine/{name}.pkl', f'output-combine-gml/{name}-yed.graphml', True) |
@ -0,0 +1,62 @@ |
|||
#!/usr/bin/env python3 |
|||
|
|||
import numpy as np |
|||
import igraph as ig |
|||
import plotly.graph_objs as go |
|||
import matplotlib.colors as mcolors |
|||
|
|||
|
|||
def build_colors(max_step: int) -> list[str]: |
|||
bwr = ['#0000ff', '#e8daef', '#ff0000'] |
|||
cmap = mcolors.LinearSegmentedColormap.from_list('custom_bwr', bwr) |
|||
return [mcolors.to_hex(cmap(x)) for x in np.linspace(0, 1, max_step + 1)] |
|||
|
|||
|
|||
def build_3d_graph(graph: ig.Graph, colors: list[str], output: str) -> None: |
|||
layout = graph.layout('kk', dim=3) |
|||
|
|||
Xn = [x[0] for x in layout] |
|||
Yn = [x[1] for x in layout] |
|||
Zn = [x[2] for x in layout] |
|||
|
|||
Xe, Ye, Ze = [], [], [] |
|||
for x, y in graph.get_edgelist(): |
|||
Xe += [layout[x][0], layout[y][0], None] |
|||
Ye += [layout[x][1], layout[y][1], None] |
|||
Ze += [layout[x][2], layout[y][2], None] |
|||
|
|||
edge_trace = go.Scatter3d( |
|||
x=Xe, y=Ye, z=Ze, |
|||
mode='lines', |
|||
line=dict(color='gray', width=2), |
|||
hoverinfo='none' |
|||
) |
|||
|
|||
node_trace = go.Scatter3d( |
|||
x=Xn, y=Yn, z=Zn, |
|||
mode='markers', |
|||
marker=dict( |
|||
symbol='circle', |
|||
size=6, |
|||
color=[colors[x['step']] for x in graph.vs] |
|||
), |
|||
hoverinfo='text', |
|||
text=[f'{x['tag']} ({x['step']}) [{'/'.join(x['codes'])}]' for x in graph.vs] |
|||
) |
|||
|
|||
fig = go.Figure(data=[edge_trace, node_trace]) |
|||
fig.update_layout( |
|||
showlegend=False, |
|||
margin=dict(l=0, r=0, b=0, t=0), |
|||
scene=dict( |
|||
xaxis=dict(showbackground=False), |
|||
yaxis=dict(showbackground=False), |
|||
zaxis=dict(showbackground=False), |
|||
) |
|||
) |
|||
fig.write_html(output) |
|||
|
|||
|
|||
if __name__ == '__main__': |
|||
g = ig.Graph.Read_Pickle('1-00M-000X_DAAF4CC-core.pkl') |
|||
build_3d_graph(g, build_colors(93), '1-00M-000X_3d-core.html') |
@ -0,0 +1,116 @@ |
|||
#!/usr/bin/env python3 |
|||
|
|||
import io |
|||
import numpy as np |
|||
from lxml import etree |
|||
from dataclasses import dataclass |
|||
from abc import ABC, abstractmethod |
|||
import matplotlib.colors as mcolors |
|||
|
|||
|
|||
@dataclass |
|||
class Config: |
|||
is_yed: bool |
|||
colors: list[str] |
|||
pretty_xml: bool = True |
|||
|
|||
yed_xmlns: str = 'http://www.yworks.com/xml/graphml' |
|||
yed_node_type: str = 'ellipse' |
|||
yed_node_width: int = 50 |
|||
yed_node_height: int = 50 |
|||
yed_node_font_size: int = 10 |
|||
|
|||
|
|||
class INode(ABC): |
|||
@staticmethod |
|||
def _add_yed_key(graphml: etree.Element) -> None: |
|||
etree.SubElement(graphml, 'key', attrib={ |
|||
'id': 'info', |
|||
'for': 'node', |
|||
'yfiles.type': 'nodegraphics' |
|||
}) |
|||
|
|||
@staticmethod |
|||
def _yed_render(cfg: Config, color: str, text: str) -> etree.Element: |
|||
yed_ns = f'{{{cfg.yed_xmlns}}}' |
|||
info = etree.Element('data', attrib={'key': 'info'}) |
|||
shape = etree.SubElement(info, f'{yed_ns}ShapeNode') |
|||
etree.SubElement(shape, f'{yed_ns}Fill', attrib={ |
|||
'color': color |
|||
}) |
|||
etree.SubElement(shape, f'{yed_ns}Shape', attrib={ |
|||
'type': cfg.yed_node_type |
|||
}) |
|||
etree.SubElement(shape, f'{yed_ns}Geometry', attrib={ |
|||
'height': str(cfg.yed_node_width), |
|||
'width': str(cfg.yed_node_height), |
|||
}) |
|||
label = etree.SubElement(shape, f'{yed_ns}NodeLabel', attrib={ |
|||
'fontSize': str(cfg.yed_node_font_size), |
|||
'modelName': 'internal' |
|||
}) |
|||
label.text = text |
|||
return info |
|||
|
|||
@staticmethod |
|||
@abstractmethod |
|||
def add_keys(graphml: etree.Element, cfg: Config) -> None: |
|||
pass |
|||
|
|||
@abstractmethod |
|||
def render(self, cfg: Config) -> etree.Element: |
|||
pass |
|||
|
|||
|
|||
class IEdge(ABC): |
|||
@staticmethod |
|||
@abstractmethod |
|||
def add_keys(graphml: etree.Element, cfg: Config) -> None: |
|||
pass |
|||
|
|||
@abstractmethod |
|||
def render(self, cfg: Config) -> etree.Element: |
|||
pass |
|||
|
|||
|
|||
class GraphML: |
|||
def __init__(self, tag: str, nodes: list[INode], edges: list[IEdge]): |
|||
self.__tag = tag |
|||
self.__nodes = nodes |
|||
self.__edges = edges |
|||
assert len(nodes) > 0 and len(edges) > 0 |
|||
|
|||
@staticmethod |
|||
def __nsmap(cfg: Config) -> dict[str | None, str]: |
|||
return { |
|||
None: 'http://graphml.graphdrawing.org/xmlns', |
|||
'xsi': 'http://www.w3.org/2001/XMLSchema-instance', |
|||
**({'y': cfg.yed_xmlns} if cfg.is_yed else {}) |
|||
} |
|||
|
|||
@staticmethod |
|||
def build_colors(num: int, bwr: list[str]) -> list[str]: |
|||
cmap = mcolors.LinearSegmentedColormap.from_list('custom_bwr', bwr) |
|||
return [mcolors.to_hex(cmap(x)) for x in np.linspace(0, 1, num)] |
|||
|
|||
def __build_graphml(self, cfg: Config) -> etree.Element: |
|||
graphml = etree.Element('graphml', nsmap=self.__nsmap(cfg)) |
|||
graphml.set( |
|||
'{http://www.w3.org/2001/XMLSchema-instance}schemaLocation', |
|||
'http://graphml.graphdrawing.org/xmlns http://graphml.graphdrawing.org/xmlns/1.0/graphml.xsd' |
|||
) |
|||
self.__nodes[0].add_keys(graphml, cfg) |
|||
self.__edges[0].add_keys(graphml, cfg) |
|||
graph = etree.SubElement(graphml, 'graph', id=self.__tag, edgedefault='undirected') |
|||
[graph.append(x.render(cfg)) for x in self.__nodes] |
|||
[graph.append(x.render(cfg)) for x in self.__edges] |
|||
return graphml |
|||
|
|||
def save_graphml(self, file: str, cfg: Config) -> None: |
|||
xml_tree = etree.ElementTree(self.__build_graphml(cfg)) |
|||
|
|||
fake_output = io.BytesIO() |
|||
xml_tree.write(fake_output, pretty_print=cfg.pretty_xml, xml_declaration=True, encoding='utf-8') |
|||
content = fake_output.getvalue().decode('utf-8') |
|||
with open(file, 'w') as fp: |
|||
fp.write(content.replace('&#10;', ' ')) |
Loading…
Reference in new issue