Skip to content

Commit

Permalink
Addd support for autocommit
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-jvasquezrojas committed Dec 5, 2024
1 parent 695c0a9 commit fdccfcf
Show file tree
Hide file tree
Showing 3 changed files with 223 additions and 1 deletion.
40 changes: 39 additions & 1 deletion src/snowflake/sqlalchemy/snowdialect.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@
import operator
import re
from collections import defaultdict
from enum import Enum
from functools import reduce
from typing import Any
from typing import Any, Optional
from urllib.parse import unquote_plus

import sqlalchemy.types as sqltypes
from sqlalchemy import event as sa_vnt
from sqlalchemy import exc as sa_exc
from sqlalchemy import util as sa_util
from sqlalchemy.engine import URL, default, reflection
from sqlalchemy.engine.interfaces import IsolationLevel
from sqlalchemy.schema import Table
from sqlalchemy.sql import text
from sqlalchemy.sql.elements import quoted_name
Expand Down Expand Up @@ -59,6 +61,11 @@
_ENABLE_SQLALCHEMY_AS_APPLICATION_NAME = True


class SnowflakeIsolationLevel(Enum):
READ_COMMITTED = "READ COMMITTED"
AUTOCOMMIT = "AUTOCOMMIT"


class SnowflakeDialect(default.DefaultDialect):
name = DIALECT_NAME
driver = "snowflake"
Expand Down Expand Up @@ -139,6 +146,16 @@ class SnowflakeDialect(default.DefaultDialect):

supports_identity_columns = True

def __init__(
self,
isolation_level: Optional[
IsolationLevel
] = SnowflakeIsolationLevel.READ_COMMITTED.value,
**kwargs: Any,
):
super().__init__(isolation_level=isolation_level, **kwargs)
self._cache_column_metadata = False

@classmethod
def dbapi(cls):
return cls.import_dbapi()
Expand Down Expand Up @@ -216,6 +233,27 @@ def has_table(self, connection, table_name, schema=None, **kw):
"""
return self._has_object(connection, "TABLE", table_name, schema)

def get_isolation_level_values(self, dbapi_connection):
return [
SnowflakeIsolationLevel.READ_COMMITTED.value,
SnowflakeIsolationLevel.AUTOCOMMIT.value,
]

def do_rollback(self, dbapi_connection):
dbapi_connection.rollback()

def do_commit(self, dbapi_connection):
dbapi_connection.commit()

def get_default_isolation_level(self, dbapi_conn):
return SnowflakeIsolationLevel.READ_COMMITTED.value

def set_isolation_level(self, dbapi_connection, level):
if level == SnowflakeIsolationLevel.AUTOCOMMIT.value:
dbapi_connection.autocommit(True)
else:
dbapi_connection.autocommit(False)

@reflection.cache
def has_sequence(self, connection, sequence_name, schema=None, **kw):
"""
Expand Down
27 changes: 27 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#
from __future__ import annotations

import logging.handlers
import os
import sys
import time
Expand Down Expand Up @@ -194,6 +195,32 @@ def engine_testaccount(request):
yield engine


@pytest.fixture()
def assert_text_in_buf():
buf = logging.handlers.BufferingHandler(100)
for log in [
logging.getLogger("sqlalchemy.engine"),
]:
log.addHandler(buf)

def go(expected, occurrences=1):
assert buf.buffer
buflines = [rec.getMessage() for rec in buf.buffer]

ocurrences_found = buflines.count(expected)
assert occurrences == ocurrences_found, (
f"Expected {occurrences} of {expected}, got {ocurrences_found} "
f"occurrences in {buflines}."
)
buf.flush()

yield go
for log in [
logging.getLogger("sqlalchemy.engine"),
]:
log.removeHandler(buf)


@pytest.fixture()
def engine_testaccount_with_numpy(request):
url = url_factory(numpy=True)
Expand Down
157 changes: 157 additions & 0 deletions tests/test_transactions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
#
# Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved.
#

from sqlalchemy import Column, Integer, MetaData, String, select, text

from snowflake.sqlalchemy import SnowflakeTable

CURRENT_TRANSACTION = text("SELECT CURRENT_TRANSACTION()")


def test_connect_read_commited(engine_testaccount, assert_text_in_buf):
metadata = MetaData()
table_name = "test_connect_read_commited"

