1
0
mirror of https://github.com/kubernetes/kubernetes.git synced 2025-02-06 11:01:14 +00:00

Compare commits

...

7 Commits

Author SHA1 Message Date
Jeffrey Ying
87a30324df
Merge eca3dfde716ef852c940d68cfba4484699ab9d19 into 9a03243789677637762eb0f907e1b4e45a0136c1 2025-02-05 21:30:08 -08:00
Kubernetes Prow Robot
9a03243789
Merge pull request #129929 from serathius/deprecate-separate-rpc
Flip SeparateCacheWatchRPC feature gate to false and deprecate it
2025-02-05 17:18:16 -08:00
Kubernetes Prow Robot
22f25efc2c
Merge pull request #128991 from Henrywu573/cm-statuz
Add statusz endpoint for kube-controller-manager
2025-02-05 15:54:15 -08:00
Henry(Qishan) Wu
8bd4e1bab2 Update test/integration/serving/serving_test.go
Co-authored-by: Antonio Ojea <antonio.ojea.garcia@gmail.com>
2025-02-05 09:48:08 -08:00
Marek Siarkowicz
4a5bbc4c15 Flip SeparateCacheWatchRPC feature gate to false and deprecate it.
Watch requests to etcd are mapped to a single stream that has a limited throughput.
By opening a lot of concurrent watch requests to single resource, users
could starve other watches from getting any events.

Separating the RPC was meant to protect the watch opened by cache.
However, as we are no longer planning to allow users to open watch directly to etcd,
the flag is not needed.
2025-01-31 14:08:15 +01:00
Jefftree
eca3dfde71 test 2025-01-28 16:13:24 +00:00
Henry Wu
5a8d77a2ae Add statusz endpoint for kube-controller-manager 2024-12-05 23:48:51 +00:00
6 changed files with 165 additions and 5 deletions

View File

