From 5662608cde912954c85f7228555ee7149f4c9b6e Mon Sep 17 00:00:00 2001 From: zzm Date: Sat, 26 Nov 2022 22:16:00 +0800 Subject: [PATCH] [fix #321] add `sending sigstop` test case (#322) * add sending sigstop it Signed-off-by: zeminzhou * fix check Signed-off-by: zeminzhou * add sleep Signed-off-by: zeminzhou * make check Signed-off-by: zeminzhou * adjust sleep time Signed-off-by: zeminzhou * add workload after kill -19 Signed-off-by: zeminzhou * remove UP_ADD Signed-off-by: zeminzhou * reuse check count Signed-off-by: zeminzhou * add comment Signed-off-by: zeminzhou * make check Signed-off-by: zeminzhou * add comment Signed-off-by: zeminzhou * add retry Signed-off-by: zeminzhou * add multi pd addr Signed-off-by: zeminzhou Signed-off-by: zeminzhou --- .../integration_tests/_utils/check_count | 48 ++++++++ .../integration_tests/_utils/check_sync_diff | 2 +- .../integration_tests/cdc_hang_on/run.sh | 1 + .../integration_tests/changefeed_error/run.sh | 11 +- cdc/tests/integration_tests/kill_owner/run.sh | 15 +-- cdc/tests/integration_tests/run_group.sh | 2 +- cdc/tests/integration_tests/sigstop/run.sh | 115 ++++++++++++++++++ cdc/tests/utils/rawkv_data/checksum.go | 8 +- cdc/tests/utils/rawkv_data/gen_data.go | 12 +- cdc/tests/utils/rawkv_data/main.go | 22 ++-- 10 files changed, 198 insertions(+), 38 deletions(-) create mode 100755 cdc/tests/integration_tests/_utils/check_count create mode 100644 cdc/tests/integration_tests/sigstop/run.sh diff --git a/cdc/tests/integration_tests/_utils/check_count b/cdc/tests/integration_tests/_utils/check_count new file mode 100755 index 00000000..667644dd --- /dev/null +++ b/cdc/tests/integration_tests/_utils/check_count @@ -0,0 +1,48 @@ +#!/bin/bash +# parameter 1: expected count +# parameter 2: component name +# parameter 3: pd addr +# parameter 4: max retry +set -eu + +expected=$1 +name=$2 +pd_addr=$3 + +if [ $# -ge 4 ]; then + max_retry=$4 +else + max_retry=30 +fi + +for ((i = 0; i <= $max_retry; i++)); do + case $name in + tikv) + : + count=$(pd-ctl store --pd $pd_addr | grep 'Up' | wc | awk '{print $1}') + ;; + pd) + : + count=$(pd-ctl health --pd $pd_addr | grep '\"health\": true' | wc | awk '{print $1}') + ;; + tikv-cdc) + : + count=$(tikv-cdc cli capture list --pd $pd_addr | jq '.|length') + ;; + *) + exit 1 + ;; + esac + + if [[ "$count" == "$expected" ]]; then + echo "check $name count successfully" + break + fi + + echo "failed to check $name count, expected: $expected, got: $count, retry: $i" + if [ "$i" -eq "$max_retry" ]; then + echo "failed to check $name count, max retires exceed" + exit 1 + fi + sleep 2 +done diff --git a/cdc/tests/integration_tests/_utils/check_sync_diff b/cdc/tests/integration_tests/_utils/check_sync_diff index c0e3063c..2a0ca28f 100755 --- a/cdc/tests/integration_tests/_utils/check_sync_diff +++ b/cdc/tests/integration_tests/_utils/check_sync_diff @@ -13,7 +13,7 @@ DOWN_PD=$3 if [ $# -ge 4 ]; then check_time=$4 else - check_time=30 + check_time=50 fi PWD=$(pwd) diff --git a/cdc/tests/integration_tests/cdc_hang_on/run.sh b/cdc/tests/integration_tests/cdc_hang_on/run.sh index 1451e28d..2f32d9ad 100644 --- a/cdc/tests/integration_tests/cdc_hang_on/run.sh +++ b/cdc/tests/integration_tests/cdc_hang_on/run.sh @@ -10,6 +10,7 @@ SINK_TYPE=$1 UP_PD=http://$UP_PD_HOST_1:$UP_PD_PORT_1,http://$UP_PD_HOST_2:$UP_PD_PORT_2,http://$UP_PD_HOST_3:$UP_PD_PORT_3 DOWN_PD=http://$DOWN_PD_HOST:$DOWN_PD_PORT RETRY_TIME=10 + function restart_cdc() { id=$1 local count=$(ps -aux | grep "tikv-cdc.test" | grep "cdc$id.log" | wc | awk '{print $1}') diff --git a/cdc/tests/integration_tests/changefeed_error/run.sh b/cdc/tests/integration_tests/changefeed_error/run.sh index 5d7263c2..78a7dc63 100755 --- a/cdc/tests/integration_tests/changefeed_error/run.sh +++ b/cdc/tests/integration_tests/changefeed_error/run.sh @@ -91,20 +91,11 @@ function check_no_changefeed() { fi } -function check_no_capture() { - pd=$1 - count=$(tikv-cdc cli capture list --pd=$pd 2>&1 | jq '.|length') - if [[ ! "$count" -eq "0" ]]; then - exit 1 - fi -} - export -f check_changefeed_mark_error export -f check_changefeed_mark_failed_regex export -f check_changefeed_mark_stopped_regex export -f check_changefeed_mark_stopped export -f check_no_changefeed -export -f check_no_capture function run() { rm -rf $WORK_DIR && mkdir -p $WORK_DIR @@ -138,7 +129,7 @@ function run() { export GO_FAILPOINTS='github.com/tikv/migration/cdc/cdc/owner/NewChangefeedRetryError=return(true)' kill $capture_pid - ensure $MAX_RETRIES check_no_capture $UP_PD + check_count 0 "tikv-cdc" $UP_PD $MAX_RETRIES run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY ensure $MAX_RETRIES check_changefeed_mark_error $UP_PD ${changefeedid} "failpoint injected retriable error" diff --git a/cdc/tests/integration_tests/kill_owner/run.sh b/cdc/tests/integration_tests/kill_owner/run.sh index d4b5d8e8..fa2fdc53 100755 --- a/cdc/tests/integration_tests/kill_owner/run.sh +++ b/cdc/tests/integration_tests/kill_owner/run.sh @@ -12,16 +12,6 @@ MAX_RETRIES=10 UP_PD=http://$UP_PD_HOST_1:$UP_PD_PORT_1 DOWN_PD=http://$DOWN_PD_HOST:$DOWN_PD_PORT -function check_capture_count() { - pd=$1 - expected=$2 - count=$(tikv-cdc cli capture list --pd=$pd 2>&1 | jq '.|length') - if [[ ! "$count" -eq "$expected" ]]; then - echo "count: $count expected: $expected" - exit 1 - fi -} - function kill_cdc_and_restart() { pd_addr=$1 work_dir=$2 @@ -31,12 +21,11 @@ function kill_cdc_and_restart() { cdc_pid=$(echo "$status" | jq '.pid') kill $cdc_pid - ensure $MAX_RETRIES check_capture_count $pd_addr 0 + check_count 0 "tikv-cdc" $pd_addr $MAX_RETRIES run_cdc_server --workdir $work_dir --binary $cdc_binary --addr "127.0.0.1:8600" --pd $pd_addr - ensure $MAX_RETRIES check_capture_count $pd_addr 1 + check_count 1 "tikv-cdc" $pd_addr $MAX_RETRIES } -export -f check_capture_count export -f kill_cdc_and_restart function run() { diff --git a/cdc/tests/integration_tests/run_group.sh b/cdc/tests/integration_tests/run_group.sh index aa272b76..780432c2 100755 --- a/cdc/tests/integration_tests/run_group.sh +++ b/cdc/tests/integration_tests/run_group.sh @@ -21,7 +21,7 @@ groups=( ["G07"]='kv_client_stream_reconnect multi_capture' ["G08"]='processor_err_chan processor_panic' ["G09"]='processor_resolved_ts_fallback processor_stop_delay' - ["G10"]='sink_hang' + ["G10"]='sink_hang sigstop' ["G11"]='sorter stop_downstream' ["G12"]='availability' # heavy test case ) diff --git a/cdc/tests/integration_tests/sigstop/run.sh b/cdc/tests/integration_tests/sigstop/run.sh new file mode 100644 index 00000000..c212eeba --- /dev/null +++ b/cdc/tests/integration_tests/sigstop/run.sh @@ -0,0 +1,115 @@ +#!/bin/bash + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=tikv-cdc.test +SINK_TYPE=$1 +UP_PD=http://$UP_PD_HOST_1:$UP_PD_PORT_1,http://$UP_PD_HOST_2:$UP_PD_PORT_2,http://$UP_PD_HOST_3:$UP_PD_PORT_3 +DOWN_PD=http://$DOWN_PD_HOST:$DOWN_PD_PORT + +function run_kill_upstream() { + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + start_tidb_cluster --workdir $WORK_DIR --multiple-upstream-pd "true" + cd $WORK_DIR + + start_ts=$(get_start_ts $UP_PD) + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "1" --addr "127.0.0.1:8600" --pd "$UP_PD" + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "2" --addr "127.0.0.1:8601" --pd "$UP_PD" + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "3" --addr "127.0.0.1:8602" --pd "$UP_PD" + + case $SINK_TYPE in + tikv) SINK_URI="tikv://${DOWN_PD_HOST}:${DOWN_PD_PORT}" ;; + *) SINK_URI="" ;; + esac + + tikv-cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" + + rawkv_op $UP_PD put 10000 & + sleep 1 + # send sigstop to tikv + n=$(echo $(($RANDOM % 3 + 1))) + tikv_pid=$(pgrep -f "tikv$n" | head -n1) + kill -19 $tikv_pid + sleep 10 + check_count 2 "tikv" $UP_PD + + kill -18 $tikv_pid + check_count 3 "tikv" $UP_PD + check_sync_diff $WORK_DIR $UP_PD $DOWN_PD + + # Ignore the test on PD, because sending SIGSTOP to PD may cause CDC to exit, + # `cdc_hang_on` has tested sending SIGSTOP to PD leader. + + rawkv_op $UP_PD delete 10000 & + sleep 1 + # send sigstop to tikv-cdc + n=$(echo $(($RANDOM % 2 + 1))) + cdc_pid=$(pgrep -f "tikv-cdc" | sed -n "$n"p) + kill -19 $cdc_pid + sleep 10 + check_count 2 "tikv-cdc" $UP_PD + + kill -18 $cdc_pid + check_count 3 "tikv-cdc" $UP_PD + check_sync_diff $WORK_DIR $UP_PD $DOWN_PD + + cleanup_process $CDC_BINARY +} + +function run_kill_downstream() { + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + start_tidb_cluster --workdir $WORK_DIR --multiple-upstream-pd "true" + cd $WORK_DIR + + # We start 3 tikv and 3 pd in cluster1(usually as upstream), + # 1 tikv and 1 pd in cluster2(usually as downstream). + # Now we treat cluster1 as the downstream cluster and cluster2 as upstream, + # so we can ensure high availability of downstream clusters while sending SIGSTOP. + + start_ts=$(get_start_ts $DOWN_PD) + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "1" --addr "127.0.0.1:8600" --pd "$DOWN_PD" + + case $SINK_TYPE in + tikv) SINK_URI="tikv://${UP_PD_HOST_1}:${UP_PD_PORT_1}" ;; + *) SINK_URI="" ;; + esac + + tikv-cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --pd $DOWN_PD + + rawkv_op $DOWN_PD put 10000 & + sleep 1 + # send sigstop to tikv + n=$(echo $(($RANDOM % 3 + 1))) + tikv_pid=$(pgrep -f "tikv$n" | head -n1) + kill -19 $tikv_pid + sleep 10 + check_count 2 "tikv" $UP_PD + + kill -18 $tikv_pid + check_count 3 "tikv" $UP_PD + check_sync_diff $WORK_DIR $DOWN_PD $UP_PD + + rawkv_op $DOWN_PD delete 10000 & + sleep 1 + # send sigstop to pd + n=$(echo $(($RANDOM % 3 + 1))) + pd_pid=$(pgrep -f "pd-server" | sed -n "$n"p) + kill -19 $pd_pid + sleep 10 + check_count 2 "pd" $UP_PD + + kill -18 $pd_pid + check_count 3 "pd" $UP_PD + check_sync_diff $WORK_DIR $DOWN_PD $UP_PD + + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +run_kill_upstream $* +run_kill_downstream $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/cdc/tests/utils/rawkv_data/checksum.go b/cdc/tests/utils/rawkv_data/checksum.go index fc426808..6f7579ab 100644 --- a/cdc/tests/utils/rawkv_data/checksum.go +++ b/cdc/tests/utils/rawkv_data/checksum.go @@ -43,13 +43,17 @@ func runChecksum(cmd *cobra.Command) error { } ctx := context.Background() - srcCli, err := rawkv.NewClientWithOpts(ctx, []string{cfg.SrcPD}, rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2), rawkv.WithSecurity(cfg.SrcSec)) + srcCli, err := rawkv.NewClientWithOpts(ctx, cfg.SrcPD, + rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2), + rawkv.WithSecurity(cfg.SrcSec)) if err != nil { return err } defer srcCli.Close() - dstCli, err := rawkv.NewClientWithOpts(ctx, []string{cfg.DstPD}, rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2), rawkv.WithSecurity(cfg.DstSec)) + dstCli, err := rawkv.NewClientWithOpts(ctx, cfg.DstPD, + rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2), + rawkv.WithSecurity(cfg.DstSec)) if err != nil { return err } diff --git a/cdc/tests/utils/rawkv_data/gen_data.go b/cdc/tests/utils/rawkv_data/gen_data.go index e418d6f1..786f1aef 100644 --- a/cdc/tests/utils/rawkv_data/gen_data.go +++ b/cdc/tests/utils/rawkv_data/gen_data.go @@ -87,7 +87,9 @@ func runDeleteCmd(cmd *cobra.Command) error { } ctx := context.Background() - cli, err := rawkv.NewClientWithOpts(ctx, []string{cfg.SrcPD}, rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2), rawkv.WithSecurity(cfg.SrcSec)) + cli, err := rawkv.NewClientWithOpts(ctx, cfg.SrcPD, + rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2), + rawkv.WithSecurity(cfg.SrcSec)) if err != nil { return err } @@ -122,12 +124,16 @@ func runPutCmd(cmd *cobra.Command) error { } ctx := context.Background() - cli, err := rawkv.NewClientWithOpts(ctx, []string{cfg.SrcPD}, rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2), rawkv.WithSecurity(cfg.SrcSec)) + cli, err := rawkv.NewClientWithOpts(ctx, cfg.SrcPD, + rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2), + rawkv.WithSecurity(cfg.SrcSec)) if err != nil { return err } defer cli.Close() - atomicCli, err := rawkv.NewClientWithOpts(ctx, []string{cfg.SrcPD}, rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2), rawkv.WithSecurity(cfg.SrcSec)) + atomicCli, err := rawkv.NewClientWithOpts(ctx, cfg.SrcPD, + rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2), + rawkv.WithSecurity(cfg.SrcSec)) if err != nil { return err } diff --git a/cdc/tests/utils/rawkv_data/main.go b/cdc/tests/utils/rawkv_data/main.go index 6f67b2ee..84276915 100644 --- a/cdc/tests/utils/rawkv_data/main.go +++ b/cdc/tests/utils/rawkv_data/main.go @@ -38,8 +38,8 @@ const ( ) type Config struct { - SrcPD string `json:"src-pd"` - DstPD string `json:"dst-pd"` + SrcPD []string `json:"src-pd"` + DstPD []string `json:"dst-pd"` StartIndex int `json:"start-index"` Count int `json:"count"` CAPath string `json:"ca-path"` @@ -61,12 +61,18 @@ func AddFlags(cmd *cobra.Command) { func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet, requireDstPD bool) error { var err error - if cfg.SrcPD, err = flags.GetString(flagSrcPD); err != nil { + srcPD, err := flags.GetString(flagSrcPD) + if err != nil { return err } - if cfg.DstPD, err = flags.GetString(flagDstPD); err != nil { + cfg.SrcPD = strings.Split(srcPD, ",") + + dstPD, err := flags.GetString(flagDstPD) + if err != nil { return err } + cfg.DstPD = strings.Split(dstPD, ",") + if cfg.StartIndex, err = flags.GetInt(flagStartIndex); err != nil { return err } @@ -83,10 +89,10 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet, requireDstPD bool) error return err } - if cfg.SrcPD == "" { + if len(cfg.SrcPD) == 0 { return fmt.Errorf("Upstream cluster PD is not set") } - if strings.HasPrefix(cfg.SrcPD, "https://") { + if strings.HasPrefix(cfg.SrcPD[0], "https://") { if cfg.CAPath == "" || cfg.CertPath == "" || cfg.KeyPath == "" { return fmt.Errorf("CAPath/CertPath/KeyPath is not set") } @@ -96,10 +102,10 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet, requireDstPD bool) error } if requireDstPD { - if cfg.DstPD == "" { + if len(cfg.DstPD) == 0 { return fmt.Errorf("Downstream cluster PD is not set") } - if strings.HasPrefix(cfg.DstPD, "https://") { + if strings.HasPrefix(cfg.DstPD[0], "https://") { if cfg.CAPath == "" || cfg.CertPath == "" || cfg.KeyPath == "" { return fmt.Errorf("CAPath/CertPath/KeyPath is not set") }