Skip to content

Commit

Permalink
feat: [MC-681] Implement scheduled_corpus_candidate Snowplow functions
Browse files Browse the repository at this point in the history
chore: move common Snowplow logic to content-common

chore: use shared Snowplow code in prospect-api and curated-corpus-api

fix: Snowplow retries on error

chore: move Snowplow Micro to integration test file

fix: lint error snowplowHttpProtocol unused

feat: emit Snowplow events on error

remove duplicate test coverage for image error

fix snowplow tests

fix ECONNREFUSED to Snowplow Micro in CI tests

Revert "fix ECONNREFUSED to Snowplow Micro in CI tests"

This reverts commit 470df13.

fix: move Snowplow Micro tests to integrations

add test for Snowplow event on success
  • Loading branch information
mmiermans committed Mar 7, 2024
1 parent a615f80 commit 694e8b3
Show file tree
Hide file tree
Showing 30 changed files with 839 additions and 404 deletions.
11 changes: 11 additions & 0 deletions lambdas/corpus-scheduler-lambda/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,17 @@ const config = {
userId: 'ML',
groups: ['mozilliansorg_pocket_scheduled_surface_curator_full'],
},
snowplow: {
// TODO: Set dev value
appId: 'corpus-scheduler-lambda',
schemas: {
// published 2024-02-28
scheduled_corpus_candidate:
'iglu:com.pocket/scheduled_corpus_candidate/jsonschema/1-0-2',
// published 2024-02-28
objectUpdate: 'iglu:com.pocket/object_update/jsonschema/1-0-17',
},
},
};

export default config;
56 changes: 56 additions & 0 deletions lambdas/corpus-scheduler-lambda/src/events/snowplow.integration.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import {
resetSnowplowEvents,
waitForSnowplowEvents,
} from 'content-common/events/snowplow/test-helpers';
import { getEmitter, getTracker } from 'content-common/events/snowplow';
import { queueSnowplowEvent } from './snowplow';
import {
SnowplowScheduledCorpusCandidateErrorName,
SnowplowScheduledCorpusCandidate,
} from './types';
import { random } from 'typia';
import config from '../config';

describe('snowplow', () => {
const mockCandidate = {
...random<SnowplowScheduledCorpusCandidate>(),
scheduled_corpus_item_external_id: '05706565-5a9c-4b57-83e5-a426485a4714',
approved_corpus_item_external_id: 'c43a2aa5-28de-4828-a20e-1fdf60cc4a80',
};
const emitter = getEmitter();
const tracker = getTracker(emitter, config.snowplow.appId);

beforeEach(async () => {
await resetSnowplowEvents();
});

it('should accept an event with a scheduled corpus candidate', async () => {
queueSnowplowEvent(tracker, mockCandidate);

const allEvents = await waitForSnowplowEvents();

expect(allEvents.total).toEqual(1);
expect(allEvents.bad).toEqual(0);
});

describe('error events', () => {
Object.values(SnowplowScheduledCorpusCandidateErrorName).forEach(
(errorName) => {
it(`should emit events with ${errorName} error`, async () => {
queueSnowplowEvent(tracker, {
...mockCandidate,
scheduled_corpus_item_external_id: undefined,
approved_corpus_item_external_id: undefined,
error_name: errorName,
error_description: `Oh no! A ${errorName} error occurred.`,
});

const allEvents = await waitForSnowplowEvents();

expect(allEvents.total).toEqual(1);
expect(allEvents.bad).toEqual(0);
});
},
);
});
});
106 changes: 106 additions & 0 deletions lambdas/corpus-scheduler-lambda/src/events/snowplow.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import {
buildSelfDescribingEvent,
Tracker,
SelfDescribingEvent,
SelfDescribingJson,
} from '@snowplow/node-tracker';

import config from '../config';
import {
SnowplowScheduledCorpusCandidateErrorName,
SnowplowScheduledCorpusCandidate,
} from './types';
import { ScheduledCandidate } from '../types';

/**
* creates an object_update Snowplow event for scheduled_corpus_candidate
*
* @returns SelfDescribingEvent
*/
export const generateEvent = (): SelfDescribingEvent => {
return {
event: {
schema: config.snowplow.schemas.objectUpdate,
data: {
trigger: 'scheduled_corpus_candidate_generated',
object: 'scheduled_corpus_candidate',
},
},
};
};

