aka Pythonic FS APIs part 2, the electric boogaloo
Contents
Links in the abstract and in outline view.
Spread all over the place, built-ins and miscellaneous stdlib
Thanks to everyone there!
p = self.fs.join("path/to/myfile")
with p.open() as f:
while True:
data = f.read(8192)
if not data:
break
yield data
p = filesystem.path("/my/safe/area")
p = p.child(user_input)
for p in path:
print p.name(), p.size()
for (cur, dirs, files) in top.walk():
dirs[:] = [
d for d in dirs
if d.name() not in IGNORE
]
for p in files:
print p, p.size()
import json
import os
def serialize(path, data):
tmp_name = '%s.%d.tmp' % (
path.name(),
os.getpid(),
)
tmp = path.parent().child(tmp_name)
with tmp.open('w') as f:
json.dump(obj=data, fp=f)
tmp.rename(path)
import fudge
import nose
import re
from fudge.inspector import arg
from serialize_json import serialize
@nose.with_setup(fudge.clear_expectations)
@fudge.with_fakes
def test_serialize():
path = fudge.Fake('path')
path.remember_order()
path.expects('name').with_args().returns('quux.thud')
path_parent = fudge.Fake('path_parent')
path_parent.remember_order()
path.expects('parent').with_args().returns(path_parent)
path_tmp = fudge.Fake('path_tmp')
path_tmp.remember_order()
TMP_RE = re.compile(r'^quux\.thud\.\d+\.tmp$')
path_parent.expects('child').with_args(
arg.passes_test(TMP_RE.match),
).returns(path_tmp)
file_tmp = fudge.Fake('file')
file_tmp.remember_order()
path_tmp.expects('open').with_args('w').returns(file_tmp)
file_ctx = fudge.Fake('file')
file_ctx.remember_order()
file_tmp.expects('__enter__').with_args().returns(file_ctx)
file_ctx.expects('write').with_args('{')
file_ctx.next_call().with_args('"foo"')
file_ctx.next_call().with_args(': ')
file_ctx.next_call().with_args('"bar"')
file_ctx.next_call().with_args('}')
file_tmp.expects('__exit__').with_args(None, None, None)
path_tmp.expects('rename').with_args(path)
serialize(path, {'foo':'bar'})
#!/usr/bin/python
import filesystem
from serialize_json import serialize
serialize(
filesystem.path('output.json'),
{'answer': 42},
)
and no temp dirs either
So use convenience libs!
def test_wishful_thinking():
p = fudgefs.FakeFS()
tmp = p.file(regex=r'^foo\.bar\.\d+\.tmp$')
tmp.writes('{"foo": "bar"}')
p.rename(tmp, 'quux.thud')
serialize(p, {'foo':'bar'})
repo = gitfs.repo.Repository(path)
with repo.transaction() as root:
with root.child('greet').open('w') as f:
f.write('hello, world\n')
| gitfs.repo | ||
| gitfs.readonly | gitfs.indexfs | |
| gitfs.commands | ||
| git plumbing | ||
fs = hadoopfs.HadoopFS('myserver')
with fs as root:
big = root.child('big')
with big.open('w') as f:
while i in xrange(1e6):
f.write('x'*1000+'\n')
Questions? Opinions? Rants?
Find me today or tomorrow to talk more.
Slides etc up on eagain.net