Skip to content

Commit

Permalink
SQLFrame: Add basic and all-types examples
Browse files Browse the repository at this point in the history
SQLFrame [1] implements the PySpark [2] DataFrame API in order to enable
running transformation pipelines directly on database engines - no Spark
clusters or dependencies required.

[1] https://pypi.org/project/sqlframe/
[2] https://spark.apache.org/docs/latest/api/python/
  • Loading branch information
amotl committed Dec 7, 2024
1 parent 4efcd1c commit 851377f
Show file tree
Hide file tree
Showing 11 changed files with 543 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ updates:
schedule:
interval: "daily"

- directory: "/by-dataframe/sqlframe"
package-ecosystem: "pip"
schedule:
interval: "daily"

- directory: "/by-language/csharp-npgsql"
package-ecosystem: "nuget"
schedule:
Expand Down
74 changes: 74 additions & 0 deletions .github/workflows/dataframe-sqlframe.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
name: SQLFrame

on:
pull_request:
branches: ~
paths:
- 'dataframe-sqlframe.yml'
- 'by-dataframe/sqlframe/**'
- '/requirements.txt'
push:
branches: [ main ]
paths:
- 'dataframe-sqlframe.yml'
- 'by-dataframe/sqlframe/**'
- '/requirements.txt'

# Allow job to be triggered manually.
workflow_dispatch:

# Run job each night after CrateDB nightly has been published.
schedule:
- cron: '0 3 * * *'

# Cancel in-progress jobs when pushing to the same branch.
concurrency:
cancel-in-progress: true
group: ${{ github.workflow }}-${{ github.ref }}

jobs:
test:
name: "
Python: ${{ matrix.python-version }}
CrateDB: ${{ matrix.cratedb-version }}
on ${{ matrix.os }}"
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [ 'ubuntu-latest' ]
python-version: [ '3.9', '3.13' ]
cratedb-version: [ 'nightly' ]

services:
cratedb:
image: crate/crate:${{ matrix.cratedb-version }}
ports:
- 4200:4200
- 5432:5432
env:
CRATE_HEAP_SIZE: 4g

steps:

- name: Acquire sources
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
architecture: x64
cache: 'pip'
cache-dependency-path: |
requirements.txt
by-dataframe/sqlframe/requirements.txt
by-dataframe/sqlframe/requirements-test.txt
- name: Install utilities
run: |
pip install -r requirements.txt
- name: Validate by-dataframe/sqlframe
run: |
ngr test --accept-no-venv by-dataframe/sqlframe
1 change: 1 addition & 0 deletions by-dataframe/sqlframe/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.csv
77 changes: 77 additions & 0 deletions by-dataframe/sqlframe/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Verify the `sqlframe` library with CrateDB

Turning PySpark Into a Universal DataFrame API

## About

This folder includes software integration tests for verifying
that the [SQLFrame] Python library works well together with [CrateDB].

SQLFrame implements the [PySpark] DataFrame API in order to enable running
transformation pipelines directly on database engines - no Spark clusters
or dependencies required.

## What's Inside

- `example_basic.py`: A few examples that read CrateDB's `sys.summits` table.
An example inquiring existing tables.

- `example_types.py`: An example that exercises all data types supported by
CrateDB.

## Synopsis

```shell
pip install --upgrade sqlframe
```
```python
from psycopg2 import connect
from sqlframe import activate
from sqlframe.base.functions import col

# Define database connection parameters, suitable for CrateDB on localhost.
# For CrateDB Cloud, use `crate://<username>:<password>@<host>`.
conn = connect(
dbname="crate",
user="crate",
password="",
host="localhost",
port="5432",
)
# Activate SQLFrame to run directly on CrateDB.
activate("postgres", conn=conn)

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Invoke query.
df = spark.sql(
spark.table("sys.summits")
.where(col("region").ilike("ortler%"))
.sort(col("height").desc())
.limit(3)
)
print(df.sql())
df.show()
```

## Tests

Set up sandbox and install packages.
```bash
pip install uv
uv venv .venv
source .venv/bin/activate
uv pip install -r requirements.txt -r requirements-test.txt
```

Run integration tests.
```bash
pytest
```


[CrateDB]: https://cratedb.com/database
[PySpark]: https://spark.apache.org/docs/latest/api/python/
[SQLFrame]: https://pypi.org/project/sqlframe/
98 changes: 98 additions & 0 deletions by-dataframe/sqlframe/example_basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""
Using `sqlframe` with CrateDB: Basic usage.
pip install --upgrade sqlframe
A few basic operations using the `sqlframe` library with CrateDB.
- https://pypi.org/project/sqlframe/
"""

from psycopg2 import connect
from sqlframe import activate
from sqlframe.base.functions import col

from patch import monkeypatch


def connect_spark():
# Connect to database.
conn = connect(
dbname="crate",
user="crate",
password="",
host="localhost",
port="5432",
)
# Activate SQLFrame to run directly on CrateDB.
activate("postgres", conn=conn)

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
return spark


def sqlframe_select_sys_summits():
"""
Query CrateDB's built-in `sys.summits` table.
:return:
"""
spark = connect_spark()
df = spark.sql(
spark.table("sys.summits")
.where(col("region").ilike("ortler%"))
.sort(col("height").desc())
.limit(3)
)
print(df.sql())
df.show()
return df


def sqlframe_export_sys_summits_pandas():
"""
Query CrateDB's built-in `sys.summits` table, returning a pandas dataframe.
"""
spark = connect_spark()
df = spark.sql(
spark.table("sys.summits")
.where(col("region").ilike("ortler%"))
.sort(col("height").desc())
.limit(3)
).toPandas()
return df


def sqlframe_export_sys_summits_csv():
"""
Query CrateDB's built-in `sys.summits` table, saving the output to CSV.
"""
spark = connect_spark()
df = spark.sql(
spark.table("sys.summits")
.where(col("region").ilike("ortler%"))
.sort(col("height").desc())
.limit(3)
)
df.write.csv("summits.csv", mode="overwrite")
return df


def sqlframe_get_table_names():
"""
Inquire table names of the system schema `sys`.
"""
spark = connect_spark()
tables = spark.catalog.listTables(dbName="sys")
return tables


monkeypatch()


if __name__ == "__main__":
print(sqlframe_select_sys_summits())
print(sqlframe_export_sys_summits_pandas())
print(sqlframe_export_sys_summits_csv())
print(sqlframe_get_table_names())
Loading

0 comments on commit 851377f

Please sign in to comment.