import warnings
from collections.abc import Collection
from collections.abc import Generator
from collections.abc import Iterator
from copy import deepcopy
from typing import TYPE_CHECKING
from typing import Any
from typing import Iterable
from typing import List
from typing import Optional
from typing import Union
import easygraph as eg
from easygraph.utils.exception import EasyGraphError
if TYPE_CHECKING:
import dgl
import networkx as nx
import torch_geometric
from easygraph import DiGraph
from easygraph import Graph
__all__ = [
"from_dict_of_dicts",
"to_easygraph_graph",
"from_edgelist",
"from_dict_of_lists",
"from_networkx",
"from_dgl",
"from_pyg",
"to_networkx",
"to_dgl",
"to_pyg",
"dict_to_hypergraph",
]
[docs]
def to_easygraph_graph(data, create_using=None, multigraph_input=False):
"""Make a EasyGraph graph from a known data structure.
The preferred way to call this is automatically
from the class constructor
>>> d = {0: {1: {"weight": 1}}} # dict-of-dicts single edge (0,1)
>>> G = eg.Graph(d)
instead of the equivalent
>>> G = eg.from_dict_of_dicts(d)
Parameters
----------
data : object to be converted
Current known types are:
any EasyGraph graph
dict-of-dicts
dict-of-lists
container (e.g. set, list, tuple) of edges
iterator (e.g. itertools.chain) that produces edges
generator of edges
Pandas DataFrame (row per edge)
numpy matrix
numpy ndarray
scipy sparse matrix
pygraphviz agraph
create_using : EasyGraph graph constructor, optional (default=eg.Graph)
Graph type to create. If graph instance, then cleared before populated.
multigraph_input : bool (default False)
If True and data is a dict_of_dicts,
try to create a multigraph assuming dict_of_dict_of_lists.
If data and create_using are both multigraphs then create
a multigraph from a multigraph.
"""
# EasyGraph graph type
if hasattr(data, "adj"):
try:
result = from_dict_of_dicts(
data.adj,
create_using=create_using,
multigraph_input=data.is_multigraph(),
)
# data.graph should be dict-like
result.graph.update(data.graph)
# data.nodes should be dict-like
# result.add_node_from(data.nodes.items()) possible but
# for custom node_attr_dict_factory which may be hashable
# will be unexpected behavior
for n, dd in data.nodes.items():
result._node[n].update(dd)
return result
except Exception as err:
raise eg.EasyGraphError("Input is not a correct EasyGraph graph.") from err
# pygraphviz agraph
if hasattr(data, "is_strict"):
try:
return eg.from_pyGraphviz_agraph(data, create_using=create_using)
except Exception as err:
raise eg.EasyGraphError("Input is not a correct pygraphviz graph.") from err
# dict of dicts/lists
if isinstance(data, dict):
try:
return from_dict_of_dicts(
data, create_using=create_using, multigraph_input=multigraph_input
)
except Exception as err:
if multigraph_input is True:
raise eg.EasyGraphError(
f"converting multigraph_input raised:\n{type(err)}: {err}"
)
try:
return from_dict_of_lists(data, create_using=create_using)
except Exception as err:
raise TypeError("Input is not known type.") from err
# Pandas DataFrame
try:
import pandas as pd
if isinstance(data, pd.DataFrame):
if data.shape[0] == data.shape[1]:
try:
return eg.from_pandas_adjacency(data, create_using=create_using)
except Exception as err:
msg = "Input is not a correct Pandas DataFrame adjacency matrix."
raise eg.EasyGraphError(msg) from err
else:
try:
return eg.from_pandas_edgelist(
data, edge_attr=True, create_using=create_using
)
except Exception as err:
msg = "Input is not a correct Pandas DataFrame adjacency edge-list."
raise eg.EasyGraphError(msg) from err
except ImportError:
warnings.warn("pandas not found, skipping conversion test.", ImportWarning)
# numpy matrix or ndarray
try:
import numpy as np
if isinstance(data, np.ndarray):
try:
return eg.from_numpy_array(data, create_using=create_using)
except Exception as err:
raise eg.EasyGraphError(
"Input is not a correct numpy matrix or array."
) from err
except ImportError:
warnings.warn("numpy not found, skipping conversion test.", ImportWarning)
# scipy sparse matrix - any format
try:
if hasattr(data, "format"):
try:
return eg.from_scipy_sparse_matrix(data, create_using=create_using)
except Exception as err:
raise eg.EasyGraphError(
"Input is not a correct scipy sparse matrix type."
) from err
except ImportError:
warnings.warn("scipy not found, skipping conversion test.", ImportWarning)
# Note: most general check - should remain last in order of execution
# Includes containers (e.g. list, set, dict, etc.), generators, and
# iterators (e.g. itertools.chain) of edges
if isinstance(data, (Collection, Generator, Iterator)):
try:
return from_edgelist(data, create_using=create_using)
except Exception as err:
raise eg.EasyGraphError("Input is not a valid edge list") from err
raise eg.EasyGraphError("Input is not a known data type for conversion.")
[docs]
def from_dict_of_lists(d, create_using=None):
G = eg.empty_graph(0, create_using)
G.add_nodes_from(d)
if G.is_multigraph() and not G.is_directed():
# a dict_of_lists can't show multiedges. BUT for undirected graphs,
# each edge shows up twice in the dict_of_lists.
# So we need to treat this case separately.
seen = {}
for node, nbrlist in d.items():
for nbr in nbrlist:
if nbr not in seen:
G.add_edge(node, nbr)
seen[node] = 1 # don't allow reverse edge to show up
else:
G.add_edges_from(
((node, nbr) for node, nbrlist in d.items() for nbr in nbrlist)
)
return G
[docs]
def from_dict_of_dicts(d, create_using=None, multigraph_input=False):
G = eg.empty_graph(0, create_using)
G.add_nodes_from(d)
# does dict d represent a MultiGraph or MultiDiGraph?
if multigraph_input:
if G.is_directed():
if G.is_multigraph():
G.add_edges_from(
(u, v, key, data)
for u, nbrs in d.items()
for v, datadict in nbrs.items()
for key, data in datadict.items()
)
else:
G.add_edges_from(
(u, v, data)
for u, nbrs in d.items()
for v, datadict in nbrs.items()
for key, data in datadict.items()
)
else: # Undirected
if G.is_multigraph():
seen = set() # don't add both directions of undirected graph
for u, nbrs in d.items():
for v, datadict in nbrs.items():
if (u, v) not in seen:
G.add_edges_from(
(u, v, key, data) for key, data in datadict.items()
)
seen.add((v, u))
else:
seen = set() # don't add both directions of undirected graph
for u, nbrs in d.items():
for v, datadict in nbrs.items():
if (u, v) not in seen:
G.add_edges_from(
(u, v, data) for key, data in datadict.items()
)
seen.add((v, u))
else: # not a multigraph to multigraph transfer
if G.is_multigraph() and not G.is_directed():
# d can have both representations u-v, v-u in dict. Only add one.
# We don't need this check for digraphs since we add both directions,
# or for Graph() since it is done implicitly (parallel edges not allowed)
seen = set()
for u, nbrs in d.items():
for v, data in nbrs.items():
if (u, v) not in seen:
G.add_edge(u, v, key=0)
G[u][v][0].update(data)
seen.add((v, u))
else:
G.add_edges_from(
((u, v, data) for u, nbrs in d.items() for v, data in nbrs.items())
)
return G
[docs]
def from_edgelist(edgelist, create_using=None):
"""Returns a graph from a list of edges.
Parameters
----------
edgelist : list or iterator
Edge tuples
create_using : EasyGraph graph constructor, optional (default=eg.Graph)
Graph type to create. If graph instance, then cleared before populated.
Examples
--------
>>> edgelist = [(0, 1)] # single edge (0,1)
>>> G = eg.from_edgelist(edgelist)
or
>>> G = eg.Graph(edgelist) # use Graph constructor
"""
G = eg.empty_graph(0, create_using)
G.add_edges_from(edgelist)
return G
[docs]
def to_networkx(g: "Union[Graph, DiGraph]") -> "Union[nx.Graph, nx.DiGraph]":
"""Convert an EasyGraph to a NetworkX graph.
Args:
g (Union[Graph, DiGraph]): An EasyGraph graph
Raises:
ImportError is raised if NetworkX is not installed.
Returns:
Union[nx.Graph, nx.DiGraph]: Converted NetworkX graph
"""
# if load_func_name in di_load_functions_name:
try:
import networkx as nx
except ImportError:
raise ImportError("NetworkX not found. Please install it.")
if g.is_directed():
G = nx.DiGraph()
else:
G = nx.Graph()
# copy attributes
G.graph = deepcopy(g.graph)
nodes_with_edges = set()
for v1, v2, _ in g.edges:
G.add_edge(v1, v2)
nodes_with_edges.add(v1)
nodes_with_edges.add(v2)
for node in set(g.nodes) - nodes_with_edges:
G.add_node(node)
return G
[docs]
def from_networkx(g: "Union[nx.Graph, nx.DiGraph]") -> "Union[Graph, DiGraph]":
"""Convert a NetworkX graph to an EasyGraph graph.
Args:
g (Union[nx.Graph, nx.DiGraph]): A NetworkX graph
Returns:
Union[Graph, DiGraph]: Converted EasyGraph graph
"""
# try:
# import networkx as nx
# except ImportError:
# raise ImportError("NetworkX not found. Please install it.")
if g.is_directed():
G = eg.DiGraph()
else:
G = eg.Graph()
# copy attributes
G.graph = deepcopy(g.graph)
nodes_with_edges = set()
for v1, v2 in g.edges:
G.add_edge(v1, v2)
nodes_with_edges.add(v1)
nodes_with_edges.add(v2)
for node in set(g.nodes) - nodes_with_edges:
G.add_node(node)
return G
[docs]
def to_dgl(g: "Union[Graph, DiGraph]"):
"""Convert an EasyGraph graph to a DGL graph.
Args:
g (Union[Graph, DiGraph]): An EasyGraph graph
Raises:
ImportError: If DGL is not installed.
Returns:
DGLGraph: Converted DGL graph
"""
try:
import dgl
except ImportError:
raise ImportError("DGL not found. Please install it.")
g_nx = to_networkx(g)
g_dgl = dgl.from_networkx(g_nx)
return g_dgl
[docs]
def from_dgl(g) -> "Union[Graph, DiGraph]":
"""Convert a DGL graph to an EasyGraph graph.
Args:
g (DGLGraph): A DGL graph
Raises:
ImportError: If DGL is not installed.
Returns:
Union[Graph, DiGraph]: Converted EasyGraph graph
"""
try:
import dgl
except ImportError:
raise ImportError("DGL not found. Please install it.")
g_nx = dgl.to_networkx(g)
g_eg = from_networkx(g_nx)
return g_eg
[docs]
def to_pyg(
G: Any,
group_node_attrs: Optional[Union[List[str], all]] = None, # type: ignore
group_edge_attrs: Optional[Union[List[str], all]] = None, # type: ignore
) -> "torch_geometric.data.Data": # type: ignore
r"""Converts a :obj:`easygraph.Graph` or :obj:`easygraph.DiGraph` to a
:class:`torch_geometric.data.Data` instance.
Args:
G (easygraph.Graph or easygraph.DiGraph): A easygraph graph.
group_node_attrs (List[str] or all, optional): The node attributes to
be concatenated and added to :obj:`data.x`. (default: :obj:`None`)
group_edge_attrs (List[str] or all, optional): The edge attributes to
be concatenated and added to :obj:`data.edge_attr`.
(default: :obj:`None`)
.. note::
All :attr:`group_node_attrs` and :attr:`group_edge_attrs` values must
be numeric.
Examples:
>>> import torch_geometric as pyg
>>> pyg_to_networkx = pyg.utils.convert.to_networkx # type: ignore
>>> networkx_to_pyg = pyg.utils.convert.from_networkx # type: ignore
>>> Data = pyg.data.Data # type: ignore
>>> edge_index = torch.tensor([
... [0, 1, 1, 2, 2, 3],
... [1, 0, 2, 1, 3, 2],
... ])
>>> data = Data(edge_index=edge_index, num_nodes=4)
>>> g = pyg_to_networkx(data)
>>> # A `Data` object is returned
>>> to_pyg(g)
Data(edge_index=[2, 6], num_nodes=4)
"""
try:
import torch_geometric as pyg
pyg_to_networkx = pyg.utils.convert.to_networkx # type: ignore
networkx_to_pyg = pyg.utils.convert.from_networkx # type: ignore
except ImportError:
raise ImportError("pytorch_geometric not found. Please install it.")
g_nx = to_networkx(G)
g_pyg = networkx_to_pyg(g_nx, group_node_attrs, group_edge_attrs)
return g_pyg
[docs]
def from_pyg(
data: "torch_geometric.data.Data", # type: ignore
node_attrs: Optional[Iterable[str]] = None,
edge_attrs: Optional[Iterable[str]] = None,
graph_attrs: Optional[Iterable[str]] = None,
to_undirected: Optional[Union[bool, str]] = False,
remove_self_loops: bool = False,
) -> Any:
r"""Converts a :class:`torch_geometric.data.Data` instance to a
:obj:`easygraph.Graph` if :attr:`to_undirected` is set to :obj:`True`, or
a directed :obj:`easygraph.DiGraph` otherwise.
Args:
data (torch_geometric.data.Data): The data object.
node_attrs (iterable of str, optional): The node attributes to be
copied. (default: :obj:`None`)
edge_attrs (iterable of str, optional): The edge attributes to be
copied. (default: :obj:`None`)
graph_attrs (iterable of str, optional): The graph attributes to be
copied. (default: :obj:`None`)
to_undirected (bool or str, optional): If set to :obj:`True` or
"upper", will return a :obj:`easygraph.Graph` instead of a
:obj:`easygraph.DiGraph`. The undirected graph will correspond to
the upper triangle of the corresponding adjacency matrix.
Similarly, if set to "lower", the undirected graph will correspond
to the lower triangle of the adjacency matrix. (default:
:obj:`False`)
remove_self_loops (bool, optional): If set to :obj:`True`, will not
include self loops in the resulting graph. (default: :obj:`False`)
Examples:
>>> import torch_geometric as pyg
>>> Data = pyg.data.Data # type: ignore
>>> edge_index = torch.tensor([
... [0, 1, 1, 2, 2, 3],
... [1, 0, 2, 1, 3, 2],
... ])
>>> data = Data(edge_index=edge_index, num_nodes=4)
>>> from_pyg(data)
<easygraph.classes.digraph.DiGraph at 0x2713fdb40d0>
"""
try:
import torch_geometric as pyg
pyg_to_networkx = pyg.utils.convert.to_networkx # type: ignore
networkx_to_pyg = pyg.utils.convert.from_networkx # type: ignore
except ImportError:
raise ImportError("pytorch_geometric not found. Please install it.")
g_nx = pyg_to_networkx(
data, node_attrs, edge_attrs, graph_attrs, to_undirected, remove_self_loops
)
g_eg = from_networkx(g_nx)
return g_eg
[docs]
def dict_to_hypergraph(data, max_order=None, is_dynamic=False):
"""
A function to read a file in a standardized JSON format.
Parameters
----------
data: dict
A dictionary in the hypergraph JSON format
max_order: int, optional
Maximum order of edges to add to the hypergraph
Returns
-------
A Hypergraph object
The loaded hypergraph
Raises
------
EasyGraphError
If the JSON is not in a format that can be loaded.
See Also
--------
read_json
"""
timestamp_lst = list()
node_data = data["node-data"]
node_num = len(node_data)
G = eg.Hypergraph(num_v=node_num)
try:
# print(len(data["node-data"]))
for index, dd in data["node-data"].items():
id = int(index) - 1
G.v_property[id] = dd
except KeyError:
raise EasyGraphError("Failed to import node attributes.")
# try:
# import time
rows = []
cols = []
edge_flag_dict = {}
e_property_dict = data["edge-data"]
edge_id = 0
for index, edge in data["edge-dict"].items():
# print("id:",id)
if max_order and len(edge) > max_order + 1:
continue
try:
id = int(index)
except ValueError as e:
raise TypeError(
f"Failed to convert the edge with ID {id} to type int."
) from e
try:
edge = [int(n) - 1 for n in edge]
if tuple(edge) not in edge_flag_dict:
edge_flag_dict[tuple(edge)] = 1
rows.extend(edge)
cols.extend(len(edge) * [edge_id])
edge_id += 1
except ValueError as e:
raise TypeError(f"Failed to convert nodes to type int.") from e
if is_dynamic:
G.add_hyperedges(
e_list=edge,
e_property=e_property_dict[str(id)],
group_name=e_property_dict[str(id)]["timestamp"],
)
timestamp_lst.append(e_property_dict[str(id)]["timestamp"])
else:
G.add_hyperedges(e_list=edge, e_property=e_property_dict[str(id)])
G._rows = rows
G._cols = cols
return G, timestamp_lst