Skip to content

Commit

Permalink
fix: use Redis streams instead of redis queue (#9428)
Browse files Browse the repository at this point in the history
* fix: use Redis streams instead of redis queue

* fix: increase MAXLEN to 10 000 000

* Add a bash option for non-test

* Update all products to stream changes to redis

* Minor comment change

* Moved the push to redis after the save to db as the subscriber could pick up the message before the change was committed (in theory)

* tests: add integration test to check for successful Redis Stream use

* feat: add Redis to Product Opener in staging

* chore: update REDIS_URL in staging

points to the local Redis instance

* fix: honor $pretend flag

Co-authored-by: Alex Garel <[email protected]>

---------

Co-authored-by: John Gomersall <[email protected]>
Co-authored-by: Alex Garel <[email protected]>
  • Loading branch information
3 people authored Dec 14, 2023
1 parent b59b275 commit b38c4c3
Show file tree
Hide file tree
Showing 16 changed files with 180 additions and 42 deletions.
5 changes: 3 additions & 2 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ ROBOTOFF_URL=http://robotoff.openfoodfacts.localhost:5500 # connect to Robotoff
QUERY_URL=http://query:5510
EVENTS_URL=
FACETS_KP_URL = https://facets-kp.openfoodfacts.org/render-to-html
# use this to push products to openfoodfacts-search
# in dev: searchredis:6379 (with openfoodfacts-search running in docker in same network)
# we push updated products to Redis stream so that every service is notified
# when a product is updated/deleted/created
# use `redis:6379` locally if you want to enable Redis
REDIS_URL=
GOOGLE_CLOUD_VISION_API_KEY=
CROWDIN_PROJECT_IDENTIFIER=
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/container-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
if: matrix.env == 'off-net'
run: |
# direct container access
echo "REDIS_URL=searchredis:6379" >> $GITHUB_ENV
echo "REDIS_URL=redis:6379" >> $GITHUB_ENV
echo "MONGODB_HOST=10.1.0.200" >> $GITHUB_ENV
echo "ROBOTOFF_URL=https://robotoff.openfoodfacts.net" >> $GITHUB_ENV
echo "QUERY_URL=http://10.1.0.200:5511" >> $GITHUB_ENV
Expand Down
21 changes: 14 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ DOCKER_COMPOSE=docker-compose --env-file=${ENV_FILE} ${LOAD_EXTRA_ENV_FILE}
# we also publish mongodb on a separate port to avoid conflicts
# we also enable the possibility to fake services in po_test_runner
DOCKER_COMPOSE_TEST=ROBOTOFF_URL="http://backend:8881/" GOOGLE_CLOUD_VISION_API_URL="http://backend:8881/" COMPOSE_PROJECT_NAME=${COMPOSE_PROJECT_NAME}_test PO_COMMON_PREFIX=test_ MONGO_EXPOSE_PORT=27027 docker-compose --env-file=${ENV_FILE}
# Enable Redis only for integration tests
DOCKER_COMPOSE_INT_TEST=REDIS_URL="redis:6379" ${DOCKER_COMPOSE_TEST}

.DEFAULT_GOAL := usage

Expand Down Expand Up @@ -260,10 +262,10 @@ integration_test: create_folders
# we launch the server and run tests within same container
# we also need dynamicfront for some assets to exists
# this is the place where variables are important
${DOCKER_COMPOSE_TEST} up -d memcached postgres mongodb backend dynamicfront incron minion
${DOCKER_COMPOSE_INT_TEST} up -d memcached postgres mongodb backend dynamicfront incron minion redis
# note: we need the -T option for ci (non tty environment)
${DOCKER_COMPOSE_TEST} exec ${COVER_OPTS} -T backend prove -l -r tests/integration
${DOCKER_COMPOSE_TEST} stop
${DOCKER_COMPOSE_INT_TEST} exec ${COVER_OPTS} -T backend prove -l -r tests/integration
${DOCKER_COMPOSE_INT_TEST} stop
@echo "🥫 integration tests success"

# stop all tests dockers
Expand All @@ -283,10 +285,10 @@ test-unit: guard-test create_folders
# you can add args= to pass options, like args="-d" to debug
test-int: guard-test create_folders
@echo "🥫 Running test: 'tests/integration/${test}' …"
${DOCKER_COMPOSE_TEST} up -d memcached postgres mongodb backend dynamicfront incron minion
${DOCKER_COMPOSE_TEST} exec backend perl ${args} tests/integration/${test}
${DOCKER_COMPOSE_INT_TEST} up -d memcached postgres mongodb backend dynamicfront incron minion redis
${DOCKER_COMPOSE_INT_TEST} exec backend perl ${args} tests/integration/${test}
# better shutdown, for if we do a modification of the code, we need a restart
${DOCKER_COMPOSE_TEST} stop backend
${DOCKER_COMPOSE_INT_TEST} stop backend

# stop all docker tests containers
stop_tests:
Expand All @@ -305,6 +307,10 @@ update_tests_results: build_lang_test
${DOCKER_COMPOSE_TEST} stop

bash:
@echo "🥫 Open a bash shell in the backend container"
${DOCKER_COMPOSE} run --rm -w /opt/product-opener backend bash

bash_test:
@echo "🥫 Open a bash shell in the test container"
${DOCKER_COMPOSE_TEST} run --rm -w /opt/product-opener backend bash

Expand Down Expand Up @@ -394,7 +400,8 @@ create_external_volumes:
docker volume create --driver=local -o type=none -o o=bind -o device=${DOCKER_LOCAL_DATA}/podata ${COMPOSE_PROJECT_NAME}_podata
# note for this one, it should be shared with pro instance in the future
docker volume create --driver=local -o type=none -o o=bind -o device=${DOCKER_LOCAL_DATA}/export_files ${COMPOSE_PROJECT_NAME}_export_files

# Create Redis volume
docker volume create --driver=local -o type=none -o o=bind -o device=${DOCKER_LOCAL_DATA}/redis ${COMPOSE_PROJECT_NAME}_redisdata

create_external_networks:
@echo "🥫 Creating external networks (production only) …"
Expand Down
1 change: 1 addition & 0 deletions cpanfile
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ requires 'XML::XML2JSON';
requires 'Redis';
requires 'Digest::SHA1';
requires 'Data::Difference';
requires 'Data::Compare';

# Mojolicious/Minion
requires 'Mojolicious::Lite';
Expand Down
6 changes: 6 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ services:
- POSTGRES_DB=minion
volumes:
- pgdata:/var/lib/postgresql/data
redis:
image: redis:7.2-alpine
command: redis-server --save 60 1 --loglevel warning
volumes:
- redisdata:/data
backend: *backend-conf
minion:
<<: *backend-conf
Expand Down Expand Up @@ -139,3 +144,4 @@ volumes:
# build_cache does not needs not be external, for wiping it will avoid keeping too much cached data
build_cache:
other_servers:
redisdata:
1 change: 1 addition & 0 deletions docker/dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ services:
- ./html/data:/import
ports:
- "127.0.0.1:${MONGO_EXPOSE_PORT:-27017}:27017"

volumes:
product_images:
html_data:
Expand Down
7 changes: 7 additions & 0 deletions docker/prod.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ services:
restart: always
networks:
- webnet
redis:
restart: always
networks:
- webnet
volumes:
html_data:
external: true
Expand All @@ -43,6 +47,9 @@ volumes:
podata:
external: true
name: ${COMPOSE_PROJECT_NAME}_podata
redisdata:
external: true
name: ${COMPOSE_PROJECT_NAME}_redisdata
export_files:
name: ${COMPOSE_PROJECT_NAME}_export_files

Expand Down
1 change: 1 addition & 0 deletions lib/ProductOpener/APITest.pm
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ BEGIN {
&mail_to_text
&new_client
&normalize_mail_for_comparison
&origin_from_url
&post_form
&tail_log_start
&tail_log_read
Expand Down
3 changes: 3 additions & 0 deletions lib/ProductOpener/Config_obf.pm
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,9 @@ $options{display_tag_ingredients} = [

# allow moving products to other instances of Product Opener on the same server
# e.g. OFF -> OBF

$options{current_server} = "obf";

$options{other_servers} = {
obf => {
name => "Open Beauty Facts",
Expand Down
3 changes: 3 additions & 0 deletions lib/ProductOpener/Config_opf.pm
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,9 @@ HTML

# allow moving products to other instances of Product Opener on the same server
# e.g. OFF -> OBF

$options{current_server} = "opf";

$options{other_servers} = {
obf => {
name => "Open Beauty Facts",
Expand Down
3 changes: 3 additions & 0 deletions lib/ProductOpener/Config_opff.pm
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,9 @@ XML

# allow moving products to other instances of Product Opener on the same server
# e.g. OFF -> OBF

$options{current_server} = "opff";

$options{other_servers} = {
obf => {
name => "Open Beauty Facts",
Expand Down
8 changes: 4 additions & 4 deletions lib/ProductOpener/Products.pm
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ use ProductOpener::Data qw/:all/;
use ProductOpener::MainCountries qw/:all/;
use ProductOpener::Text qw/:all/;
use ProductOpener::Display qw/single_param/;
use ProductOpener::Redis qw/push_to_search_service/;
use ProductOpener::Redis qw/push_to_redis_stream/;

# needed by analyze_and_enrich_product_data()
# may be moved to another module at some point
Expand Down Expand Up @@ -1452,11 +1452,11 @@ sub store_product ($user_id, $product_ref, $comment) {

$log->debug("store_product - done", {code => $code, product_id => $product_id}) if $log->is_debug();

# index for search service
push_to_search_service($product_ref);
my $update_type = $product_ref->{deleted} ? "deleted" : "updated";
# Publish information about update on Redis stream
push_to_redis_stream($user_id, $product_ref, $update_type, $comment, $diffs);

# Notify Robotoff
my $update_type = $product_ref->{deleted} ? "deleted" : "updated";
send_notification_for_product_change($user_id, $product_ref, $update_type, $comment, $diffs);

return 1;
Expand Down
59 changes: 46 additions & 13 deletions lib/ProductOpener/Redis.pm
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@

=head1 NAME
ProductOpener::Redis - functions to push informations to redis
ProductOpener::Redis - functions to push information to redis
=head1 DESCRIPTION
C<ProductOpener::Redis> is handling pushing info to Redis
to communicate updates to openfoodfacts-search instance
to communicate updates to all services, including search-a-licious.
=cut

package ProductOpener::Redis;

use ProductOpener::Config qw/:all/;
use ProductOpener::PerlStandards;
use Exporter qw< import >;
use JSON::PP;

BEGIN {
use vars qw(@ISA @EXPORT_OK %EXPORT_TAGS);
@EXPORT_OK = qw(
&push_to_search_service
&push_to_redis_stream
); # symbols to export on request
%EXPORT_TAGS = (all => [@EXPORT_OK]);
}
Expand Down Expand Up @@ -64,24 +66,36 @@ sub init_redis() {
return;
}

=head2 push_to_search_service ($product_ref)
=head2 push_to_redis_stream ($user_id, $product_ref, $action, $comment, $diffs)
Inform openfoodfacts-search that a product was updated.
It uses Redis to do that.
Add an event to Redis stream to inform that a product was updated.
=head3 Arguments
=head4 String $user_id
The user that updated the product.
=head4 Product Object $product_ref
The product that was updated.
=head4 String $action
The action that was performed on the product (either "updated" or "deleted").
A product creation is considered as an update.
=head4 String $comment
The user comment associated with the update.
=head4 HashRef $diffs
a hashref of the differences between the previous and new revision of the product.
=cut

sub push_to_search_service ($product_ref) {
sub push_to_redis_stream ($user_id, $product_ref, $action, $comment, $diffs) {

if (!$redis_url) {
# off search not activated
# No Redis URL provided, we can't push to Redis
if (!$sent_warning_about_missing_redis_url) {
$log->warn("Redis URL not provided for search indexing") if $log->is_warn();
$log->warn("Redis URL not provided for streaming") if $log->is_warn();
$sent_warning_about_missing_redis_url = 1;
}
return;
Expand All @@ -90,22 +104,41 @@ sub push_to_search_service ($product_ref) {
my $error = "";
if (!defined $redis_client) {
# we where deconnected, try again
$log->info("Trying to reconnect to redis");
$log->info("Trying to reconnect to Redis");
init_redis();
}
if (defined $redis_client) {
eval {$redis_client->rpush('search_import_queue', $product_ref->{code});};
$log->debug("Pushing product update to Redis", {product_code => $product_ref->{code}}) if $log->is_debug();
eval {
$redis_client->xadd(
# name of the Redis stream
'product_update',
# We do not add a MAXLEN
'MAXLEN', '~', '10000000',
# We let Redis generate the id
'*',
# fields
'code', $product_ref->{code},
'flavor', $options{current_server},
'user_id', $user_id, 'action', $action,
'comment', $comment, 'diffs', encode_json($diffs)
);
};
$error = $@;
}
else {
$error = "Can't connect to redis";
$error = "Can't connect to Redis";
}
if (!($error eq "")) {
$log->error("Failed to push to redis", {product_code => $product_ref->{code}, error => $error})
$log->error("Failed to push product update to Redis", {product_code => $product_ref->{code}, error => $error})
if $log->is_warn();
# ask for eventual reconnection for next call
$redis_client = undef;
}
else {
$log->debug("Successfully pushed product update to Redis", {product_code => $product_ref->{code}})
if $log->is_debug();
}

return;
}
Expand Down
Loading

0 comments on commit b38c4c3

Please sign in to comment.