1
0
mirror of https://github.com/kubernetes/kubernetes.git synced 2025-02-06 11:01:14 +00:00

Compare commits

...

7 Commits

Author SHA1 Message Date
Vinayak Goyal
d0789ca04e
Merge a44ad31b1a81805e50b177bfc0cba4df9225d52b into 9a03243789677637762eb0f907e1b4e45a0136c1 2025-02-05 21:30:08 -08:00
Kubernetes Prow Robot
9a03243789
Merge pull request #129929 from serathius/deprecate-separate-rpc
Flip SeparateCacheWatchRPC feature gate to false and deprecate it
2025-02-05 17:18:16 -08:00
Kubernetes Prow Robot
22f25efc2c
Merge pull request #128991 from Henrywu573/cm-statuz
Add statusz endpoint for kube-controller-manager
2025-02-05 15:54:15 -08:00
Henry(Qishan) Wu
8bd4e1bab2 Update test/integration/serving/serving_test.go
Co-authored-by: Antonio Ojea <antonio.ojea.garcia@gmail.com>
2025-02-05 09:48:08 -08:00
Vinayak Goyal
a44ad31b1a Remove unused service account creation from node_authn.go 2025-01-31 22:28:08 +00:00
Marek Siarkowicz
4a5bbc4c15 Flip SeparateCacheWatchRPC feature gate to false and deprecate it.
Watch requests to etcd are mapped to a single stream that has a limited throughput.
By opening a lot of concurrent watch requests to single resource, users
could starve other watches from getting any events.

Separating the RPC was meant to protect the watch opened by cache.
However, as we are no longer planning to allow users to open watch directly to etcd,
the flag is not needed.
2025-01-31 14:08:15 +01:00
Henry Wu
5a8d77a2ae Add statusz endpoint for kube-controller-manager 2024-12-05 23:48:51 +00:00
6 changed files with 164 additions and 17 deletions

View File