@ -64,6 +64,8 @@ import (
"k8s.io/component-base/term"
utilversion "k8s.io/component-base/version"
"k8s.io/component-base/version/verflag"
zpagesfeatures "k8s.io/component-base/zpages/features"
"k8s.io/component-base/zpages/statusz"
genericcontrollermanager "k8s.io/controller-manager/app"
"k8s.io/controller-manager/controller"
"k8s.io/controller-manager/pkg/clientbuilder"
@ -91,6 +93,8 @@ const (
ControllerStartJitter = 1.0
// ConfigzName is the name used for register kube-controller manager /configz, same with GroupName.
ConfigzName = "kubecontrollermanager.config.k8s.io"
// kubeControllerManager defines variable used internally when referring to cloud-controller-manager component
kubeControllerManager = "kube-controller-manager"
)
// NewControllerManagerCommand creates a *cobra.Command object with default parameters
@ -105,7 +109,7 @@ func NewControllerManagerCommand() *cobra.Command {
}
cmd := &cobra.Command{
Use: "kube-controller-manager",
Use: kubeControllerManager,
Long: `The Kubernetes controller manager is a daemon that embeds
the core control loops shipped with Kubernetes. In applications of robotics and
automation, a control loop is a non-terminating loop that regulates the state of
@ -213,6 +217,10 @@ func Run(ctx context.Context, c *config.CompletedConfig) error {
unsecuredMux = genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, healthzHandler)
slis.SLIMetricsWithReset{}.Install(unsecuredMux)
if utilfeature.DefaultFeatureGate.Enabled(zpagesfeatures.ComponentStatusz) {
statusz.Install(unsecuredMux, kubeControllerManager, statusz.NewRegistry())
}
handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication)
// TODO: handle stoppedCh and listenerStoppedCh returned by c.SecureServing.Serve
if _, _, err := c.SecureServing.Serve(handler, 0, stopCh); err != nil {
@ -267,7 +275,7 @@ func Run(ctx context.Context, c *config.CompletedConfig) error {
logger.Info("starting leader migration")
leaderMigrator = leadermigration.NewLeaderMigrator(&c.ComponentConfig.Generic.LeaderMigration,
"kube-controller-manager")
kubeControllerManager)
// startSATokenControllerInit is the original InitFunc.
startSATokenControllerInit := saTokenControllerDescriptor.GetInitFunc()
@ -295,7 +303,7 @@ func Run(ctx context.Context, c *config.CompletedConfig) error {
c.Client,
"kube-system",
id,
"kube-controller-manager",
kubeControllerManager,
binaryVersion.FinalizeVersion(),
emulationVersion.FinalizeVersion(),
coordinationv1.OldestEmulationVersion,
@ -634,7 +642,7 @@ func CreateControllerContext(ctx context.Context, s *config.CompletedConfig, roo
RESTMapper: restMapper,
InformersStarted: make(chan struct{}),
ResyncPeriod: ResyncPeriod(s),
ControllerManagerMetrics: controllersmetrics.NewControllerManagerMetrics("kube-controller-manager"),
ControllerManagerMetrics: controllersmetrics.NewControllerManagerMetrics(kubeControllerManager),
}
if controllerContext.ComponentConfig.GarbageCollectorController.EnableGarbageCollector &&

View File

@ -93,7 +93,7 @@ kube::version::get_version_vars() {
# so use our idea of "dirty" from git status instead.
KUBE_GIT_VERSION+="-dirty"
fi
KUBE_GIT_VERSION=$(echo "${KUBE_GIT_VERSION}" | sed 's/alpha/beta/g')
# Try to match the "git describe" output to a regex to try to extract
# the "major" and "minor" versions and whether this is the exact tagged

View File

@ -325,6 +325,7 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
genericfeatures.SeparateCacheWatchRPC: {
{Version: version.MustParse("1.28"), Default: true, PreRelease: featuregate.Beta},
{Version: version.MustParse("1.33"), Default: false, PreRelease: featuregate.Deprecated},
},
genericfeatures.StorageVersionAPI: {

View File

@ -364,6 +364,7 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
SeparateCacheWatchRPC: {
{Version: version.MustParse("1.28"), Default: true, PreRelease: featuregate.Beta},
{Version: version.MustParse("1.33"), Default: false, PreRelease: featuregate.Deprecated},
},
StorageVersionAPI: {

View File

@ -1150,6 +1150,10 @@
lockToDefault: false
preRelease: Beta
version: "1.28"
- default: false
lockToDefault: false
preRelease: Deprecated
version: "1.33"
- name: SeparateTaintEvictionController
versionedSpecs:
- default: true

View File

@ -22,6 +22,7 @@ import (
"crypto/x509"
"fmt"
"io"
"net"
"net/http"
"os"
"path"
@ -30,9 +31,12 @@ import (
"k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/options"
utilfeature "k8s.io/apiserver/pkg/util/feature"
cloudprovider "k8s.io/cloud-provider"
cloudctrlmgrtesting "k8s.io/cloud-provider/app/testing"
"k8s.io/cloud-provider/fake"
featuregatetesting "k8s.io/component-base/featuregate/testing"
zpagesfeatures "k8s.io/component-base/zpages/features"
"k8s.io/klog/v2/ktesting"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
kubectrlmgrtesting "k8s.io/kubernetes/cmd/kube-controller-manager/app/testing"
@ -289,3 +293,145 @@ func fakeCloudProviderFactory(io.Reader) (cloudprovider.Interface, error) {
DisableRoutes: true, // disable routes for server tests, otherwise --cluster-cidr is required
}, nil
}
func TestKubeControllerManagerServingStatusz(t *testing.T) {
// authenticate to apiserver via bearer token
token := "flwqkenfjasasdfmwerasd" // Fake token for testing.
tokenFile, err := os.CreateTemp("", "kubeconfig")
if err != nil {
t.Fatal(err)
}
if _, err = tokenFile.WriteString(fmt.Sprintf(`
%s,system:kube-controller-manager,system:kube-controller-manager,""
`, token)); err != nil {
t.Fatal(err)
}
if err = tokenFile.Close(); err != nil {
t.Fatal(err)
}
// start apiserver
server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{
"--token-auth-file", tokenFile.Name(),
"--authorization-mode", "RBAC",
}, framework.SharedEtcd())
defer server.TearDownFn()
// create kubeconfig for the apiserver
apiserverConfig, err := os.CreateTemp("", "kubeconfig")
if err != nil {
t.Fatal(err)
}
if _, err = apiserverConfig.WriteString(fmt.Sprintf(`
apiVersion: v1
kind: Config
clusters:
- cluster:
server: %s
certificate-authority: %s
name: integration
contexts:
- context:
cluster: integration
user: controller-manager
name: default-context
current-context: default-context
users:
- name: controller-manager
user:
token: %s
`, server.ClientConfig.Host, server.ServerOpts.SecureServing.ServerCert.CertKey.CertFile, token)); err != nil {
t.Fatal(err)
}
if err = apiserverConfig.Close(); err != nil {
t.Fatal(err)
}
tests := []struct {
name string
flags []string
path string
anonymous bool // to use the token or not
wantErr bool
wantSecureCode *int
}{
{"serving /statusz", []string{
"--authentication-skip-lookup", // to survive inaccessible extensions-apiserver-authentication configmap
"--authentication-kubeconfig", apiserverConfig.Name(),
"--authorization-kubeconfig", apiserverConfig.Name(),
"--authorization-always-allow-paths", "/statusz",
"--kubeconfig", apiserverConfig.Name(),
"--leader-elect=false",
}, "/statusz", false, false, intPtr(http.StatusOK)},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, zpagesfeatures.ComponentStatusz, true)
_, ctx := ktesting.NewTestContext(t)
secureOptions, secureInfo, tearDownFn, err := kubeControllerManagerTester{}.StartTestServer(ctx, append(append([]string{}, tt.flags...), []string{}...))
if tearDownFn != nil {
defer tearDownFn()
}
if (err != nil) != tt.wantErr {
t.Fatalf("StartTestServer() error = %v, wantErr %v", err, tt.wantErr)
}
if err != nil {
return
}
if want, got := tt.wantSecureCode != nil, secureInfo != nil; want != got {
t.Errorf("SecureServing enabled: expected=%v got=%v", want, got)
} else if want {
// only interested on the port, because we are using always localhost
_, port, err := net.SplitHostPort(secureInfo.Listener.Addr().String())
if err != nil {
t.Fatalf("could not get host and port from %s : %v", secureInfo.Listener.Addr().String(), err)
}
// use IPv4 because the self-signed cert does not support [::]
url := fmt.Sprintf("https://127.0.0.1:%s%s", port, tt.path)
// read self-signed server cert disk
pool := x509.NewCertPool()
serverCertPath := path.Join(secureOptions.ServerCert.CertDirectory, secureOptions.ServerCert.PairName+".crt")
serverCert, err := os.ReadFile(serverCertPath)
if err != nil {
t.Fatalf("Failed to read component server cert %q: %v", serverCertPath, err)
}
pool.AppendCertsFromPEM(serverCert)
tr := &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: pool,
},
}
client := &http.Client{Transport: tr}
req, err := http.NewRequest(http.MethodGet, url, nil)
req.Header.Set("Accept", "text/plain")
if err != nil {
t.Fatal(err)
}
if !tt.anonymous {
req.Header.Add("Authorization", fmt.Sprintf("Token %s", token))
}
r, err := client.Do(req)
if err != nil {
t.Fatalf("failed to GET %s from component: %v", tt.path, err)
}
if _, err = io.ReadAll(r.Body); err != nil {
t.Fatalf("failed to read response body: %v", err)
}
defer func() {
if err := r.Body.Close(); err != nil {
t.Fatalf("Error closing response body: %v", err)
}
}()
if got, expected := r.StatusCode, *tt.wantSecureCode; got != expected {
t.Fatalf("expected http %d at %s of component, got: %d", expected, tt.path, got)
}
}
})
}
}