diff --git a/src/pixie_cli/pkg/cmd/collect_logs.go b/src/pixie_cli/pkg/cmd/collect_logs.go index eb733884f77..caf6e0be6de 100644 --- a/src/pixie_cli/pkg/cmd/collect_logs.go +++ b/src/pixie_cli/pkg/cmd/collect_logs.go @@ -27,7 +27,7 @@ import ( "github.com/spf13/viper" "px.dev/pixie/src/pixie_cli/pkg/utils" - "px.dev/pixie/src/utils/shared/k8s" + "px.dev/pixie/src/pixie_cli/pkg/vizier" ) func init() { @@ -42,7 +42,7 @@ var CollectLogsCmd = &cobra.Command{ viper.BindPFlag("namespace", cmd.Flags().Lookup("namespace")) }, Run: func(cmd *cobra.Command, args []string) { - c := k8s.NewLogCollector() + c := vizier.NewLogCollector(mustCreateBundleReader(), viper.GetString("cloud_addr")) fName := fmt.Sprintf("pixie_logs_%s.zip", time.Now().Format("20060102150405")) err := c.CollectPixieLogs(fName) if err != nil { diff --git a/src/pixie_cli/pkg/cmd/deploy.go b/src/pixie_cli/pkg/cmd/deploy.go index fe060df72d5..2515b37e41e 100644 --- a/src/pixie_cli/pkg/cmd/deploy.go +++ b/src/pixie_cli/pkg/cmd/deploy.go @@ -22,7 +22,6 @@ import ( "context" "errors" "fmt" - "io" "os" "strings" "time" @@ -72,6 +71,7 @@ var BlockListedLabels = []string{ } func init() { + DeployCmd.Flags().StringP("bundle", "b", "", "Path/URL to bundle file") DeployCmd.Flags().StringP("extract_yaml", "e", "", "Directory to extract the Pixie yamls to") DeployCmd.Flags().StringP("vizier_version", "v", "", "Pixie version to deploy") DeployCmd.Flags().BoolP("check", "c", true, "Check whether the cluster can run Pixie") @@ -106,6 +106,7 @@ var DeployCmd = &cobra.Command{ Use: "deploy", Short: "Deploys Pixie on the current K8s cluster", PreRun: func(cmd *cobra.Command, args []string) { + viper.BindPFlag("bundle", cmd.Flags().Lookup("bundle")) viper.BindPFlag("extract_yaml", cmd.Flags().Lookup("extract_yaml")) viper.BindPFlag("vizier_version", cmd.Flags().Lookup("vizier_version")) viper.BindPFlag("check", cmd.Flags().Lookup("check")) @@ -604,61 +605,6 @@ func deploy(cloudConn *grpc.ClientConn, clientset *kubernetes.Clientset, vzClien return clusterID } -func runSimpleHealthCheckScript(cloudAddr string, clusterID uuid.UUID) error { - v, err := vizier.ConnectionToVizierByID(cloudAddr, clusterID) - br := mustCreateBundleReader() - if err != nil { - return err - } - execScript := br.MustGetScript(script.AgentStatusScript) - - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - - resp, err := v.ExecuteScriptStream(ctx, execScript, nil) - if err != nil { - return err - } - - // TODO(zasgar): Make this use the Null output. We can't right now - // because of fatal message on vizier failure. - errCh := make(chan error) - // Eat all responses. - go func() { - for { - select { - case <-ctx.Done(): - if ctx.Err() != nil { - errCh <- ctx.Err() - return - } - errCh <- nil - return - case msg := <-resp: - if msg == nil { - errCh <- nil - return - } - if msg.Err != nil { - if msg.Err == io.EOF { - errCh <- nil - return - } - errCh <- msg.Err - return - } - if msg.Resp.Status != nil && msg.Resp.Status.Code != 0 { - errCh <- errors.New(msg.Resp.Status.Message) - } - // Eat messages. - } - } - }() - - err = <-errCh - return err -} - func waitForHealthCheckTaskGenerator(cloudAddr string, clusterID uuid.UUID) func() error { return func() error { timeout := time.NewTimer(5 * time.Minute) @@ -668,10 +614,15 @@ func waitForHealthCheckTaskGenerator(cloudAddr string, clusterID uuid.UUID) func case <-timeout.C: return errors.New("timeout waiting for healthcheck (it is possible that Pixie stabilized after the healthcheck timeout. To check if Pixie successfully deployed, run `px debug pods`)") default: - err := runSimpleHealthCheckScript(cloudAddr, clusterID) + _, err := vizier.RunSimpleHealthCheckScript(mustCreateBundleReader(), cloudAddr, clusterID) if err == nil { return nil } + // The health check warning error indicates the cluster successfully deployed, but there are some warnings. + // Return the error to end the polling and show the warnings. + if _, ok := err.(*vizier.HealthCheckWarning); ok { + return err + } time.Sleep(5 * time.Second) } } @@ -691,13 +642,17 @@ func waitForHealthCheck(cloudAddr string, clusterID uuid.UUID, clientset *kubern hc := utils.NewSerialTaskRunner(healthCheckJobs) err := hc.RunAndMonitor() if err != nil { - _ = pxanalytics.Client().Enqueue(&analytics.Track{ - UserId: pxconfig.Cfg().UniqueClientID, - Event: "Deploy Healthcheck Failed", - Properties: analytics.NewProperties(). - Set("err", err.Error()), - }) - utils.WithError(err).Fatal("Failed Pixie healthcheck") + if _, ok := err.(*vizier.HealthCheckWarning); ok { + utils.WithError(err).Error("Pixie healthcheck detected the following warnings:") + } else { + _ = pxanalytics.Client().Enqueue(&analytics.Track{ + UserId: pxconfig.Cfg().UniqueClientID, + Event: "Deploy Healthcheck Failed", + Properties: analytics.NewProperties(). + Set("err", err.Error()), + }) + utils.WithError(err).Fatal("Failed Pixie healthcheck") + } } _ = pxanalytics.Client().Enqueue(&analytics.Track{ UserId: pxconfig.Cfg().UniqueClientID, diff --git a/src/pixie_cli/pkg/cmd/root.go b/src/pixie_cli/pkg/cmd/root.go index 2320e2229e6..2a2702de61c 100644 --- a/src/pixie_cli/pkg/cmd/root.go +++ b/src/pixie_cli/pkg/cmd/root.go @@ -203,7 +203,6 @@ var RootCmd = &cobra.Command{ // Name a variable to store a slice of commands that don't require cloudAddr var cmdsCloudAddrNotReqd = []*cobra.Command{ - CollectLogsCmd, VersionCmd, } @@ -245,7 +244,7 @@ func checkAuthForCmd(c *cobra.Command) { os.Exit(1) } switch c { - case DeployCmd, UpdateCmd, GetCmd, DeployKeyCmd, APIKeyCmd: + case CollectLogsCmd, DeployCmd, UpdateCmd, GetCmd, DeployKeyCmd, APIKeyCmd: utils.Errorf("These commands are unsupported in Direct Vizier mode.") os.Exit(1) default: @@ -254,7 +253,7 @@ func checkAuthForCmd(c *cobra.Command) { } switch c { - case DeployCmd, UpdateCmd, RunCmd, LiveCmd, GetCmd, ScriptCmd, DeployKeyCmd, APIKeyCmd: + case CollectLogsCmd, DeployCmd, UpdateCmd, RunCmd, LiveCmd, GetCmd, ScriptCmd, DeployKeyCmd, APIKeyCmd: authenticated := auth.IsAuthenticated(viper.GetString("cloud_addr")) if !authenticated { utils.Errorf("Failed to authenticate. Please retry `px auth login`.") diff --git a/src/pixie_cli/pkg/vizier/BUILD.bazel b/src/pixie_cli/pkg/vizier/BUILD.bazel index 8ca06df7514..ce4056c7ce4 100644 --- a/src/pixie_cli/pkg/vizier/BUILD.bazel +++ b/src/pixie_cli/pkg/vizier/BUILD.bazel @@ -25,6 +25,7 @@ go_library( "data_formatter.go", "errors.go", "lister.go", + "logs.go", "script.go", "stream_adapter.go", "utils.go", diff --git a/src/pixie_cli/pkg/vizier/logs.go b/src/pixie_cli/pkg/vizier/logs.go new file mode 100644 index 00000000000..88bfef241e3 --- /dev/null +++ b/src/pixie_cli/pkg/vizier/logs.go @@ -0,0 +1,144 @@ +/* + * Copyright 2018- The Pixie Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package vizier + +import ( + "archive/zip" + "context" + "errors" + "os" + "strings" + + log "github.com/sirupsen/logrus" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + + "px.dev/pixie/src/utils/script" + "px.dev/pixie/src/utils/shared/k8s" +) + +// LogCollector collect logs for Pixie and cluster setup information. +type LogCollector struct { + k8sConfig *rest.Config + k8sClientSet *kubernetes.Clientset + cloudAddr string + br *script.BundleManager + k8s.LogCollector +} + +// NewLogCollector creates a new log collector. +func NewLogCollector(br *script.BundleManager, cloudAddr string) *LogCollector { + cfg := k8s.GetConfig() + cs := k8s.GetClientset(cfg) + return &LogCollector{ + cfg, + cs, + cloudAddr, + br, + *k8s.NewLogCollector(), + } +} + +// CollectPixieLogs collects logs for all Pixie pods and write them to the zip file fName. +func (c *LogCollector) CollectPixieLogs(fName string) error { + if !strings.HasSuffix(fName, ".zip") { + return errors.New("fname must have .zip suffix") + } + f, err := os.Create(fName) + if err != nil { + return err + } + defer f.Close() + + zf := zip.NewWriter(f) + defer zf.Close() + + vls := k8s.VizierLabelSelector() + vizierLabelSelector := metav1.FormatLabelSelector(&vls) + + // We check across all namespaces for the matching pixie pods. + vizierPodList, err := c.k8sClientSet.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{LabelSelector: vizierLabelSelector}) + if err != nil { + return err + } + + // We also need to get the logs the operator logs. + // As the LabelSelectors are ANDed, we need to make a new query and merge + // the results. + ols := k8s.OperatorLabelSelector() + operatorLabelSelector := metav1.FormatLabelSelector(&ols) + + operatorPodList, err := c.k8sClientSet.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{LabelSelector: operatorLabelSelector}) + if err != nil { + return err + } + + // Merge the two pod lists + pods := append(vizierPodList.Items, operatorPodList.Items...) + + for _, pod := range pods { + for _, containerStatus := range pod.Status.ContainerStatuses { + // Ignore prev logs, they might not exist. + _ = c.LogPodInfoToZipFile(zf, pod, containerStatus.Name, true) + + err := c.LogPodInfoToZipFile(zf, pod, containerStatus.Name, false) + if err != nil { + log.WithError(err).Warnf("Failed to log pod: %s", pod.Name) + } + } + err = c.WritePodDescription(zf, pod) + if err != nil { + log.WithError(err).Warnf("failed to write pod description") + } + } + + err = c.LogKubeCmd(zf, "nodes.log", "describe", "node") + if err != nil { + log.WithError(err).Warn("failed to log node info") + } + + err = c.LogKubeCmd(zf, "services.log", "describe", "services", "--all-namespaces", "-l", vizierLabelSelector) + if err != nil { + log.WithError(err).Warnf("failed to log services") + } + + // Describe vizier and write it to vizier.log + err = c.LogKubeCmd(zf, "vizier.log", "describe", "vizier", "--all-namespaces") + if err != nil { + log.WithError(err).Warnf("failed to log vizier crd") + } + + clusterID, err := GetCurrentVizier(c.cloudAddr) + if err != nil { + log.WithError(err).Warnf("failed to get cluster ID") + } + outputCh, err := RunSimpleHealthCheckScript(c.br, c.cloudAddr, clusterID) + + if err != nil { + entry := log.WithError(err) + if _, ok := err.(*HealthCheckWarning); ok { + entry.Warn("healthcheck script detected the following warnings:") + } else { + entry.Warn("failed to run healthcheck script") + } + } + + return c.LogOutputToZipFile(zf, "px_agent_diagnostics.txt", <-outputCh) +} diff --git a/src/pixie_cli/pkg/vizier/script.go b/src/pixie_cli/pkg/vizier/script.go index 7d8bc6954b0..fa9651c4577 100644 --- a/src/pixie_cli/pkg/vizier/script.go +++ b/src/pixie_cli/pkg/vizier/script.go @@ -19,25 +19,35 @@ package vizier import ( + "bufio" "context" + "encoding/json" "errors" "fmt" "io" + "math" "strings" "time" + "github.com/gofrs/uuid" "github.com/segmentio/analytics-go/v3" "golang.org/x/sync/errgroup" "google.golang.org/grpc/codes" apiutils "px.dev/pixie/src/api/go/pxapi/utils" "px.dev/pixie/src/api/proto/vizierpb" + "px.dev/pixie/src/pixie_cli/pkg/components" "px.dev/pixie/src/pixie_cli/pkg/pxanalytics" "px.dev/pixie/src/pixie_cli/pkg/pxconfig" "px.dev/pixie/src/pixie_cli/pkg/utils" "px.dev/pixie/src/utils/script" ) +const ( + equalityThreshold = 0.01 + headersInstalledPercColumn = "headers_installed_percent" // must match the column in px/agent_status_diagnostics +) + type taskWrapper struct { name string run func() error @@ -195,7 +205,6 @@ func runScript(ctx context.Context, conns []*Connector, execScript *script.Execu return tw, err } -// RunScript runs the script and return the data channel func RunScript(ctx context.Context, conns []*Connector, execScript *script.ExecutableScript, encOpts *vizierpb.ExecuteScriptRequest_EncryptionOptions) (chan *ExecData, error) { // TODO(zasgar): Refactor this when we change to the new API to make analytics cleaner. _ = pxanalytics.Client().Enqueue(&analytics.Track{ @@ -256,3 +265,143 @@ func RunScript(ctx context.Context, conns []*Connector, execScript *script.Execu }() return mergedResponses, nil } + +type HealthCheckWarning struct { + message string +} + +func (h *HealthCheckWarning) Error() string { + return h.message +} + +func newHealthCheckWarning(message string) error { + return &HealthCheckWarning{message} +} + +func evaluateHealthCheckResult(output string) error { + jsonData := make(map[string]interface{}) + + err := json.Unmarshal([]byte(output), &jsonData) + if err != nil { + return err + } + if v, ok := jsonData[headersInstalledPercColumn]; ok { + switch t := v.(type) { + case float64: + if math.Abs(1.0-t) > equalityThreshold { + msg := "Detected missing kernel headers on your cluster's nodes. This may cause issues with the Pixie agent. Please install kernel headers on all nodes." + return newHealthCheckWarning(msg) + } + } + } else { + return newHealthCheckWarning("Unable to detect if the cluster's nodes have the distro kernel headers installed (vizier too old to perform this check). Please ensure that the kernel headers are installed on all nodes.") + } + return nil +} + +type healthCheckData struct { + line string + err error +} + +// Runs the health check script on the specified vizier. The script's output is evaluated with +// the evaluateHealthCheckResult function to determine if the cluster is healthy. Only a single +// line of output will be parsed from the script. +func runHealthCheckScript(v *Connector, execScript *script.ExecutableScript) (chan string, error) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + var encOpts, decOpts *vizierpb.ExecuteScriptRequest_EncryptionOptions + + resp, err := RunScript(ctx, []*Connector{v}, execScript, encOpts) + if err != nil { + return nil, err + } + + reader, writer := io.Pipe() + defer writer.Close() + defer reader.Close() + factoryFunc := func(md *vizierpb.ExecuteScriptResponse_MetaData) components.OutputStreamWriter { + return components.CreateStreamWriter("json", writer) + } + tw := NewStreamOutputAdapterWithFactory(ctx, resp, "json", decOpts, factoryFunc) + + bufReader := bufio.NewReader(reader) + errCh := make(chan error, 1) + streamCh := make(chan healthCheckData, 1) + outputCh := make(chan string, 1) + go func() { + defer close(streamCh) + for { + line, err := bufReader.ReadString('\n') + streamCh <- healthCheckData{line, err} + if err != nil { + return + } + } + }() + // Consumes the first line of output from the stream or the error from the context. + // The px/agent_status_diagnostics script only outputs one line, but even in the case + // that the fallback (px/agent_status) is used, a single line informs whether the output + // can be processed properly. + go func() { + defer close(errCh) + defer close(outputCh) + for { + select { + case <-ctx.Done(): + errCh <- ctx.Err() + return + case data := <-streamCh: + line := data.line + err := data.err + if err == nil { + err = evaluateHealthCheckResult(line) + } + outputCh <- line + errCh <- err + return + } + } + }() + + err = tw.WaitForCompletion() + + if err != nil { + return outputCh, err + } + err = <-errCh + + return outputCh, err +} + +// RunSimpleHealthCheckScript runs a diagnostic pxl script to verify query serving works. +// For newer viziers, it performs additional checks to ensure that the cluster is healthy +// and that common issues are detected. +func RunSimpleHealthCheckScript(br *script.BundleManager, cloudAddr string, clusterID uuid.UUID) (chan string, error) { + v, err := ConnectionToVizierByID(cloudAddr, clusterID) + if err != nil { + return nil, err + } + execScript, err := br.GetScript(script.AgentStatusDiagnosticsScript) + + if err != nil { + execScript, err = br.GetScript(script.AgentStatusScript) + if err != nil { + return nil, err + } + } + + resp, err := runHealthCheckScript(v, execScript) + if scriptErr, ok := err.(*ScriptExecutionError); ok { + if scriptErr.Code() == CodeCompilerError { + // If the script compilation failed, we fall back to the old health check script. + execScript, err = br.GetScript(script.AgentStatusScript) + if err != nil { + return nil, err + } + return runHealthCheckScript(v, execScript) + } + } + return resp, err +} diff --git a/src/utils/script/well_known.go b/src/utils/script/well_known.go index 333006b34eb..fc6d968a90e 100644 --- a/src/utils/script/well_known.go +++ b/src/utils/script/well_known.go @@ -21,6 +21,7 @@ package script // This file has a list of well known scripts, that can be referenced // from various part of the CLI. const ( - AgentStatusScript = "px/agent_status" - ServiceStatsScript = "px/service_stats" + AgentStatusScript = "px/agent_status" + AgentStatusDiagnosticsScript = "px/agent_status_diagnostics" + ServiceStatsScript = "px/service_stats" ) diff --git a/src/utils/shared/k8s/logs.go b/src/utils/shared/k8s/logs.go index 6d5e5af2432..9831d81c753 100644 --- a/src/utils/shared/k8s/logs.go +++ b/src/utils/shared/k8s/logs.go @@ -22,16 +22,12 @@ import ( "archive/zip" "context" "encoding/json" - "errors" "fmt" "io" - "os" "os/exec" - "strings" log "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" ) @@ -60,7 +56,7 @@ func fileNameFromParams(ns string, podName string, containerName string, prev bo return fmt.Sprintf("%s__%s__%s.%s", ns, podName, containerName, suffix) } -func (c *LogCollector) logPodInfoToZipFile(zf *zip.Writer, pod v1.Pod, containerName string, prev bool) error { +func (c *LogCollector) LogPodInfoToZipFile(zf *zip.Writer, pod v1.Pod, containerName string, prev bool) error { fName := fileNameFromParams(pod.Namespace, pod.Name, containerName, prev) w, err := zf.Create(fName) if err != nil { @@ -85,7 +81,7 @@ func (c *LogCollector) logPodInfoToZipFile(zf *zip.Writer, pod v1.Pod, container return nil } -func (c *LogCollector) logKubeCmd(zf *zip.Writer, fName string, arg ...string) error { +func (c *LogCollector) LogKubeCmd(zf *zip.Writer, fName string, arg ...string) error { cmd := exec.Command("kubectl", arg...) w, err := zf.Create(fName) defer zf.Flush() @@ -112,7 +108,7 @@ func (c *LogCollector) logKubeCmd(zf *zip.Writer, fName string, arg ...string) e return cmd.Wait() } -func (c *LogCollector) writePodDescription(zf *zip.Writer, pod v1.Pod) error { +func (c *LogCollector) WritePodDescription(zf *zip.Writer, pod v1.Pod) error { w, err := zf.Create(fmt.Sprintf("%s__%s__describe.json", pod.Namespace, pod.Name)) defer zf.Flush() @@ -124,75 +120,13 @@ func (c *LogCollector) writePodDescription(zf *zip.Writer, pod v1.Pod) error { return enc.Encode(pod) } -// CollectPixieLogs collects logs for all Pixie pods and write them to the zip file fName. -func (c *LogCollector) CollectPixieLogs(fName string) error { - if !strings.HasSuffix(fName, ".zip") { - return errors.New("fname must have .zip suffix") - } - - f, err := os.Create(fName) - if err != nil { - return err - } - defer f.Close() - - zf := zip.NewWriter(f) - defer zf.Close() - - vls := VizierLabelSelector() - vizierLabelSelector := metav1.FormatLabelSelector(&vls) - - // We check across all namespaces for the matching pixie pods. - vizierPodList, err := c.k8sClientSet.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{LabelSelector: vizierLabelSelector}) - if err != nil { - return err - } - - // We also need to get the logs the operator logs. - // As the LabelSelectors are ANDed, we need to make a new query and merge - // the results. - ols := OperatorLabelSelector() - operatorLabelSelector := metav1.FormatLabelSelector(&ols) - - operatorPodList, err := c.k8sClientSet.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{LabelSelector: operatorLabelSelector}) +func (c *LogCollector) LogOutputToZipFile(zf *zip.Writer, fName string, output string) error { + w, err := zf.Create(fName) if err != nil { return err } + defer zf.Flush() - // Merge the two pod lists - pods := append(vizierPodList.Items, operatorPodList.Items...) - - for _, pod := range pods { - for _, containerStatus := range pod.Status.ContainerStatuses { - // Ignore prev logs, they might not exist. - _ = c.logPodInfoToZipFile(zf, pod, containerStatus.Name, true) - - err := c.logPodInfoToZipFile(zf, pod, containerStatus.Name, false) - if err != nil { - log.WithError(err).Warnf("Failed to log pod: %s", pod.Name) - } - } - err = c.writePodDescription(zf, pod) - if err != nil { - log.WithError(err).Warnf("failed to write pod description") - } - } - - err = c.logKubeCmd(zf, "nodes.log", "describe", "node") - if err != nil { - log.WithError(err).Warn("failed to log node info") - } - - err = c.logKubeCmd(zf, "services.log", "describe", "services", "--all-namespaces", "-l", vizierLabelSelector) - if err != nil { - log.WithError(err).Warnf("failed to log services") - } - - // Describe vizier and write it to vizier.log - err = c.logKubeCmd(zf, "vizier.log", "describe", "vizier", "--all-namespaces") - if err != nil { - log.WithError(err).Warnf("failed to log vizier crd") - } - - return nil + _, err = w.Write([]byte(output)) + return err }