Skip to content

Commit

Permalink
read filter from the config file
Browse files Browse the repository at this point in the history
  • Loading branch information
agatav committed Sep 12, 2024
1 parent 5152884 commit 056cdb8
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class SourceHubspot(AbstractSource):
def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional[Mapping[str, Any]], state: Optional[TState], **kwargs):
self.catalog = catalog
self.state = state
self.config = config

def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]:
"""Check connection"""
Expand Down Expand Up @@ -147,8 +148,8 @@ def get_common_params(self, config) -> Mapping[str, Any]:
return common_param

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
credentials = config.get("credentials", {})
common_params = self.get_common_params(config=config)
credentials = self.config.get("credentials", {})
common_params = self.get_common_params(config=self.config)
streams = [
Campaigns(**common_params),
Companies(**common_params),
Expand Down Expand Up @@ -186,7 +187,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
Workflows(**common_params),
]

enable_experimental_streams = "enable_experimental_streams" in config and config["enable_experimental_streams"]
enable_experimental_streams = "enable_experimental_streams" in self.config and self.config["enable_experimental_streams"]

if enable_experimental_streams:
streams.extend(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,11 +414,10 @@ def __init__(
self._acceptance_test_config = acceptance_test_config.get(self.name, {})

# Filter for records
if stream_filters is None:
stream_filters = {}
for filter in stream_filters:
if filter["stream_name"] == self.name:
self._stream_filter = filter["filter_value"]
if stream_filters:
for filter in stream_filters:
if filter["stream_name"] == self.name:
self._stream_filter = filter["filter_value"]
if catalog:
self.catalog = catalog

Expand Down Expand Up @@ -1155,17 +1154,16 @@ def _process_search(
"filters": [{"value": int(self._state.timestamp() * 1000), "propertyName": self.last_modified_field, "operator": "GTE"}],
"sorts": [{"propertyName": self.last_modified_field, "direction": "ASCENDING"}],
"properties": properties_list,
"limit": 100,
"limit": 200,
}
if self.state
else {
"filters": [{"value": int(self._start_date.timestamp() * 1000), "propertyName": self.last_modified_field, "operator": "GTE"}],
"sorts": [{"propertyName": self.last_modified_field, "direction": "ASCENDING"}],
"properties": properties_list,
"limit": 100,
"limit": 200,
}
)

if self._stream_filter:
if "propertyName" in self._stream_filter and "operator" in self._stream_filter and "value" in self._stream_filter:
payload["filters"].append({
Expand Down

0 comments on commit 056cdb8

Please sign in to comment.