From b38c4c33b45a37eacb8919730cfe4d5c661d93f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Bournhonesque?= Date: Thu, 14 Dec 2023 12:07:43 +0100 Subject: [PATCH] fix: use Redis streams instead of redis queue (#9428) * fix: use Redis streams instead of redis queue * fix: increase MAXLEN to 10 000 000 * Add a bash option for non-test * Update all products to stream changes to redis * Minor comment change * Moved the push to redis after the save to db as the subscriber could pick up the message before the change was committed (in theory) * tests: add integration test to check for successful Redis Stream use * feat: add Redis to Product Opener in staging * chore: update REDIS_URL in staging points to the local Redis instance * fix: honor $pretend flag Co-authored-by: Alex Garel --------- Co-authored-by: John Gomersall Co-authored-by: Alex Garel --- .env | 5 ++- .github/workflows/container-deploy.yml | 2 +- Makefile | 21 ++++++--- cpanfile | 1 + docker-compose.yml | 6 +++ docker/dev.yml | 1 + docker/prod.yml | 7 +++ lib/ProductOpener/APITest.pm | 1 + lib/ProductOpener/Config_obf.pm | 3 ++ lib/ProductOpener/Config_opf.pm | 3 ++ lib/ProductOpener/Config_opff.pm | 3 ++ lib/ProductOpener/Products.pm | 8 ++-- lib/ProductOpener/Redis.pm | 59 +++++++++++++++++++------ scripts/update_all_products.pl | 50 ++++++++++++++------- stop_words.txt | 3 ++ tests/integration/add_update_to_redis.t | 49 ++++++++++++++++++++ 16 files changed, 180 insertions(+), 42 deletions(-) create mode 100644 tests/integration/add_update_to_redis.t diff --git a/.env b/.env index 9666015ae3ec2..f633ccb60c301 100644 --- a/.env +++ b/.env @@ -50,8 +50,9 @@ ROBOTOFF_URL=http://robotoff.openfoodfacts.localhost:5500 # connect to Robotoff QUERY_URL=http://query:5510 EVENTS_URL= FACETS_KP_URL = https://facets-kp.openfoodfacts.org/render-to-html -# use this to push products to openfoodfacts-search -# in dev: searchredis:6379 (with openfoodfacts-search running in docker in same network) +# we push updated products to Redis stream so that every service is notified +# when a product is updated/deleted/created +# use `redis:6379` locally if you want to enable Redis REDIS_URL= GOOGLE_CLOUD_VISION_API_KEY= CROWDIN_PROJECT_IDENTIFIER= diff --git a/.github/workflows/container-deploy.yml b/.github/workflows/container-deploy.yml index b752e1e7831cb..337e773886685 100644 --- a/.github/workflows/container-deploy.yml +++ b/.github/workflows/container-deploy.yml @@ -26,7 +26,7 @@ jobs: if: matrix.env == 'off-net' run: | # direct container access - echo "REDIS_URL=searchredis:6379" >> $GITHUB_ENV + echo "REDIS_URL=redis:6379" >> $GITHUB_ENV echo "MONGODB_HOST=10.1.0.200" >> $GITHUB_ENV echo "ROBOTOFF_URL=https://robotoff.openfoodfacts.net" >> $GITHUB_ENV echo "QUERY_URL=http://10.1.0.200:5511" >> $GITHUB_ENV diff --git a/Makefile b/Makefile index bfdbd6e419ccf..8d61c60ebe31d 100644 --- a/Makefile +++ b/Makefile @@ -52,6 +52,8 @@ DOCKER_COMPOSE=docker-compose --env-file=${ENV_FILE} ${LOAD_EXTRA_ENV_FILE} # we also publish mongodb on a separate port to avoid conflicts # we also enable the possibility to fake services in po_test_runner DOCKER_COMPOSE_TEST=ROBOTOFF_URL="http://backend:8881/" GOOGLE_CLOUD_VISION_API_URL="http://backend:8881/" COMPOSE_PROJECT_NAME=${COMPOSE_PROJECT_NAME}_test PO_COMMON_PREFIX=test_ MONGO_EXPOSE_PORT=27027 docker-compose --env-file=${ENV_FILE} +# Enable Redis only for integration tests +DOCKER_COMPOSE_INT_TEST=REDIS_URL="redis:6379" ${DOCKER_COMPOSE_TEST} .DEFAULT_GOAL := usage @@ -260,10 +262,10 @@ integration_test: create_folders # we launch the server and run tests within same container # we also need dynamicfront for some assets to exists # this is the place where variables are important - ${DOCKER_COMPOSE_TEST} up -d memcached postgres mongodb backend dynamicfront incron minion + ${DOCKER_COMPOSE_INT_TEST} up -d memcached postgres mongodb backend dynamicfront incron minion redis # note: we need the -T option for ci (non tty environment) - ${DOCKER_COMPOSE_TEST} exec ${COVER_OPTS} -T backend prove -l -r tests/integration - ${DOCKER_COMPOSE_TEST} stop + ${DOCKER_COMPOSE_INT_TEST} exec ${COVER_OPTS} -T backend prove -l -r tests/integration + ${DOCKER_COMPOSE_INT_TEST} stop @echo "🥫 integration tests success" # stop all tests dockers @@ -283,10 +285,10 @@ test-unit: guard-test create_folders # you can add args= to pass options, like args="-d" to debug test-int: guard-test create_folders @echo "🥫 Running test: 'tests/integration/${test}' …" - ${DOCKER_COMPOSE_TEST} up -d memcached postgres mongodb backend dynamicfront incron minion - ${DOCKER_COMPOSE_TEST} exec backend perl ${args} tests/integration/${test} + ${DOCKER_COMPOSE_INT_TEST} up -d memcached postgres mongodb backend dynamicfront incron minion redis + ${DOCKER_COMPOSE_INT_TEST} exec backend perl ${args} tests/integration/${test} # better shutdown, for if we do a modification of the code, we need a restart - ${DOCKER_COMPOSE_TEST} stop backend + ${DOCKER_COMPOSE_INT_TEST} stop backend # stop all docker tests containers stop_tests: @@ -305,6 +307,10 @@ update_tests_results: build_lang_test ${DOCKER_COMPOSE_TEST} stop bash: + @echo "🥫 Open a bash shell in the backend container" + ${DOCKER_COMPOSE} run --rm -w /opt/product-opener backend bash + +bash_test: @echo "🥫 Open a bash shell in the test container" ${DOCKER_COMPOSE_TEST} run --rm -w /opt/product-opener backend bash @@ -394,7 +400,8 @@ create_external_volumes: docker volume create --driver=local -o type=none -o o=bind -o device=${DOCKER_LOCAL_DATA}/podata ${COMPOSE_PROJECT_NAME}_podata # note for this one, it should be shared with pro instance in the future docker volume create --driver=local -o type=none -o o=bind -o device=${DOCKER_LOCAL_DATA}/export_files ${COMPOSE_PROJECT_NAME}_export_files - +# Create Redis volume + docker volume create --driver=local -o type=none -o o=bind -o device=${DOCKER_LOCAL_DATA}/redis ${COMPOSE_PROJECT_NAME}_redisdata create_external_networks: @echo "🥫 Creating external networks (production only) …" diff --git a/cpanfile b/cpanfile index 61ff5072f4ac1..b08ee5884b7ca 100644 --- a/cpanfile +++ b/cpanfile @@ -70,6 +70,7 @@ requires 'XML::XML2JSON'; requires 'Redis'; requires 'Digest::SHA1'; requires 'Data::Difference'; +requires 'Data::Compare'; # Mojolicious/Minion requires 'Mojolicious::Lite'; diff --git a/docker-compose.yml b/docker-compose.yml index 35358a4128147..8f06d067510f6 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -80,6 +80,11 @@ services: - POSTGRES_DB=minion volumes: - pgdata:/var/lib/postgresql/data + redis: + image: redis:7.2-alpine + command: redis-server --save 60 1 --loglevel warning + volumes: + - redisdata:/data backend: *backend-conf minion: <<: *backend-conf @@ -139,3 +144,4 @@ volumes: # build_cache does not needs not be external, for wiping it will avoid keeping too much cached data build_cache: other_servers: + redisdata: diff --git a/docker/dev.yml b/docker/dev.yml index 4aacaf88d1c6d..9ee76dc1f5e53 100644 --- a/docker/dev.yml +++ b/docker/dev.yml @@ -99,6 +99,7 @@ services: - ./html/data:/import ports: - "127.0.0.1:${MONGO_EXPOSE_PORT:-27017}:27017" + volumes: product_images: html_data: diff --git a/docker/prod.yml b/docker/prod.yml index f6be0883cb547..ba0de70b65c5a 100644 --- a/docker/prod.yml +++ b/docker/prod.yml @@ -24,6 +24,10 @@ services: restart: always networks: - webnet + redis: + restart: always + networks: + - webnet volumes: html_data: external: true @@ -43,6 +47,9 @@ volumes: podata: external: true name: ${COMPOSE_PROJECT_NAME}_podata + redisdata: + external: true + name: ${COMPOSE_PROJECT_NAME}_redisdata export_files: name: ${COMPOSE_PROJECT_NAME}_export_files diff --git a/lib/ProductOpener/APITest.pm b/lib/ProductOpener/APITest.pm index a9d8a9f5cf84f..c104a92ef3a3b 100644 --- a/lib/ProductOpener/APITest.pm +++ b/lib/ProductOpener/APITest.pm @@ -45,6 +45,7 @@ BEGIN { &mail_to_text &new_client &normalize_mail_for_comparison + &origin_from_url &post_form &tail_log_start &tail_log_read diff --git a/lib/ProductOpener/Config_obf.pm b/lib/ProductOpener/Config_obf.pm index 165d22b745a62..9d9ac51676cef 100644 --- a/lib/ProductOpener/Config_obf.pm +++ b/lib/ProductOpener/Config_obf.pm @@ -380,6 +380,9 @@ $options{display_tag_ingredients} = [ # allow moving products to other instances of Product Opener on the same server # e.g. OFF -> OBF + +$options{current_server} = "obf"; + $options{other_servers} = { obf => { name => "Open Beauty Facts", diff --git a/lib/ProductOpener/Config_opf.pm b/lib/ProductOpener/Config_opf.pm index cc93ca8061ac2..4d1ef7410ee74 100644 --- a/lib/ProductOpener/Config_opf.pm +++ b/lib/ProductOpener/Config_opf.pm @@ -361,6 +361,9 @@ HTML # allow moving products to other instances of Product Opener on the same server # e.g. OFF -> OBF + +$options{current_server} = "opf"; + $options{other_servers} = { obf => { name => "Open Beauty Facts", diff --git a/lib/ProductOpener/Config_opff.pm b/lib/ProductOpener/Config_opff.pm index 4268a3a21d0c9..2e647c66a23a6 100644 --- a/lib/ProductOpener/Config_opff.pm +++ b/lib/ProductOpener/Config_opff.pm @@ -402,6 +402,9 @@ XML # allow moving products to other instances of Product Opener on the same server # e.g. OFF -> OBF + +$options{current_server} = "opff"; + $options{other_servers} = { obf => { name => "Open Beauty Facts", diff --git a/lib/ProductOpener/Products.pm b/lib/ProductOpener/Products.pm index 129aed5723fd0..8233977fa0968 100644 --- a/lib/ProductOpener/Products.pm +++ b/lib/ProductOpener/Products.pm @@ -141,7 +141,7 @@ use ProductOpener::Data qw/:all/; use ProductOpener::MainCountries qw/:all/; use ProductOpener::Text qw/:all/; use ProductOpener::Display qw/single_param/; -use ProductOpener::Redis qw/push_to_search_service/; +use ProductOpener::Redis qw/push_to_redis_stream/; # needed by analyze_and_enrich_product_data() # may be moved to another module at some point @@ -1452,11 +1452,11 @@ sub store_product ($user_id, $product_ref, $comment) { $log->debug("store_product - done", {code => $code, product_id => $product_id}) if $log->is_debug(); - # index for search service - push_to_search_service($product_ref); + my $update_type = $product_ref->{deleted} ? "deleted" : "updated"; + # Publish information about update on Redis stream + push_to_redis_stream($user_id, $product_ref, $update_type, $comment, $diffs); # Notify Robotoff - my $update_type = $product_ref->{deleted} ? "deleted" : "updated"; send_notification_for_product_change($user_id, $product_ref, $update_type, $comment, $diffs); return 1; diff --git a/lib/ProductOpener/Redis.pm b/lib/ProductOpener/Redis.pm index a5c9673e894c1..41cf06d3e46b4 100644 --- a/lib/ProductOpener/Redis.pm +++ b/lib/ProductOpener/Redis.pm @@ -1,24 +1,26 @@ =head1 NAME -ProductOpener::Redis - functions to push informations to redis +ProductOpener::Redis - functions to push information to redis =head1 DESCRIPTION C is handling pushing info to Redis -to communicate updates to openfoodfacts-search instance +to communicate updates to all services, including search-a-licious. =cut package ProductOpener::Redis; +use ProductOpener::Config qw/:all/; use ProductOpener::PerlStandards; use Exporter qw< import >; +use JSON::PP; BEGIN { use vars qw(@ISA @EXPORT_OK %EXPORT_TAGS); @EXPORT_OK = qw( - &push_to_search_service + &push_to_redis_stream ); # symbols to export on request %EXPORT_TAGS = (all => [@EXPORT_OK]); } @@ -64,24 +66,36 @@ sub init_redis() { return; } -=head2 push_to_search_service ($product_ref) +=head2 push_to_redis_stream ($user_id, $product_ref, $action, $comment, $diffs) -Inform openfoodfacts-search that a product was updated. -It uses Redis to do that. +Add an event to Redis stream to inform that a product was updated. =head3 Arguments +=head4 String $user_id +The user that updated the product. + =head4 Product Object $product_ref The product that was updated. +=head4 String $action +The action that was performed on the product (either "updated" or "deleted"). +A product creation is considered as an update. + +=head4 String $comment +The user comment associated with the update. + +=head4 HashRef $diffs +a hashref of the differences between the previous and new revision of the product. + =cut -sub push_to_search_service ($product_ref) { +sub push_to_redis_stream ($user_id, $product_ref, $action, $comment, $diffs) { if (!$redis_url) { - # off search not activated + # No Redis URL provided, we can't push to Redis if (!$sent_warning_about_missing_redis_url) { - $log->warn("Redis URL not provided for search indexing") if $log->is_warn(); + $log->warn("Redis URL not provided for streaming") if $log->is_warn(); $sent_warning_about_missing_redis_url = 1; } return; @@ -90,22 +104,41 @@ sub push_to_search_service ($product_ref) { my $error = ""; if (!defined $redis_client) { # we where deconnected, try again - $log->info("Trying to reconnect to redis"); + $log->info("Trying to reconnect to Redis"); init_redis(); } if (defined $redis_client) { - eval {$redis_client->rpush('search_import_queue', $product_ref->{code});}; + $log->debug("Pushing product update to Redis", {product_code => $product_ref->{code}}) if $log->is_debug(); + eval { + $redis_client->xadd( + # name of the Redis stream + 'product_update', + # We do not add a MAXLEN + 'MAXLEN', '~', '10000000', + # We let Redis generate the id + '*', + # fields + 'code', $product_ref->{code}, + 'flavor', $options{current_server}, + 'user_id', $user_id, 'action', $action, + 'comment', $comment, 'diffs', encode_json($diffs) + ); + }; $error = $@; } else { - $error = "Can't connect to redis"; + $error = "Can't connect to Redis"; } if (!($error eq "")) { - $log->error("Failed to push to redis", {product_code => $product_ref->{code}, error => $error}) + $log->error("Failed to push product update to Redis", {product_code => $product_ref->{code}, error => $error}) if $log->is_warn(); # ask for eventual reconnection for next call $redis_client = undef; } + else { + $log->debug("Successfully pushed product update to Redis", {product_code => $product_ref->{code}}) + if $log->is_debug(); + } return; } diff --git a/scripts/update_all_products.pl b/scripts/update_all_products.pl index c41be3ef65117..2a503ecad92e0 100755 --- a/scripts/update_all_products.pl +++ b/scripts/update_all_products.pl @@ -78,6 +78,7 @@ use ProductOpener::PackagerCodes qw/:all/; use ProductOpener::API qw/:all/; use ProductOpener::LoadData qw/:all/; +use ProductOpener::Redis qw/push_to_redis_stream/; use CGI qw/:cgi :form escapeHTML/; use URI::Escape::XS; @@ -85,6 +86,7 @@ use Encode; use JSON::PP; use Data::DeepAccess qw(deep_get deep_exists deep_set); +use Data::Compare; use Log::Any::Adapter 'TAP'; @@ -396,6 +398,7 @@ } $cursor->immortal(1); +my $l = 0; # number of products tested my $n = 0; # number of products updated my $m = 0; # number of products with a new version created @@ -424,6 +427,7 @@ } while (my $product_ref = $cursor->next) { + $l++; # Response structure to keep track of warnings and errors # Note: currently some warnings and errors are added, @@ -440,13 +444,10 @@ } if (not defined $code) { - print STDERR "code field undefined for product id: " - . $product_ref->{id} - . " _id: " - . $product_ref->{_id} . "\n"; + print STDERR "\ncode field undefined for product id: " . $product_ref->{id} . " _id: " . $product_ref->{_id}; } else { - print STDERR "updating product code: $code $owner_info ($n / $products_count)\n"; + print STDERR "\nupdating product code: $code $owner_info ($l / $products_count)"; } next if $just_print_codes; @@ -457,6 +458,7 @@ } if ((defined $product_ref) and ($productid ne '')) { + my $original_product = dclone($product_ref); $lc = $product_ref->{lc}; @@ -1347,7 +1349,18 @@ assign_ciqual_codes($product_ref); } + my $any_change = $product_values_changed; if (not $pretend) { + if (!$any_change) { + # Deep compare with original (if we don't already know that a change has been made) + $any_change = !Compare($product_ref, $original_product); + } + if (!$any_change) { + print STDERR ". Skipped"; + } + } + + if ($any_change and (!$pretend)) { $product_ref->{update_key} = $key; # Create a new version of the product and create a new .sto file @@ -1359,21 +1372,24 @@ # Otherwise, we silently update the .sto file of the last version else { - # make sure nutrient values are numbers ProductOpener::Products::make_sure_numbers_are_stored_as_numbers($product_ref); + # Make sure product _id and code are saved as string and not a number + # see bug #1077 - https://github.com/openfoodfacts/openfoodfacts-server/issues/1077 + # make sure that code is saved as a string, otherwise mongodb saves it as number, and leading 0s are removed + $product_ref->{_id} .= ''; + $product_ref->{code} .= ''; + + # Set last modified time + $product_ref->{last_modified_t} = time() + 0; + if (!$mongodb_to_mongodb) { # Store data to .sto file store("$BASE_DIRS{PRODUCTS}/$path/product.sto", $product_ref); } # Store data to mongodb - # Make sure product _id and code are saved as string and not a number - # see bug #1077 - https://github.com/openfoodfacts/openfoodfacts-server/issues/1077 - # make sure that code is saved as a string, otherwise mongodb saves it as number, and leading 0s are removed - $product_ref->{_id} .= ''; - $product_ref->{code} .= ''; my $collection = "current"; if ($product_ref->{obsolete}) { $collection = "obsolete"; @@ -1393,17 +1409,21 @@ $products_collection->delete_one({"_id" => $product_ref->{_id}}); $fix_obsolete_fixed++; } + + # Send to redis + push_to_redis_stream('update_all_products', $product_ref, "updated", $comment, {}); } - } - $n++; + $n++; + } } else { - print STDERR "Unable to load product file for product code $code\n"; + print STDERR ". Unable to load product file for product code $code"; } - } +print STDERR "\n"; + if ($prefix_packaging_tags_with_language) { print "Results of --prefixy-packaging-tags-with-language:\n\n"; diff --git a/stop_words.txt b/stop_words.txt index 64015d94a4c84..d2e4d0f99f357 100644 --- a/stop_words.txt +++ b/stop_words.txt @@ -258,3 +258,6 @@ webpage webpages bing txt +redis +init +licious \ No newline at end of file diff --git a/tests/integration/add_update_to_redis.t b/tests/integration/add_update_to_redis.t new file mode 100644 index 0000000000000..3dba53186125c --- /dev/null +++ b/tests/integration/add_update_to_redis.t @@ -0,0 +1,49 @@ +#!/usr/bin/perl -w + +use ProductOpener::PerlStandards; + +use Test::More; +use ProductOpener::APITest qw/:all/; +use ProductOpener::Test qw/:all/; +use ProductOpener::TestDefaults qw/:all/; + +remove_all_users(); + +remove_all_products(); + +wait_application_ready(); + +my $test_ua = new_client(); + +my %create_user_args = (%default_user_form, (email => 'bob@gmail.com')); +create_user($test_ua, \%create_user_args); + +my %product_form = ( + cc => "be", + lc => "fr", + code => "1234567890001", + product_name => "Product name", + categories => "Cookies", + quantity => "250 g", + serving_size => '20 g', + ingredients_text_fr => "Farine de blé, eau, sel, sucre", + labels => "Bio, Max Havelaar", + nutriment_salt => '50.2', + nutriment_salt_unit => 'mg', + nutriment_sugars => '12.5' +); + +my $url = construct_test_url("/cgi/product_jqm_multilingual.pl"); +my $headers_in = {"Origin" => origin_from_url($url)}; +# We use the logging mechanism to check that the product update is pushed to Redis +my $tail = tail_log_start(); +my $response = $test_ua->post($url, Content => \%product_form, %$headers_in); +# Stop logging +my $logs = tail_log_read($tail); + +# Check that the push_to_redis_stream function was called and that Redis connection was successful +ok($logs =~ /Pushing product update to Redis/, "pushing product update to Redis"); +# Check that the Redis call didn't trigger an error +ok($logs =~ /Successfully pushed product update to Redis/, "successfully pushed product update to Redis"); +ok($logs !~ /Failed to push product update to Redis/, "no failure when pushing product update to Redis"); +done_testing();