Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Paste
P7054
elastic_cluster_latency.py
Active
Public
Actions
Authored by
EBernhardson
on Apr 30 2018, 4:55 PM.
Edit Paste
Archive Paste
View Raw File
Subscribe
Mute Notifications
Award Token
Flag For Later
Tags
Elasticsearch
Discovery-Search
Referenced Files
F17446225: elastic_cluster_latency.py
Apr 30 2018, 4:55 PM
2018-04-30 16:55:29 (UTC+0)
Subscribers
None
from
collections
import
OrderedDict
import
copy
import
json
import
logging
from
pprint
import
pprint
import
random
import
re
import
requests
import
sys
import
time
ARCHIVE_INDEX_RE
=
re
.
compile
(
'^.*_archive_\d+$'
)
LOG
=
logging
.
getLogger
(
'dupe_cluster'
)
NUKE_CLUSTER
=
False
# Load cluster state from saved json and create
# indices found inside
CREATE_FROM_STATE
=
False
# Create archive indices matching all general indices.
# When false 10 duplicates of existing indices are created
# to use with mutation tests
CREATE_ARCHIVE
=
False
def
clean_settings
(
settings
,
exclude
=
set
([
'provided_name'
,
'creation_date'
,
'uuid'
,
'version'
])):
settings
=
dict
(
settings
)
settings
[
'index'
]
=
{
k
:
v
for
k
,
v
in
settings
[
'index'
]
.
items
()
if
k
not
in
exclude
}
try
:
del
settings
[
'index'
][
'routing'
][
'allocation'
][
'total_shards_per_node'
]
except
KeyError
:
pass
return
settings
def
nuke_cluster
():
if
'localhost:9200'
not
in
ES_URL
:
raise
Exception
(
'must be localhost:9200'
)
requests
.
delete
(
ES_URL
+
'_all'
)
def
wait_for_green
():
url
=
ES_URL
+
'_cluster/health'
while
True
:
res
=
requests
.
get
(
url
)
.
json
()
if
res
[
'status'
]
==
'green'
:
return
time
.
sleep
(
0.1
)
def
wait_for_recovery
(
index
):
url
=
ES_URL
+
index
+
'/_recovery'
done
=
False
while
not
done
:
res
=
requests
.
get
(
url
)
.
json
()
done
=
True
for
shard
in
res
[
index
][
'shards'
]:
if
shard
[
'stage'
]
!=
'DONE'
:
done
=
False
break
if
not
done
:
time
.
sleep
(
0.1
)
def
dupe_cluster
(
remote_state
,
report
,
max_indices
=
None
):
LOG
.
info
(
'Loading indices from state into cluster'
)
total
=
len
(
remote_state
[
'metadata'
][
'indices'
])
if
max_indices
is
not
None
:
total
=
min
(
total
,
max_indices
)
processed
=
0
created
=
0
for
index_name
,
config
in
remote_state
[
'metadata'
][
'indices'
]
.
items
():
processed
+=
1
if
config
[
'state'
]
!=
'open'
:
continue
index_url
=
ES_URL
+
index_name
if
requests
.
head
(
index_url
)
.
status_code
==
200
:
LOG
.
info
(
'[
%5d
/
%5d
] Already exists'
%
(
processed
,
total
))
continue
start
=
time
.
time
()
res
=
requests
.
put
(
index_url
,
json
=
{
"settings"
:
clean_settings
(
config
[
'settings'
]),
"mappings"
:
config
[
'mappings'
],
# {k: v for k, v in config['mappings'].items() if k != "archive"},
})
.
json
()
took
=
time
.
time
()
-
start
report
(
'dupe'
,
took
)
LOG
.
info
(
'[
%5d
/
%5d
] [
%d
ms] Processed
%s
'
%
(
processed
,
total
,
1000
*
took
,
index_name
))
if
'acknowledged'
in
res
and
res
[
'acknowledged'
]:
created
+=
1
if
config
[
'aliases'
]:
res
=
requests
.
post
(
ES_URL
+
'_aliases'
,
json
=
{
"actions"
:
[
{
"add"
:
{
"index"
:
index_name
,
"alias"
:
alias
}}
for
alias
in
config
[
'aliases'
]
]
})
.
json
()
if
'acknowledged'
not
in
res
or
not
res
[
'acknowledged'
]:
pprint
(
res
)
if
max_indices
is
not
None
and
created
>=
max_indices
:
break
else
:
pprint
(
res
)
def
create_archive
(
state
,
report
):
LOG
.
info
(
'Creating archive indices'
)
wikis
=
{}
for
index_name
,
config
in
state
[
'metadata'
][
'indices'
]
.
items
():
if
'archive'
not
in
config
[
'mappings'
]:
if
'general'
in
index_name
:
LOG
.
info
(
config
[
'mappings'
]
.
keys
())
continue
wiki
=
'_'
.
join
(
index_name
.
split
(
'_'
)[:
-
2
])
wikis
[
wiki
]
=
config
total
=
len
(
wikis
)
processed
=
0
for
wiki
,
config
in
wikis
.
items
():
processed
+=
1
index_name
=
'
%s
_archive_
%d
'
%
(
wiki
,
int
(
time
.
time
()))
index_url
=
ES_URL
+
index_name
if
requests
.
head
(
index_url
)
.
status_code
==
200
:
continue
settings
=
clean_settings
(
config
[
'settings'
])
start
=
time
.
time
()
res
=
requests
.
put
(
index_url
,
json
=
{
"settings"
:
clean_settings
(
config
[
'settings'
]),
"mappings"
:
{
"archive"
:
config
[
'mappings'
][
'archive'
]
}
})
took
=
time
.
time
()
-
start
LOG
.
info
(
'[
%5d
/
%5d
] [
%d
ms]
%s
'
%
(
processed
,
total
,
took
*
1000
,
wiki
))
report
(
'create_archive'
,
took
)
def
wrap_timer
(
f
):
def
inner
(
*
args
,
**
kwargs
):
start
=
time
.
time
()
res
=
f
(
*
args
,
**
kwargs
)
return
res
,
time
.
time
()
-
start
inner
.
__name__
=
f
.
__name__
return
inner
def
get_mutable_indices
(
cluster_state
):
available
=
[]
for
index_name
in
cluster_state
[
'routing_table'
][
'indices'
]
.
keys
():
if
ARCHIVE_INDEX_RE
.
match
(
index_name
):
available
.
append
(
index_name
)
return
available
def
move_shard
(
cluster_state
):
# Fetch fresh cluster state
cluster_state
,
_
=
fetch_cluster_state
()
# Ensure we only mutate "our" archive indices instead of accidentally
# moving a 40G production shard.
mutable_indices
=
get_mutable_indices
(
cluster_state
)
if
not
mutable_indices
:
return
cluster_state
,
float
(
'nan'
)
for
_
in
range
(
10
):
index
=
random
.
choice
(
mutable_indices
)
routing
=
cluster_state
[
'routing_table'
][
'indices'
][
index
]
index_config
=
cluster_state
[
'metadata'
][
'indices'
][
index
][
'settings'
][
'index'
]
if
int
(
index_config
[
'number_of_replicas'
])
>
2
:
continue
shard_id
,
shard
=
random
.
choice
(
routing
[
'shards'
]
.
items
())
nodes
=
set
(
cluster_state
[
'nodes'
]
.
keys
())
for
instance
in
shard
:
nodes
.
remove
(
instance
[
'node'
])
if
not
nodes
:
LOG
.
warning
(
'No free nodes!'
)
continue
from_node
=
random
.
choice
(
shard
)[
'node'
]
to_node
=
nodes
.
pop
()
start
=
time
.
time
()
res
=
requests
.
post
(
ES_URL
+
'_cluster/reroute'
,
json
=
{
"commands"
:
[
{
"move"
:
{
"index"
:
index
,
"shard"
:
shard_id
,
"from_node"
:
from_node
,
"to_node"
:
to_node
}}
]
})
took
=
time
.
time
()
-
start
parsed
=
res
.
json
()
if
'acknowledged'
in
parsed
and
parsed
[
'acknowledged'
]:
LOG
.
info
(
'Moved shard
%s
of
%s
from
%s
to
%s
'
,
shard_id
,
index
,
from_node
,
to_node
)
new_cluster_state
=
dict
(
cluster_state
,
**
parsed
[
'state'
])
wait_for_recovery
(
index
)
else
:
# Verbose and not too helpful
# LOG.warning(parsed)
# TODO: Some failure
continue
return
new_cluster_state
,
took
raise
Exception
()
@wrap_timer
def
fetch_cluster_state
(
cluster_state
=
None
):
res
=
requests
.
get
(
ES_URL
+
'_cluster/state'
)
return
res
.
json
()
def
change_index_mappings
(
cluster_state
):
mutable_indices
=
get_mutable_indices
(
cluster_state
)
if
not
mutable_indices
:
return
cluster_state
,
float
(
'nan'
)
for
_
in
range
(
10
):
index_name
=
random
.
choice
(
mutable_indices
)
index_config
=
cluster_state
[
'metadata'
][
'indices'
][
index_name
]
if
'page'
not
in
index_config
[
'mappings'
]:
continue
fields
=
set
()
for
type_config
in
index_config
[
'mappings'
]
.
values
():
fields
=
fields
.
union
(
type_config
[
'properties'
]
.
keys
())
# Alternatively, create field with random name?
if
'zomg'
in
fields
:
continue
LOG
.
info
(
'Adding zomg field to
%s
'
,
index_name
)
start
=
time
.
time
()
res
=
requests
.
put
(
ES_URL
+
index_name
+
'/_mapping/page'
,
json
=
{
"properties"
:
{
"zomg"
:
{
"type"
:
"text"
}
}
})
took
=
time
.
time
()
-
start
parsed
=
res
.
json
()
if
'acknowledged'
in
parsed
and
parsed
[
'acknowledged'
]:
cluster_state
[
'metadata'
][
'indices'
][
index_name
][
'mappings'
][
'page'
][
'properties'
][
'zomg'
]
=
{
"type"
:
"text"
}
else
:
# TODO: Some failure
raise
Exception
()
return
cluster_state
,
took
raise
Exception
(
'Could not find index to add name field to'
)
@wrap_timer
def
create_index
(
cluster_state
):
# Choose a random index to duplicate
config
=
random
.
choice
(
cluster_state
[
'metadata'
][
'indices'
]
.
values
())
index_name
=
'erikfakewiki_archive_'
+
str
(
random
.
random
())[
2
:]
index_url
=
ES_URL
+
index_name
res
=
requests
.
put
(
index_url
,
json
=
{
"settings"
:
clean_settings
(
config
[
'settings'
]),
"mappings"
:
{
k
:
v
for
k
,
v
in
config
[
'mappings'
]
.
items
()
if
k
!=
"archive"
},
})
wait_for_green
()
# Going to pretend we don't need to update cluster state, and let the regular
# fetch per-iteration get the new info
return
cluster_state
def
delete_index
(
cluster_state
):
mutable_indices
=
get_mutable_indices
(
cluster_state
)
if
mutable_indices
:
index_name
=
random
.
choice
(
mutable_indices
)
LOG
.
info
(
'Deleting index
%s
'
,
index_name
)
start
=
time
.
time
()
requests
.
delete
(
ES_URL
+
index_name
)
took
=
time
.
time
()
-
start
# TODO: Remove from other parts of state we look at
del
cluster_state
[
'metadata'
][
'indices'
][
index_name
]
del
cluster_state
[
'routing_table'
][
'indices'
][
index_name
]
else
:
took
=
float
(
'nan'
)
return
cluster_state
,
took
def
remove_replica
(
cluster_state
):
mutable_indices
=
get_mutable_indices
(
cluster_state
)
if
not
mutable_indices
:
return
cluster_state
,
float
(
'nan'
)
max_attempts
=
max
(
10
,
len
(
cluster_state
[
'metadata'
][
'indices'
])
//
10
)
attempts
=
0
for
_
in
range
(
max_attempts
):
index_name
=
random
.
choice
(
mutable_indices
)
config
=
cluster_state
[
'metadata'
][
'indices'
][
index_name
]
index_config
=
dict
(
config
[
'settings'
][
'index'
])
if
'auto_expand_replicas'
not
in
index_config
:
continue
low
,
high
=
map
(
int
,
index_config
[
'auto_expand_replicas'
]
.
split
(
'-'
,
1
))
if
high
<
2
:
continue
auto_expand_replicas
=
'
%d
-
%d
'
%
(
low
,
high
-
1
)
LOG
.
info
(
'Removing replica from
%s
'
,
index_name
)
start
=
time
.
time
()
requests
.
put
(
ES_URL
+
index_name
+
'/_settings'
,
json
=
{
"index"
:
{
"auto_expand_replicas"
:
auto_expand_replicas
,
}
})
took
=
time
.
time
()
-
start
config
[
'settings'
][
'index'
][
'auto_expand_replicas'
]
=
auto_expand_replicas
return
cluster_state
,
took
raise
Exception
(
'Could not find index to remove replica from'
)
def
add_replica
(
cluster_state
):
mutable_indices
=
get_mutable_indices
(
cluster_state
)
if
not
mutable_indices
:
return
cluster_state
,
float
(
'nan'
)
# Look for indices with < 2 replicas
for
index_name
in
mutable_indices
:
config
=
cluster_state
[
'metadata'
][
'indices'
][
index_name
]
if
'auto_expand_replicas'
not
in
config
[
'settings'
][
'index'
]:
continue
if
int
(
config
[
'settings'
][
'index'
][
'auto_expand_replicas'
]
.
split
(
'-'
,
1
)[
1
])
<
2
:
break
else
:
# Choose one randomly
for
_
in
range
(
10
):
index_name
=
random
.
choice
(
mutable_indices
)
config
=
cluster_state
[
'metadata'
][
'indices'
][
index_name
]
if
'auto_expand_replicas'
in
config
[
'settings'
][
'index'
]:
break
else
:
raise
Exception
(
'Could not find index to add replica to'
)
low
,
high
=
map
(
int
,
config
[
'settings'
][
'index'
][
'auto_expand_replicas'
]
.
split
(
'-'
,
1
))
auto_expand_replicas
=
"
%s
-
%s
"
%
(
low
,
high
+
1
)
start
=
time
.
time
()
requests
.
put
(
ES_URL
+
index_name
+
'/_settings'
,
json
=
{
"index"
:
{
"auto_expand_replicas"
:
auto_expand_replicas
,
}
})
# TODO: Does this work? Might be a race condition? It's also checking
# something very generic.
wait_for_green
()
took
=
time
.
time
()
-
start
LOG
.
info
(
'Added replica to
%s
'
,
index_name
)
config
[
'settings'
][
'index'
][
'auto_expand_replicas'
]
=
auto_expand_replicas
return
cluster_state
,
took
def
measure_latency
(
report
,
n_iterations
=
100
):
LOG
.
info
(
'Start latency measurements'
)
actions
=
[
move_shard
,
# change_index_mappings, change_index_settings,
create_index
,
delete_index
,
remove_replica
,
add_replica
,
]
cluster_state
=
None
for
i
in
range
(
n_iterations
):
LOG
.
info
(
'Starting iteration
%d
/
%d
'
,
i
,
n_iterations
)
start
=
time
.
time
()
cluster_state
,
_
=
fetch_cluster_state
()
report
(
'fetch_cluster_state'
,
time
.
time
()
-
start
)
random
.
shuffle
(
actions
)
for
action
in
actions
:
LOG
.
info
(
'Running
%s
'
,
action
.
__name__
)
cluster_state
,
took
=
action
(
cluster_state
)
report
(
action
.
__name__
,
took
)
def
make_reporter
(
path
):
with
open
(
path
,
'wb'
)
as
f
:
while
True
:
metric
,
took
=
yield
f
.
write
(
'
%s
:
%f
\n
'
%
(
metric
,
took
))
f
.
flush
()
if
__name__
==
"__main__"
:
logging
.
basicConfig
(
level
=
'INFO'
)
logging
.
getLogger
(
"requests"
)
.
setLevel
(
logging
.
WARNING
)
random
.
seed
(
0
)
ES_URL
=
sys
.
argv
[
1
]
if
ES_URL
[
-
1
]
!=
'/'
:
ES_URL
+=
'/'
report_path
=
sys
.
argv
[
2
]
max_indices
=
int
(
sys
.
argv
[
3
])
reporter
=
make_reporter
(
report_path
)
reporter
.
send
(
None
)
def
report
(
metric
,
took
):
reporter
.
send
((
metric
,
took
))
if
NUKE_CLUSTER
:
nuke_cluster
()
if
CREATE_FROM_STATE
:
cluster_state_path
=
'cluster_state.json'
LOG
.
info
(
'Read cluster state:
%s
'
%
(
cluster_state_path
))
with
open
(
cluster_state_path
,
'rb'
)
as
f
:
source_cluster_state
=
json
.
load
(
f
)
dupe_cluster
(
source_cluster_state
,
report
,
max_indices
)
LOG
.
info
(
'Waiting for cluster in green state'
)
wait_for_green
()
LOG
.
info
(
'Green!'
)
cluster_state
,
_
=
fetch_cluster_state
()
if
CREATE_ARCHIVE
:
create_archive
(
cluster_state
,
report
)
else
:
LOG
.
info
(
'Ensuring at least 10 mutable indices'
)
# We need some wikis for mutation tests to work with.
available
=
len
(
get_mutable_indices
(
cluster_state
))
for
_
in
range
(
10
-
available
):
create_index
(
cluster_state
)
LOG
.
info
(
'Waiting for cluster in green state'
)
wait_for_green
()
LOG
.
info
(
'Green!'
)
measure_latency
(
report
)
Event Timeline
EBernhardson
created this paste.
Apr 30 2018, 4:55 PM
2018-04-30 16:55:29 (UTC+0)
Log In to Comment