Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/allocator/mmaprototype/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ go_library(
"memo_helper.go",
"messages.go",
"rebalance_advisor.go",
"store_load_summary.go",
"store_set.go",
"top_k_replicas.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/mmaprototype",
Expand Down
8 changes: 0 additions & 8 deletions pkg/kv/kvserver/allocator/mmaprototype/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,6 @@ type Allocator interface {
// about the nodes in the cluster is a side effect of this method.
SetStore(store StoreAttributesAndLocality)

// RemoveNodeAndStores tells the allocator to remove the NodeID and all its
// stores.
RemoveNodeAndStores(nodeID roachpb.NodeID) error

// UpdateFailureDetectionSummary tells the allocator about the current
// failure detection state for a node. A node starts in the fdOK state.
UpdateFailureDetectionSummary(nodeID roachpb.NodeID, fd failureDetectionSummary) error

// ProcessStoreLoadMsg provides frequent the state of every store and its
// associated node in the cluster.
ProcessStoreLoadMsg(ctx context.Context, msg *StoreLoadMsg)
Expand Down
35 changes: 6 additions & 29 deletions pkg/kv/kvserver/allocator/mmaprototype/allocator_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,8 @@ func (a *allocatorState) rebalanceStores(

var changes []PendingRangeChange
var disj [1]constraintsConj
var storesToExclude storeIDPostingList
var storesToExcludeForRange storeIDPostingList
var storesToExclude storeSet
var storesToExcludeForRange storeSet
scratchNodes := map[roachpb.NodeID]*NodeLoad{}
scratchStores := map[roachpb.StoreID]struct{}{}
// The caller has a fixed concurrency limit it can move ranges at, when it
Expand Down Expand Up @@ -461,7 +461,7 @@ func (a *allocatorState) rebalanceStores(
store.StoreID, rangeID))
}
cands, _ := rstate.constraints.candidatesToMoveLease()
var candsPL storeIDPostingList
var candsPL storeSet
for _, cand := range cands {
candsPL.insert(cand.storeID)
}
Expand Down Expand Up @@ -493,10 +493,6 @@ func (a *allocatorState) rebalanceStores(
continue
}
candSls := a.cs.computeLoadSummary(ctx, cand.storeID, &means.storeLoad, &means.nodeLoad)
if sls.fd != fdOK {
log.KvDistribution.VInfof(ctx, 2, "skipping store s%d: failure detection status not OK", cand.storeID)
continue
}
candsSet.candidates = append(candsSet.candidates, candidateInfo{
StoreID: cand.storeID,
storeLoadSummary: candSls,
Expand Down Expand Up @@ -608,7 +604,7 @@ func (a *allocatorState) rebalanceStores(
// If the node is cpu overloaded, or the store/node is not fdOK, exclude
// the other stores on this node from receiving replicas shed by this
// store.
excludeStoresOnNode := store.nls > overloadSlow || store.fd != fdOK
excludeStoresOnNode := store.nls > overloadSlow
storesToExclude = storesToExclude[:0]
if excludeStoresOnNode {
nodeID := ss.NodeID
Expand Down Expand Up @@ -812,22 +808,6 @@ func (a *allocatorState) SetStore(store StoreAttributesAndLocality) {
a.cs.setStore(store)
}

// RemoveNodeAndStores implements the Allocator interface.
func (a *allocatorState) RemoveNodeAndStores(nodeID roachpb.NodeID) error {
a.mu.Lock()
defer a.mu.Unlock()
panic("unimplemented")
}

// UpdateFailureDetectionSummary implements the Allocator interface.
func (a *allocatorState) UpdateFailureDetectionSummary(
nodeID roachpb.NodeID, fd failureDetectionSummary,
) error {
a.mu.Lock()
defer a.mu.Unlock()
panic("unimplemented")
}

// ProcessStoreLeaseholderMsg implements the Allocator interface.
func (a *allocatorState) ProcessStoreLoadMsg(ctx context.Context, msg *StoreLoadMsg) {
a.mu.Lock()
Expand Down Expand Up @@ -1222,7 +1202,7 @@ func sortTargetCandidateSetAndPick(
// ones that have notMatchedLeasePreferenceIndex.
j = 0
for _, cand := range cands.candidates {
if cand.leasePreferenceIndex == notMatchedLeasePreferencIndex {
if cand.leasePreferenceIndex == notMatchedLeasePreferenceIndex {
break
}
j++
Expand Down Expand Up @@ -1360,7 +1340,7 @@ func (a *allocatorState) ensureAnalyzedConstraints(rstate *rangeState) bool {
func (a *allocatorState) computeCandidatesForRange(
ctx context.Context,
expr constraintsDisj,
storesToExclude storeIDPostingList,
storesToExclude storeSet,
loadSheddingStore roachpb.StoreID,
) (_ candidateSet, sheddingSLS storeLoadSummary) {
means := a.cs.meansMemo.getMeans(expr)
Expand All @@ -1381,9 +1361,6 @@ func (a *allocatorState) computeCandidatesForRange(
}
ss := a.cs.stores[storeID]
csls := a.cs.meansMemo.getStoreLoadSummary(ctx, means, storeID, ss.loadSeqNum)
if csls.fd != fdOK {
continue
}
cset.candidates = append(cset.candidates, candidateInfo{
StoreID: storeID,
storeLoadSummary: csls,
Expand Down
81 changes: 0 additions & 81 deletions pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,37 +597,9 @@ type pendingReplicaChange struct {
enactedAtTime time.Time
}

// TODO(kvoli): This will eventually be used to represent the state of a node's
// membership. This corresponds to non-decommissioning, decommissioning and
// decommissioned. Fill in commentary and use.
type storeMembership int8

const (
storeMembershipMember storeMembership = iota
storeMembershipRemoving
storeMembershipRemoved
)

func (s storeMembership) String() string {
return redact.StringWithoutMarkers(s)
}

// SafeFormat implements the redact.SafeFormatter interface.
func (s storeMembership) SafeFormat(w redact.SafePrinter, _ rune) {
switch s {
case storeMembershipMember:
w.Print("full")
case storeMembershipRemoving:
w.Print("removing")
case storeMembershipRemoved:
w.Print("removed")
}
}

// storeState maintains the complete state about a store as known to the
// allocator.
type storeState struct {
storeMembership
storeLoad
StoreAttributesAndLocality
adjusted struct {
Expand Down Expand Up @@ -829,47 +801,10 @@ func newStoreState() *storeState {
return ss
}

// failureDetectionSummary is provided by an external entity and never
// computed inside the allocator.
type failureDetectionSummary uint8

// All state transitions are permitted by the allocator. For example, fdDead
// => fdOk is allowed since the allocator can simply stop shedding replicas
// and then start adding replicas (if underloaded).
const (
fdOK failureDetectionSummary = iota
// Don't add replicas or leases.
fdSuspect
// Move leases away. Don't add replicas or leases.
fdDrain
// Node is dead, so move leases and replicas away from it.
fdDead
)

func (fds failureDetectionSummary) String() string {
return redact.StringWithoutMarkers(fds)
}

// SafeFormat implements the redact.SafeFormatter interface.
func (fds failureDetectionSummary) SafeFormat(w redact.SafePrinter, _ rune) {
switch fds {
case fdOK:
w.Print("ok")
case fdSuspect:
w.Print("suspect")
case fdDrain:
w.Print("drain")
case fdDead:
w.Print("dead")
}
}

type nodeState struct {
stores []roachpb.StoreID
NodeLoad
adjustedCPU LoadValue

fdSummary failureDetectionSummary
}

func newNodeState(nodeID roachpb.NodeID) *nodeState {
Expand Down Expand Up @@ -1897,21 +1832,6 @@ func (cs *clusterState) setStore(sal StoreAttributesAndLocality) {
}
}

func (cs *clusterState) setStoreMembership(storeID roachpb.StoreID, state storeMembership) {
if ss, ok := cs.stores[storeID]; ok {
ss.storeMembership = state
} else {
panic(fmt.Sprintf("store %d not found in cluster state", storeID))
}
}

func (cs *clusterState) updateFailureDetectionSummary(
nodeID roachpb.NodeID, fd failureDetectionSummary,
) {
ns := cs.nodes[nodeID]
ns.fdSummary = fd
}

//======================================================================
// clusterState accessors:
//
Expand Down Expand Up @@ -2193,7 +2113,6 @@ func computeLoadSummary(
nls: nls,
dimSummary: dimSummary,
highDiskSpaceUtilization: highDiskSpaceUtil,
fd: ns.fdSummary,
maxFractionPendingIncrease: ss.maxFractionPendingIncrease,
maxFractionPendingDecrease: ss.maxFractionPendingDecrease,
loadSeqNum: ss.loadSeqNum,
Expand Down
58 changes: 8 additions & 50 deletions pkg/kv/kvserver/allocator/mmaprototype/cluster_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,8 @@ func printPendingChangesTest(changes []*pendingReplicaChange) string {
return buf.String()
}

func testingGetStoreList(t *testing.T, cs *clusterState) (member, removed storeIDPostingList) {
var clusterStoreList, nodeStoreList storeIDPostingList
func testingGetStoreList(t *testing.T, cs *clusterState) storeSet {
var clusterStoreList, nodeStoreList storeSet
// Ensure that the storeIDs in the cluster store map and the stores listed
// under each node are the same.
for storeID := range cs.stores {
Expand All @@ -222,15 +222,7 @@ func testingGetStoreList(t *testing.T, cs *clusterState) (member, removed storeI
require.True(t, clusterStoreList.isEqual(nodeStoreList),
"expected store lists to be equal %v != %v", clusterStoreList, nodeStoreList)

for storeID, ss := range cs.stores {
switch ss.storeMembership {
case storeMembershipMember, storeMembershipRemoving:
member.insert(storeID)
case storeMembershipRemoved:
removed.insert(storeID)
}
}
return member, removed
return clusterStoreList
}

func testingGetPendingChanges(t *testing.T, cs *clusterState) []*pendingReplicaChange {
Expand Down Expand Up @@ -301,12 +293,12 @@ func TestClusterState(t *testing.T) {
var buf strings.Builder
for _, nodeID := range nodeList {
ns := cs.nodes[roachpb.NodeID(nodeID)]
fmt.Fprintf(&buf, "node-id=%s failure-summary=%s locality-tiers=%s\n",
ns.NodeID, ns.fdSummary, cs.stores[ns.stores[0]].StoreAttributesAndLocality.locality())
fmt.Fprintf(&buf, "node-id=%s locality-tiers=%s\n",
ns.NodeID, cs.stores[ns.stores[0]].StoreAttributesAndLocality.locality())
for _, storeID := range ns.stores {
ss := cs.stores[storeID]
fmt.Fprintf(&buf, " store-id=%v membership=%v attrs=%s locality-code=%s\n",
ss.StoreID, ss.storeMembership, ss.StoreAttrs, ss.localityTiers.str)
fmt.Fprintf(&buf, " store-id=%v attrs=%s locality-code=%s\n",
ss.StoreID, ss.StoreAttrs, ss.localityTiers.str)
}
}
return buf.String()
Expand Down Expand Up @@ -336,7 +328,7 @@ func TestClusterState(t *testing.T) {

case "get-load-info":
var buf strings.Builder
memberStores, _ := testingGetStoreList(t, cs)
memberStores := testingGetStoreList(t, cs)
for _, storeID := range memberStores {
ss := cs.stores[storeID]
ns := cs.nodes[ss.NodeID]
Expand Down Expand Up @@ -365,40 +357,6 @@ func TestClusterState(t *testing.T) {
}
return printNodeListMeta()

case "set-store-membership":
storeID := dd.ScanArg[roachpb.StoreID](t, d, "store-id")
var storeMembershipVal storeMembership
switch str := dd.ScanArg[string](t, d, "membership"); str {
case "member":
storeMembershipVal = storeMembershipMember
case "removing":
storeMembershipVal = storeMembershipRemoving
case "removed":
storeMembershipVal = storeMembershipRemoved
}
cs.setStoreMembership(storeID, storeMembershipVal)

var buf strings.Builder
nonRemovedStores, removedStores := testingGetStoreList(t, cs)
buf.WriteString("member store-ids: ")
printPostingList(&buf, nonRemovedStores)
buf.WriteString("\nremoved store-ids: ")
printPostingList(&buf, removedStores)
return buf.String()

case "update-failure-detection":
nodeID := dd.ScanArg[roachpb.NodeID](t, d, "node-id")
failureDetectionString := dd.ScanArg[string](t, d, "summary")
var fd failureDetectionSummary
for i := fdOK; i < fdDead+1; i++ {
if i.String() == failureDetectionString {
fd = i
break
}
}
cs.updateFailureDetectionSummary(nodeID, fd)
return printNodeListMeta()

case "store-load-msg":
msg := parseStoreLoadMsg(t, d.Input)
cs.processStoreLoadMsg(context.Background(), &msg)
Expand Down
Loading