From 694e8b395f1fb5cd62889069cca132f9f709e763 Mon Sep 17 00:00:00 2001 From: Mathijs Miermans Date: Tue, 27 Feb 2024 20:06:07 -0800 Subject: [PATCH] feat: [MC-681] Implement scheduled_corpus_candidate Snowplow functions chore: move common Snowplow logic to content-common chore: use shared Snowplow code in prospect-api and curated-corpus-api fix: Snowplow retries on error chore: move Snowplow Micro to integration test file fix: lint error snowplowHttpProtocol unused feat: emit Snowplow events on error remove duplicate test coverage for image error fix snowplow tests fix ECONNREFUSED to Snowplow Micro in CI tests Revert "fix ECONNREFUSED to Snowplow Micro in CI tests" This reverts commit 470df1355fc914d1cdbbb9e3cd3f271221892a66. fix: move Snowplow Micro tests to integrations add test for Snowplow event on success --- lambdas/corpus-scheduler-lambda/src/config.ts | 11 ++ .../src/events/snowplow.integration.ts | 56 +++++++++ .../src/events/snowplow.ts | 106 ++++++++++++++++++ .../src/events/testHelpers.ts | 16 +++ .../src/events/types.ts | 27 +++++ .../{index.spec.ts => index.integration.ts} | 75 ++++++++----- lambdas/corpus-scheduler-lambda/src/index.ts | 3 - .../src/testHelpers.ts | 29 ++--- lambdas/corpus-scheduler-lambda/src/types.ts | 23 +++- .../src/utils.integration.ts | 97 ++++++++++++++++ .../corpus-scheduler-lambda/src/utils.spec.ts | 54 ++++----- lambdas/corpus-scheduler-lambda/src/utils.ts | 73 ++++++++++-- .../src/validation.spec.ts | 52 +++------ .../content-common/events/snowplow/config.ts | 23 ++++ .../events/snowplow/index.integration.ts | 32 ++++++ .../events/snowplow/index.spec.ts | 54 +++++++++ .../content-common/events/snowplow/index.ts | 57 ++++++++++ .../events/snowplow/test-helpers.ts | 98 ++++++++++++++++ packages/content-common/package.json | 11 +- pnpm-lock.yaml | 31 ++--- .../curated-corpus-api/src/config/index.ts | 9 -- ...ReviewedItemSnowplowHandler.integration.ts | 26 ++--- ...cheduledItemSnowplowHandler.integration.ts | 16 +-- .../src/events/snowplow/tracker.ts | 23 +--- .../src/test/helpers/snowplow.ts | 37 +----- servers/prospect-api/package.json | 1 - .../src/events/snowplow-test-helpers.ts | 62 ---------- .../src/events/snowplow.integration.ts | 72 ++++++------ servers/prospect-api/src/events/snowplow.ts | 61 +--------- servers/prospect-api/src/resolvers.ts | 8 +- 30 files changed, 839 insertions(+), 404 deletions(-) create mode 100644 lambdas/corpus-scheduler-lambda/src/events/snowplow.integration.ts create mode 100644 lambdas/corpus-scheduler-lambda/src/events/snowplow.ts create mode 100644 lambdas/corpus-scheduler-lambda/src/events/testHelpers.ts create mode 100644 lambdas/corpus-scheduler-lambda/src/events/types.ts rename lambdas/corpus-scheduler-lambda/src/{index.spec.ts => index.integration.ts} (65%) create mode 100644 lambdas/corpus-scheduler-lambda/src/utils.integration.ts create mode 100644 packages/content-common/events/snowplow/config.ts create mode 100644 packages/content-common/events/snowplow/index.integration.ts create mode 100644 packages/content-common/events/snowplow/index.spec.ts create mode 100644 packages/content-common/events/snowplow/index.ts create mode 100644 packages/content-common/events/snowplow/test-helpers.ts delete mode 100644 servers/prospect-api/src/events/snowplow-test-helpers.ts diff --git a/lambdas/corpus-scheduler-lambda/src/config.ts b/lambdas/corpus-scheduler-lambda/src/config.ts index 78afad53..f7244a46 100644 --- a/lambdas/corpus-scheduler-lambda/src/config.ts +++ b/lambdas/corpus-scheduler-lambda/src/config.ts @@ -26,6 +26,17 @@ const config = { userId: 'ML', groups: ['mozilliansorg_pocket_scheduled_surface_curator_full'], }, + snowplow: { + // TODO: Set dev value + appId: 'corpus-scheduler-lambda', + schemas: { + // published 2024-02-28 + scheduled_corpus_candidate: + 'iglu:com.pocket/scheduled_corpus_candidate/jsonschema/1-0-2', + // published 2024-02-28 + objectUpdate: 'iglu:com.pocket/object_update/jsonschema/1-0-17', + }, + }, }; export default config; diff --git a/lambdas/corpus-scheduler-lambda/src/events/snowplow.integration.ts b/lambdas/corpus-scheduler-lambda/src/events/snowplow.integration.ts new file mode 100644 index 00000000..daf9fcd4 --- /dev/null +++ b/lambdas/corpus-scheduler-lambda/src/events/snowplow.integration.ts @@ -0,0 +1,56 @@ +import { + resetSnowplowEvents, + waitForSnowplowEvents, +} from 'content-common/events/snowplow/test-helpers'; +import { getEmitter, getTracker } from 'content-common/events/snowplow'; +import { queueSnowplowEvent } from './snowplow'; +import { + SnowplowScheduledCorpusCandidateErrorName, + SnowplowScheduledCorpusCandidate, +} from './types'; +import { random } from 'typia'; +import config from '../config'; + +describe('snowplow', () => { + const mockCandidate = { + ...random(), + scheduled_corpus_item_external_id: '05706565-5a9c-4b57-83e5-a426485a4714', + approved_corpus_item_external_id: 'c43a2aa5-28de-4828-a20e-1fdf60cc4a80', + }; + const emitter = getEmitter(); + const tracker = getTracker(emitter, config.snowplow.appId); + + beforeEach(async () => { + await resetSnowplowEvents(); + }); + + it('should accept an event with a scheduled corpus candidate', async () => { + queueSnowplowEvent(tracker, mockCandidate); + + const allEvents = await waitForSnowplowEvents(); + + expect(allEvents.total).toEqual(1); + expect(allEvents.bad).toEqual(0); + }); + + describe('error events', () => { + Object.values(SnowplowScheduledCorpusCandidateErrorName).forEach( + (errorName) => { + it(`should emit events with ${errorName} error`, async () => { + queueSnowplowEvent(tracker, { + ...mockCandidate, + scheduled_corpus_item_external_id: undefined, + approved_corpus_item_external_id: undefined, + error_name: errorName, + error_description: `Oh no! A ${errorName} error occurred.`, + }); + + const allEvents = await waitForSnowplowEvents(); + + expect(allEvents.total).toEqual(1); + expect(allEvents.bad).toEqual(0); + }); + }, + ); + }); +}); diff --git a/lambdas/corpus-scheduler-lambda/src/events/snowplow.ts b/lambdas/corpus-scheduler-lambda/src/events/snowplow.ts new file mode 100644 index 00000000..904e609b --- /dev/null +++ b/lambdas/corpus-scheduler-lambda/src/events/snowplow.ts @@ -0,0 +1,106 @@ +import { + buildSelfDescribingEvent, + Tracker, + SelfDescribingEvent, + SelfDescribingJson, +} from '@snowplow/node-tracker'; + +import config from '../config'; +import { + SnowplowScheduledCorpusCandidateErrorName, + SnowplowScheduledCorpusCandidate, +} from './types'; +import { ScheduledCandidate } from '../types'; + +/** + * creates an object_update Snowplow event for scheduled_corpus_candidate + * + * @returns SelfDescribingEvent + */ +export const generateEvent = (): SelfDescribingEvent => { + return { + event: { + schema: config.snowplow.schemas.objectUpdate, + data: { + trigger: 'scheduled_corpus_candidate_generated', + object: 'scheduled_corpus_candidate', + }, + }, + }; +}; + +/** + * generates a scheduled_corpus_candidate entity being sent to snowplow + * + * @param scheduledCorpusCandidate PocketAnalyticsArticle + * @returns SelfDescribingJson + */ +export const generateContext = ( + scheduledCorpusCandidate: SnowplowScheduledCorpusCandidate, +): SelfDescribingJson => { + return { + schema: config.snowplow.schemas.scheduled_corpus_candidate, + data: scheduledCorpusCandidate, + }; +}; + +/** + * + * @param candidate ML candidate + * @param errorName Snowplow structured error + * @param errorDescription Longer human-readable description of the error + */ +export const generateSnowplowErrorEntity = ( + candidate: ScheduledCandidate, + errorName: SnowplowScheduledCorpusCandidateErrorName, + errorDescription: string, +): SnowplowScheduledCorpusCandidate => { + return { + scheduled_corpus_candidate_id: candidate.scheduled_corpus_candidate_id, + candidate_url: candidate.scheduled_corpus_item.url, + features: candidate.features, + run_details: candidate.run_details, + error_name: errorName, + error_description: errorDescription, + }; +}; + +/** + * + * @param candidate ML candidate + * @param approvedCorpusItemId Identifier the item added to the corpus + */ +export const generateSnowplowSuccessEntity = ( + candidate: ScheduledCandidate, + approvedCorpusItemId: string, +): SnowplowScheduledCorpusCandidate => { + return { + scheduled_corpus_candidate_id: candidate.scheduled_corpus_candidate_id, + candidate_url: candidate.scheduled_corpus_item.url, + approved_corpus_item_external_id: approvedCorpusItemId, + features: candidate.features, + run_details: candidate.run_details, + // TODO: set scheduled_corpus_item_external_id + }; +}; + +/** + * main entry point to snowplow. queues up an event to send. + * + * (elsewhere, we tell snowplow to send all queued events.) + * + * @param tracker TrackerInterface + * @param entity Entity representing the result of trying to schedule a candidate. + */ +export const queueSnowplowEvent = ( + tracker: Tracker, + entity: SnowplowScheduledCorpusCandidate, +) => { + const event = generateEvent(); + const contexts: SelfDescribingJson[] = [generateContext(entity)]; + + // reminder - this method is not async and does not directly initiate + // any http request. it sends the event to a queue internal to the + // snowplow module, which has its own logic on when to flush the queue. + tracker.track(buildSelfDescribingEvent(event), contexts); +}; diff --git a/lambdas/corpus-scheduler-lambda/src/events/testHelpers.ts b/lambdas/corpus-scheduler-lambda/src/events/testHelpers.ts new file mode 100644 index 00000000..3b4f00fe --- /dev/null +++ b/lambdas/corpus-scheduler-lambda/src/events/testHelpers.ts @@ -0,0 +1,16 @@ +import { SnowplowScheduledCorpusCandidate } from './types'; +import { + getGoodSnowplowEvents, + parseSnowplowData, +} from 'content-common/events/snowplow/test-helpers'; + +/** + * @return scheduled_corpus_candidate entity from the last good event sent to Snowplow Micro. + */ +export async function extractScheduledCandidateEntity(): Promise { + const goodEvents = await getGoodSnowplowEvents(); + const snowplowContext = parseSnowplowData( + goodEvents[0].rawEvent.parameters.cx, + ); + return snowplowContext.data[0].data as SnowplowScheduledCorpusCandidate; +} diff --git a/lambdas/corpus-scheduler-lambda/src/events/types.ts b/lambdas/corpus-scheduler-lambda/src/events/types.ts new file mode 100644 index 00000000..643c170c --- /dev/null +++ b/lambdas/corpus-scheduler-lambda/src/events/types.ts @@ -0,0 +1,27 @@ +import { + ScheduledCorpusCandidateFeatures, + ScheduledCorpusCandidateRunDetails, +} from '../types'; + +// scheduled_corpus_candidate entity +export type SnowplowScheduledCorpusCandidate = { + scheduled_corpus_candidate_id: string; + scheduled_corpus_item_external_id?: string; + approved_corpus_item_external_id?: string; + candidate_url: string; + error_name?: SnowplowScheduledCorpusCandidateErrorName; + error_description?: string; + features: ScheduledCorpusCandidateFeatures; + run_details: ScheduledCorpusCandidateRunDetails; +}; + +export enum SnowplowScheduledCorpusCandidateErrorName { + ALREADY_SCHEDULED = 'ALREADY_SCHEDULED', + /** TODO: [MC-737] Add validation on scheduled date. */ + INSUFFICIENT_TIME_BEFORE_SCHEDULED_DATE = 'INSUFFICIENT_TIME_BEFORE_SCHEDULED_DATE', + /** TODO: [MC-666] Add safeguard to prevent scheduling from unverified domains. */ + DOMAIN_NOT_ALLOWED_FOR_AUTO_SCHEDULING = 'DOMAIN_NOT_ALLOWED_FOR_AUTO_SCHEDULING', + MISSING_EXCERPT = 'MISSING_EXCERPT', + MISSING_TITLE = 'MISSING_TITLE', + MISSING_IMAGE = 'MISSING_IMAGE', +} diff --git a/lambdas/corpus-scheduler-lambda/src/index.spec.ts b/lambdas/corpus-scheduler-lambda/src/index.integration.ts similarity index 65% rename from lambdas/corpus-scheduler-lambda/src/index.spec.ts rename to lambdas/corpus-scheduler-lambda/src/index.integration.ts index f45ec094..77c37406 100644 --- a/lambdas/corpus-scheduler-lambda/src/index.spec.ts +++ b/lambdas/corpus-scheduler-lambda/src/index.integration.ts @@ -13,20 +13,30 @@ import { CuratedStatus, Topics, } from 'content-common'; +import { + resetSnowplowEvents, + waitForSnowplowEvents, +} from 'content-common/events/snowplow/test-helpers'; +import { extractScheduledCandidateEntity } from './events/testHelpers'; describe('corpus scheduler lambda', () => { const server = setupServer(); - const scheduledCandidate = createScheduledCandidate( - 'Fake title', - 'fake excerpt', - 'https://fake-image-url.com', - CorpusLanguage.EN, - ['Fake Author'], - 'https://fake-url.com', - ); + const scheduledCandidate = createScheduledCandidate({ + title: 'Fake title', + excerpt: 'fake excerpt', + image_url: 'https://fake-image-url.com', + language: CorpusLanguage.EN, + authors: ['Fake Author'], + url: 'https://fake-url.com', + }); const record = createScheduledCandidates([scheduledCandidate]); + const fakeEvent = { + Records: [{ messageId: '1', body: JSON.stringify(record) }], + } as unknown as SQSEvent; + const sqsContext = null as unknown as Context; + const sqsCallback = null as unknown as Callback; const getUrlMetadataBody = { data: { @@ -87,7 +97,7 @@ describe('corpus scheduler lambda', () => { ); }; - beforeAll(() => server.listen()); + beforeAll(() => server.listen({ onUnhandledRequest: 'bypass' })); afterEach(() => server.resetHandlers()); afterAll(() => server.close()); @@ -102,6 +112,10 @@ describe('corpus scheduler lambda', () => { jest.clearAllMocks(); }); + beforeEach(async () => { + await resetSnowplowEvents(); + }); + it('returns batch item failure if curated-corpus-api has error, with partial success', async () => { mockGetUrlMetadata(); // Note: msw uses handlers in reverse order, so 2nd request will error, and 1st will succeed. @@ -128,15 +142,7 @@ describe('corpus scheduler lambda', () => { mockGetUrlMetadata(); mockCreateApprovedCorpusItemOnce({ data: null }); - const fakeEvent = { - Records: [{ messageId: '1', body: JSON.stringify(record) }], - } as unknown as SQSEvent; - - const actual = await processor( - fakeEvent, - null as unknown as Context, - null as unknown as Callback, - ); + const actual = await processor(fakeEvent, sqsContext, sqsCallback); expect(actual).toEqual({ batchItemFailures: [{ itemIdentifier: '1' }] }); }, 7000); @@ -145,16 +151,31 @@ describe('corpus scheduler lambda', () => { mockGetUrlMetadata(); mockCreateApprovedCorpusItemOnce(); - const fakeEvent = { - Records: [{ messageId: '1', body: JSON.stringify(record) }], - } as unknown as SQSEvent; - - const actual = await processor( - fakeEvent, - null as unknown as Context, - null as unknown as Callback, - ); + const actual = await processor(fakeEvent, sqsContext, sqsCallback); expect(actual).toEqual({ batchItemFailures: [] }); }); + + it('emits a Snowplow event if candidate is successfully processed', async () => { + mockGetUrlMetadata(); + mockCreateApprovedCorpusItemOnce(createApprovedCorpusItemBody); + + await processor(fakeEvent, sqsContext, sqsCallback); + + // Exactly one Snowplow event should be emitted. + const allEvents = await waitForSnowplowEvents(); + expect(allEvents.bad).toEqual(0); + expect(allEvents.good).toEqual(record.candidates.length); + + // Check that the right Snowplow entity that was included with the event. + const snowplowEntity = await extractScheduledCandidateEntity(); + expect(snowplowEntity.approved_corpus_item_external_id).toEqual( + createApprovedCorpusItemBody.data.createApprovedCorpusItem.externalId, + ); + expect(snowplowEntity.scheduled_corpus_candidate_id).toEqual( + record.candidates[0].scheduled_corpus_candidate_id, + ); + expect(snowplowEntity.error_name).toBeUndefined(); + expect(snowplowEntity.error_description).toBeUndefined(); + }); }); diff --git a/lambdas/corpus-scheduler-lambda/src/index.ts b/lambdas/corpus-scheduler-lambda/src/index.ts index 28ebc5a7..8126fe86 100644 --- a/lambdas/corpus-scheduler-lambda/src/index.ts +++ b/lambdas/corpus-scheduler-lambda/src/index.ts @@ -15,9 +15,6 @@ Sentry.AWSLambda.init({ serverName: config.app.name, }); -// temp log statements -console.log('corpus scheduler lambda'); - /** * @param event data from an SQS message - should be an array of items to create / schedule in corpus * @returns SQSBatchResponse (all failed records) diff --git a/lambdas/corpus-scheduler-lambda/src/testHelpers.ts b/lambdas/corpus-scheduler-lambda/src/testHelpers.ts index d36715d7..4529c1f4 100644 --- a/lambdas/corpus-scheduler-lambda/src/testHelpers.ts +++ b/lambdas/corpus-scheduler-lambda/src/testHelpers.ts @@ -1,4 +1,4 @@ -import { ScheduledCandidate, ScheduledCandidates } from './types'; +import { ScheduledCandidate, ScheduledCandidates, ScheduledCorpusItem } from './types'; import { CorpusItemSource, CorpusLanguage, @@ -17,30 +17,25 @@ export const createScheduledCandidates = ( }; }; export const createScheduledCandidate = ( - title?: string, - excerpt?: string, - imageUrl?: string, - language?: CorpusLanguage, - authors?: string[], - url?: string, - source?: CorpusItemSource.ML, + scheduledCorpusItemOverrides: Partial = {}, ): ScheduledCandidate => { return { scheduled_corpus_candidate_id: 'a4b5d99c-4c1b-4d35-bccf-6455c8df07b0', scheduled_corpus_item: { - url: - url || - 'https://www.politico.com/news/magazine/2024/02/26/former-boeing-employee-speaks-out-00142948', + url: 'https://www.politico.com/news/magazine/2024/02/26/former-boeing-employee-speaks-out-00142948', status: CuratedStatus.RECOMMENDATION, - source: source || CorpusItemSource.ML, + source: CorpusItemSource.ML, topic: Topics.SELF_IMPROVEMENT, scheduled_date: '2024-02-22', scheduled_surface_guid: 'NEW_TAB_EN_US', - title: title, - excerpt: excerpt, - language: language, - image_url: imageUrl, - authors: authors, + title: + 'Romantic norms are in flux. No wonder everyone’s obsessed with polyamory.', + excerpt: + 'In the conversation about open marriages and polyamory, America’s sexual anxieties are on full display.', + language: CorpusLanguage.EN, + image_url: 'https://fake-image-url.com', + authors: ['Rebecca Jennings'], + ...scheduledCorpusItemOverrides, }, features: { domain_prob: 0.7829, diff --git a/lambdas/corpus-scheduler-lambda/src/types.ts b/lambdas/corpus-scheduler-lambda/src/types.ts index a2840a2e..17678694 100644 --- a/lambdas/corpus-scheduler-lambda/src/types.ts +++ b/lambdas/corpus-scheduler-lambda/src/types.ts @@ -1,9 +1,10 @@ import { - CuratedStatus, CorpusItemSource, CorpusLanguage, + CuratedStatus, Topics, } from 'content-common'; +import { tags } from 'typia'; export interface ScheduledCandidates { candidates: ScheduledCandidate[]; @@ -11,11 +12,11 @@ export interface ScheduledCandidates { export interface ScheduledCandidate { scheduled_corpus_candidate_id: string; scheduled_corpus_item: ScheduledCorpusItem; - features: { [key: string]: string | number }; // ML controls which features are sent - run_details: { [key: string]: string | number }; // ML controls which run debug info is sent + features: ScheduledCorpusCandidateFeatures; + run_details: ScheduledCorpusCandidateRunDetails; } -interface ScheduledCorpusItem { +export interface ScheduledCorpusItem { url: string; status: CuratedStatus; // TODO: set source to CorpusItemSource.ML once ML source is added @@ -32,3 +33,17 @@ interface ScheduledCorpusItem { // TODO: add allowed surfaces here to schedule to production export const allowedScheduledSurfaces: string[] = []; + +export type ScheduledCorpusCandidateFeatures = { + rank: number & tags.Type<'int64'>; // rank is integer in Snowplow schema + score: number; + data_source: string; + ml_version: string; + [key: string]: string | number; // ML controls which additional features are sent +}; + +export type ScheduledCorpusCandidateRunDetails = { + flow_name: string; + run_id: string; + [key: string]: any; // ML controls which additional run debug info is sent +}; diff --git a/lambdas/corpus-scheduler-lambda/src/utils.integration.ts b/lambdas/corpus-scheduler-lambda/src/utils.integration.ts new file mode 100644 index 00000000..9045195f --- /dev/null +++ b/lambdas/corpus-scheduler-lambda/src/utils.integration.ts @@ -0,0 +1,97 @@ +import { mapScheduledCandidateInputToCreateApprovedItemInput } from './utils'; +import { UrlMetadata } from 'content-common/types'; +import { createScheduledCandidate, parserItem } from './testHelpers'; +import { SnowplowScheduledCorpusCandidateErrorName } from './events/types'; +import { + resetSnowplowEvents, + waitForSnowplowEvents, +} from 'content-common/events/snowplow/test-helpers'; +import { extractScheduledCandidateEntity } from './events/testHelpers'; + +describe('utils integrations', function () { + beforeEach(async () => { + await resetSnowplowEvents(); + }); + + describe('processAndScheduleCandidate', () => { + describe('happy path', () => { + it(`should emit a Snowplow event when a candidate is scheduled`, async () => { + // Create a ScheduledCandidate with + const candidate: any = createScheduledCandidate(); + + await mapScheduledCandidateInputToCreateApprovedItemInput( + candidate, + parserItem, + ); + + const allEvents = await waitForSnowplowEvents(); + expect(allEvents).toEqual({ total: 1, good: 1, bad: 0 }); + + // Check that the right error was emitted. + const snowplowEntity = await extractScheduledCandidateEntity(); + expect(snowplowEntity.error_description).toBeUndefined(); + }); + }); + }); + + describe('mapScheduledCandidateInputToCreateApprovedItemInput', () => { + describe('error handling', () => { + interface MetadataErrorTestCase { + candidateKey: string; + parserKey: string; + expectedSnowplowError: SnowplowScheduledCorpusCandidateErrorName; + } + + const metadataErrorTestCases: MetadataErrorTestCase[] = [ + { + candidateKey: 'title', + parserKey: 'title', + expectedSnowplowError: + SnowplowScheduledCorpusCandidateErrorName.MISSING_TITLE, + }, + { + candidateKey: 'excerpt', + parserKey: 'excerpt', + expectedSnowplowError: + SnowplowScheduledCorpusCandidateErrorName.MISSING_EXCERPT, + }, + { + candidateKey: 'image_url', + parserKey: 'imageUrl', + expectedSnowplowError: + SnowplowScheduledCorpusCandidateErrorName.MISSING_IMAGE, + }, + ]; + + metadataErrorTestCases.forEach( + ({ candidateKey, parserKey, expectedSnowplowError }) => { + it(`should emit a Snowplow event when ${candidateKey} is missing with error_name=${expectedSnowplowError}`, async () => { + // Create a ScheduledCandidate with + const incompleteCandidate: any = createScheduledCandidate(); + incompleteCandidate.scheduled_corpus_item[candidateKey] = undefined; + + const incompleteParserItem: UrlMetadata = { + ...parserItem, + [parserKey]: undefined, + }; + + await expect( + mapScheduledCandidateInputToCreateApprovedItemInput( + incompleteCandidate, + incompleteParserItem, + ), + ).rejects.toThrow(Error); + + const allEvents = await waitForSnowplowEvents(); + expect(allEvents).toEqual({ total: 1, good: 1, bad: 0 }); + + // Check that the right error was emitted. + const snowplowEntity = await extractScheduledCandidateEntity(); + expect(snowplowEntity.error_name).toEqual(expectedSnowplowError); + expect(snowplowEntity.error_description).toBeTruthy(); + }); + }, + ); + }); + }); +}); diff --git a/lambdas/corpus-scheduler-lambda/src/utils.spec.ts b/lambdas/corpus-scheduler-lambda/src/utils.spec.ts index b1bf8004..40b748a6 100644 --- a/lambdas/corpus-scheduler-lambda/src/utils.spec.ts +++ b/lambdas/corpus-scheduler-lambda/src/utils.spec.ts @@ -10,7 +10,7 @@ import { GetSecretValueCommand, SecretsManagerClient, } from '@aws-sdk/client-secrets-manager'; -import { ApprovedItemAuthor, CorpusLanguage } from 'content-common'; +import { ApprovedItemAuthor } from 'content-common'; import { createScheduledCandidate, expectedOutput, @@ -118,13 +118,7 @@ describe('utils', function () { }); describe('mapScheduledCandidateInputToCreateApprovedItemInput', () => { it('should map correctly a ScheduledCandidate to CreateApprovedItemInput', async () => { - const scheduledCandidate = createScheduledCandidate( - 'Romantic norms are in flux. No wonder everyone’s obsessed with polyamory.', - 'In the conversation about open marriages and polyamory, America’s sexual anxieties are on full display.', - 'https://fake-image-url.com', - CorpusLanguage.EN, - ['Rebecca Jennings'], - ); + const scheduledCandidate = createScheduledCandidate(); const output = await mapScheduledCandidateInputToCreateApprovedItemInput( scheduledCandidate, parserItem, @@ -133,14 +127,14 @@ describe('utils', function () { }); it('should map correctly a ScheduledCandidate to CreateApprovedItemInput & fallback on Parser fields for undefined optional ScheduledCandidate fields', async () => { // all optional fields are undefined and should be taken from the Parser - const scheduledCandidate = createScheduledCandidate(); - expect(scheduledCandidate.scheduled_corpus_item.title).toBeUndefined(); - expect(scheduledCandidate.scheduled_corpus_item.excerpt).toBeUndefined(); - expect(scheduledCandidate.scheduled_corpus_item.language).toBeUndefined(); - expect(scheduledCandidate.scheduled_corpus_item.authors).toBeUndefined(); - expect( - scheduledCandidate.scheduled_corpus_item.image_url, - ).toBeUndefined(); + const scheduledCandidate = createScheduledCandidate({ + title: undefined, + excerpt: undefined, + language: undefined, + authors: undefined, + image_url: undefined, + }); + const output = await mapScheduledCandidateInputToCreateApprovedItemInput( scheduledCandidate, parserItem, @@ -148,29 +142,23 @@ describe('utils', function () { expect(output).toEqual(expectedOutput); }); it('should throw Error on CreateApprovedItemInput if field types are wrong', async () => { - const scheduledCandidate = createScheduledCandidate( - 'Romantic norms are in flux. No wonder everyone’s obsessed with polyamory.', - 'In the conversation about open marriages and polyamory, America’s sexual anxieties are on full display.', - 'https://fake-image-url.com', - CorpusLanguage.EN, - ['Rebecca Jennings'], - ); - parserItem.publisher = 1 as unknown as string; // force publisher to be the wrong type to trigger an Error + const scheduledCandidate = createScheduledCandidate(); + + const invalidParserItem: any = { + ...parserItem, + publisher: 1, + }; await expect( mapScheduledCandidateInputToCreateApprovedItemInput( scheduledCandidate, - parserItem, - ), - ).rejects.toThrow(Error); - await expect( - mapScheduledCandidateInputToCreateApprovedItemInput( - scheduledCandidate, - parserItem, + invalidParserItem, ), ).rejects.toThrow( - `failed to map a4b5d99c-4c1b-4d35-bccf-6455c8df07b0 to CreateApprovedItemInput. ` + - `Reason: Error: Error on typia.assert(): invalid type on $input.publisher, expect to be string`, + new Error( + `failed to map a4b5d99c-4c1b-4d35-bccf-6455c8df07b0 to CreateApprovedItemInput. ` + + `Reason: Error: Error on typia.assert(): invalid type on $input.publisher, expect to be string`, + ), ); }); }); diff --git a/lambdas/corpus-scheduler-lambda/src/utils.ts b/lambdas/corpus-scheduler-lambda/src/utils.ts index 4e80b0a3..e03d5dea 100644 --- a/lambdas/corpus-scheduler-lambda/src/utils.ts +++ b/lambdas/corpus-scheduler-lambda/src/utils.ts @@ -2,27 +2,33 @@ import { GetSecretValueCommand, SecretsManagerClient, } from '@aws-sdk/client-secrets-manager'; -// eslint-disable-next-line @typescript-eslint/no-var-requires -const jwt = require('jsonwebtoken'); -// eslint-disable-next-line @typescript-eslint/no-var-requires -const jwkToPem = require('jwk-to-pem'); import config from './config'; import { validateCandidate } from './validation'; import { ApprovedItemAuthor, - CreateApprovedItemInput, CorpusLanguage, - UrlMetadata, + CreateApprovedItemInput, ScheduledItemSource, + UrlMetadata, } from 'content-common'; import { allowedScheduledSurfaces, ScheduledCandidate, ScheduledCandidates, } from './types'; -import { assert } from 'typia'; +import { assert, TypeGuardError } from 'typia'; import { SQSRecord } from 'aws-lambda'; import { createApprovedCorpusItem, fetchUrlMetadata } from './graphQlApiCalls'; +import { + generateSnowplowErrorEntity, generateSnowplowSuccessEntity, + queueSnowplowEvent, +} from './events/snowplow'; +import { getEmitter, getTracker } from 'content-common/events/snowplow'; +import { SnowplowScheduledCorpusCandidateErrorName } from './events/types'; +// eslint-disable-next-line @typescript-eslint/no-var-requires +const jwt = require('jsonwebtoken'); +// eslint-disable-next-line @typescript-eslint/no-var-requires +const jwkToPem = require('jwk-to-pem'); // Secrets Manager Client const smClient = new SecretsManagerClient({ region: config.aws.region }); @@ -100,6 +106,43 @@ export const mapAuthorToApprovedItemAuthor = ( }); }; +/** + * @param e Error raised by Typia assert + * @return Snowplow error corresponding to e if one exists, otherwise undefined. + */ +function mapApprovedItemInputTypiaErrorToSnowplowError( + e: TypeGuardError, +): SnowplowScheduledCorpusCandidateErrorName | undefined { + switch (e.path) { + case '$input.imageUrl': + return SnowplowScheduledCorpusCandidateErrorName.MISSING_IMAGE; + case '$input.title': + return SnowplowScheduledCorpusCandidateErrorName.MISSING_TITLE; + case '$input.excerpt': + return SnowplowScheduledCorpusCandidateErrorName.MISSING_EXCERPT; + } +} + +/** + * + * @param e Error raised by Typia assert + * @param candidate + */ +function handleApprovedItemInputTypiaError( + e: TypeGuardError, + candidate: ScheduledCandidate, +) { + const snowplowError = mapApprovedItemInputTypiaErrorToSnowplowError(e); + if (snowplowError) { + const emitter = getEmitter(); + const tracker = getTracker(emitter, config.snowplow.appId); + queueSnowplowEvent( + tracker, + generateSnowplowErrorEntity(candidate, snowplowError, e.message), + ); + } +} + /** * Creates a scheduled item to send to createApprovedCorpusItem mutation * @param candidate ScheduledCandidate received from Metaflow @@ -168,6 +211,10 @@ export const mapScheduledCandidateInputToCreateApprovedItemInput = async ( assert(itemToSchedule); return itemToSchedule; } catch (e) { + if (e instanceof TypeGuardError) { + handleApprovedItemInputTypiaError(e, candidate); + } + throw new Error( `failed to map ${candidate.scheduled_corpus_candidate_id} to CreateApprovedItemInput. Reason: ${e}`, ); @@ -214,8 +261,18 @@ export const processAndScheduleCandidate = async ( const createdItem = await createApprovedCorpusItem( createApprovedItemInput, ); + const approvedCorpusItemId = + createdItem.data.createApprovedCorpusItem.externalId; + + const emitter = getEmitter(); + const tracker = getTracker(emitter, config.snowplow.appId); + queueSnowplowEvent( + tracker, + generateSnowplowSuccessEntity(candidate, approvedCorpusItemId), + ); + console.log( - `CreateApprovedCorpusItem MUTATION OUTPUT: externalId: ${createdItem.data.createApprovedCorpusItem.externalId}, url: ${createdItem.data.createApprovedCorpusItem.url}, title: ${createdItem.data.createApprovedCorpusItem.title}`, + `CreateApprovedCorpusItem MUTATION OUTPUT: externalId: ${approvedCorpusItemId}, url: ${createdItem.data.createApprovedCorpusItem.url}, title: ${createdItem.data.createApprovedCorpusItem.title}`, ); } else { console.log( diff --git a/lambdas/corpus-scheduler-lambda/src/validation.spec.ts b/lambdas/corpus-scheduler-lambda/src/validation.spec.ts index 7d9e22b7..0b52bc2b 100644 --- a/lambdas/corpus-scheduler-lambda/src/validation.spec.ts +++ b/lambdas/corpus-scheduler-lambda/src/validation.spec.ts @@ -21,57 +21,33 @@ describe('validation', function () { }); describe('validateCandidate', () => { it('should throw Error on ScheduleCandidate if source is not ML', async () => { - const badScheduledCandidate = createScheduledCandidate( - 'Romantic norms are in flux. No wonder everyone’s obsessed with polyamory.', - 'In the conversation about open marriages and polyamory, America’s sexual anxieties are on full display.', - 'https://fake-image-url.com', - CorpusLanguage.EN, - ['Rebecca Jennings'], - undefined, - CorpusItemSource.MANUAL as CorpusItemSource.ML, - ); + const badScheduledCandidate = createScheduledCandidate(); + badScheduledCandidate.scheduled_corpus_item.source = + CorpusItemSource.MANUAL as CorpusItemSource.ML; await expect(validateCandidate(badScheduledCandidate)).rejects.toThrow( - Error, - ); - await expect(validateCandidate(badScheduledCandidate)).rejects.toThrow( - 'Error on typia.assert(): invalid type on $input.scheduled_corpus_item.source, expect to be "ML"', + new Error( + 'Error on typia.assert(): invalid type on $input.scheduled_corpus_item.source, expect to be "ML"', + ), ); }); it('should throw Error on ScheduleCandidate if types are wrong (language)', async () => { - const badScheduledCandidate = createScheduledCandidate( - 'Romantic norms are in flux. No wonder everyone’s obsessed with polyamory.', - 'In the conversation about open marriages and polyamory, America’s sexual anxieties are on full display.', - 'https://fake-image-url.com', - 'en' as CorpusLanguage, - ['Rebecca Jennings'], - undefined, - CorpusItemSource.ML, - ); + const badScheduledCandidate = createScheduledCandidate(); + badScheduledCandidate.scheduled_corpus_item.language = + 'en' as CorpusLanguage; // should throw error await expect(validateCandidate(badScheduledCandidate)).rejects.toThrow( - Error, - ); - await expect(validateCandidate(badScheduledCandidate)).rejects.toThrow( - 'Error on typia.assert(): invalid type on $input.scheduled_corpus_item.language, expect to be ("DE" | "EN" | "ES" | "FR" | "IT" | undefined)', + new Error( + 'Error on typia.assert(): invalid type on $input.scheduled_corpus_item.language, expect to be ("DE" | "EN" | "ES" | "FR" | "IT" | undefined)', + ), ); }); it('should not throw Error on ScheduleCandidate if it validates', async () => { - const scheduledCandidate = createScheduledCandidate( - 'Romantic norms are in flux. No wonder everyone’s obsessed with polyamory.', - 'In the conversation about open marriages and polyamory, America’s sexual anxieties are on full display.', - 'https://fake-image-url.com', - CorpusLanguage.EN, - ['Rebecca Jennings'], - undefined, - CorpusItemSource.ML, - ); + const scheduledCandidate = createScheduledCandidate(); // should not throw error - await expect( - validateCandidate(scheduledCandidate), - ).resolves.not.toThrowError(); + await validateCandidate(scheduledCandidate); }); }); }); diff --git a/packages/content-common/events/snowplow/config.ts b/packages/content-common/events/snowplow/config.ts new file mode 100644 index 00000000..2e9f24f4 --- /dev/null +++ b/packages/content-common/events/snowplow/config.ts @@ -0,0 +1,23 @@ +import { RequiredRetryOptions } from 'got'; + +const environment = process.env.ENVIRONMENT || 'development'; +const snowplowEndpoint = process.env.SNOWPLOW_ENDPOINT || 'localhost:9090'; + +// Snowplow uses Got. By default, Got does not retry POST, so we need to enable this explicitly. +// https://github.com/sindresorhus/got/blob/main/documentation/7-retry.md#methods +const retries: Partial = { + limit: 3, + methods: ['GET', 'POST'], +}; + +const config = { + snowplow: { + endpoint: snowplowEndpoint, + httpProtocol: environment === 'production' ? 'https' : 'http', + bufferSize: 1, + retries, + namespace: 'content-engineering', + }, +}; + +export default config; diff --git a/packages/content-common/events/snowplow/index.integration.ts b/packages/content-common/events/snowplow/index.integration.ts new file mode 100644 index 00000000..32a35d2e --- /dev/null +++ b/packages/content-common/events/snowplow/index.integration.ts @@ -0,0 +1,32 @@ +import { getEmitter, getTracker } from './index'; +import { + generateObjectUpdateEvent, + resetSnowplowEvents, + waitForSnowplowEvents, +} from './test-helpers'; + +describe('Snowplow Tracker integration', () => { + const emitter = getEmitter(); + const tracker = getTracker(emitter, 'test-app-id'); + + beforeEach(async () => { + await resetSnowplowEvents(); + }); + + it('should accept a valid object_update event', async () => { + tracker.track(generateObjectUpdateEvent()); + + const allEvents = await waitForSnowplowEvents(); + + expect(allEvents).toEqual({ total: 1, bad: 0, good: 1 }); + }); + + it('should not accept a invalid object_update event', async () => { + // 'object' is a required property for the object_update event. + tracker.track(generateObjectUpdateEvent({ object: undefined })); + + const allEvents = await waitForSnowplowEvents(); + + expect(allEvents).toEqual({ total: 1, bad: 1, good: 0 }); + }); +}); diff --git a/packages/content-common/events/snowplow/index.spec.ts b/packages/content-common/events/snowplow/index.spec.ts new file mode 100644 index 00000000..15952226 --- /dev/null +++ b/packages/content-common/events/snowplow/index.spec.ts @@ -0,0 +1,54 @@ +import * as Sentry from '@sentry/node'; +import { http, HttpResponse } from 'msw'; +import { setupServer } from 'msw/node'; +import { getEmitter, getTracker } from './index'; +import config from './config'; +import { generateObjectUpdateEvent } from './test-helpers'; + +describe('Snowplow Tracker', () => { + const server = setupServer(); + const emitter = getEmitter(); + const tracker = getTracker(emitter, 'test-app-id'); + + beforeAll(() => server.listen()); + afterEach(() => server.resetHandlers()); + afterAll(() => server.close()); + + it('retries and sends a message to Sentry when emitter fails', async () => { + let snowplowRequestCount = 0; + + const captureMessageSpy = jest + .spyOn(Sentry, 'captureMessage') + .mockImplementation(); + + // Intercept HTTP requests to Snowplow's endpoint and return an error + server.use( + http.post( + new RegExp( + // Actual url: http://localhost:9090/com.snowplowanalytics.snowplow/tp2 + `${config.snowplow.httpProtocol}://${config.snowplow.endpoint}.*`, + ), + () => { + snowplowRequestCount += 1; + return HttpResponse.text('Simulated timeout', { status: 504 }); + }, + ), + ); + + // Send a dummy event to Snowplow, that will return a 504 from the above handler. + tracker.track(generateObjectUpdateEvent()); + + // Wait for requests to Snowplow. By default, backoff is about 2^0 + 2^1 + 2^2 = 7 seconds. + await new Promise((resolve) => setTimeout(resolve, 8000)); + + // Assert that Snowplow performs the expected number of retries. + // It should be 1 higher than retries.limit, because the first request is not a retry. + expect(snowplowRequestCount).toEqual(config.snowplow.retries.limit + 1); + + // Assert that Sentry's captureMessage was called. + expect(captureMessageSpy).toHaveBeenCalled(); + + // Clean up mocks + captureMessageSpy.mockRestore(); + }, 10000); +}); diff --git a/packages/content-common/events/snowplow/index.ts b/packages/content-common/events/snowplow/index.ts new file mode 100644 index 00000000..5c700411 --- /dev/null +++ b/packages/content-common/events/snowplow/index.ts @@ -0,0 +1,57 @@ +import * as Sentry from '@sentry/node'; +import { + gotEmitter, + HttpMethod, + HttpProtocol, + tracker as snowPlowTracker, + Emitter, + Tracker, +} from '@snowplow/node-tracker'; +import { Response, RequestError } from 'got'; +import config from './config'; + +let emitter: Emitter; +let tracker: Tracker; + +/** + * lazy instantiation of a snowplow emitter + * + * @returns Emitter + */ +export function getEmitter(): Emitter { + if (!emitter) { + emitter = gotEmitter( + config.snowplow.endpoint, + config.snowplow.httpProtocol as HttpProtocol, + undefined, + HttpMethod.POST, + config.snowplow.bufferSize, + config.snowplow.retries, + undefined, + // this is the callback function invoked after snowplow flushes their + // internal cache. + (error?: RequestError, response?: Response) => { + if (error) { + Sentry.addBreadcrumb({ message: 'Emitter Data', data: error }); + Sentry.captureMessage(`Emitter Error`); + } + }, + ); + } + + return emitter; +} + +/** + * lazy instantiation of a snowplow tracker + * @param emitter Emitter - a snowplow emitter + * @param appId Identifies the app to Snowplow that's sending the events + * @returns Tracker + */ +export const getTracker = (emitter: Emitter, appId: string): Tracker => { + if (!tracker) { + tracker = snowPlowTracker(emitter, config.snowplow.namespace, appId, true); + } + + return tracker; +}; diff --git a/packages/content-common/events/snowplow/test-helpers.ts b/packages/content-common/events/snowplow/test-helpers.ts new file mode 100644 index 00000000..3e2f6b1d --- /dev/null +++ b/packages/content-common/events/snowplow/test-helpers.ts @@ -0,0 +1,98 @@ +import fetch from 'node-fetch'; +import config from './config'; +import { + buildSelfDescribingEvent, + PayloadBuilder, +} from '@snowplow/node-tracker'; + +export interface SnowplowMicroEventCounts { + /** Total number of Snowplow events received. */ + total: number; + /** Number of valid Snowplow events received. */ + good: number; + /** Number of invalid Snowplow events received. */ + bad: number; +} + +export async function snowplowRequest( + path: string, + post = false, +): Promise { + const response = await fetch( + `${config.snowplow.httpProtocol}://${config.snowplow.endpoint}${path}`, + { + method: post ? 'POST' : 'GET', + }, + ); + return await response.json(); +} + +/** + * Resets the event counts in Snowplow Micro. + */ +export async function resetSnowplowEvents(): Promise { + await snowplowRequest('/micro/reset', true); +} + +export async function getAllSnowplowEvents(): Promise { + return snowplowRequest('/micro/all'); +} + +export async function getGoodSnowplowEvents(): Promise<{ [key: string]: any }> { + return snowplowRequest('/micro/good'); +} + +export async function getBadSnowplowEvents(): Promise<{ [key: string]: any }> { + return snowplowRequest('/micro/bad'); +} + +export function parseSnowplowData(data: string): { [key: string]: any } { + return JSON.parse(Buffer.from(data, 'base64').toString()); +} + +/** + * Waits until Snowplow events are received and returns counts. + * @param maxWaitTime Maximum time to wait. By default, this is 4 seconds, which is less than the default Jest test + * timeout of 5 seconds. In practice Snowplow Micro (running locally) receives events in a few milliseconds. + * @param expectedEventCount Waits until this number of events (good or bad) are received. + * @return Counts for the number of Snowplow events received (total, good, bad). + */ +export async function waitForSnowplowEvents( + maxWaitTime: number = 4000, + expectedEventCount: number = 1, +): Promise { + let totalWaitTime = 0; + // Snowplow tests take about 20ms. waitPeriod is set to half of that to minimize waiting. + const waitPeriod = 10; + + while (totalWaitTime < maxWaitTime) { + const eventCounts = await getAllSnowplowEvents(); + if (eventCounts.total >= expectedEventCount) { + return eventCounts; + } else { + await new Promise((resolve) => setTimeout(resolve, waitPeriod)); + totalWaitTime += waitPeriod; + } + } + + return await getAllSnowplowEvents(); +} + +/** + * Generates an object_update Snowplow event + * @param overrideData Optionally, allows the event data to be changed. + */ +export const generateObjectUpdateEvent = ( + overrideData = {}, +): PayloadBuilder => { + return buildSelfDescribingEvent({ + event: { + schema: 'iglu:com.pocket/object_update/jsonschema/1-0-17', + data: { + trigger: 'scheduled_corpus_candidate_generated', + object: 'scheduled_corpus_candidate', + ...overrideData, + }, + }, + }); +}; diff --git a/packages/content-common/package.json b/packages/content-common/package.json index d4483578..b31c8a4a 100644 --- a/packages/content-common/package.json +++ b/packages/content-common/package.json @@ -12,12 +12,15 @@ "author": "pocket", "license": "ISC", "private": true, - "dependencies": {}, + "dependencies": { + "@snowplow/node-tracker": "3.5.0" + }, "devDependencies": { - "eslint-config-custom": "workspace:*", - "tsconfig": "workspace:*", "@types/jest": "29.5.12", + "eslint-config-custom": "workspace:*", "jest": "29.7.0", - "ts-jest": "29.1.2" + "msw": "2.1.2", + "ts-jest": "29.1.2", + "tsconfig": "workspace:*" } } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 11a6a450..dda14031 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -102,7 +102,7 @@ importers: version: 2.1.2(typescript@5.3.3) ts-jest: specifier: 29.1.2 - version: 29.1.2(@babel/core@7.23.9)(jest@29.7.0)(typescript@5.3.3) + version: 29.1.2(@babel/core@7.24.0)(jest@29.7.0)(typescript@5.3.3) tsconfig: specifier: workspace:* version: link:../../packages/tsconfig @@ -197,6 +197,10 @@ importers: version: link:../../packages/tsconfig packages/content-common: + dependencies: + '@snowplow/node-tracker': + specifier: 3.5.0 + version: 3.5.0 devDependencies: '@types/jest': specifier: 29.5.12 @@ -207,9 +211,12 @@ importers: jest: specifier: 29.7.0 version: 29.7.0(@types/node@20.11.13)(ts-node@10.9.2) + msw: + specifier: 2.1.2 + version: 2.1.2(typescript@5.3.3) ts-jest: specifier: 29.1.2 - version: 29.1.2(@babel/core@7.24.0)(jest@29.7.0)(typescript@5.3.3) + version: 29.1.2(@babel/core@7.23.9)(jest@29.7.0)(typescript@5.3.3) tsconfig: specifier: workspace:* version: link:../tsconfig @@ -437,9 +444,6 @@ importers: '@pocket-tools/ts-logger': specifier: ^1.2.4 version: 1.3.0(@babel/core@7.24.0)(@types/node@20.11.22)(typescript@5.3.3) - '@snowplow/node-tracker': - specifier: ^3.5.0 - version: 3.19.0 content-common: specifier: workspace:* version: link:../../packages/content-common @@ -5423,14 +5427,6 @@ packages: tslib: 2.6.2 dev: false - /@snowplow/node-tracker@3.19.0: - resolution: {integrity: sha512-fsMI4SRY/E8KkDZ/hqw0I/g8gGwy+7GU1N14taxyC3OtpuiPhfGp3OFuwLdsfc4Av9UpVnv7zvhfuAB7+51Uhg==} - dependencies: - '@snowplow/tracker-core': 3.19.0 - got: 11.8.6 - tslib: 2.6.2 - dev: false - /@snowplow/node-tracker@3.5.0: resolution: {integrity: sha512-GQjfrSyir/Img3BC8TEkw4J0H225bhAVCXI+KE+4+9WINaudapa5hy1rLZxBT0g3u4oCulPgg/gQvoWmqn0boQ==} dependencies: @@ -5439,13 +5435,6 @@ packages: tslib: 2.6.2 dev: false - /@snowplow/tracker-core@3.19.0: - resolution: {integrity: sha512-DYxSm22QeeHg56kG8qziE/0EJIqkyOO8DM6rAjwUo5Etm2KQuKdxpPd8FyXVwOF1RtVzlq1uEkcBG8GGGjT4Aw==} - dependencies: - tslib: 2.6.2 - uuid: 3.4.0 - dev: false - /@snowplow/tracker-core@3.5.0: resolution: {integrity: sha512-5fMFNSNygrPvwhpM6dKIa79HMyhJ1WN77es06GduYyoyETsaEVW683QPANlXN6Iy8A2NlE3cy0Rlq1WGGIId6w==} dependencies: @@ -13871,7 +13860,7 @@ packages: '@babel/core': 7.24.0 bs-logger: 0.2.6 fast-json-stable-stringify: 2.1.0 - jest: 29.7.0(@types/node@20.11.22)(ts-node@10.9.2) + jest: 29.7.0(@types/node@20.11.13)(ts-node@10.9.2) jest-util: 29.7.0 json5: 2.2.3 lodash.memoize: 4.1.2 diff --git a/servers/curated-corpus-api/src/config/index.ts b/servers/curated-corpus-api/src/config/index.ts index e8390416..a105752b 100644 --- a/servers/curated-corpus-api/src/config/index.ts +++ b/servers/curated-corpus-api/src/config/index.ts @@ -17,10 +17,6 @@ if (!awsEnvironments.includes(process.env.NODE_ENV ?? '')) { s3path = `https://${bucket}.s3.amazonaws.com/`; } -// Work out the Snowplow HTTP protocol. -const snowplowHttpProtocol = - process.env.NODE_ENV === 'production' ? 'https' : 'http'; - // Environment variables below are set in .aws/src/main.ts export default { app: { @@ -69,11 +65,6 @@ export default { includeLocalVariables: true, }, snowplow: { - endpoint: process.env.SNOWPLOW_ENDPOINT || 'localhost:9090', - httpProtocol: snowplowHttpProtocol, - bufferSize: 1, - retries: 3, - namespace: 'pocket-backend', appId: 'pocket-backend-curated-corpus-api', corpusItemEvents: ReviewedCorpusItemEventType, corpusScheduleEvents: ScheduledCorpusItemEventType, diff --git a/servers/curated-corpus-api/src/events/snowplow/ReviewedItemSnowplowHandler.integration.ts b/servers/curated-corpus-api/src/events/snowplow/ReviewedItemSnowplowHandler.integration.ts index 3a6e182a..e4bb0aee 100644 --- a/servers/curated-corpus-api/src/events/snowplow/ReviewedItemSnowplowHandler.integration.ts +++ b/servers/curated-corpus-api/src/events/snowplow/ReviewedItemSnowplowHandler.integration.ts @@ -1,11 +1,11 @@ import { CuratedStatus, RejectedCuratedCorpusItem } from '@prisma/client'; import { - assertValidSnowplowObjectUpdateEvents, - getAllSnowplowEvents, getGoodSnowplowEvents, parseSnowplowData, resetSnowplowEvents, -} from '../../test/helpers/snowplow'; + waitForSnowplowEvents, +} from 'content-common/events/snowplow/test-helpers'; +import { assertValidSnowplowObjectUpdateEvents } from '../../test/helpers/snowplow'; import config from '../../config'; import { ApprovedCorpusItemPayload, @@ -174,11 +174,8 @@ describe('ReviewedItemSnowplowHandler', () => { eventType: ReviewedCorpusItemEventType.REJECT_ITEM, }); - // wait a sec * 3 - await new Promise((resolve) => setTimeout(resolve, 3000)); - // make sure we only have good events - const allEvents = await getAllSnowplowEvents(); + const allEvents = await waitForSnowplowEvents(); expect(allEvents.total).toEqual(4); expect(allEvents.good).toEqual(4); expect(allEvents.bad).toEqual(0); @@ -213,11 +210,8 @@ describe('ReviewedItemSnowplowHandler', () => { eventType: ReviewedCorpusItemEventType.ADD_ITEM, }); - // wait a sec * 3 - await new Promise((resolve) => setTimeout(resolve, 3000)); - // make sure we only have good events - const allEvents = await getAllSnowplowEvents(); + const allEvents = await waitForSnowplowEvents(); expect(allEvents.total).toEqual(1); expect(allEvents.good).toEqual(1); expect(allEvents.bad).toEqual(0); @@ -252,11 +246,8 @@ describe('ReviewedItemSnowplowHandler', () => { eventType: ReviewedCorpusItemEventType.ADD_ITEM, }); - // wait a sec * 3 - await new Promise((resolve) => setTimeout(resolve, 3000)); - // make sure we only have good events - const allEvents = await getAllSnowplowEvents(); + const allEvents = await waitForSnowplowEvents(); expect(allEvents.total).toEqual(1); expect(allEvents.good).toEqual(1); expect(allEvents.bad).toEqual(0); @@ -290,11 +281,8 @@ describe('ReviewedItemSnowplowHandler', () => { eventType: ReviewedCorpusItemEventType.ADD_ITEM, }); - // wait a sec * 3 - await new Promise((resolve) => setTimeout(resolve, 3000)); - // make sure we only have good events - const allEvents = await getAllSnowplowEvents(); + const allEvents = await waitForSnowplowEvents(); expect(allEvents.total).toEqual(1); expect(allEvents.good).toEqual(1); expect(allEvents.bad).toEqual(0); diff --git a/servers/curated-corpus-api/src/events/snowplow/ScheduledItemSnowplowHandler.integration.ts b/servers/curated-corpus-api/src/events/snowplow/ScheduledItemSnowplowHandler.integration.ts index 8557688f..a56a01ab 100644 --- a/servers/curated-corpus-api/src/events/snowplow/ScheduledItemSnowplowHandler.integration.ts +++ b/servers/curated-corpus-api/src/events/snowplow/ScheduledItemSnowplowHandler.integration.ts @@ -1,11 +1,11 @@ import { CuratedStatus } from '@prisma/client'; import { - assertValidSnowplowObjectUpdateEvents, - getAllSnowplowEvents, getGoodSnowplowEvents, parseSnowplowData, resetSnowplowEvents, -} from '../../test/helpers/snowplow'; + waitForSnowplowEvents, +} from 'content-common/events/snowplow/test-helpers'; +import { assertValidSnowplowObjectUpdateEvents } from '../../test/helpers/snowplow'; import config from '../../config'; import { ScheduledCorpusItemEventType, @@ -134,11 +134,8 @@ describe('ScheduledItemSnowplowHandler', () => { eventType: ScheduledCorpusItemEventType.REMOVE_SCHEDULE, }); - // wait a sec * 3 - await new Promise((resolve) => setTimeout(resolve, 3000)); - // make sure we only have good events - const allEvents = await getAllSnowplowEvents(); + const allEvents = await waitForSnowplowEvents(); expect(allEvents.total).toEqual(2); expect(allEvents.good).toEqual(2); expect(allEvents.bad).toEqual(0); @@ -177,11 +174,8 @@ describe('ScheduledItemSnowplowHandler', () => { eventType: ScheduledCorpusItemEventType.ADD_SCHEDULE, }); - // wait a sec * 3 - await new Promise((resolve) => setTimeout(resolve, 3000)); - // make sure we only have good events - const allEvents = await getAllSnowplowEvents(); + const allEvents = await waitForSnowplowEvents(); expect(allEvents.total).toEqual(1); expect(allEvents.good).toEqual(1); expect(allEvents.bad).toEqual(0); diff --git a/servers/curated-corpus-api/src/events/snowplow/tracker.ts b/servers/curated-corpus-api/src/events/snowplow/tracker.ts index e91844a5..5c84fb3b 100644 --- a/servers/curated-corpus-api/src/events/snowplow/tracker.ts +++ b/servers/curated-corpus-api/src/events/snowplow/tracker.ts @@ -1,23 +1,6 @@ -import { - gotEmitter, - HttpMethod, - HttpProtocol, - tracker as snowPlowTracker, -} from '@snowplow/node-tracker'; +import { getEmitter, getTracker } from 'content-common/events/snowplow'; import config from '../../config'; -const emitter = gotEmitter( - config.snowplow.endpoint, - config.snowplow.httpProtocol as HttpProtocol, - undefined, - HttpMethod.POST, - config.snowplow.bufferSize, - config.snowplow.retries -); +const emitter = getEmitter(); -export const tracker = snowPlowTracker( - emitter, - config.snowplow.namespace, - config.snowplow.appId, - true -); +export const tracker = getTracker(emitter, config.snowplow.appId); diff --git a/servers/curated-corpus-api/src/test/helpers/snowplow.ts b/servers/curated-corpus-api/src/test/helpers/snowplow.ts index ff53cfbe..a7e4d852 100644 --- a/servers/curated-corpus-api/src/test/helpers/snowplow.ts +++ b/servers/curated-corpus-api/src/test/helpers/snowplow.ts @@ -1,48 +1,19 @@ -import fetch from 'node-fetch'; import config from '../../config'; import { CuratedCorpusItemUpdate } from '../../events/snowplow/schema'; - -export async function snowplowRequest( - path: string, - post = false -): Promise { - const response = await fetch( - `${config.snowplow.httpProtocol}://${config.snowplow.endpoint}${path}`, - { - method: post ? 'POST' : 'GET', - } - ); - return await response.json(); -} - -export async function resetSnowplowEvents(): Promise { - await snowplowRequest('/micro/reset', true); -} - -export async function getAllSnowplowEvents(): Promise<{ [key: string]: any }> { - return snowplowRequest('/micro/all'); -} - -export async function getGoodSnowplowEvents(): Promise<{ [key: string]: any }> { - return snowplowRequest('/micro/good'); -} - -export function parseSnowplowData(data: string): { [key: string]: any } { - return JSON.parse(Buffer.from(data, 'base64').toString()); -} +import { parseSnowplowData } from 'content-common/events/snowplow/test-helpers'; export function assertValidSnowplowObjectUpdateEvents( events, triggers: CuratedCorpusItemUpdate['trigger'][], - object: CuratedCorpusItemUpdate['object'] + object: CuratedCorpusItemUpdate['object'], ) { const parsedEvents = events .map(parseSnowplowData) .map((parsedEvent) => parsedEvent.data); const actualEvents = triggers.map((trigger) => ({ - schema: config.snowplow.schemas.objectUpdate, - data: { trigger, object }, + schema: config.snowplow.schemas.objectUpdate, + data: { trigger, object }, })); expect(parsedEvents.map(a => a.data.trigger).sort()).toEqual(actualEvents.map(a => a.data.trigger).sort()); expect(parsedEvents.map(a => a.data.object).sort()).toEqual(actualEvents.map(a => a.data.object).sort()); diff --git a/servers/prospect-api/package.json b/servers/prospect-api/package.json index 019e6f0a..ae7325ed 100644 --- a/servers/prospect-api/package.json +++ b/servers/prospect-api/package.json @@ -23,7 +23,6 @@ "@aws-sdk/lib-dynamodb": "3.504.0", "@pocket-tools/apollo-utils": "3.0.0", "@pocket-tools/ts-logger": "^1.2.4", - "@snowplow/node-tracker": "^3.5.0", "content-common": "workspace:*", "prospectapi-common": "workspace:*", "supertest": "^6.3.3" diff --git a/servers/prospect-api/src/events/snowplow-test-helpers.ts b/servers/prospect-api/src/events/snowplow-test-helpers.ts deleted file mode 100644 index c7bd6db0..00000000 --- a/servers/prospect-api/src/events/snowplow-test-helpers.ts +++ /dev/null @@ -1,62 +0,0 @@ -import fetch from 'node-fetch'; -import config from '../config'; -import { ProspectReviewStatus, SnowplowProspect } from './types'; - -export const prospect: SnowplowProspect = { - object_version: 'new', - prospect_id: 'c586eff4-f69a-5e5b-8c4d-a4039bb5b497', - url: 'https://www.nytimes.com/2022/11/03/t-magazine/spain-islamic-history.html', - title: 'In Search of a Lost Spain', - excerpt: - 'ON A MORNING of haunting heat in Seville, I sought out the tomb of Ferdinand III. There, in the Gothic cool, older Spaniards came and went, dropping to one knee and crossing themselves before the sepulcher of the Castilian monarch.', - image_url: - 'https://static01.nyt.com/images/2022/11/03/t-magazine/03tmag-spain-slide-9VKO-copy/03tmag-spain-slide-9VKO-facebookJumbo.jpg', - language: 'en', - topic: 'EDUCATION', - is_collection: false, - is_syndicated: false, - authors: ['RICHARD MOSSE', 'AATISH TASEER'], - publisher: 'The New York Times', - domain: 'nytimes.com', - prospect_source: 'COUNTS_LOGISTIC_APPROVAL', - scheduled_surface_id: 'NEW_TAB_EN_US', - created_at: 1668100357, - prospect_review_status: ProspectReviewStatus.Dismissed, - // The Unix timestamp in seconds. - reviewed_at: 1668100358, - // The LDAP string of the curator who reviewed this prospect - for now, only dismissing prospect. - reviewed_by: 'sso-user', -}; - -export async function snowplowRequest( - path: string, - post = false -): Promise { - const response = await fetch( - `${config.snowplow.httpProtocol}://${config.snowplow.endpoint}${path}`, - { - method: post ? 'POST' : 'GET', - } - ); - return await response.json(); -} - -export async function resetSnowplowEvents(): Promise { - await snowplowRequest('/micro/reset', true); -} - -export async function getAllSnowplowEvents(): Promise<{ [key: string]: any }> { - return snowplowRequest('/micro/all'); -} - -export async function getGoodSnowplowEvents(): Promise<{ [key: string]: any }> { - return snowplowRequest('/micro/good'); -} - -export async function getBadSnowplowEvents(): Promise<{ [key: string]: any }> { - return snowplowRequest('/micro/bad'); -} - -export function parseSnowplowData(data: string): { [key: string]: any } { - return JSON.parse(Buffer.from(data, 'base64').toString()); -} diff --git a/servers/prospect-api/src/events/snowplow.integration.ts b/servers/prospect-api/src/events/snowplow.integration.ts index 50882224..7352ec32 100644 --- a/servers/prospect-api/src/events/snowplow.integration.ts +++ b/servers/prospect-api/src/events/snowplow.integration.ts @@ -1,35 +1,56 @@ import { - getAllSnowplowEvents, + waitForSnowplowEvents, resetSnowplowEvents, - prospect, -} from './snowplow-test-helpers'; -import { getEmitter, getTracker, queueSnowplowEvent } from './snowplow'; -import { SnowplowProspect } from './types'; +} from 'content-common/events/snowplow/test-helpers'; +import { getEmitter, getTracker } from 'content-common/events/snowplow'; +import { queueSnowplowEvent } from './snowplow'; +import { ProspectReviewStatus, SnowplowProspect } from './types'; +import config from '../config'; + +export const prospect: SnowplowProspect = { + object_version: 'new', + prospect_id: 'c586eff4-f69a-5e5b-8c4d-a4039bb5b497', + url: 'https://www.nytimes.com/2022/11/03/t-magazine/spain-islamic-history.html', + title: 'In Search of a Lost Spain', + excerpt: + 'ON A MORNING of haunting heat in Seville, I sought out the tomb of Ferdinand III. There, in the Gothic cool, older Spaniards came and went, dropping to one knee and crossing themselves before the sepulcher of the Castilian monarch.', + image_url: + 'https://static01.nyt.com/images/2022/11/03/t-magazine/03tmag-spain-slide-9VKO-copy/03tmag-spain-slide-9VKO-facebookJumbo.jpg', + language: 'en', + topic: 'EDUCATION', + is_collection: false, + is_syndicated: false, + authors: ['RICHARD MOSSE', 'AATISH TASEER'], + publisher: 'The New York Times', + domain: 'nytimes.com', + prospect_source: 'COUNTS_LOGISTIC_APPROVAL', + scheduled_surface_id: 'NEW_TAB_EN_US', + created_at: 1668100357, + prospect_review_status: ProspectReviewStatus.Dismissed, + // The Unix timestamp in seconds. + reviewed_at: 1668100358, + // The LDAP string of the curator who reviewed this prospect - for now, only dismissing prospect. + reviewed_by: 'sso-user', +}; describe('snowplow', () => { + const emitter = getEmitter(); + const tracker = getTracker(emitter, config.snowplow.endpoint); + beforeEach(async () => { await resetSnowplowEvents(); }); it('should accept an event with a prospect', async () => { - const emitter = getEmitter(); - const tracker = getTracker(emitter); - queueSnowplowEvent(tracker, 'prospect_reviewed', prospect); - // wait a sec * 3 because snowplow does internal queueing - await new Promise((resolve) => setTimeout(resolve, 3000)); - - const allEvents = await getAllSnowplowEvents(); + const allEvents = await waitForSnowplowEvents(); expect(allEvents.total).toEqual(1); expect(allEvents.bad).toEqual(0); }); it('should accept an event with a prospect with status reasons and no comment', async () => { - const emitter = getEmitter(); - const tracker = getTracker(emitter); - const prospectWithRemovalReasons: SnowplowProspect = { ...prospect, status_reasons: ['PUBLISHER', 'OUTDATED'], @@ -41,19 +62,13 @@ describe('snowplow', () => { prospectWithRemovalReasons, ); - // wait a sec * 3 - await new Promise((resolve) => setTimeout(resolve, 3000)); - - const allEvents = await getAllSnowplowEvents(); + const allEvents = await waitForSnowplowEvents(); expect(allEvents.total).toEqual(1); expect(allEvents.bad).toEqual(0); }); it('should accept an event with a prospect with no status reasons and a comment', async () => { - const emitter = getEmitter(); - const tracker = getTracker(emitter); - const prospectWithRemovalReasons: SnowplowProspect = { ...prospect, status_reason_comment: 'do read these comments', @@ -65,19 +80,13 @@ describe('snowplow', () => { prospectWithRemovalReasons, ); - // wait a sec * 3 - await new Promise((resolve) => setTimeout(resolve, 3000)); - - const allEvents = await getAllSnowplowEvents(); + const allEvents = await waitForSnowplowEvents(); expect(allEvents.total).toEqual(1); expect(allEvents.bad).toEqual(0); }); it('should accept an event with a prospect with status reasons and comment', async () => { - const emitter = getEmitter(); - const tracker = getTracker(emitter); - const prospectWithRemovalReasons: SnowplowProspect = { ...prospect, status_reasons: ['PUBLISHER', 'OUTDATED'], @@ -90,10 +99,7 @@ describe('snowplow', () => { prospectWithRemovalReasons, ); - // wait a sec * 3 - await new Promise((resolve) => setTimeout(resolve, 3000)); - - const allEvents = await getAllSnowplowEvents(); + const allEvents = await waitForSnowplowEvents(); expect(allEvents.total).toEqual(1); expect(allEvents.bad).toEqual(0); diff --git a/servers/prospect-api/src/events/snowplow.ts b/servers/prospect-api/src/events/snowplow.ts index de6315c7..0d72fc49 100644 --- a/servers/prospect-api/src/events/snowplow.ts +++ b/servers/prospect-api/src/events/snowplow.ts @@ -1,72 +1,15 @@ import * as Sentry from '@sentry/node'; import { - gotEmitter, - HttpMethod, - HttpProtocol, - tracker as snowPlowTracker, buildSelfDescribingEvent, - Emitter, Tracker, SelfDescribingEvent, SelfDescribingJson, } from '@snowplow/node-tracker'; -import { Response, RequestError } from 'got'; import config from '../config'; import { SnowplowProspect } from './types'; import { serverLogger } from '../express'; -let emitter: Emitter; -let tracker: Tracker; - -/** - * lazy instantiation of a snowplow emitter - * - * @returns Emitter - */ -export function getEmitter(): Emitter { - if (!emitter) { - emitter = gotEmitter( - config.snowplow.endpoint, - config.snowplow.httpProtocol as HttpProtocol, - undefined, - HttpMethod.POST, - config.snowplow.bufferSize, - config.snowplow.retries, - undefined, - // this is the callback function invoked after snowplow flushes their - // internal cache. - (error?: RequestError, response?: Response) => { - if (error) { - serverLogger.error('emitter error', { error }); - Sentry.addBreadcrumb({ message: 'Emitter Data', data: error }); - Sentry.captureMessage(`Emitter Error`); - } - } - ); - } - - return emitter; -} - -/** - * lazy instantiation of a snowplow tracker - * @param emitter Emitter - a snowplow emitter - * @returns Tracker - */ -export const getTracker = (emitter: Emitter): Tracker => { - if (!tracker) { - tracker = snowPlowTracker( - emitter, - config.snowplow.namespace, - config.snowplow.appId, - true - ); - } - - return tracker; -}; - /** * creates a snowplow event object * @@ -92,7 +35,7 @@ export const generateEvent = (eventName: string): SelfDescribingEvent => { * @returns SelfDescribingJson */ export const generateContext = ( - prospect: SnowplowProspect + prospect: SnowplowProspect, ): SelfDescribingJson => { return { schema: config.snowplow.schemas.prospect, @@ -113,7 +56,7 @@ export const generateContext = ( export const queueSnowplowEvent = ( tracker: Tracker, eventName: string, - prospect: SnowplowProspect + prospect: SnowplowProspect, ): void => { const event = generateEvent(eventName); const contexts: SelfDescribingJson[] = [generateContext(prospect)]; diff --git a/servers/prospect-api/src/resolvers.ts b/servers/prospect-api/src/resolvers.ts index 3fc28416..827ac116 100644 --- a/servers/prospect-api/src/resolvers.ts +++ b/servers/prospect-api/src/resolvers.ts @@ -35,7 +35,8 @@ import { import { GetProspectsFilters, Context } from './types'; //import { sendEventBridgeEvent } from './events/events'; -import { getEmitter, getTracker, queueSnowplowEvent } from './events/snowplow'; +import { getEmitter, getTracker } from 'content-common/events/snowplow'; +import { queueSnowplowEvent } from './events/snowplow'; /** * Return an object conforming to the Item graphql definition. @@ -162,7 +163,10 @@ export const resolvers = { // in the mean time, send the dismiss event directly to snowplow // initialize snowplow tracker const snowplowEmitter = getEmitter(); - const snowplowTracker = getTracker(snowplowEmitter); + const snowplowTracker = getTracker( + snowplowEmitter, + config.snowplow.endpoint, + ); queueSnowplowEvent( snowplowTracker, 'prospect_reviewed',