Skip to content

Commit

Permalink
Merge pull request #223 from memgraph/develop
Browse files Browse the repository at this point in the history
[main < develop] Release GQLAlchemy 1.4
  • Loading branch information
brunos252 authored Mar 10, 2023
2 parents 8dd7428 + 2a0f679 commit d0f5e4d
Show file tree
Hide file tree
Showing 47 changed files with 5,302 additions and 1,506 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ jobs:
run: |
python -m pip install -U pip
sudo -H pip install networkx numpy scipy
sudo -H pip install poethepoet==0.18.1
- name: Setup poetry
uses: abatilo/[email protected]
with:
Expand All @@ -49,6 +50,7 @@ jobs:
- name: Test project
run: |
poetry install
poe install-pyg-cpu
poetry run pytest -vvv -m "not slow and not ubuntu and not docker"
- name: Use the Upload Artifact GitHub Action
uses: actions/upload-artifact@v3
Expand All @@ -75,6 +77,7 @@ jobs:
run: |
python -m pip install -U pip
python -m pip install networkx numpy scipy
python -m pip install poethepoet==0.18.1
- uses: Vampire/setup-wsl@v1
with:
distribution: Ubuntu-20.04
Expand Down Expand Up @@ -112,6 +115,7 @@ jobs:
- name: Test project
run: |
poetry install
poe install-pyg-cpu
poetry run pytest -vvv -m "not slow and not ubuntu and not docker"
- name: Save Memgraph Logs
uses: actions/upload-artifact@v3
Expand Down
20 changes: 10 additions & 10 deletions gqlalchemy/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ def __init__(
self.client_name = client_name

@abstractmethod
def execute(self, query: str) -> None:
def execute(self, query: str, parameters: Dict[str, Any] = {}) -> None:
"""Executes Cypher query without returning any results."""
pass

@abstractmethod
def execute_and_fetch(self, query: str) -> Iterator[Dict[str, Any]]:
def execute_and_fetch(self, query: str, parameters: Dict[str, Any] = {}) -> Iterator[Dict[str, Any]]:
"""Executes Cypher query and returns iterator of results."""
pass

Expand Down Expand Up @@ -78,17 +78,17 @@ def __init__(
self._connection = self._create_connection()

@database_error_handler
def execute(self, query: str) -> None:
def execute(self, query: str, parameters: Dict[str, Any] = {}) -> None:
"""Executes Cypher query without returning any results."""
cursor = self._connection.cursor()
cursor.execute(query)
cursor.execute(query, parameters)
cursor.fetchall()

@database_error_handler
def execute_and_fetch(self, query: str) -> Iterator[Dict[str, Any]]:
def execute_and_fetch(self, query: str, parameters: Dict[str, Any] = {}) -> Iterator[Dict[str, Any]]:
"""Executes Cypher query and returns iterator of results."""
cursor = self._connection.cursor()
cursor.execute(query)
cursor.execute(query, parameters)
while True:
row = cursor.fetchone()
if row is None:
Expand Down Expand Up @@ -166,15 +166,15 @@ def __init__(
self.lazy = lazy
self._connection = self._create_connection()

def execute(self, query: str) -> None:
def execute(self, query: str, parameters: Dict[str, Any] = {}) -> None:
"""Executes Cypher query without returning any results."""
with self._connection.session() as session:
session.run(query)
session.run(query, parameters)

def execute_and_fetch(self, query: str) -> Iterator[Dict[str, Any]]:
def execute_and_fetch(self, query: str, parameters: Dict[str, Any] = {}) -> Iterator[Dict[str, Any]]:
"""Executes Cypher query and returns iterator of results."""
with self._connection.session() as session:
results = session.run(query)
results = session.run(query, parameters)
columns = results.keys()
for result in results:
yield {column: _convert_neo4j_value(result[column]) for column in columns}
Expand Down
77 changes: 63 additions & 14 deletions gqlalchemy/graph_algorithms/integrated_algorithms.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
BFS_EXPANSION = " *BFS"
DFS_EXPANSION = " *"
WSHORTEST_EXPANSION = " *WSHORTEST"
ALLSHORTEST_EXPANSION = " *ALLSHORTEST"

DEFAULT_TOTAL_WEIGHT = "total_weight"
DEFAULT_WEIGHT_PROPERTY = "r.weight"
Expand Down Expand Up @@ -72,10 +73,10 @@ def __init__(
) -> None:
"""
Args:
lower_bound: Lower bound for path depth. Defaults to `None`.
upper_bound: Upper bound for path depth. Defaults to `None`.
lower_bound: Lower bound for path depth.
upper_bound: Upper bound for path depth.
condition: Filter through nodes and relationships that pass this
condition. Defaults to `None`.
condition.
"""
super().__init__()
self.lower_bound = str(lower_bound) if lower_bound is not None else ""
Expand Down Expand Up @@ -123,10 +124,10 @@ def __init__(
) -> None:
"""
Args:
lower_bound: Lower bound for path depth. Defaults to None.
upper_bound: Upper bound for path depth. Defaults to None.
lower_bound: Lower bound for path depth.
upper_bound: Upper bound for path depth.
condition: Filter through nodes and relationships that pass this
condition. Defaults to None.
condition.
"""
super().__init__()
self.lower_bound = str(lower_bound) if lower_bound is not None else ""
Expand Down Expand Up @@ -156,7 +157,8 @@ def to_cypher_bounds(self) -> str:


class WeightedShortestPath(IntegratedAlgorithm):
"""Build a Djikstra shortest path call for a Cypher query
"""Build a Djikstra shortest path call for a Cypher query.
The weighted shortest path algorithm can be called in Memgraph with Cypher
queries such as:
" MATCH (a {id: 723})-[r *WSHORTEST 10 (r, n | r.weight) weight_sum
Expand All @@ -175,19 +177,18 @@ def __init__(
) -> None:
"""
Args:
upper_bound: Upper bound for path depth. Defaults to None.
upper_bound: Upper bound for path depth.
condition: Filter through nodes and relationships that pass this
condition. Defaults to None.
condition.
total_weight_var: Variable defined as the sum of all weights on
path being returned. Defaults to "total_weight".
weight_property: property being used as weight. Defaults to
"r.weight".
path being returned.
weight_property: property being used as weight.
"""
super().__init__()
self.weight_property = f"r.{weight_property}" if "." not in weight_property else weight_property
self.weight_property = weight_property if "." in weight_property else f"r.{weight_property}"
self.total_weight_var = total_weight_var
self.condition = condition
self.upper_bound = str(upper_bound) if upper_bound is not None else ""
self.upper_bound = "" if upper_bound is None else str(upper_bound)

def __str__(self) -> str:
algo_str = WSHORTEST_EXPANSION
Expand All @@ -201,3 +202,51 @@ def __str__(self) -> str:
algo_str = f"{algo_str} {filter_lambda}"

return algo_str


class AllShortestPath(IntegratedAlgorithm):
"""Build a Djikstra shortest path call for a Cypher query.
The weighted shortest path algorithm can be called in Memgraph with Cypher
queries such as:
" MATCH (a {id: 723})-[r *ALLSHORTEST 10 (r, n | r.weight) total_weight
(r, n | r.x > 12 AND r.y < 3)]-(b {id: 882}) RETURN * "
It is called inside the relationship clause, "*ALLSHORTEST" naming the
algorithm, "10" specifying search depth bounds, and "(r, n | <expression>)"
is a filter lambda, used to filter which relationships and nodes to use.
"""

def __init__(
self,
upper_bound: int = None,
condition: str = None,
total_weight_var: str = DEFAULT_TOTAL_WEIGHT,
weight_property: str = DEFAULT_WEIGHT_PROPERTY,
) -> None:
"""
Args:
upper_bound: Upper bound for path depth.
condition: Filter through nodes and relationships that pass this
condition.
total_weight_var: Variable defined as the sum of all weights on
path being returned.
weight_property: Property being used as weight.
"""
super().__init__()
self.weight_property = weight_property if "." in weight_property else f"r.{weight_property}"
self.total_weight_var = total_weight_var
self.condition = condition
self.upper_bound = "" if upper_bound is None else str(upper_bound)

def __str__(self) -> str:
algo_str = ALLSHORTEST_EXPANSION
if self.upper_bound != "":
algo_str = f"{algo_str} {self.upper_bound}"

algo_str = f"{algo_str} {super().to_cypher_lambda(self.weight_property)} {self.total_weight_var}"

filter_lambda = super().to_cypher_lambda(self.condition)
if filter_lambda != "":
algo_str = f"{algo_str} {filter_lambda}"

return algo_str
9 changes: 9 additions & 0 deletions gqlalchemy/memgraph_constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import os

MG_HOST = os.getenv("MG_HOST", "127.0.0.1")
MG_PORT = int(os.getenv("MG_PORT", "7687"))
MG_USERNAME = os.getenv("MG_USERNAME", "")
MG_PASSWORD = os.getenv("MG_PASSWORD", "")
MG_ENCRYPTED = os.getenv("MG_ENCRYPT", "false").lower() == "true"
MG_CLIENT_NAME = os.getenv("MG_CLIENT_NAME", "GQLAlchemy")
MG_LAZY = os.getenv("MG_LAZY", "false").lower() == "true"
2 changes: 1 addition & 1 deletion gqlalchemy/query_builders/declarative_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ def call(
Call procedure with arguments:
Python: `call('json_util.load_from_url', 'https://some-url.com').yield_('objects').return_(results='objects').execute()
Python: `call('json_util.load_from_url', "'https://some-url.com'").yield_('objects').return_(results='objects').execute()
Cypher: `CALL json_util.load_from_url(https://some-url.com) YIELD objects RETURN objects;`
"""
self._query.append(CallPartialQuery(procedure, arguments))
Expand Down
144 changes: 143 additions & 1 deletion gqlalchemy/query_builders/memgraph_query_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Optional
from typing import Optional, Union, Tuple, List
from string import ascii_lowercase

from gqlalchemy.query_builders.declarative_base import ( # noqa F401
Call,
Expand All @@ -31,6 +32,7 @@
)
from gqlalchemy.vendors.database_client import DatabaseClient
from gqlalchemy.vendors.memgraph import Memgraph
from gqlalchemy.utilities import CypherVariable, CypherNode, CypherRelationship, RelationshipDirection


class MemgraphQueryBuilderTypes(DeclarativeBaseTypes):
Expand Down Expand Up @@ -79,8 +81,148 @@ def load_csv(self, path: str, header: bool, row: str) -> "DeclarativeBase":

return self

def _construct_subgraph_path(
self,
relationship_types: Optional[Union[str, List[List[str]]]] = None,
relationship_directions: Optional[
Union[RelationshipDirection, List[RelationshipDirection]]
] = RelationshipDirection.RIGHT,
) -> str:
"""Constructs a MATCH query defining a subgraph using node labels and relationship types.
Args:
relationship_types: A string or list of lists of types of relationships used in the subgraph.
relationship_directions: Enums representing directions.
Returns:
a string representing a MATCH query for a path with given node labels and relationship types.
"""

query = f"{CypherNode(variable=ascii_lowercase[0])}"
for i in range(len(relationship_types)):
query += f"{CypherRelationship(relationship_types[i], relationship_directions[i])}{CypherNode()}"

return query

def call(
self,
procedure: str,
arguments: Optional[Union[str, Tuple[Union[str, int, float]]]] = None,
node_labels: Optional[Union[str, List[List[str]]]] = None,
relationship_types: Optional[Union[str, List[List[str]]]] = None,
relationship_directions: Optional[
Union[RelationshipDirection, List[RelationshipDirection]]
] = RelationshipDirection.RIGHT,
subgraph_path: str = None,
) -> "DeclarativeBase":
"""Override of base class method to support Memgraph's subgraph functionality.
Method can be called with node labels and relationship types, both being optional, which are used to construct
a subgraph, or if neither is provided, a subgraph query is used, which can be passed as a string representing a
Cypher query defining the MATCH clause which selects the nodes and relationships to use.
Args:
procedure: A string representing the name of the procedure in the
format `query_module.procedure`.
arguments: A string representing the arguments of the procedure in
text format.
node_labels: Either a string, which is then used as the label for all nodes, or
a list of lists defining all labels for every node
relationship_types: Types of relationships to be used in the subgraph. Either a
single type or a list of lists defining all types for every relationship
relationship_directions: Directions of the relationships.
subgraph_path: Optional way to define the subgraph via a Cypher MATCH clause.
Returns:
A `DeclarativeBase` instance for constructing queries.
Examples:
Python: `call('export_util.json', '/home/user', "LABEL", ["TYPE1", "TYPE2"]).execute()
Cypher: `MATCH p=(a)-[:TYPE1 | :TYPE2]->(b) WHERE (a:LABEL) AND (b:LABEL)
WITH project(p) AS graph CALL export_util.json(graph, '/home/user')`
or
Python: `call('export_util.json', '/home/user', subgraph_path="(:LABEL)-[:TYPE]->(:LABEL)").execute()
Cypher: `MATCH p=(:LABEL)-[:TYPE1]->(:LABEL) WITH project(p) AS graph
CALL export_util.json(graph, '/home/user')`
"""

if not (node_labels is None and relationship_types is None):
if isinstance(relationship_types, str):
relationship_types = [[relationship_types]] * (
len(node_labels) - 1 if isinstance(node_labels, list) else 1
)

if isinstance(node_labels, str):
node_labels = [[node_labels]] * (len(relationship_types) + 1 if relationship_types else 2)

if isinstance(relationship_directions, RelationshipDirection):
relationship_directions = [relationship_directions] * (
len(relationship_types) if relationship_types else 1
)

if (
node_labels
and relationship_types
and (
len(node_labels) != len(relationship_types) + 1
or len(relationship_types) != len(relationship_directions)
)
):
raise ValueError(
"number of items in node_labels should be one more than in relationship_types and relationship_directions"
)

subgraph_path = f"{CypherNode(variable=ascii_lowercase[0])}"
for i in range(len(relationship_directions)):
rel_types = relationship_types[i] if relationship_types else None
subgraph_path += f"{CypherRelationship(rel_types, relationship_directions[i])}{CypherNode(variable=ascii_lowercase[i + 1])}"

if subgraph_path is not None:
self._query.append(ProjectPartialQuery(subgraph_path=subgraph_path, node_labels=node_labels))

if isinstance(arguments, str):
arguments = (CypherVariable(name="graph"), arguments)
elif isinstance(arguments, tuple):
arguments = (CypherVariable(name="graph"), *arguments)
else:
arguments = CypherVariable(name="graph")

super().call(procedure=procedure, arguments=arguments)

return self


class LoadCsv(DeclarativeBase):
def __init__(self, path: str, header: bool, row: str, connection: Optional[DatabaseClient] = None):
super().__init__(connection)
self._query.append(LoadCsvPartialQuery(path, header, row))


class ProjectPartialQuery(PartialQuery):
def __init__(self, subgraph_path: str, node_labels: Optional[List[List[str]]] = None):
super().__init__(DeclarativeBaseTypes.MATCH)
self._subgraph_path = subgraph_path
self._node_labels = node_labels

@property
def subgraph_path(self) -> str:
return self._subgraph_path

@property
def node_labels(self) -> Optional[List[List[str]]]:
return self._node_labels

def construct_query(self) -> str:
"""Constructs a Project partial querty.
Given path part of a query (e.g. (:LABEL)-[:TYPE]->(:LABEL2)),
adds MATCH, a path identifier and appends the WITH clause."""
query = f" MATCH p={self.subgraph_path}"
if self.node_labels:
query += f" WHERE ({ascii_lowercase[0]}:" + f" or {ascii_lowercase[0]}:".join(self.node_labels[0]) + ")"
for i in range(1, len(self.node_labels)):
query += f" AND ({ascii_lowercase[i]}:" + f" or {ascii_lowercase[i]}:".join(self.node_labels[i]) + ")"

return query + " WITH project(p) AS graph "
Loading

0 comments on commit d0f5e4d

Please sign in to comment.