@ -64,6 +64,8 @@ import (
"k8s.io/component-base/term"
utilversion "k8s.io/component-base/version"
"k8s.io/component-base/version/verflag"
zpagesfeatures "k8s.io/component-base/zpages/features"
"k8s.io/component-base/zpages/statusz"
genericcontrollermanager "k8s.io/controller-manager/app"
"k8s.io/controller-manager/controller"
"k8s.io/controller-manager/pkg/clientbuilder"
@ -91,6 +93,8 @@ const (
ControllerStartJitter = 1.0
// ConfigzName is the name used for register kube-controller manager /configz, same with GroupName.
ConfigzName = "kubecontrollermanager.config.k8s.io"
// kubeControllerManager defines variable used internally when referring to cloud-controller-manager component
kubeControllerManager = "kube-controller-manager"
)
// NewControllerManagerCommand creates a *cobra.Command object with default parameters
@ -105,7 +109,7 @@ func NewControllerManagerCommand() *cobra.Command {
}
cmd := &cobra.Command{
Use: "kube-controller-manager",
Use: kubeControllerManager,
Long: `The Kubernetes controller manager is a daemon that embeds
the core control loops shipped with Kubernetes. In applications of robotics and
automation, a control loop is a non-terminating loop that regulates the state of
@ -213,6 +217,10 @@ func Run(ctx context.Context, c *config.CompletedConfig) error {
unsecuredMux = genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, healthzHandler)
slis.SLIMetricsWithReset{}.Install(unsecuredMux)
if utilfeature.DefaultFeatureGate.Enabled(zpagesfeatures.ComponentStatusz) {
statusz.Install(unsecuredMux, kubeControllerManager, statusz.NewRegistry())
}
handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication)
// TODO: handle stoppedCh and listenerStoppedCh returned by c.SecureServing.Serve
if _, _, err := c.SecureServing.Serve(handler, 0, stopCh); err != nil {
@ -267,7 +275,7 @@ func Run(ctx context.Context, c *config.CompletedConfig) error {
logger.Info("starting leader migration")
leaderMigrator = leadermigration.NewLeaderMigrator(&c.ComponentConfig.Generic.LeaderMigration,
"kube-controller-manager")
kubeControllerManager)
// startSATokenControllerInit is the original InitFunc.
startSATokenControllerInit := saTokenControllerDescriptor.GetInitFunc()
@ -295,7 +303,7 @@ func Run(ctx context.Context, c *config.CompletedConfig) error {
c.Client,
"kube-system",
id,
"kube-controller-manager",
kubeControllerManager,
binaryVersion.FinalizeVersion(),
emulationVersion.FinalizeVersion(),
coordinationv1.OldestEmulationVersion,
@ -634,7 +642,7 @@ func CreateControllerContext(ctx context.Context, s *config.CompletedConfig, roo
RESTMapper: restMapper,
InformersStarted: make(chan struct{}),
ResyncPeriod: ResyncPeriod(s),
ControllerManagerMetrics: controllersmetrics.NewControllerManagerMetrics("kube-controller-manager"),
ControllerManagerMetrics: controllersmetrics.NewControllerManagerMetrics(kubeControllerManager),
}
if controllerContext.ComponentConfig.GarbageCollectorController.EnableGarbageCollector &&

View File

@ -325,6 +325,7 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
genericfeatures.SeparateCacheWatchRPC: {
{Version: version.MustParse("1.28"), Default: true, PreRelease: featuregate.Beta},
{Version: version.MustParse("1.33"), Default: false, PreRelease: featuregate.Deprecated},
},
genericfeatures.StorageVersionAPI: {

View File

@ -364,6 +364,7 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
SeparateCacheWatchRPC: {
{Version: version.MustParse("1.28"), Default: true, PreRelease: featuregate.Beta},
{Version: version.MustParse("1.33"), Default: false, PreRelease: featuregate.Deprecated},
},
StorageVersionAPI: {

View File

@ -23,7 +23,6 @@ import (
"strconv"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/cluster/ports"
"k8s.io/kubernetes/test/e2e/feature"
"k8s.io/kubernetes/test/e2e/framework"
@ -68,18 +67,6 @@ var _ = SIGDescribe(feature.NodeAuthenticator, func() {
})
ginkgo.It("The kubelet can delegate ServiceAccount tokens to the API server", func(ctx context.Context) {
ginkgo.By("create a new ServiceAccount for authentication")
trueValue := true
newSA := &v1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Namespace: ns,
Name: "node-auth-newsa",
},
AutomountServiceAccountToken: &trueValue,
}
_, err := f.ClientSet.CoreV1().ServiceAccounts(ns).Create(ctx, newSA, metav1.CreateOptions{})
framework.ExpectNoError(err, "failed to create service account (%s:%s)", ns, newSA.Name)
pod := createNodeAuthTestPod(ctx, f)
for _, nodeIP := range nodeIPs {

View File

@ -1150,6 +1150,10 @@
lockToDefault: false
preRelease: Beta
version: "1.28"
- default: false
lockToDefault: false
preRelease: Deprecated
version: "1.33"
- name: SeparateTaintEvictionController
versionedSpecs:
- default: true

View File

@ -22,6 +22,7 @@ import (
"crypto/x509"
"fmt"
"io"
"net"
"net/http"
"os"
"path"
@ -30,9 +31,12 @@ import (
"k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/options"
utilfeature "k8s.io/apiserver/pkg/util/feature"
cloudprovider "k8s.io/cloud-provider"
cloudctrlmgrtesting "k8s.io/cloud-provider/app/testing"
"k8s.io/cloud-provider/fake"
featuregatetesting "k8s.io/component-base/featuregate/testing"
zpagesfeatures "k8s.io/component-base/zpages/features"
"k8s.io/klog/v2/ktesting"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
kubectrlmgrtesting "k8s.io/kubernetes/cmd/kube-controller-manager/app/testing"
@ -289,3 +293,145 @@ func fakeCloudProviderFactory(io.Reader) (cloudprovider.Interface, error) {
DisableRoutes: true, // disable routes for server tests, otherwise --cluster-cidr is required
}, nil
}
func TestKubeControllerManagerServingStatusz(t *testing.T) {
// authenticate to apiserver via bearer token
token := "flwqkenfjasasdfmwerasd" // Fake token for testing.
tokenFile, err := os.CreateTemp("", "kubeconfig")
if err != nil {
t.Fatal(err)
}
if _, err = tokenFile.WriteString(fmt.Sprintf(`
%s,system:kube-controller-manager,system:kube-controller-manager,""
`, token)); err != nil {
t.Fatal(err)
}
if err = tokenFile.Close(); err != nil {
t.Fatal(err)
}
// start apiserver
server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{
"--token-auth-file", tokenFile.Name(),
"--authorization-mode", "RBAC",
}, framework.SharedEtcd())
defer server.TearDownFn()
// create kubeconfig for the apiserver
apiserverConfig, err := os.CreateTemp("", "kubeconfig")
if err != nil {
t.Fatal(err)
}
if _, err = apiserverConfig.WriteString(fmt.Sprintf(`
apiVersion: v1
kind: Config
clusters:
- cluster:
server: %s
certificate-authority: %s
name: integration
contexts:
- context:
cluster: integration
user: controller-manager
name: default-context
current-context: default-context
users:
- name: controller-manager
user:
token: %s
`, server.ClientConfig.Host, server.ServerOpts.SecureServing.ServerCert.CertKey.CertFile, token)); err != nil {
t.Fatal(err)
}
if err = apiserverConfig.Close(); err != nil {
t.Fatal(err)
}
tests := []struct {
name string
flags []string
path string
anonymous bool // to use the token or not
wantErr bool
wantSecureCode *int
}{
{"serving /statusz", []string{
"--authentication-skip-lookup", // to survive inaccessible extensions-apiserver-authentication configmap
"--authentication-kubeconfig", apiserverConfig.Name(),
"--authorization-kubeconfig", apiserverConfig.Name(),
"--authorization-always-allow-paths", "/statusz",
"--kubeconfig", apiserverConfig.Name(),
"--leader-elect=false",
}, "/statusz", false, false, intPtr(http.StatusOK)},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, zpagesfeatures.ComponentStatusz, true)
_, ctx := ktesting.NewTestContext(t)
secureOptions, secureInfo, tearDownFn, err := kubeControllerManagerTester{}.StartTestServer(ctx, append(append([]string{}, tt.flags...), []string{}...))
if tearDownFn != nil {
defer tearDownFn()
}
if (err != nil) != tt.wantErr {
t.Fatalf("StartTestServer() error = %v, wantErr %v", err, tt.wantErr)
}
if err != nil {
return
}
if want, got := tt.wantSecureCode != nil, secureInfo != nil; want != got {
t.Errorf("SecureServing enabled: expected=%v got=%v", want, got)
} else if want {
// only interested on the port, because we are using always localhost
_, port, err := net.SplitHostPort(secureInfo.Listener.Addr().String())
if err != nil {
t.Fatalf("could not get host and port from %s : %v", secureInfo.Listener.Addr().String(), err)
}
// use IPv4 because the self-signed cert does not support [::]
url := fmt.Sprintf("https://127.0.0.1:%s%s", port, tt.path)
// read self-signed server cert disk
pool := x509.NewCertPool()
serverCertPath := path.Join(secureOptions.ServerCert.CertDirectory, secureOptions.ServerCert.PairName+".crt")
serverCert, err := os.ReadFile(serverCertPath)
if err != nil {
t.Fatalf("Failed to read component server cert %q: %v", serverCertPath, err)
}
pool.AppendCertsFromPEM(serverCert)
tr := &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: pool,
},
}
client := &http.Client{Transport: tr}
req, err := http.NewRequest(http.MethodGet, url, nil)
req.Header.Set("Accept", "text/plain")
if err != nil {
t.Fatal(err)
}
if !tt.anonymous {
req.Header.Add("Authorization", fmt.Sprintf("Token %s", token))
}
r, err := client.Do(req)
if err != nil {
t.Fatalf("failed to GET %s from component: %v", tt.path, err)
}
if _, err = io.ReadAll(r.Body); err != nil {
t.Fatalf("failed to read response body: %v", err)
}
defer func() {
if err := r.Body.Close(); err != nil {
t.Fatalf("Error closing response body: %v", err)
}
}()
if got, expected := r.StatusCode, *tt.wantSecureCode; got != expected {
t.Fatalf("expected http %d at %s of component, got: %d", expected, tt.path, got)
}
}
})
}
}