From d3e40d1efc915fc619027b9c35fca39d87e05ae3 Mon Sep 17 00:00:00 2001 From: Jorge Vasquez Rojas Date: Mon, 16 Dec 2024 15:45:50 -0600 Subject: [PATCH] SNOW-1846847 Add support for autocommit (#555) * Add support for autocommit --- DESCRIPTION.md | 1 + src/snowflake/sqlalchemy/snowdialect.py | 34 +++++ tests/test_transactions.py | 157 ++++++++++++++++++++++++ 3 files changed, 192 insertions(+) create mode 100644 tests/test_transactions.py diff --git a/DESCRIPTION.md b/DESCRIPTION.md index 0615bd15..0a50408e 100644 --- a/DESCRIPTION.md +++ b/DESCRIPTION.md @@ -19,6 +19,7 @@ Source code is also available at: - v1.7.1(December 02, 2024) - Add support for partition by to copy into - Fix BOOLEAN type not found in snowdialect + - Add support for autocommit Isolation Level - v1.7.0(November 21, 2024) - Add support for dynamic tables and required options diff --git a/src/snowflake/sqlalchemy/snowdialect.py b/src/snowflake/sqlalchemy/snowdialect.py index dd5e4375..96bcac71 100644 --- a/src/snowflake/sqlalchemy/snowdialect.py +++ b/src/snowflake/sqlalchemy/snowdialect.py @@ -4,6 +4,7 @@ import operator import re from collections import defaultdict +from enum import Enum from functools import reduce from typing import Any, Collection, Optional from urllib.parse import unquote_plus @@ -59,6 +60,11 @@ _ENABLE_SQLALCHEMY_AS_APPLICATION_NAME = True +class SnowflakeIsolationLevel(Enum): + READ_COMMITTED = "READ COMMITTED" + AUTOCOMMIT = "AUTOCOMMIT" + + class SnowflakeDialect(default.DefaultDialect): name = DIALECT_NAME driver = "snowflake" @@ -139,6 +145,13 @@ class SnowflakeDialect(default.DefaultDialect): supports_identity_columns = True + def __init__( + self, + isolation_level: Optional[str] = SnowflakeIsolationLevel.READ_COMMITTED.value, + **kwargs: Any, + ): + super().__init__(isolation_level=isolation_level, **kwargs) + @classmethod def dbapi(cls): return cls.import_dbapi() @@ -216,6 +229,27 @@ def has_table(self, connection, table_name, schema=None, **kw): """ return self._has_object(connection, "TABLE", table_name, schema) + def get_isolation_level_values(self, dbapi_connection): + return [ + SnowflakeIsolationLevel.READ_COMMITTED.value, + SnowflakeIsolationLevel.AUTOCOMMIT.value, + ] + + def do_rollback(self, dbapi_connection): + dbapi_connection.rollback() + + def do_commit(self, dbapi_connection): + dbapi_connection.commit() + + def get_default_isolation_level(self, dbapi_conn): + return SnowflakeIsolationLevel.READ_COMMITTED.value + + def set_isolation_level(self, dbapi_connection, level): + if level == SnowflakeIsolationLevel.AUTOCOMMIT.value: + dbapi_connection.autocommit(True) + else: + dbapi_connection.autocommit(False) + @reflection.cache def has_sequence(self, connection, sequence_name, schema=None, **kw): """ diff --git a/tests/test_transactions.py b/tests/test_transactions.py new file mode 100644 index 00000000..c163c2b7 --- /dev/null +++ b/tests/test_transactions.py @@ -0,0 +1,157 @@ +# +# Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved. +# + +from sqlalchemy import Column, Integer, MetaData, String, select, text + +from snowflake.sqlalchemy import SnowflakeTable + +CURRENT_TRANSACTION = text("SELECT CURRENT_TRANSACTION()") + + +def test_connect_read_commited(engine_testaccount, assert_text_in_buf): + metadata = MetaData() + table_name = "test_connect_read_commited" + + test_table_1 = SnowflakeTable( + table_name, + metadata, + Column("id", Integer, primary_key=True), + Column("name", String), + cluster_by=["id", text("id > 5")], + ) + + metadata.create_all(engine_testaccount) + try: + with engine_testaccount.connect().execution_options( + isolation_level="READ COMMITTED" + ) as connection: + result = connection.execute(CURRENT_TRANSACTION).fetchall() + assert result[0] == (None,), result + ins = test_table_1.insert().values(id=1, name="test") + connection.execute(ins) + result = connection.execute(CURRENT_TRANSACTION).fetchall() + assert result[0] != ( + None, + ), "AUTOCOMMIT DISABLED, transaction should be started" + + with engine_testaccount.connect() as conn: + s = select(test_table_1) + results = conn.execute(s).fetchall() + assert len(results) == 0, results # No insert commited + assert_text_in_buf("ROLLBACK", occurrences=1) + finally: + metadata.drop_all(engine_testaccount) + + +def test_begin_read_commited(engine_testaccount, assert_text_in_buf): + metadata = MetaData() + table_name = "test_begin_read_commited" + + test_table_1 = SnowflakeTable( + table_name, + metadata, + Column("id", Integer, primary_key=True), + Column("name", String), + cluster_by=["id", text("id > 5")], + ) + + metadata.create_all(engine_testaccount) + try: + with engine_testaccount.connect().execution_options( + isolation_level="READ COMMITTED" + ) as connection, connection.begin(): + result = connection.execute(CURRENT_TRANSACTION).fetchall() + assert result[0] == (None,), result + ins = test_table_1.insert().values(id=1, name="test") + connection.execute(ins) + result = connection.execute(CURRENT_TRANSACTION).fetchall() + assert result[0] != ( + None, + ), "AUTOCOMMIT DISABLED, transaction should be started" + + with engine_testaccount.connect() as conn: + s = select(test_table_1) + results = conn.execute(s).fetchall() + assert len(results) == 1, results # Insert commited + assert_text_in_buf("COMMIT", occurrences=2) + finally: + metadata.drop_all(engine_testaccount) + + +def test_connect_autocommit(engine_testaccount, assert_text_in_buf): + metadata = MetaData() + table_name = "test_connect_autocommit" + + test_table_1 = SnowflakeTable( + table_name, + metadata, + Column("id", Integer, primary_key=True), + Column("name", String), + cluster_by=["id", text("id > 5")], + ) + + metadata.create_all(engine_testaccount) + try: + with engine_testaccount.connect().execution_options( + isolation_level="AUTOCOMMIT" + ) as connection: + result = connection.execute(CURRENT_TRANSACTION).fetchall() + assert result[0] == (None,), result + ins = test_table_1.insert().values(id=1, name="test") + connection.execute(ins) + result = connection.execute(CURRENT_TRANSACTION).fetchall() + assert result[0] == ( + None, + ), "Autocommit enabled, transaction should not be started" + + with engine_testaccount.connect() as conn: + s = select(test_table_1) + results = conn.execute(s).fetchall() + assert len(results) == 1, results + assert_text_in_buf( + "ROLLBACK using DBAPI connection.rollback(), DBAPI should ignore due to autocommit mode", + occurrences=1, + ) + + finally: + metadata.drop_all(engine_testaccount) + + +def test_begin_autocommit(engine_testaccount, assert_text_in_buf): + metadata = MetaData() + table_name = "test_begin_autocommit" + + test_table_1 = SnowflakeTable( + table_name, + metadata, + Column("id", Integer, primary_key=True), + Column("name", String), + cluster_by=["id", text("id > 5")], + ) + + metadata.create_all(engine_testaccount) + try: + with engine_testaccount.connect().execution_options( + isolation_level="AUTOCOMMIT" + ) as connection, connection.begin(): + result = connection.execute(CURRENT_TRANSACTION).fetchall() + assert result[0] == (None,), result + ins = test_table_1.insert().values(id=1, name="test") + connection.execute(ins) + result = connection.execute(CURRENT_TRANSACTION).fetchall() + assert result[0] == ( + None, + ), "Autocommit enabled, transaction should not be started" + + with engine_testaccount.connect() as conn: + s = select(test_table_1) + results = conn.execute(s).fetchall() + assert len(results) == 1, results + assert_text_in_buf( + "COMMIT using DBAPI connection.commit(), DBAPI should ignore due to autocommit mode", + occurrences=1, + ) + + finally: + metadata.drop_all(engine_testaccount)