Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: salesforce custom stream filtering #2

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,119 +2,68 @@
"streams": [
{
"stream": {
"name": "Account",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"name": "Lead",
"json_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"additionalProperties": true,
"properties": {
"Country": {
"type": [
"string",
"null"
]
},
"country_iso_code__c": {
"type": [
"string",
"null"
]
},
"Email": {
"type": [
"string",
"null"
]
},
"Id": {
"type": [
"string",
"null"
]
},
"LastModifiedDate": {
"format": "date-time",
"type": [
"string",
"null"
]
},
"SystemModstamp": {
"format": "date-time",
"type": [
"string",
"null"
]
}
},
"type": "object"
},
"supported_sync_modes": [
"full_refresh",
"incremental"
],
"source_defined_cursor": true,
"default_cursor_field": ["SystemModstamp"],
"source_defined_primary_key": [["Id"]]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "ActiveFeatureLicenseMetric",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["SystemModstamp"],
"source_defined_primary_key": [["Id"]]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "ActivePermSetLicenseMetric",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["SystemModstamp"],
"source_defined_primary_key": [["Id"]]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "ActiveProfileMetric",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["SystemModstamp"],
"source_defined_primary_key": [["Id"]]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "AppDefinition",
"json_schema": {},
"supported_sync_modes": ["full_refresh"],
"source_defined_primary_key": [["Id"]]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "Asset",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["SystemModstamp"],
"source_defined_primary_key": [["Id"]]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "FormulaFunctionAllowedType",
"json_schema": {},
"supported_sync_modes": ["full_refresh"],
"source_defined_primary_key": [["Id"]]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "ObjectPermissions",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["SystemModstamp"],
"source_defined_primary_key": [["Id"]]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "PermissionSetTabSetting",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["SystemModstamp"],
"source_defined_primary_key": [["Id"]]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "LeadHistory",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["CreatedDate"],
"source_defined_primary_key": [["Id"]]
"default_cursor_field": [
"CreatedDate"
],
"source_defined_primary_key": [
[
"Id"
]
]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "append"
}
]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ def prepare_stream(cls, stream_name: str, json_schema, sobject_options, sf_objec
"authenticator": authenticator,
"start_date": config.get("start_date"),
}

stream_filters = config.get("stream_filters")
if stream_filters is not None:
for filter in stream_filters:
if filter["stream_name"] == stream_name:
stream_kwargs["stream_filter"] = filter["filter_value"]