/**
* generates a scheduled_corpus_candidate entity being sent to snowplow
*
* @param scheduledCorpusCandidate PocketAnalyticsArticle
* @returns SelfDescribingJson
*/
export const generateContext = (
scheduledCorpusCandidate: SnowplowScheduledCorpusCandidate,
): SelfDescribingJson => {
return {
schema: config.snowplow.schemas.scheduled_corpus_candidate,
data: scheduledCorpusCandidate,
};
};

/**
*
* @param candidate ML candidate
* @param errorName Snowplow structured error
* @param errorDescription Longer human-readable description of the error
*/
export const generateSnowplowErrorEntity = (
candidate: ScheduledCandidate,
errorName: SnowplowScheduledCorpusCandidateErrorName,
errorDescription: string,
): SnowplowScheduledCorpusCandidate => {
return {
scheduled_corpus_candidate_id: candidate.scheduled_corpus_candidate_id,
candidate_url: candidate.scheduled_corpus_item.url,
features: candidate.features,
run_details: candidate.run_details,
error_name: errorName,
error_description: errorDescription,
};
};

/**
*
* @param candidate ML candidate
* @param approvedCorpusItemId Identifier the item added to the corpus
*/
export const generateSnowplowSuccessEntity = (
candidate: ScheduledCandidate,
approvedCorpusItemId: string,
): SnowplowScheduledCorpusCandidate => {
return {
scheduled_corpus_candidate_id: candidate.scheduled_corpus_candidate_id,
candidate_url: candidate.scheduled_corpus_item.url,
approved_corpus_item_external_id: approvedCorpusItemId,
features: candidate.features,
run_details: candidate.run_details,
// TODO: set scheduled_corpus_item_external_id
};
};

/**
* main entry point to snowplow. queues up an event to send.
*
* (elsewhere, we tell snowplow to send all queued events.)
*
* @param tracker TrackerInterface
* @param entity Entity representing the result of trying to schedule a candidate.
*/
export const queueSnowplowEvent = (
tracker: Tracker,
entity: SnowplowScheduledCorpusCandidate,
) => {
const event = generateEvent();
const contexts: SelfDescribingJson[] = [generateContext(entity)];

// reminder - this method is not async and does not directly initiate
// any http request. it sends the event to a queue internal to the
// snowplow module, which has its own logic on when to flush the queue.
tracker.track(buildSelfDescribingEvent(event), contexts);
};
16 changes: 16 additions & 0 deletions lambdas/corpus-scheduler-lambda/src/events/testHelpers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { SnowplowScheduledCorpusCandidate } from './types';
import {
getGoodSnowplowEvents,
parseSnowplowData,
} from 'content-common/events/snowplow/test-helpers';

/**
* @return scheduled_corpus_candidate entity from the last good event sent to Snowplow Micro.
*/
export async function extractScheduledCandidateEntity(): Promise<SnowplowScheduledCorpusCandidate> {
const goodEvents = await getGoodSnowplowEvents();
const snowplowContext = parseSnowplowData(
goodEvents[0].rawEvent.parameters.cx,
);
return snowplowContext.data[0].data as SnowplowScheduledCorpusCandidate;
}
27 changes: 27 additions & 0 deletions lambdas/corpus-scheduler-lambda/src/events/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import {
ScheduledCorpusCandidateFeatures,
ScheduledCorpusCandidateRunDetails,
} from '../types';

// scheduled_corpus_candidate entity
export type SnowplowScheduledCorpusCandidate = {
scheduled_corpus_candidate_id: string;
scheduled_corpus_item_external_id?: string;
approved_corpus_item_external_id?: string;
candidate_url: string;
error_name?: SnowplowScheduledCorpusCandidateErrorName;
error_description?: string;
features: ScheduledCorpusCandidateFeatures;
run_details: ScheduledCorpusCandidateRunDetails;
};

