Page MenuHomePhabricator
Paste P58402

etcd-mirror stress tests
ActivePublic

Authored by Scott_French on Mar 4 2024, 8:04 PM.
Tags
None
Referenced Files
F42396959: etcd-mirror stress tests
Mar 4 2024, 9:51 PM
F42396940: etcd-mirror stress tests
Mar 4 2024, 9:41 PM
F42396891: etcd-mirror stress tests
Mar 4 2024, 9:07 PM
F42396766: etcd-mirror stress tests
Mar 4 2024, 8:06 PM
F42396765: etcd-mirror stress tests
Mar 4 2024, 8:05 PM
F42396763: etcd-mirror stress tests
Mar 4 2024, 8:04 PM
Subscribers
None
etcd-mirror stress tests - swfrench@ 2024-03-01
# Setup
* Two etcd v3.5.12 single-node clusters with etcd-mirror replicating from one to the other, all running on the same machine (laptop).
** This measures ideal / peak replication throughput, as (1) there is no inter-DC network delay and (2) no delay from consensus rounds among etcd nodes (disk I/O is still a factor).
* Two workloads:
** stress: issues puts to the primary at a piecewise constant rate (unbounded, or in practice weakly bounded, concurrency)
** probe: issues puts to the primary at a constant rate of every 500ms, while waiting for the resulting put to apply at the replica.
* The probe measures end-to-end replication time: elapsed time from immediately prior to the set being issued to just after the paired wait completes (i.e., HTTP long-poll returns).
* The general idea here is to probe while varying the “antagonist” stress rate.
** In this case, 3x reps of 2m (~ 240 probes) at a stress rate of 1, 2, 10, 20, 100, 200, 500, and 666 put/s.
## Why top out at 666 put/s?
This (put interval of 1500us) is just a bit below where replication fails, which happens at ~ 700 put/s in this test setup. Specifically, replication delay becomes wildly variable, up to the crossover where the delay is 1000.0 / {rate} seconds, and we fall off the EventHistory window.
# Results
Distribution of replication delay (ms) observed by probes (executed every 500ms) at a range of “stress” workload request rates (note the log scale): https://people.wikimedia.org/~swfrench/T358636/replication_latency.png
General observations (see delay distribution summary linked above):
* The mean replication delay in the low-stress case is at or below ~ 4ms.
* As the stress rate increases, the distribution of delays broadens, including toward lower delays.
** The latter is a bit surprising, and is perhaps an artifact of etcd behavior (e.g., "hitching a ride" on commit batching driven by the background write rate).
* Overall, replication appears well behaved (predictably low tail latency) well past a background stress rate 100 put/s, which is ~ 50x the typical peak already seen day-to-day in production (~ 1.8 put/s).
** Notably, these peaks are not attributable to spicerack locks, and instead seem to correlate with Scap operations of various sorts per SAL (e.g., rapid depool/pool of appservers?).
* Peak activity clearly identifiable as spicerack locks (i.e., the characteristic POST/DELETE + PUT) tops out at below 1 write/s.
# Conclusion
Taken together, I believe we’re safe to expand replication to include spicerack lock state, with the already stated caveats that certain parts of lock state are not translatable across clusters (but should TTL out).
# Code
> cat cmd/probe/main.go
// Replication delay probe for etcd-mirror.
package main
import (
"bytes"
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"log"
"net/http"
"strconv"
"time"
)
var (
interval = flag.Duration("interval", time.Second, "Interval between probe puts.")
duration = flag.Duration("duration", time.Minute, "Total run duration.")
primary = flag.String("primary_endpoint", "", "Address of the primary endpoint.")
replica = flag.String("replica_endpoint", "", "Address of the replica endpoint.")
key = flag.String("key", "/probe", "Key used for probing.")
)
type stats struct {
n int64
sum int64
min int64
max int64
}
func (s *stats) record(d time.Duration) {
micros := d.Microseconds()
fmt.Printf("Latency: %d us\n", micros)
if s.n == 0 {
s.sum = micros
s.min = micros
s.max = micros
} else {
s.sum += micros
s.min = min(s.min, micros)
s.max = max(s.max, micros)
}
s.n++
}
func (s *stats) String() string {
return fmt.Sprintf("%d samples: %dus avg, %dus min, %dus max", s.n, s.sum/s.n, s.min, s.max)
}
type etcdNode struct {
Key string `json:"key"`
Value string `json:"value"`
ModifiedIndex int `json:"modifiedIndex"`
CreatedIndex int `json:"createdIndex"`
Dir bool `json:"dir"`
Nodes []etcdNode `json:"nodes"`
}
type etcdResponse struct {
// Note: this is an incomplete representation of the reponse payload, e.g.,
// does not capture errors.
Action string `json:"action"`
Node *etcdNode `json:"node"`
}
type watcher struct {
waitIndex int
}
func (w *watcher) mustStartWait(ctx context.Context, seq int64, receipt chan time.Time) {
var url string
if w.waitIndex == 0 {
url = fmt.Sprintf("%s/v2/keys%s?wait=true", *replica, *key)
} else {
url = fmt.Sprintf("%s/v2/keys%s?wait=true&waitIndex=%d", *replica, *key, w.waitIndex)
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
log.Fatalf("Failed to prepare request: %v", err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
if errors.Is(err, context.Canceled) {
close(receipt)
return
}
log.Fatalf("Watch failed: %v", err)
}
go func() {
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
t := time.Now() // Need to wait until the entire body has been received.
if err != nil {
if errors.Is(err, context.Canceled) {
close(receipt)
return
}
log.Fatalf("Failed to read watch response body: %v", err)
}
msg := &etcdResponse{}
if err := json.Unmarshal(body, &msg); err != nil {
log.Fatalf("Failed to unmarshal watch response body: %v", err)
}
if msg.Node == nil {
log.Fatalf("Received etcd response with no node: %q", string(body))
}
vseq, err := strconv.ParseInt(msg.Node.Value, 10, 64)
if err != nil {
log.Fatalf("Failed to parse probe sequence number: %v", err)
}
if vseq != seq {
log.Fatalf("Sequence number violation: got %d want %d", vseq, seq)
}
w.waitIndex = msg.Node.ModifiedIndex + 1
receipt <- t
}()
}
func mustSet(seq int64) time.Time {
url := fmt.Sprintf("%s/v2/keys%s", *primary, *key)
body := bytes.NewBufferString(fmt.Sprintf("value=%d", seq))
req, err := http.NewRequest(http.MethodPut, url, body)
if err != nil {
log.Fatalf("Failed to prepare request: %v", err)
}
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
t := time.Now()
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Fatalf("Put failed: %v", err)
}
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
log.Fatalf("Put returned non-ok status: %d", resp.StatusCode)
}
return t
}
func main() {
flag.Parse()
w := new(watcher)
s := new(stats)
ctx, cancel := context.WithCancel(context.Background())
done := make(chan any)
receipt := make(chan time.Time)
go func() {
defer func() { close(done) }()
tick := time.NewTicker(*interval)
defer tick.Stop()
var seq int64
for {
select {
case <-ctx.Done():
return
case <-tick.C:
w.mustStartWait(ctx, seq, receipt)
tsnd := mustSet(seq)
trcv, ok := <-receipt
if !ok {
return
}
s.record(trcv.Sub(tsnd))
seq++
}
}
}()
time.Sleep(*duration)
cancel()
<-done
fmt.Printf("Summary: %s\n", s.String())
}
> cat cmd/stress/main.go
// Stress workload for etcd-mirror.
package main
import (
"bytes"
"flag"
"fmt"
"log"
"net/http"
"sync/atomic"
"time"
)
var (
interval = flag.Duration("interval", 100*time.Millisecond, "Interval between background puts.")
primary = flag.String("primary_endpoint", "", "Address of the primary endpoint.")
key = flag.String("key", "/stress", "Key used for stressing the system.")
concurrency = flag.Int("concurrency", 8, "Number of persistent sender goroutines.")
)
func mustSet(c *http.Client) {
url := fmt.Sprintf("%s/v2/keys%s", *primary, *key)
req, err := http.NewRequest(http.MethodPut, url, bytes.NewBufferString("nonsense"))
if err != nil {
log.Fatalf("Failed to prepare request: %v", err)
}
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
resp, err := c.Do(req)
if err != nil {
log.Fatalf("Put failed: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
log.Fatalf("Put returned non-ok status: %d", resp.StatusCode)
}
}
func main() {
flag.Parse()
n := new(atomic.Int64)
tset := time.NewTicker(*interval)
defer tset.Stop()
for i := 0; i < *concurrency; i++ {
go func() {
c := &http.Client{}
for range tset.C {
mustSet(c)
n.Add(1)
}
}()
}
tprint := time.NewTicker(10 * time.Second)
defer tprint.Stop()
lastTick := time.Now()
var lastValue int64
for range tprint.C {
tick := time.Now()
value := n.Load()
d := tick.Sub(lastTick)
rate := float64(value-lastValue) / d.Seconds()
log.Printf("Last %s: avg %f sets/s", d, rate)
lastTick = tick
lastValue = value
}
}