UTF-8? ✓
from StringIO import StringIO
from nose.tools import eq_ as eq, assert_raises
from rados import Rados, ObjectNotFound
from veintidos.cas import CAS, Compressor
from veintidos.chunk import Chunker
from util import random_id, random_bytes, eq_bufferUse single RADOS connection for the module. Create new pool for tests and delete it afterwards
rados = None
pool_name = None
ioctx_cas = None
ioctx_index = Nonedef setup_module():
global rados
rados = Rados(conffile='')
rados.connect()
global pool_name
pool_name = random_id()
rados.create_pool(pool_name)
global ioctx_cas
global ioctx_index
ioctx_cas = rados.open_ioctx(pool_name)
ioctx_index = rados.open_ioctx(pool_name)def teardown_module():
global pool_name
rados.delete_pool(pool_name)Test: Write and immediate remove should not leave any object behind
def test_chunker_no_litter(): cas = CAS(ioctx_cas)
chunker = Chunker(cas, ioctx_index)
data_in = StringIO(random_bytes(chunker.chunk_size*4))
obj_name = random_id()
chunker.write_full(obj_name, data_in)
chunker.remove_all_versions(obj_name)
cas_objs = [x.key for x in ioctx_cas.list_objects()]
index_objs = [x.key for x in ioctx_index.list_objects()]
print "CAS objects left:", cas_objs
print "Index objects left:", index_objs
eq(len(cas_objs), 0)
eq(len(index_objs), 0)Test: get(put(x)) == x with random buffer content
def test_cas_put_get(): cas = CAS(ioctx_cas)
data_in = random_bytes(99)
obj_name = cas.put(data_in)
eq_buffer(data_in, cas.get(obj_name))Test: get(put(x)) == x with deduplicatable content (zeros)
def test_cas_put_deduplicatable_content(): cas = CAS(ioctx_cas)
data_in = "\x00" * (4 * 1024**2)
obj_name_1 = cas.put(data_in)
obj_name_2 = cas.put(data_in)
eq(obj_name_1, obj_name_2)Test Utility: get(put(x)) == x for given cas and x in {random, 1s, 0s}
def compressed_cas_put_get(cas): data_in = random_bytes(8*1024**2)
obj_name = cas.put(data_in)
eq_buffer(data_in, cas.get(obj_name, size=8*1024**2))
cas.down(obj_name)
data_in = "\xFF" * 11*1024**2
obj_name = cas.put(data_in)
eq_buffer(data_in, cas.get(obj_name, size=11*1024**2))
cas.down(obj_name)
data_in = "\x00" * 42
obj_name = cas.put(data_in)
eq_buffer(data_in, cas.get(obj_name))
cas.down(obj_name)Test: get(put(x)) == x for all available compressors
def test_compressed_cas_put_get(): for compression in Compressor.supported():
print compression
cas = CAS(ioctx_cas, compression=compression)
compressed_cas_put_get(cas)Test: Uncompressed put and compressed put afterwards does not change object except reference counter
def test_mixed_compression():Put uncompressed data and then put new objects with compression turned on CAS must not create new objects, but rather increment their reference counter
cas = CAS(ioctx_cas, compression="no")
data_in = "\xFF" * 11*1024**2
obj_name = cas.put(data_in)
eq_buffer(data_in, cas.get(obj_name, size=11*1024**2))
for compression in Compressor.supported():
ccas = CAS(ioctx_cas, compression=compression)
tmp_obj_name = ccas.put(data_in)
eq_buffer(data_in, ccas.get(tmp_obj_name, size=11*1024**2))
eq_buffer(data_in, cas.get(obj_name, size=11*1024**2))
info = cas.info(obj_name)
eq("no", info["cas.meta.compression"])
eq(len(Compressor.supported()) + 1, info["cas.refcount"])Test: read(write(x)) = x for x filling only a single chunk
def test_chunker_put_get_single(): cas = CAS(ioctx_cas)
chunker = Chunker(cas, ioctx_index)
data_in = StringIO(random_bytes(42))
obj_name = random_id()
version = chunker.write_full(obj_name, data_in)
data_out = StringIO()
chunker.read_full(obj_name, data_out, version)
eq_buffer(data_in.getvalue(), data_out.getvalue())Test: read(write(x)) = x for x spread over multiple chunks. Every chunk filled
def test_chunker_put_get_multiple(): cas = CAS(ioctx_cas)
chunker = Chunker(cas, ioctx_index)
data_in = StringIO(random_bytes(chunker.chunk_size*4))
obj_name = random_id()
version = chunker.write_full(obj_name, data_in)
data_out = StringIO()
chunker.read_full(obj_name, data_out, version)
eq_buffer(data_in.getvalue(), data_out.getvalue())Test: read(write(x)) = x for x spread over multiple chunks. With partially filled chunks
def test_chunker_put_get_multiple_fraction(): cas = CAS(ioctx_cas)
chunker = Chunker(cas, ioctx_index)
data_in = StringIO(random_bytes(int(chunker.chunk_size*1.5)))
obj_name = random_id()
version = chunker.write_full(obj_name, data_in)
data_out = StringIO()
chunker.read_full(obj_name, data_out, version)
eq_buffer(data_in.getvalue(), data_out.getvalue())Test: versions / head_version returns version of last write_full. Single write_full
def test_chunker_versions(): cas = CAS(ioctx_cas)
chunker = Chunker(cas, ioctx_index)
data_in = StringIO(random_bytes(10*1024**1))
obj_name = random_id()
version = chunker.write_full(obj_name, data_in)
eq(len(chunker.versions(obj_name)), 1)
eq(version, chunker.head_version(obj_name))
eq(version, chunker.versions(obj_name)[0])Test: versions / head_version return version of last write_full. Multiple write_full
def test_chunker_multiple_versions(): cas = CAS(ioctx_cas)
chunker = Chunker(cas, ioctx_index)
data_in = random_bytes(42)
obj_name = random_id()
versions = (
chunker.write_full(obj_name, StringIO(data_in)),
chunker.write_full(obj_name, StringIO(data_in)),
chunker.write_full(obj_name, StringIO(data_in)),
chunker.write_full(obj_name, StringIO(data_in)),
chunker.write_full(obj_name, StringIO(data_in)),
)
eq(len(versions), len(chunker.versions(obj_name)))
eq(versions[-1], chunker.head_version(obj_name))
eq(versions[0], chunker.versions(obj_name)[0])Test: remove actually removes
remove_version(write_full): No versions, but index objectwrite_full, write_full, remove_all_versions: Index object gonedef test_chunker_remove(): cas = CAS(ioctx_cas)
chunker = Chunker(cas, ioctx_index)
data_in = random_bytes(42)
obj_name = random_id()
version = chunker.write_full(obj_name, StringIO(data_in))
chunker.remove_version(obj_name, version)
eq(chunker.head_version(obj_name), None)
chunker.write_full(obj_name, StringIO(data_in))
chunker.write_full(obj_name, StringIO(data_in))
chunker.remove_all_versions(obj_name)
assert_raises(ObjectNotFound, chunker.head_version, obj_name)Test: partial reads using chunker.read with different input and weird extents
def test_chunker_partial_read(): cas = CAS(ioctx_cas)
chunker = Chunker(cas, ioctx_index)
data_in = StringIO("\x00" * chunker.chunk_size +
"\xFF" * chunker.chunk_size)
obj_name = random_id()
version = chunker.write_full(obj_name, data_in)
middle = chunker.chunk_size / 2
buf = chunker.read(obj_name, chunker.chunk_size, middle, version)
eq(len(buf), chunker.chunk_size)
eq_buffer("\x00" * (chunker.chunk_size/2) +
"\xFF" * (chunker.chunk_size/2), buf)Test: partial reads past file size
def test_chunker_partial_read_past_size(): cas = CAS(ioctx_cas)
chunker = Chunker(cas, ioctx_index)
data_in = StringIO("\x00" * chunker.chunk_size)
obj_name = random_id()
version = chunker.write_full(obj_name, data_in)
buf = chunker.read(obj_name, chunker.chunk_size, chunker.chunk_size, version)
eq_buffer(buf, "")