api_type = cls._get_api_type(stream_name, json_schema, config.get("force_use_bulk_api", False))
full_refresh, incremental = cls._get_stream_type(stream_name, api_type)
Expand Down Expand Up @@ -174,6 +180,11 @@ def generate_streams(
for stream_name, sobject_options in stream_objects.items():
json_schema = schemas.get(stream_name, {})

if self.catalog:
for catalog_stream in self.catalog.streams:
if stream_name == catalog_stream.stream.name and catalog_stream.stream.json_schema.get("properties", {}):
json_schema['properties'] = catalog_stream.stream.json_schema.get("properties", {})

stream_class, kwargs = self.prepare_stream(stream_name, json_schema, sobject_options, *default_args)

parent_name = PARENT_SALESFORCE_OBJECTS.get(stream_name, {}).get("parent_name")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,21 @@ connectionSpecification:
order: 2
title: Filter Salesforce Objects
description: Add filters to select only required stream based on `SObject` name. Use this field to filter which tables are displayed by this connector. This is useful if your Salesforce account has a large number of tables (>1000), in which case you may find it easier to navigate the UI and speed up the connector's performance if you restrict the tables displayed by this connector.
stream_filters:
description: Add filters to sync only required records based on `SObject` name and available `SObject` attributes.
type: array
order: 9
items:
type: object
properties:
stream_name:
type: string
title: Stream Name
order: 1
filter_value:
type: string
title: Filter expression
order: 2
advanced_auth:
auth_flow_type: oauth2.0
predicate_key:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def __init__(
sobject_options: Mapping[str, Any] = None,
schema: dict = None,
start_date=None,
stream_filter: str = "",
**kwargs,
):
super().__init__(**kwargs)
Expand All @@ -78,6 +79,7 @@ def __init__(
session=self._session, # no need to specific api_budget and authenticator as HttpStream sets them in self._session
error_handler=SalesforceErrorHandler(stream_name=self.stream_name, sobject_options=self.sobject_options),
)
self.stream_filter = stream_filter

@staticmethod
def format_start_date(start_date: Optional[str]) -> Optional[str]:
Expand Down Expand Up @@ -190,12 +192,19 @@ def request_params(

property_chunk = property_chunk or {}
query = f"SELECT {','.join(property_chunk.keys())} FROM {self.name} "
where_conditions = []

if self.name in PARENT_SALESFORCE_OBJECTS:
# add where clause: " WHERE ContentDocumentId IN ('06905000000NMXXXXX', ...)"
parent_field = PARENT_SALESFORCE_OBJECTS[self.name]["field"]
parent_ids = [f"'{parent_record[parent_field]}'" for parent_record in stream_slice["parents"]]
query += f" WHERE ContentDocumentId IN ({','.join(parent_ids)})"
where_conditions.append(f"ContentDocumentId IN ({','.join(parent_ids)})")

if self.stream_filter:
where_conditions.append(f"{self.stream_filter}")

if where_conditions:
query += f" WHERE {' AND '.join(where_conditions)}"

if self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS:
query += f"ORDER BY {self.primary_key} ASC"
Expand All @@ -204,7 +213,6 @@ def request_params(

def chunk_properties(self) -> Iterable[Mapping[str, Any]]:
selected_properties = self.get_json_schema().get("properties", {})

def empty_props_with_pk_if_present():
return {self.primary_key: selected_properties[self.primary_key]} if self.primary_key else {}

Expand Down Expand Up @@ -589,13 +597,21 @@ def request_params(
query = f"SELECT {select_fields} FROM {self.name}"
if next_page_token:
query += next_page_token["next_token"]

where_conditions = []

if self.name in PARENT_SALESFORCE_OBJECTS:
# add where clause: " WHERE ContentDocumentId IN ('06905000000NMXXXXX', '06905000000Mxp7XXX', ...)"
parent_field = PARENT_SALESFORCE_OBJECTS[self.name]["field"]
parent_ids = [f"'{parent_record[parent_field]}'" for parent_record in stream_slice["parents"]]
query += f" WHERE ContentDocumentId IN ({','.join(parent_ids)})"

where_conditions.append(f"ContentDocumentId IN ({','.join(parent_ids)})")

if self.stream_filter:
where_conditions.append(f"{self.stream_filter}")

if where_conditions:
query += f" WHERE {' AND '.join(where_conditions)}"

return {"q": query}

def read_records(
Expand Down Expand Up @@ -696,12 +712,14 @@ class IncrementalRestSalesforceStream(RestSalesforceStream, CheckpointMixin, ABC
state_checkpoint_interval = 500
_slice = None

def __init__(self, replication_key: str, stream_slice_step: str = "P30D", **kwargs):
def __init__(self, replication_key: str, stream_slice_step: str = "P30D", stream_filter: str = "", **kwargs):
super().__init__(**kwargs)
self.replication_key = replication_key
self._stream_slice_step = stream_slice_step
self._stream_slicer_cursor = None
self._state = {}
self.stream_filter = stream_filter


def set_cursor(self, cursor: Cursor) -> None:
self._stream_slicer_cursor = cursor
Expand Down Expand Up @@ -752,6 +770,8 @@ def request_params(
where_conditions.append(f"{self.cursor_field} >= {start_date}")
if end_date:
where_conditions.append(f"{self.cursor_field} < {end_date}")
if self.stream_filter:
where_conditions.append(f"{self.stream_filter}")

where_clause = f"WHERE {' AND '.join(where_conditions)}"
query = f"SELECT {select_fields} FROM {table_name} {where_clause}"
Expand Down Expand Up @@ -798,8 +818,12 @@ def request_params(
table_name = self.name
where_conditions = [f"{self.cursor_field} >= {start_date}", f"{self.cursor_field} < {end_date}"]

if self.stream_filter:
where_conditions.append(f"{self.stream_filter}")

where_clause = f"WHERE {' AND '.join(where_conditions)}"
query = f"SELECT {select_fields} FROM {table_name} {where_clause}"

return {"q": query}


Expand Down
Loading