from locust import HttpLocust, TaskSet, task import string import random STRINGLENGTH = 32 BASE_URL = '/sessions/v1/' class UserBehavior(TaskSet): min_wait = 1000 max_wait = 5000 def on_start(self): letters = string.ascii_lowercase self.key = ''.join(random.choice(letters) for i in range(STRINGLENGTH)) self.key = BASE_URL + self.key self.data = ''.join(random.choice(letters) for i in range(STRINGLENGTH)) @task(5) def set(self): self.client.post( self.key, data=self.data, headers={'Content-Type': 'application/octet-stream'}, name='post', ) @task(94) def get(self): with self.client.get( self.key, name='get', catch_response=True) as response: if response.status_code == 404: response.success() @task(1) def delete(self): self.client.delete(self.key, name='delete') class HealthCheckBehavior(TaskSet): min_wait = 10000 max_wait = 10000 @task def healthz(self): self.client.get('/healthz') @task def metrics(self): self.client.get('/metrics') class SessionUser(HttpLocust): weight = 50 task_set = UserBehavior class HealthCheckUser(HttpLocust): weight = 1 task_set = HealthCheckBehavior