-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Improve security migration resilience by handling version conflicts #137558
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
77ac083
1e5cb67
a1287e6
17d9ebb
f371a09
49dc7e9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| pr: 137558 | ||
| summary: Improve security migration resilience by handling version conflicts | ||
| area: Security | ||
| type: enhancement | ||
| issues: [] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -241,8 +241,6 @@ public void testMigrationFallbackNamePreCondition() throws Exception { | |
| waitForMigrationCompletion(SecurityMigrations.CLEANUP_ROLE_MAPPING_DUPLICATES_MIGRATION_VERSION); | ||
| // First migration is on a new index, so should skip all migrations. If we reset, it should re-trigger and run all migrations | ||
| resetMigration(); | ||
| // Wait for the first migration to finish | ||
| waitForMigrationCompletion(SecurityMigrations.CLEANUP_ROLE_MAPPING_DUPLICATES_MIGRATION_VERSION - 1); | ||
|
|
||
| // Make sure migration didn't run yet (blocked by the fallback name) | ||
| assertMigrationLessThan(SecurityMigrations.CLEANUP_ROLE_MAPPING_DUPLICATES_MIGRATION_VERSION); | ||
|
|
@@ -315,10 +313,7 @@ public void testNewIndexSkipMigration() { | |
| ensureGreen(); | ||
| deleteSecurityIndex(); // hack to force a new security index to be created | ||
| ensureGreen(); | ||
| CountDownLatch awaitMigrations = awaitMigrationVersionUpdates( | ||
| masterNode, | ||
| SecurityMigrations.CLEANUP_ROLE_MAPPING_DUPLICATES_MIGRATION_VERSION | ||
| ); | ||
| CountDownLatch awaitMigrations = awaitMigrationVersionUpdates(masterNode, SecurityMigrations.MIGRATIONS_BY_VERSION.lastKey()); | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Order of migrations changed, this should have been |
||
| // Create a native role mapping to create security index and trigger migration | ||
| createNativeRoleMapping("everyone_kibana_alone"); | ||
| // Make sure no migration ran (set to current version without applying prior migrations) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,191 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the Elastic License | ||
| * 2.0; you may not use this file except in compliance with the Elastic License | ||
| * 2.0. | ||
| */ | ||
|
|
||
| package org.elasticsearch.xpack.security.support; | ||
|
|
||
| import org.elasticsearch.action.ActionListener; | ||
| import org.elasticsearch.action.search.SearchRequest; | ||
| import org.elasticsearch.action.search.SearchResponse; | ||
| import org.elasticsearch.action.support.WriteRequest; | ||
| import org.elasticsearch.cluster.metadata.IndexMetadata; | ||
| import org.elasticsearch.cluster.service.ClusterService; | ||
| import org.elasticsearch.core.TimeValue; | ||
| import org.elasticsearch.index.query.QueryBuilders; | ||
| import org.elasticsearch.painless.PainlessPlugin; | ||
| import org.elasticsearch.plugins.Plugin; | ||
| import org.elasticsearch.search.SearchHit; | ||
| import org.elasticsearch.test.ESIntegTestCase; | ||
| import org.elasticsearch.test.ESTestCase; | ||
| import org.elasticsearch.test.SecurityIntegTestCase; | ||
| import org.elasticsearch.xcontent.ToXContent; | ||
| import org.elasticsearch.xcontent.XContentBuilder; | ||
| import org.elasticsearch.xpack.core.security.action.UpdateIndexMigrationVersionAction; | ||
| import org.elasticsearch.xpack.core.security.authz.RoleDescriptor; | ||
| import org.elasticsearch.xpack.security.authz.store.NativeRolesStore; | ||
| import org.junit.Before; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.Collection; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.Executors; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import java.util.concurrent.atomic.AtomicLong; | ||
| import java.util.stream.Collectors; | ||
| import java.util.stream.Stream; | ||
|
|
||
| import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; | ||
| import static org.elasticsearch.xpack.core.security.action.UpdateIndexMigrationVersionAction.MIGRATION_VERSION_CUSTOM_DATA_KEY; | ||
| import static org.elasticsearch.xpack.core.security.action.UpdateIndexMigrationVersionAction.MIGRATION_VERSION_CUSTOM_KEY; | ||
| import static org.elasticsearch.xpack.core.security.authz.RoleDescriptor.ROLE_TYPE; | ||
| import static org.elasticsearch.xpack.core.security.test.TestRestrictedIndices.INTERNAL_SECURITY_MAIN_INDEX_7; | ||
| import static org.elasticsearch.xpack.security.support.SecurityMigrations.ROLE_METADATA_FLATTENED_MIGRATION_VERSION; | ||
| import static org.hamcrest.Matchers.greaterThanOrEqualTo; | ||
|
|
||
| @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false) | ||
| public class MetadataFlattenedMigrationIntegTests extends SecurityIntegTestCase { | ||
|
|
||
| private final AtomicLong versionCounter = new AtomicLong(1); | ||
|
|
||
| @Before | ||
| public void resetVersion() { | ||
| versionCounter.set(1); | ||
| } | ||
|
|
||
| public void testMigrationWithConcurrentUpdates() throws Exception { | ||
| internalCluster().setBootstrapMasterNodeIndex(0); | ||
| internalCluster().startNode(); | ||
| ensureGreen(); | ||
|
|
||
| waitForMigrationCompletion(); | ||
| var roles = createRoles(); | ||
| final var nativeRoleStore = internalCluster().getInstance(NativeRolesStore.class); | ||
|
|
||
| try (ExecutorService executor = Executors.newSingleThreadExecutor()) { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can be made a lot more intense by adding more concurrency and more roles, resulting in more retries, but I've reduced it as much as possible to work well in a unit test. |
||
| final AtomicBoolean runUpdateRolesBackground = new AtomicBoolean(true); | ||
| executor.submit(() -> { | ||
| while (runUpdateRolesBackground.get()) { | ||
| // Only update half the list so the other half can be verified as migrated | ||
| RoleDescriptor roleToUpdate = randomFrom(roles.subList(0, roles.size() / 2)); | ||
|
|
||
| RoleDescriptor updatedRole = new RoleDescriptor( | ||
| roleToUpdate.getName(), | ||
| new String[] { "monitor" }, | ||
| null, | ||
| null, | ||
| null, | ||
| null, | ||
| Map.of("test", "value", "timestamp", System.currentTimeMillis(), "random", randomAlphaOfLength(10)), | ||
| null | ||
| ); | ||
| nativeRoleStore.putRole( | ||
| WriteRequest.RefreshPolicy.IMMEDIATE, | ||
| updatedRole, | ||
| ActionListener.wrap(resp -> {}, ESTestCase::fail) | ||
| ); | ||
| try { | ||
| Thread.sleep(10); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for my own understanding: what's the purpose of 10s sleep here? |
||
| } catch (InterruptedException e) { | ||
| throw new RuntimeException(e); | ||
| } | ||
| } | ||
| }); | ||
|
|
||
| resetMigration(); | ||
| try { | ||
| waitForMigrationCompletion(); | ||
| } finally { | ||
| runUpdateRolesBackground.set(false); | ||
| executor.shutdown(); | ||
| } | ||
| } | ||
| assertAllRolesHaveMetadataFlattened(); | ||
| } | ||
|
|
||
| private void resetMigration() { | ||
| client().execute( | ||
| UpdateIndexMigrationVersionAction.INSTANCE, | ||
| new UpdateIndexMigrationVersionAction.Request( | ||
| TimeValue.MAX_VALUE, | ||
| ROLE_METADATA_FLATTENED_MIGRATION_VERSION - 1, | ||
| INTERNAL_SECURITY_MAIN_INDEX_7 | ||
| ) | ||
| ).actionGet(); | ||
| } | ||
|
|
||
| private List<RoleDescriptor> createRoles() throws IOException { | ||
| var roles = randomList( | ||
| 25, | ||
| 50, | ||
| () -> new RoleDescriptor( | ||
| randomAlphaOfLength(20), | ||
| null, | ||
| null, | ||
| null, | ||
| null, | ||
| null, | ||
| Map.of("test", "value", "timestamp", System.currentTimeMillis(), "random", randomAlphaOfLength(10)), | ||
| Map.of() | ||
| ) | ||
| ); | ||
| for (RoleDescriptor role : roles) { | ||
| indexRoleDirectly(role); | ||
| } | ||
| indicesAdmin().prepareRefresh(INTERNAL_SECURITY_MAIN_INDEX_7).get(); | ||
| return roles; | ||
| } | ||
|
|
||
| private void indexRoleDirectly(RoleDescriptor role) throws IOException { | ||
| XContentBuilder builder = buildRoleDocument(role); | ||
| prepareIndex(INTERNAL_SECURITY_MAIN_INDEX_7).setId(ROLE_TYPE + "-" + role.getName()) | ||
| .setSource(builder) | ||
| .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) | ||
| .get(); | ||
| } | ||
|
|
||
| private XContentBuilder buildRoleDocument(RoleDescriptor role) throws IOException { | ||
| XContentBuilder builder = jsonBuilder().startObject(); | ||
| // metadata_flattened is populated by the native role store, so write directly to index to simulate pre-migration state | ||
| role.innerToXContent(builder, ToXContent.EMPTY_PARAMS, true); | ||
| builder.endObject(); | ||
| return builder; | ||
| } | ||
|
|
||
| private int getCurrentMigrationVersion() { | ||
| ClusterService clusterService = internalCluster().getInstance(ClusterService.class); | ||
| IndexMetadata indexMetadata = clusterService.state().metadata().getProject().index(INTERNAL_SECURITY_MAIN_INDEX_7); | ||
| if (indexMetadata == null || indexMetadata.getCustomData(MIGRATION_VERSION_CUSTOM_KEY) == null) { | ||
| return 0; | ||
| } | ||
| return Integer.parseInt(indexMetadata.getCustomData(MIGRATION_VERSION_CUSTOM_KEY).get(MIGRATION_VERSION_CUSTOM_DATA_KEY)); | ||
| } | ||
|
|
||
| private void waitForMigrationCompletion() throws Exception { | ||
| assertBusy(() -> assertThat(getCurrentMigrationVersion(), greaterThanOrEqualTo(ROLE_METADATA_FLATTENED_MIGRATION_VERSION))); | ||
| } | ||
|
|
||
| private void assertAllRolesHaveMetadataFlattened() { | ||
| SearchRequest searchRequest = new SearchRequest(INTERNAL_SECURITY_MAIN_INDEX_7); | ||
| searchRequest.source().query(QueryBuilders.termQuery("type", "role")).size(1000); | ||
| SearchResponse response = client().search(searchRequest).actionGet(); | ||
| for (SearchHit hit : response.getHits().getHits()) { | ||
| @SuppressWarnings("unchecked") | ||
| Map<String, Object> metadata = (Map<String, Object>) hit.getSourceAsMap().get("metadata_flattened"); | ||
| // Only check non-reserved roles | ||
| if (metadata.get("_reserved") == null) { | ||
| assertEquals("value", metadata.get("test")); | ||
| } | ||
| } | ||
| response.decRef(); | ||
| } | ||
|
|
||
| @Override | ||
| protected Collection<Class<? extends Plugin>> nodePlugins() { | ||
| return Stream.concat(super.nodePlugins().stream(), Stream.of(PainlessPlugin.class)).collect(Collectors.toList()); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -50,10 +50,12 @@ | |
| import org.elasticsearch.index.IndexVersion; | ||
| import org.elasticsearch.indices.IndexClosedException; | ||
| import org.elasticsearch.indices.SystemIndexDescriptor; | ||
| import org.elasticsearch.persistent.PersistentTasksCustomMetadata; | ||
| import org.elasticsearch.rest.RestStatus; | ||
| import org.elasticsearch.threadpool.Scheduler; | ||
| import org.elasticsearch.xcontent.XContentType; | ||
| import org.elasticsearch.xpack.core.security.authz.RoleMappingMetadata; | ||
| import org.elasticsearch.xpack.core.security.support.SecurityMigrationTaskParams; | ||
| import org.elasticsearch.xpack.security.SecurityFeatures; | ||
| import org.elasticsearch.xpack.security.action.rolemapping.ReservedRoleMappingAction; | ||
|
|
||
|
|
@@ -158,6 +160,7 @@ private IndexState unavailableState(ProjectId projectId, ProjectStatus status) { | |
| false, | ||
| false, | ||
| null, | ||
| false, | ||
| null, | ||
| null, | ||
| null, | ||
|
|
@@ -180,6 +183,7 @@ public class IndexState { | |
| public final boolean mappingUpToDate; | ||
| public final boolean createdOnLatestVersion; | ||
| public final RoleMappingsCleanupMigrationStatus roleMappingsCleanupMigrationStatus; | ||
| public final boolean securityMigrationRunning; | ||
| public final Integer migrationsVersion; | ||
| // Min mapping version supported by the descriptors in the cluster | ||
| public final SystemIndexDescriptor.MappingsVersion minClusterMappingVersion; | ||
|
|
@@ -201,6 +205,7 @@ public IndexState( | |
| boolean mappingUpToDate, | ||
| boolean createdOnLatestVersion, | ||
| RoleMappingsCleanupMigrationStatus roleMappingsCleanupMigrationStatus, | ||
| boolean securityMigrationRunning, | ||
| Integer migrationsVersion, | ||
| SystemIndexDescriptor.MappingsVersion minClusterMappingVersion, | ||
| Integer indexMappingVersion, | ||
|
|
@@ -220,6 +225,7 @@ public IndexState( | |
| this.migrationsVersion = migrationsVersion; | ||
| this.createdOnLatestVersion = createdOnLatestVersion; | ||
| this.roleMappingsCleanupMigrationStatus = roleMappingsCleanupMigrationStatus; | ||
| this.securityMigrationRunning = securityMigrationRunning; | ||
| this.minClusterMappingVersion = minClusterMappingVersion; | ||
| this.indexMappingVersion = indexMappingVersion; | ||
| this.concreteIndexName = concreteIndexName; | ||
|
|
@@ -247,6 +253,7 @@ public boolean equals(Object o) { | |
| && mappingUpToDate == other.mappingUpToDate | ||
| && createdOnLatestVersion == other.createdOnLatestVersion | ||
| && roleMappingsCleanupMigrationStatus == other.roleMappingsCleanupMigrationStatus | ||
| && securityMigrationRunning == other.securityMigrationRunning | ||
| && Objects.equals(indexMappingVersion, other.indexMappingVersion) | ||
| && Objects.equals(migrationsVersion, other.migrationsVersion) | ||
| && Objects.equals(minClusterMappingVersion, other.minClusterMappingVersion) | ||
|
|
@@ -268,6 +275,7 @@ public int hashCode() { | |
| mappingUpToDate, | ||
| createdOnLatestVersion, | ||
| roleMappingsCleanupMigrationStatus, | ||
| securityMigrationRunning, | ||
| migrationsVersion, | ||
| minClusterMappingVersion, | ||
| indexMappingVersion, | ||
|
|
@@ -370,6 +378,8 @@ public String toString() { | |
| + createdOnLatestVersion | ||
| + ", roleMappingsCleanupMigrationStatus=" | ||
| + roleMappingsCleanupMigrationStatus | ||
| + ", securityMigrationRunning=" | ||
| + securityMigrationRunning | ||
| + ", migrationsVersion=" | ||
| + migrationsVersion | ||
| + ", minClusterMappingVersion=" | ||
|
|
@@ -821,6 +831,9 @@ private IndexState updateProjectState(ProjectState project) { | |
| project, | ||
| migrationsVersion | ||
| ); | ||
| var persistentTaskCustomMetadata = PersistentTasksCustomMetadata.get(project.metadata()); | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When a migration is running, its persistent task will be present in cluster state, when it's not it will not be present in cluster state. When a persistent task completes (failure or success) it's removed from cluster state. We want to make sure that an index state change is triggered when a persistent task fails to make sure it's retried immediately, that's why we need this state here. |
||
| final boolean securityMigrationRunning = persistentTaskCustomMetadata != null | ||
| && persistentTaskCustomMetadata.getTask(SecurityMigrationTaskParams.TASK_NAME) != null; | ||
| final boolean mappingIsUpToDate = indexMetadata == null || checkIndexMappingUpToDate(project); | ||
| final SystemIndexDescriptor.MappingsVersion minClusterMappingVersion = getMinSecurityIndexMappingVersion(project); | ||
| final int indexMappingVersion = loadIndexMappingVersion(systemIndexDescriptor.getAliasName(), project.metadata()); | ||
|
|
@@ -853,6 +866,7 @@ private IndexState updateProjectState(ProjectState project) { | |
| mappingIsUpToDate, | ||
| createdOnLatestVersion, | ||
| roleMappingsCleanupMigrationStatus, | ||
| securityMigrationRunning, | ||
| migrationsVersion, | ||
| minClusterMappingVersion, | ||
| indexMappingVersion, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -54,7 +54,7 @@ public SecurityMigrationExecutor( | |
| @Override | ||
| protected void nodeOperation(AllocatedPersistentTask task, SecurityMigrationTaskParams params, PersistentTaskState state) { | ||
| ActionListener<Void> listener = ActionListener.wrap((res) -> task.markAsCompleted(), (exception) -> { | ||
| logger.warn("Security migration failed: " + exception); | ||
| logger.warn("Security migration failed", exception); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice catch 👍 |
||
| task.markAsFailed(exception); | ||
| }); | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is now the first migration so we don't need this line anymore.