test_table_1 = SnowflakeTable(
table_name,
metadata,
Column("id", Integer, primary_key=True),
Column("name", String),
cluster_by=["id", text("id > 5")],
)

metadata.create_all(engine_testaccount)
try:
with engine_testaccount.connect().execution_options(
isolation_level="READ COMMITTED"
) as connection:
result = connection.execute(CURRENT_TRANSACTION).fetchall()
assert result[0] == (None,), result
ins = test_table_1.insert().values(id=1, name="test")
connection.execute(ins)
result = connection.execute(CURRENT_TRANSACTION).fetchall()
assert result[0] != (
None,
), "AUTOCOMMIT DISABLED, transaction should be started"

with engine_testaccount.connect() as conn:
s = select(test_table_1)
results = conn.execute(s).fetchall()
assert len(results) == 0, results # No insert commited
assert_text_in_buf("ROLLBACK", occurrences=1)
finally:
metadata.drop_all(engine_testaccount)


def test_begin_read_commited(engine_testaccount, assert_text_in_buf):
metadata = MetaData()
table_name = "test_begin_read_commited"

test_table_1 = SnowflakeTable(
table_name,
metadata,
Column("id", Integer, primary_key=True),
Column("name", String),
cluster_by=["id", text("id > 5")],
)

metadata.create_all(engine_testaccount)
try:
with engine_testaccount.connect().execution_options(
isolation_level="READ COMMITTED"
) as connection, connection.begin():
result = connection.execute(CURRENT_TRANSACTION).fetchall()
assert result[0] == (None,), result
ins = test_table_1.insert().values(id=1, name="test")
connection.execute(ins)
result = connection.execute(CURRENT_TRANSACTION).fetchall()
assert result[0] != (
None,
), "AUTOCOMMIT DISABLED, transaction should be started"

with engine_testaccount.connect() as conn:
s = select(test_table_1)
results = conn.execute(s).fetchall()
assert len(results) == 1, results # Insert commited
assert_text_in_buf("COMMIT", occurrences=2)
finally:
metadata.drop_all(engine_testaccount)


def test_connect_autocommit(engine_testaccount, assert_text_in_buf):
metadata = MetaData()
table_name = "test_connect_autocommit"

test_table_1 = SnowflakeTable(
table_name,
metadata,
Column("id", Integer, primary_key=True),
Column("name", String),
cluster_by=["id", text("id > 5")],
)

metadata.create_all(engine_testaccount)
try:
with engine_testaccount.connect().execution_options(
isolation_level="AUTOCOMMIT"
) as connection:
result = connection.execute(CURRENT_TRANSACTION).fetchall()
assert result[0] == (None,), result
ins = test_table_1.insert().values(id=1, name="test")
connection.execute(ins)
result = connection.execute(CURRENT_TRANSACTION).fetchall()
assert result[0] == (
None,
), "Autocommit enabled, transaction should not be started"

with engine_testaccount.connect() as conn:
s = select(test_table_1)
results = conn.execute(s).fetchall()
assert len(results) == 1, results
assert_text_in_buf(
"ROLLBACK using DBAPI connection.rollback(), DBAPI should ignore due to autocommit mode",
occurrences=1,
)

finally:
metadata.drop_all(engine_testaccount)


def test_begin_autocommit(engine_testaccount, assert_text_in_buf):
metadata = MetaData()
table_name = "test_begin_autocommit"

test_table_1 = SnowflakeTable(
table_name,
metadata,
Column("id", Integer, primary_key=True),
Column("name", String),
cluster_by=["id", text("id > 5")],
)

metadata.create_all(engine_testaccount)
try:
with engine_testaccount.connect().execution_options(
isolation_level="AUTOCOMMIT"
) as connection, connection.begin():
result = connection.execute(CURRENT_TRANSACTION).fetchall()
assert result[0] == (None,), result
ins = test_table_1.insert().values(id=1, name="test")
connection.execute(ins)
result = connection.execute(CURRENT_TRANSACTION).fetchall()
assert result[0] == (
None,
), "Autocommit enabled, transaction should not be started"

with engine_testaccount.connect() as conn:
s = select(test_table_1)
results = conn.execute(s).fetchall()
assert len(results) == 1, results
assert_text_in_buf(
"COMMIT using DBAPI connection.commit(), DBAPI should ignore due to autocommit mode",
occurrences=1,
)

finally:
metadata.drop_all(engine_testaccount)

0 comments on commit fdccfcf

Please sign in to comment.