Postgres table
==============
gis=# \d tiles
Table "public.tiles"
Column | Type | Modifiers
--------+--------+-----------
zoom | bigint |
block | bigint |
idx | bigint |
tile | bytea |
Indexes:
"tiles_zoom_block_idx_idx" UNIQUE, btree (zoom, block, idx)
Cassandra table
===============
cqlsh:v2> describe tiles;
CREATE TABLE v2.tiles (
zoom int,
block int,
idx int,
tile blob,
PRIMARY KEY (zoom, block, idx)
) WITH CLUSTERING ORDER BY (block ASC, idx ASC)
AND bloom_filter_fp_chance = 0.01
AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}'
AND comment = ''
AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'}
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99.0PERCENTILE';
replication factor for the keyspace is 4 so
nodetool status v2
Datacenter: codfw
=================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
-- Address Load Tokens Owns (effective) Host ID Rack
UN 10.192.0.128 122.99 GB 256 100.0% e33a1800-96d9-4a6d-86d3-e1d270026d4a rack1
UN 10.192.0.129 124.58 GB 256 100.0% f74287ed-8286-459e-956c-e0aed02184c4 rack1
UN 10.192.16.34 110.34 GB 256 100.0% 3e5ff43e-df80-48fe-9302-2a28b1127095 rack1
UN 10.192.16.35 114.19 GB 256 100.0% 1af61427-f70e-4653-bccd-71735e5f78da rack1
Code
====
import psycopg2
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import SimpleStatement
import random
from multiprocessing import Process, Value, cpu_count
import time
import math
import os
REPEATS = 10
NUMBER_OF_QUERIES = 360000import argparse
WARMUPS = 1
NUMBER_OF_QUERIES_PER_PROCS = 18 = 25000
HOSTS = ['maps-test2001.codfw.wmnet',
'maps-test2002.codfw.wmnet',
'maps-test2003.codfw.wmnet',
'maps-test2004.codfw.wmnet']
CASSANDRA_USER = os.environ['CASSANDRA_USER']
CASSANDRA_PASS = os.environ['CASSANDRA_PASS']
POSTGRES_USER = os.environ['POSTGRES_USER']
POSTGRES_PASS = os.environ['POSTGRES_PASS']
def benchmark_cassandra_worker(hostdef benchmark_cassandra_worker(host, username, password, queries, ret):
# Setup cassandra
auth_provider = PlainTextAuthProvider(
username=CASSANDRA_USERusername, password=CASSANDRA_PASSpassword)
cluster = Cluster([host], auth_provider=auth_provider)
session = cluster.connect('v2')
statement = session.prepare('SELECT zoom, block, idx, tile FROM tiles \
WHERE zoom=? \
AND block=? \
AND idx=?')
start_time = time.time()
for query in queries:
session.execute(statement,
(query['zoom'], query['block'], query['idx']))
end_time = time.time()
duration = end_time - start_time
qps = len(queries) / duration
ret.value = qps
return qps
def benchmark_postgresql_worker(host, username, password, queries, ret):
# Setup postgresql
conn = psycopg2.connect('host=%s dbname=gis user=%s password=%s' %
(host, POSTGRES_USERusername, POSTGRES_PASSpassword))
cur = conn.cursor()
start_time = time.time()
sql = 'SELECT zoom, block, idx, tile FROM tiles \
WHERE zoom=%(zoom)s \
AND block=%(block)s \
AND idx=%(idx)s'
for query in queries:
cur.execute(sql, query)
end_time = time.time()
cur.close()
conn.close()
duration = end_time - start_time
qps = len(queries) / duration
ret.value = qps
return qps
def benchmark(func):unc, username, password, queries, nr_procs):
print "'Benchmarking %s"' % func.func_name
# Benchmarking cassandra
processes = []
for i in range(NUMBER_OF_PROCS):nr_procs):
host = HOSTS[i % len(HOSTS)]
start_query = i* * len(queries)/NUMBER_OF_PROCS / nr_procs
end_query = (i+1)* i + 1 ) * len(queries)/NUMBER_OF_PROCS / nr_procs - 1
v = Value('d', 0)
p = Process(target=func,
args=(host, username, password, queries[start_query:end_query], v))
p.start()
processes.append({'p': p, 'v': v})
# Let's wait for everyone to finish
for proc in processes:
proc['p'].join()
qps = 0
for proc in processes:
qps = qps + proc['v'].value
print "'Benchmarking %s: done"' % func.func_name
return qps
if __name__ == '__main__':
queries = []
# First generate all our queriesdef main(args):
print "Creating queries"for nr_procs in range(args.min_procs, args.max_procs + 1):
for i in range(1, nr_queries = NUMBER_OF_QUERIES):_PER_PROC * nr_procs
# We give every max some extra space so as to have queries that return= []
# nothing. idx is an exception as it set to the max# First generate allowed (current our queries
# cassandra keyspace/table limitation in this specific installation)print 'Creating queries'
if args.mode == 'precomputed':
with open('tiles.csv', 'r') as f:
for i in range(1, nr_queries):
line = f.readline()
elems = line.split(',')
elems = [int(i) for i in elems]
# And let's splice
zoom, block, idx = elems
query = {
'zoom': zoom,
'block': block,
'idx': idx,
}
queries.append(query)
else:
# Below are zoom levels and mins, maxes for block and idx.
# zoom | min_block | max_block | min_idx | max_idx
limits = (
'zoom': random.randrange(0, 20(0, 0, 0, 0, 1),
'block': random.randrange(0, 70000(1, 0, 0, 0, 3),
'idx': random.randrange(0, 2147483647(2, 0, 0, 0, 15),
} (3, 0, 0, 0, 61),
queries.append(query) (4, 0, 0, 1, 247),
print "Queries created"
cassandra_qps_list = [] (5, 0, 0, 5, 1012),
postgresql_qps_list = [] (6, 0, 0, 21, 4081),
# Repeat the exact same test random multiple times per datastore to (7, 0, 0, 85, 16326),
# statistically remove the effects of pagecache (8, 0, 1, 341, 65312),
for i in range(REPEATS): (9, 0, 7, 1365, 262143),
worker = random.choice(( (10, 0, 31, 5461, 1048575),
('cassandra', benchmark_cassandra_worker11, 0, 127, 21845, 4194303),
('postgresql'12, 2, 511, 87381, 16777215),
(13, 10, 2047, 349525, 67108863),
(14, 42, 8191, 1398101, 268435455),
(15, 170, 32767, 5592405, 1073741823),
(16, 682, 65535, 22369621, 2147483647),
)
for i in range(1, nr_queries):
z = random.choice(limits)
query = {
'zoom': z[0],
'block': int(round(random.uniform(z[1], z[2]))) if z[1] != z[2] else 0,
'idx': int(round(random.uniform(z[3], benchmark_postgresql_worker))z[4]))),
}
queries.append(query)
qps = benchmark(worker[1])print 'Queries created'
cassandra_qps_list = []
if worker[0] == 'cassandra':postgresql_qps_list = []
cassandra_qps_list.append(qps)# Repeat the exact same test random multiple times per datastore to
# statistically remove the effects of pagecache
iffor worker[0] == 'postgresql': in (
postgresql_qps_list.append(qps)
cassandra_qps = sum(cassandra_qps_list)/len(cassandra_qps_list)(
postgresql_qps = sum(postgresql_qps_list)/len(postgresql_qps_list) benchmark_cassandra_worker,
print 'Cassandra got: %s QPS' % args.cassandra_qpsuser,
print 'PostgreSQL got: %s QPS' % postgresql_qps
Preliminary results args.cassandra_pass,
=================== cassandra_qps_list,
python stresstest.py
Creating queries ),
Queries created (
Benchmarking benchmark_cassandrapostgresql_worker,
Benchmarking benchmark_cassandra_worker: done args.postgres_user,
Benchmarking benchmark_ args.postgresql_worker_pass,
Benchmarking benchmark_ postgresql_worker: doneqps_list,
Benchmarking benchmark_cassandra_worker )):
Benchmarking benchmark_cassandra_worker: done for i in range(WARMUPS + 1):
Benchmarking qps = benchmark_postgresql_(worker[0], worker[1], workerer[2], queries, nr_procs)
Benchmarking benchmark_postgresql_worker: done worker[3].append(qps)
cassandra_qps = sum(cassandra_qps_list)/len(cassandra_qps_list)
postgresql_qps = sum(postgresql_qps_list)/len(postgresql_qps_list)
Benchmarking benchmark_postgresql_worker print 'Thread Number: %s' % nr_procs
Benchmarking benchmark_postgresql_worker: done print 'Cassandra got: %s QPS' % cassandra_qps
Benchmarking benchmark_ print 'PostgreSQL got: %s QPS' % postgresql_workerqps
Benchmarking benchmark_postgresql_worker: done with open('results.csv', 'a') as f:
Benchmark # Yes writing CSV without using benchmark_cassandra_workerthe CSV module. Could obviously be
Benchmarking benchmark_cassandra_worker: done # done better but this is too simple and cheap to really care
Benchmarking benchmark_ f.write('%s,%s,%s\n' % (nr_procs, cassandra_qps, postgresql_worker_qps))
if __name__ == '__main__':
Benchmarking benchmark_postgresql_worker: done parser = argparse.ArgumentParser(description='Benchmark maps backends')
Benchmarking benchmark_cassandra_worker parser.add_argument('mode', choices=['precomputed', 'random'])
Benchmarking benchmark_cassandra_worker: done parser.add_argument('--min_procs', type=int, default=1)
Benchmarking benchmark_postgresql_worker parser.add_argument('--max_procs', type=int, default=1)
Benchmarking benchmark_postgresql_worker: done parser.add_argument('--cassandra_user', default='cassandra')
C parser.add_argument('--cassandra got: 4860.36987879 QPS_pass', default='cassandra')
PostgreSQL got: 53602.9711195 QPS
And a second run: parser.add_argument('--postgres_user')
Cassandra got: 4858.80625565 QPS parser.add_argument('--postgres_pass')
PostgreSQL got: 53986.4100861 QPS args = parser.parse_args()
main(args)