export enum SnowplowScheduledCorpusCandidateErrorName {
ALREADY_SCHEDULED = 'ALREADY_SCHEDULED',
/** TODO: [MC-737] Add validation on scheduled date. */
INSUFFICIENT_TIME_BEFORE_SCHEDULED_DATE = 'INSUFFICIENT_TIME_BEFORE_SCHEDULED_DATE',
/** TODO: [MC-666] Add safeguard to prevent scheduling from unverified domains. */
DOMAIN_NOT_ALLOWED_FOR_AUTO_SCHEDULING = 'DOMAIN_NOT_ALLOWED_FOR_AUTO_SCHEDULING',
MISSING_EXCERPT = 'MISSING_EXCERPT',
MISSING_TITLE = 'MISSING_TITLE',
MISSING_IMAGE = 'MISSING_IMAGE',
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,30 @@ import {
CuratedStatus,
Topics,
} from 'content-common';
import {
resetSnowplowEvents,
waitForSnowplowEvents,
} from 'content-common/events/snowplow/test-helpers';
import { extractScheduledCandidateEntity } from './events/testHelpers';

describe('corpus scheduler lambda', () => {
const server = setupServer();

const scheduledCandidate = createScheduledCandidate(
'Fake title',
'fake excerpt',
'https://fake-image-url.com',
CorpusLanguage.EN,
['Fake Author'],
'https://fake-url.com',
);
const scheduledCandidate = createScheduledCandidate({
title: 'Fake title',
excerpt: 'fake excerpt',
image_url: 'https://fake-image-url.com',
language: CorpusLanguage.EN,
authors: ['Fake Author'],
url: 'https://fake-url.com',
});

const record = createScheduledCandidates([scheduledCandidate]);
const fakeEvent = {
Records: [{ messageId: '1', body: JSON.stringify(record) }],
} as unknown as SQSEvent;
const sqsContext = null as unknown as Context;
const sqsCallback = null as unknown as Callback;

const getUrlMetadataBody = {
data: {
Expand Down Expand Up @@ -87,7 +97,7 @@ describe('corpus scheduler lambda', () => {
);
};

beforeAll(() => server.listen());
beforeAll(() => server.listen({ onUnhandledRequest: 'bypass' }));
afterEach(() => server.resetHandlers());
afterAll(() => server.close());

Expand All @@ -102,6 +112,10 @@ describe('corpus scheduler lambda', () => {
jest.clearAllMocks();
});

beforeEach(async () => {
await resetSnowplowEvents();
});

it('returns batch item failure if curated-corpus-api has error, with partial success', async () => {
mockGetUrlMetadata();
// Note: msw uses handlers in reverse order, so 2nd request will error, and 1st will succeed.
Expand All @@ -128,15 +142,7 @@ describe('corpus scheduler lambda', () => {
mockGetUrlMetadata();
mockCreateApprovedCorpusItemOnce({ data: null });

const fakeEvent = {
Records: [{ messageId: '1', body: JSON.stringify(record) }],
} as unknown as SQSEvent;

const actual = await processor(
fakeEvent,
null as unknown as Context,
null as unknown as Callback,
);
const actual = await processor(fakeEvent, sqsContext, sqsCallback);

expect(actual).toEqual({ batchItemFailures: [{ itemIdentifier: '1' }] });
}, 7000);
Expand All @@ -145,16 +151,31 @@ describe('corpus scheduler lambda', () => {
mockGetUrlMetadata();
mockCreateApprovedCorpusItemOnce();

const fakeEvent = {
Records: [{ messageId: '1', body: JSON.stringify(record) }],
} as unknown as SQSEvent;

const actual = await processor(
fakeEvent,
null as unknown as Context,
null as unknown as Callback,
);
const actual = await processor(fakeEvent, sqsContext, sqsCallback);

expect(actual).toEqual({ batchItemFailures: [] });
});

it('emits a Snowplow event if candidate is successfully processed', async () => {
mockGetUrlMetadata();
mockCreateApprovedCorpusItemOnce(createApprovedCorpusItemBody);

await processor(fakeEvent, sqsContext, sqsCallback);

// Exactly one Snowplow event should be emitted.
const allEvents = await waitForSnowplowEvents();
expect(allEvents.bad).toEqual(0);
expect(allEvents.good).toEqual(record.candidates.length);

// Check that the right Snowplow entity that was included with the event.
const snowplowEntity = await extractScheduledCandidateEntity();
expect(snowplowEntity.approved_corpus_item_external_id).toEqual(
createApprovedCorpusItemBody.data.createApprovedCorpusItem.externalId,
);
expect(snowplowEntity.scheduled_corpus_candidate_id).toEqual(
record.candidates[0].scheduled_corpus_candidate_id,
);
expect(snowplowEntity.error_name).toBeUndefined();
expect(snowplowEntity.error_description).toBeUndefined();
});
});
3 changes: 0 additions & 3 deletions lambdas/corpus-scheduler-lambda/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ Sentry.AWSLambda.init({
serverName: config.app.name,
});

// temp log statements
console.log('corpus scheduler lambda');

/**
* @param event data from an SQS message - should be an array of items to create / schedule in corpus
* @returns SQSBatchResponse (all failed records)
Expand Down
Loading

0 comments on commit 694e8b3

Please sign in to comment.