2019-06-14 09:51:59 +00:00
# Documentation. {{{1
This Python script implements a file system in user space using FUSE . It ' s
called DedupFS because the file system ' s primary feature is deduplication,
which enables it to store virtually unlimited copies of files because data
is only stored once .
In addition to deduplication the file system also supports transparent
compression using any of the compression methods lzo , zlib and bz2 .
These two properties make the file system ideal for backups : I ' m currently
storing 250 GB worth of backups using only 8 GB of disk space .
The latest version is available at http : / / peterodding . com / code / dedupfs /
DedupFS is licensed under the MIT license .
Copyright 2010 Peter Odding < peter @peterodding.com > .
# Imports. {{{1
# Check the Python version, warn the user if untested.
import sys
flagone = True
if sys . version_info [ : 2 ] != ( 2 , 6 ) :
msg = " Warning: DedupFS has only been tested on Python 2.6, while you ' re running Python %d . %d ! \n "
sys . stderr . write ( msg % ( sys . version_info [ 0 ] , sys . version_info [ 1 ] ) )
# Try to load the required modules from Python's standard library.
try :
from io import BytesIO
import cStringIO
import errno
import hashlib
import tempfile
import logging
import math
import os
import sqlite3
import stat
import time
import traceback
except ImportError , e :
msg = " Error: Failed to load one of the required Python modules! ( %s ) \n "
sys . stderr . write ( msg % str ( e ) )
sys . exit ( 1 )
# Try to load the Python FUSE binding.
try :
import fuse
except ImportError :
sys . stderr . write ( " Error: The Python FUSE binding isn ' t installed! \n " + \
" If you ' re on Ubuntu try running `sudo apt-get install python-fuse ' . \n " )
sys . exit ( 1 )
# Local modules that are mostly useful for debugging.
from my_formats import format_size , format_timespan
from get_memory_usage import get_memory_usage
2019-07-08 05:01:07 +00:00
from subprocess import Popen
2019-06-14 09:51:59 +00:00
def main ( ) : # {{{1
This function enables using dedupfs . py as a shell script that creates FUSE
mount points . Execute " dedupfs -h " for a list of valid command line options .
dfs = DedupFS ( )
# A short usage message with the command line options defined by dedupfs
# itself (see the __init__() method of the DedupFS class) is automatically
# printed by the following call when sys.argv contains -h or --help.
fuse_opts = dfs . parse ( [ ' -o ' , ' use_ino,default_permissions,fsname=dedupfs ' ] + sys . argv [ 1 : ] )
dfs_opts = dfs . cmdline [ 0 ]
if dfs_opts . print_stats :
dfs . read_only = True
dfs . fsinit ( silent = True )
dfs . report_disk_usage ( )
dfs . fsdestroy ( silent = True )
# If the user didn't pass -h or --help and also didn't supply a mount point
# as a positional argument, print the short usage message and exit (I don't
# agree with the Python FUSE binding's default behavior, which is something
# nonsensical like using the working directory as a mount point).
elif dfs . fuse_args . mount_expected ( ) and not fuse_opts . mountpoint :
dfs . parse ( [ ' -h ' ] )
elif fuse_opts . mountpoint or not dfs . fuse_args . mount_expected ( ) :
# Don't print all options unless the user passed -h or --help explicitly
# because this listing includes the 20+ options defined by the Python FUSE
# binding (which is kind of intimidating at first).
dfs . main ( )
class DedupFS ( fuse . Fuse ) : # {{{1
def __init__ ( self , * args , * * kw ) : # {{{2
try :
# Set the Python FUSE API version.
fuse . fuse_python_api = ( 0 , 2 )
# Initialize the FUSE binding's internal state.
fuse . Fuse . __init__ ( self , * args , * * kw )
# Set some options required by the Python FUSE binding.
self . flags = 0
self . multithreaded = 0
# Initialize instance attributes.
self . block_size = 1024 * 1024 * 5
self . buffers = { }
self . bytes_read = 0
self . bytes_written = 0
self . cache_gc_last_run = time . time ( )
self . cache_requests = 0
self . cache_timeout = 60 # TODO Make this a command line option!
self . cached_nodes = { }
self . calls_log_filter = [ ]
self . datastore_file = ' ~/.dedupfs-datastore.db '
self . fs_mounted_at = time . time ( )
self . gc_enabled = True
self . gc_hook_last_run = time . time ( )
self . gc_interval = 60
self . link_mode = stat . S_IFLNK | 0777
self . memory_usage = 0
self . metastore_file = ' ~/.dedupfs-metastore.sqlite3 '
self . opcount = 0
self . read_only = False
self . root_mode = stat . S_IFDIR | 0755
self . time_spent_caching_nodes = 0
self . time_spent_hashing = 0
self . time_spent_interning = 0
self . time_spent_querying_tree = 0
self . time_spent_reading = 0
self . time_spent_traversing_tree = 0
self . time_spent_writing = 0
self . time_spent_writing_blocks = 0
self . __NODE_KEY_VALUE = 0
self . __NODE_KEY_LAST_USED = 1
# Initialize a Logger() object to handle logging.
self . logger = logging . getLogger ( ' dedupfs ' )
self . logger . setLevel ( logging . INFO )
self . logger . addHandler ( logging . StreamHandler ( sys . stderr ) )
# Register some custom command line options with the option parser.
option_stored_in_db = " (this option is only useful when creating a new database, because your choice is stored in the database and can ' t be changed after that) "
self . parser . set_conflict_handler ( ' resolve ' ) # enable overriding the --help message.
self . parser . add_option ( ' -h ' , ' --help ' , action = ' help ' ,
help = " show this help message followed by the command line options defined by the Python FUSE binding and exit " )
self . parser . add_option ( ' -v ' , ' --verbose ' , action = ' count ' , dest = ' verbosity ' , default = 0 ,
help = " increase verbosity " )
self . parser . add_option ( ' --print-stats ' , dest = ' print_stats ' , action = ' store_true ' , default = False ,
help = " print the total apparent size and the actual disk usage of the file system and exit " )
self . parser . add_option ( ' --log-file ' , dest = ' log_file ' , help = " specify log file location " )
self . parser . add_option ( ' --metastore ' , dest = ' metastore ' , metavar = ' FILE ' , default = self . metastore_file ,
help = " specify the location of the file in which metadata is stored " )
self . parser . add_option ( ' --datastore ' , dest = ' datastore ' , metavar = ' FILE ' , default = self . datastore_file ,
help = " specify the location of the file in which data blocks are stored " )
self . parser . add_option ( ' --block-size ' , dest = ' block_size ' , metavar = ' BYTES ' , default = self . block_size ,
type = ' int ' , help = " specify the maximum block size in bytes " + option_stored_in_db )
self . parser . add_option ( ' --no-transactions ' , dest = ' use_transactions ' , action = ' store_false ' , default = True ,
help = " don ' t use transactions when making multiple related changes, this might make the file system faster or slower (?) " )
self . parser . add_option ( ' --nosync ' , dest = ' synchronous ' , action = ' store_false ' , default = True ,
help = " disable SQLite ' s normal synchronous behavior which guarantees that data is written to disk immediately, because it slows down the file system too much (this means you might lose data when the mount point isn ' t cleanly unmounted) " )
self . parser . add_option ( ' --nogc ' , dest = ' gc_enabled ' , action = ' store_false ' , default = True ,
help = " disable the periodic garbage collection because it degrades performance (only do this when you ' ve got disk space to waste or you know that nothing will be be deleted from the file system, which means little to no garbage will be produced) " )
self . parser . add_option ( ' --verify-writes ' , dest = ' verify_writes ' , action = ' store_true ' , default = False ,
help = " after writing a new data block to the database, check that the block was written correctly by reading it back again and checking for differences " )
# Dynamically check for supported hashing algorithms.
msg = " specify the hashing algorithm that will be used to recognize duplicate data blocks: one of %s " + option_stored_in_db
hash_functions = filter ( lambda m : m [ 0 ] != ' _ ' and m != ' new ' , dir ( hashlib ) )
msg % = ' , ' . join ( ' %r ' % fun for fun in hash_functions )
self . parser . add_option ( ' --hash ' , dest = ' hash_function ' , metavar = ' FUNCTION ' , type = ' choice ' ,
choices = hash_functions , default = ' sha1 ' , help = msg )
# Dynamically check for supported compression methods.
def noop ( s ) :
return s
self . compressors = { ' none ' : ( noop , noop ) }
compression_methods = [ ' none ' ]
for modname in ' lzo ' , ' zlib ' , ' bz2 ' :
try :
module = __import__ ( modname )
if hasattr ( module , ' compress ' ) and hasattr ( module , ' decompress ' ) :
self . compressors [ modname ] = ( module . compress , module . decompress )
compression_methods . append ( modname )
except ImportError :
msg = " enable compression of data blocks using one of the supported compression methods: one of %s " + option_stored_in_db
msg % = ' , ' . join ( ' %r ' % mth for mth in compression_methods [ 1 : ] )
self . parser . add_option ( ' --compress ' , dest = ' compression_method ' , metavar = ' METHOD ' , type = ' choice ' ,
choices = compression_methods , default = ' none ' , help = msg )
# Dynamically check for profiling support.
try :
# Using __import__() here because of pyflakes.
for p in ' cProfile ' , ' pstats ' : __import__ ( p )
self . parser . add_option ( ' --profile ' , action = ' store_true ' , default = False ,
help = " use the Python modules cProfile and pstats to create a profile of time spent in various function calls and print out a table of the slowest functions at exit (of course this slows everything down but it can nevertheless give a good indication of the hot spots) " )
except ImportError :
self . logger . warning ( " No profiling support available, --profile option disabled. " )
self . logger . warning ( " If you ' re on Ubuntu try `sudo apt-get install python-profiler ' . " )
except Exception , e :
self . __except_to_status ( ' __init__ ' , e )
sys . exit ( 1 )
# FUSE API implementation: {{{2
def access ( self , path , flags ) : # {{{3
try :
self . __log_call ( ' access ' , ' access( %r , %o ) ' , path , flags )
inode = self . __path2keys ( path ) [ 1 ]
if flags != os . F_OK and not self . __access ( inode , flags ) :
return - errno . EACCES
return 0
except Exception , e :
return self . __except_to_status ( ' access ' , e , errno . ENOENT )
def chmod ( self , path , mode ) : # {{{3
try :
self . __log_call ( ' chmod ' , ' chmod( %r , %o ) ' , path , mode )
if self . read_only : return - errno . EROFS
inode = self . __path2keys ( path ) [ 1 ]
self . conn . execute ( ' UPDATE inodes SET mode = ? WHERE inode = ? ' , ( mode , inode ) )
self . __gc_hook ( )
return 0
except Exception , e :
return self . __except_to_status ( ' chmod ' , e , errno . EIO )
def chown ( self , path , uid , gid ) : # {{{3
try :
self . __log_call ( ' chown ' , ' chown( %r , %i , %i ) ' , path , uid , gid )
if self . read_only : return - errno . EROFS
inode = self . __path2keys ( path ) [ 1 ]
self . conn . execute ( ' UPDATE inodes SET uid = ?, gid = ? WHERE inode = ? ' , ( uid , gid , inode ) )
self . __gc_hook ( )
return 0
except Exception , e :
return self . __except_to_status ( ' chown ' , e , errno . EIO )
def create ( self , path , flags , mode ) : # {{{3
try :
self . __log_call ( ' create ' , ' create( %r , %o , %o ) ' , path , flags , mode )
if self . read_only : return - errno . EROFS
try :
# If the file already exists, just open it.
status = self . open ( path , flags , nested = True )
except OSError , e :
if e . errno != errno . ENOENT : raise
# Otherwise create a new file and open that.
inode , parent_ino = self . __insert ( path , mode , 0 )
status = self . open ( path , flags , nested = True , inode = inode )
self . __commit_changes ( )
self . __gc_hook ( )
return status
except Exception , e :
self . __rollback_changes ( )
return self . __except_to_status ( ' create ' , e , errno . EIO )
def fsdestroy ( self , silent = False ) : # {{{3
try :
self . __log_call ( ' fsdestroy ' , ' fsdestroy() ' )
self . __collect_garbage ( )
if not silent :
self . __print_stats ( )
if not self . read_only :
self . logger . info ( " Committing outstanding changes to ` %s ' . " , self . metastore_file )
self . __dbmcall ( ' sync ' )
self . conn . commit ( )
self . conn . close ( )
self . __dbmcall ( ' close ' )
return 0
except Exception , e :
return self . __except_to_status ( ' fsdestroy ' , e , errno . EIO )
def fsinit ( self , silent = False ) : # {{{3
try :
# Process the custom command line options defined in __init__().
options = self . cmdline [ 0 ]
self . block_size = options . block_size
self . compression_method = options . compression_method
self . datastore_file = self . __check_data_file ( options . datastore , silent )
self . gc_enabled = options . gc_enabled
self . hash_function = options . hash_function
self . metastore_file = self . __check_data_file ( options . metastore , silent )
self . synchronous = options . synchronous
self . use_transactions = options . use_transactions
self . verify_writes = options . verify_writes
# Initialize the logging and database subsystems.
self . __init_logging ( options )
self . __log_call ( ' fsinit ' , ' fsinit() ' )
self . __setup_database_connections ( silent )
if not self . read_only :
self . __init_metastore ( )
self . __get_opts_from_db ( options )
# Make sure the hash function is (still) valid (since the database was created).
if not hasattr ( hashlib , self . hash_function ) :
self . logger . critical ( " Error: The selected hash function %r doesn ' t exist! " , self . hash_function )
sys . exit ( 1 )
# Get a reference to the hash function.
self . hash_function_impl = getattr ( hashlib , self . hash_function )
# Disable synchronous operation. This is supposed to make SQLite perform
# MUCH better but it has to be enabled wit --nosync because you might
# lose data when the file system isn't cleanly unmounted...
if not self . synchronous and not self . read_only :
self . logger . warning ( " Warning: Disabling synchronous operation, you might lose data.. " )
self . conn . execute ( ' PRAGMA synchronous = OFF ' )
# Select the compression method (if any) after potentially reading the
# configured block size that was used to create the database (see the
# set_block_size() call).
self . __select_compress_method ( options , silent )
return 0
except Exception , e :
self . __except_to_status ( ' fsinit ' , e , errno . EIO )
# Bug fix: Break the mount point when initialization failed with an
# exception, because self.conn might not be valid, which results in
# an internal error message for every FUSE API call...
os . _exit ( 1 )
def getattr ( self , path ) : # {{{3
try :
self . __log_call ( ' getattr ' , ' getattr( %r ) ' , path )
inode = self . __path2keys ( path ) [ 1 ]
query = ' SELECT inode, nlinks, mode, uid, gid, rdev, size, atime, mtime, ctime FROM inodes WHERE inode = ? '
attrs = self . conn . execute ( query , ( inode , ) ) . fetchone ( )
result = Stat ( st_ino = attrs [ 0 ] ,
st_nlink = attrs [ 1 ] ,
st_mode = attrs [ 2 ] ,
st_uid = attrs [ 3 ] ,
st_gid = attrs [ 4 ] ,
st_rdev = attrs [ 5 ] ,
st_size = attrs [ 6 ] ,
st_atime = attrs [ 7 ] ,
st_mtime = attrs [ 8 ] ,
st_ctime = attrs [ 9 ] ,
st_blksize = self . block_size ,
st_blocks = attrs [ 6 ] / 512 ,
st_dev = 0 )
self . logger . debug ( " getattr( %r ) returning %s " , path , result )
return result
except Exception , e :
self . logger . debug ( " getattr( %r ) returning ENOENT " , path )
return self . __except_to_status ( ' getattr ' , e , errno . ENOENT )
def link ( self , target_path , link_path , nested = False ) : # {{{3
# From the link(2) manual page: "If link_path names a directory, link()
# shall fail unless the process has appropriate privileges and the
# implementation supports using link() on directories." ... :-)
# However I've read that FUSE doesn't like multiple directory pathnames
# with the same inode number (maybe because of internal caching based on
# inode numbers?).
try :
self . __log_call ( ' link ' , ' %s link( %r -> %r ) ' , nested and ' ' or ' ' , target_path , link_path )
if self . read_only : return - errno . EROFS
target_ino = self . __path2keys ( target_path ) [ 1 ]
link_parent , link_name = os . path . split ( link_path )
link_parent_id , link_parent_ino = self . __path2keys ( link_parent )
string_id = self . __intern ( link_name )
self . conn . execute ( ' INSERT INTO tree (parent_id, name, inode) VALUES (?, ?, ?) ' ,
( link_parent_id , string_id , target_ino ) )
node_id = self . __fetchval ( ' SELECT last_insert_rowid() ' )
self . conn . execute ( ' UPDATE inodes SET nlinks = nlinks + 1 WHERE inode = ? ' , ( target_ino , ) )
if self . __fetchval ( ' SELECT mode FROM inodes WHERE inode = ? ' , target_ino ) & stat . S_IFDIR :
self . conn . execute ( ' UPDATE inodes SET nlinks = nlinks + 1 WHERE inode = ? ' , ( link_parent_ino , ) )
self . __cache_set ( link_path , ( node_id , target_ino ) )
self . __commit_changes ( nested )
self . __gc_hook ( nested )
return 0
except Exception , e :
self . __rollback_changes ( nested )
if nested : raise
return self . __except_to_status ( ' link ' , e , errno . EIO )
def mkdir ( self , path , mode ) : # {{{3
try :
self . __log_call ( ' mkdir ' , ' mkdir( %r , %o ) ' , path , mode )
if self . read_only : return - errno . EROFS
inode , parent_ino = self . __insert ( path , mode | stat . S_IFDIR , 1024 * 4 )
self . conn . execute ( ' UPDATE inodes SET nlinks = nlinks + 1 WHERE inode = ? ' , ( parent_ino , ) )
self . __commit_changes ( )
self . __gc_hook ( )
return 0
except Exception , e :
self . __rollback_changes ( )
return self . __except_to_status ( ' mkdir ' , e , errno . EIO )
def mknod ( self , path , mode , rdev ) : # {{{3
try :
self . __log_call ( ' mknod ' , ' mknod( %r , %o ) ' , path , mode )
if self . read_only : return - errno . EROFS
self . __insert ( path , mode , 0 , rdev )
self . __commit_changes ( )
self . __gc_hook ( )
return 0
except Exception , e :
self . __rollback_changes ( )
return self . __except_to_status ( ' mknod ' , e , errno . EIO )
def open ( self , path , flags , nested = None , inode = None ) : # {{{3
try :
self . __log_call ( ' open ' , ' open( %r , %o ) ' , path , flags )
# Make sure the file exists?
inode = inode or self . __path2keys ( path ) [ 1 ]
# Make sure the file is readable and/or writable.
access_flags = 0
if flags & ( os . O_RDONLY | os . O_RDWR ) : access_flags | = os . R_OK
if flags & ( os . O_WRONLY | os . O_RDWR ) : access_flags | = os . W_OK
if not self . __access ( inode , access_flags ) :
return - errno . EACCES
return 0
except Exception , e :
if nested : raise
return self . __except_to_status ( ' open ' , e , errno . ENOENT )
def read ( self , path , length , offset ) : # {{{3
try :
self . __log_call ( ' read ' , ' read( %r , %i , %i ) ' , path , length , offset )
start_time = time . time ( )
buf = self . __get_file_buffer ( path )
buf . seek ( offset )
data = buf . read ( length )
self . time_spent_reading + = time . time ( ) - start_time
self . bytes_read + = len ( data )
return data
except Exception , e :
return self . __except_to_status ( ' read ' , e , code = errno . EIO )
def readdir ( self , path , offset ) : # {{{3
# Bug fix: When you use the -o use_ino option, directory entries must have
# an "ino" field, otherwise not a single directory entry will be listed!
try :
self . __log_call ( ' readdir ' , ' readdir( %r , %i ) ' , path , offset )
node_id , inode = self . __path2keys ( path )
yield fuse . Direntry ( ' . ' , ino = inode )
yield fuse . Direntry ( ' .. ' )
query = " SELECT t.inode, s.value FROM tree t, strings s WHERE t.parent_id = ? AND t.name = s.id "
for inode , name in self . conn . execute ( query , ( node_id , ) ) . fetchall ( ) :
yield fuse . Direntry ( str ( name ) , ino = inode )
except Exception , e :
self . __except_to_status ( ' readdir ' , e )
def readlink ( self , path ) : # {{{3
try :
self . __log_call ( ' readlink ' , ' readlink( %r ) ' , path )
inode = self . __path2keys ( path ) [ 1 ]
query = ' SELECT target FROM links WHERE inode = ? '
return str ( self . __fetchval ( query , inode ) )
except Exception , e :
return self . __except_to_status ( ' readlink ' , e , errno . ENOENT )
def release ( self , path , flags ) : # {{{3
try :
self . __log_call ( ' release ' , ' release( %r , %o ) ' , path , flags )
# Flush the write buffer?!
if path in self . buffers :
buf = self . buffers [ path ]
# Flush the write buffer?
if buf . dirty :
# Record start time so we can calculate average write speed.
start_time = time . time ( )
# Make sure the file exists and get its inode number.
inode = self . __path2keys ( path ) [ 1 ]
# Save apparent file size before possibly compressing data.
apparent_size = len ( buf )
# Split up that string in the configured block size, hash the
# resulting blocks and store any new blocks.
try :
self . __write_blocks ( inode , buf , apparent_size )
self . __commit_changes ( )
except Exception , e :
self . __rollback_changes ( )
# Record the number of bytes written and the elapsed time.
self . bytes_written + = apparent_size
self . time_spent_writing + = time . time ( ) - start_time
self . __gc_hook ( )
# Delete the buffer.
buf . close ( )
del self . buffers [ path ]
return 0
except Exception , e :
return self . __except_to_status ( ' release ' , e , errno . EIO )
def rename ( self , old_path , new_path ) : # {{{3
try :
self . __log_call ( ' rename ' , ' rename( %r -> %r ) ' , old_path , new_path )
if self . read_only : return - errno . EROFS
# Try to remove the existing target path (if if exists).
# NB: This also makes sure target directories are empty.
try :
self . __remove ( new_path , check_empty = True )
except OSError , e :
# Ignore errno.ENOENT, re raise other exceptions.
if e . errno != errno . ENOENT : raise
# Link the new path to the same inode as the old path.
self . link ( old_path , new_path , nested = True )
# Finally unlink the old path.
self . unlink ( old_path , nested = True )
self . __commit_changes ( )
self . __gc_hook ( )
return 0
except Exception , e :
self . __rollback_changes ( )
return self . __except_to_status ( ' rename ' , e , errno . ENOENT )
def rmdir ( self , path ) : # {{{3
try :
self . __log_call ( ' rmdir ' , ' rmdir( %r ) ' , path )
if self . read_only : return - errno . EROFS
self . __remove ( path , check_empty = True )
self . __commit_changes ( )
return 0
except Exception , e :
self . __rollback_changes ( )
return self . __except_to_status ( ' rmdir ' , e , errno . ENOENT )
def statfs ( self ) : # {{{3
try :
self . __log_call ( ' statfs ' , ' statfs() ' )
# Use os.statvfs() to report the host file system's storage capacity.
host_fs = os . statvfs ( self . metastore_file )
return StatVFS ( f_bavail = ( host_fs . f_bsize * host_fs . f_bavail ) / self . block_size ,
# The total number of free blocks available to a non privileged process.
f_bfree = ( host_fs . f_frsize * host_fs . f_bfree ) / self . block_size ,
# The total number of free blocks in the file system.
f_blocks = ( host_fs . f_frsize * host_fs . f_blocks ) / self . block_size ,
# The total number of blocks in the file system in terms of f_frsize.
f_bsize = self . block_size , # The file system block size in bytes.
f_favail = 0 , # The number of free file serial numbers available to a non privileged process.
f_ffree = 0 , # The total number of free file serial numbers.
f_files = 0 , # The total number of file serial numbers.
f_flag = 0 ,
# File system flags. Symbols are defined in the <sys/statvfs.h> header file to refer to bits in this field (see The f_flags field).
f_frsize = self . block_size , # The fundamental file system block size in bytes.
f_namemax = 4294967295 ) # The maximum file name length in the file system. Some file systems may return the maximum value that can be stored in an unsigned long to indicate the file system has no maximum file name length. The maximum value that can be stored in an unsigned long is defined in <limits.h> as ULONG_MAX.
except Exception , e :
return self . __except_to_status ( ' statfs ' , e , errno . EIO )
def symlink ( self , target_path , link_path ) : # {{{3
try :
self . __log_call ( ' symlink ' , ' symlink( %r -> %r ) ' , link_path , target_path )
if self . read_only : return - errno . EROFS
# Create an inode to hold the symbolic link.
inode , parent_ino = self . __insert ( link_path , self . link_mode , len ( target_path ) )
# Save the symbolic link's target.
self . conn . execute ( ' INSERT INTO links (inode, target) VALUES (?, ?) ' , ( inode , sqlite3 . Binary ( target_path ) ) )
self . __commit_changes ( )
self . __gc_hook ( )
return 0
except Exception , e :
self . __rollback_changes ( )
return self . __except_to_status ( ' symlink ' , e , errno . EIO )
def truncate ( self , path , size ) : # {{{3
try :
self . __log_call ( ' truncate ' , ' truncate( %r , %i ) ' , path , size )
if self . read_only : return - errno . EROFS
inode = self . __path2keys ( path ) [ 1 ]
last_block = size / self . block_size
self . conn . execute ( ' DELETE FROM " index " WHERE inode = ? AND block_nr > ? ' , ( inode , last_block ) )
self . conn . execute ( ' UPDATE inodes SET size = ? WHERE inode = ? ' , ( size , inode ) )
self . __gc_hook ( )
self . __commit_changes ( )
return 0
except Exception , e :
self . __rollback_changes ( )
return self . __except_to_status ( ' truncate ' , e , errno . ENOENT )
def unlink ( self , path , nested = False ) : # {{{3
try :
self . __log_call ( ' unlink ' , ' %s unlink( %r ) ' , nested and ' ' or ' ' , path )
if self . read_only : return - errno . EROFS
self . __remove ( path )
self . __commit_changes ( nested )
except Exception , e :
self . __rollback_changes ( nested )
if nested : raise
return self . __except_to_status ( ' unlink ' , e , errno . ENOENT )
def utime ( self , path , times ) : # {{{3
try :
self . __log_call ( ' utime ' , ' utime( %r , %i , %i ) ' , path , * times )
if self . read_only : return - errno . EROFS
inode = self . __path2keys ( path ) [ 1 ]
atime , mtime = times
self . conn . execute ( ' UPDATE inodes SET atime = ?, mtime = ? WHERE inode = ? ' , ( atime , mtime , inode ) )
self . __gc_hook ( )
return 0
except Exception , e :
return self . __except_to_status ( ' utime ' , e , errno . ENOENT )
def utimens ( self , path , ts_acc , ts_mod ) : # {{{3
try :
self . __log_call ( ' utimens ' , ' utimens( %r , %i . %i , %i . %i ) ' , path , ts_acc . tv_sec , ts_acc . tv_nsec , ts_mod . tv_sec ,
ts_mod . tv_nsec )
if self . read_only : return - errno . EROFS
inode = self . __path2keys ( path ) [ 1 ]
atime = ts_acc . tv_sec + ( ts_acc . tv_nsec / 1000000.0 )
mtime = ts_mod . tv_sec + ( ts_mod . tv_nsec / 1000000.0 )
self . conn . execute ( ' UPDATE inodes SET atime = ?, mtime = ? WHERE inode = ? ' , ( atime , mtime , inode ) )
self . __gc_hook ( )
return 0
except Exception , e :
return self . __except_to_status ( ' utimens ' , e , errno . ENOENT )
def write ( self , path , data , offset ) : # {{{3
try :
length = len ( data )
self . __log_call ( ' write ' , ' write( %r , %i , %i ) ' , path , offset , length )
start_time = time . time ( )
buf = self . __get_file_buffer ( path )
buf . seek ( offset )
buf . write ( data )
self . time_spent_writing + = time . time ( ) - start_time
# self.bytes_written is incremented from release().
return length
except Exception , e :
return self . __except_to_status ( ' write ' , e , errno . EIO )
# Miscellaneous methods: {{{2
def __init_logging ( self , options ) : # {{{3
# Configure logging of messages to a file.
if options . log_file :
handler = logging . StreamHandler ( open ( options . log_file , ' w ' ) )
self . logger . addHandler ( handler )
# Convert verbosity argument to logging level?
if options . verbosity > 0 :
if options . verbosity < = 1 :
self . logger . setLevel ( logging . INFO )
elif options . verbosity < = 2 :
self . logger . setLevel ( logging . DEBUG )
else :
self . logger . setLevel ( logging . NOTSET )
def __init_metastore ( self ) : # {{{3
# Bug fix: At this point fuse.FuseGetContext() returns uid = 0 and gid = 0
# which differs from the info returned in later calls. The simple fix is to
# use Python's os.getuid() and os.getgid() library functions instead of
# fuse.FuseGetContext().
uid , gid = os . getuid ( ) , os . getgid ( )
t = self . __newctime ( )
self . conn . executescript ( """
- - Create the required tables ?
CREATE TABLE IF NOT EXISTS " index " ( inode INTEGER , hash_id INTEGER , block_nr INTEGER , PRIMARY KEY ( inode , hash_id , block_nr ) ) ;
- - Create the root node of the file system ?
INSERT OR IGNORE INTO strings ( id , value ) VALUES ( 1 , ' ' ) ;
INSERT OR IGNORE INTO tree ( id , parent_id , name , inode ) VALUES ( 1 , NULL , 1 , 1 ) ;
INSERT OR IGNORE INTO inodes ( nlinks , mode , uid , gid , rdev , size , atime , mtime , ctime ) VALUES ( 2 , % i , % i , % i , 0 , 1024 * 4 , % f , % f , % f ) ;
- - Save the command line options used to initialize the database ?
INSERT OR IGNORE INTO options ( name , value ) VALUES ( ' synchronous ' , % i ) ;
INSERT OR IGNORE INTO options ( name , value ) VALUES ( ' block_size ' , % i ) ;
INSERT OR IGNORE INTO options ( name , value ) VALUES ( ' compression_method ' , % r ) ;
INSERT OR IGNORE INTO options ( name , value ) VALUES ( ' hash_function ' , % r ) ;
""" % (self.root_mode, uid, gid, t, t, t, self.synchronous and 1 or 0,
self . block_size , self . compression_method , self . hash_function ) )
def __setup_database_connections ( self , silent ) : # {{{3
if not silent :
self . logger . info ( " Using data files %r and %r . " , self . metastore_file , self . datastore_file )
# Open the key/value store containing the data blocks.
if not os . path . exists ( self . metastore_file ) :
self . blocks = self . __open_datastore ( True )
else :
from whichdb import whichdb
created_by_gdbm = whichdb ( self . metastore_file ) == ' gdbm '
self . blocks = self . __open_datastore ( created_by_gdbm )
# Open an SQLite database connection with manual transaction management.
self . conn = sqlite3 . connect ( self . metastore_file , isolation_level = None )
# Use the built in row factory to enable named attributes.
self . conn . row_factory = sqlite3 . Row
# Return regular strings instead of Unicode objects.
self . conn . text_factory = str
# Don't bother releasing any locks since there's currently no point in
# having concurrent reading/writing of the file system database.
self . conn . execute ( ' PRAGMA locking_mode = EXCLUSIVE ' )
def __open_datastore ( self , use_gdbm ) :
# gdbm is preferred over other dbm implementations because it supports fast
# vs. synchronous modes, however any other dedicated key/value store should
# work just fine (albeit not as fast). Note though that existing key/value
# stores are always accessed through the library that created them.
mode = self . read_only and ' r ' or ' c '
if use_gdbm :
try :
import gdbm
mode + = self . synchronous and ' s ' or ' f '
return gdbm . open ( self . datastore_file , mode )
except ImportError :
import anydbm
return anydbm . open ( self . datastore_file , mode )
def __dbmcall ( self , fun ) : # {{{3
# I simply cannot find any freakin' documentation on the type of objects
# returned by anydbm and gdbm, so cannot verify that any single method will
# always be there, although most seem to...
# if hasattr(self.blocks, fun):
# getattr(self.blocks, fun)()
def __check_data_file ( self , pathname , silent ) : # {{{3
pathname = os . path . expanduser ( pathname )
if os . access ( pathname , os . F_OK ) :
# If the datafile already exists make sure it's readable,
# otherwise the file system would be completely unusable.
if not os . access ( pathname , os . R_OK ) :
self . logger . critical ( " Error: Datafile %r exists but isn ' t readable! " , pathname )
os . _exit ( 1 )
# Check and respect whether the datafile is writable (e.g. when it was
# created by root but is currently being accessed by another user).
if not os . access ( pathname , os . W_OK ) :
if not silent :
self . logger . warning ( " File %r exists but isn ' t writable! Switching to read only mode. " , pathname )
self . read_only = True
return pathname
def __log_call ( self , fun , msg , * args ) : # {{{3
# To disable all __log_call() invocations:
# :%s/^\(\s\+\)\(self\.__log_call\)/\1#\2
# To re enable them:
# :%s/^\(\s\+\)#\(self\.__log_call\)/\1\2
if self . calls_log_filter == [ ] or fun in self . calls_log_filter :
self . logger . debug ( msg , * args )
def __get_opts_from_db ( self , options ) : # {{{3
for name , value in self . conn . execute ( ' SELECT name, value FROM options ' ) :
if name == ' synchronous ' :
self . synchronous = int ( value ) != 0
# If the user passed --nosync, override the value stored in the database.
if not options . synchronous :
self . synchronous = False
elif name == ' block_size ' and int ( value ) != self . block_size :
self . logger . warning ( " Ignoring --block-size= %i argument, using previously chosen block size %i instead " ,
self . block_size , int ( value ) )
self . block_size = int ( value )
elif name == ' compression_method ' and value != self . compression_method :
if self . compression_method != ' none ' :
self . logger . warning (
" Ignoring --compress= %s argument, using previously chosen compression method %r instead " ,
self . compression_method , value )
self . compression_method = value
elif name == ' hash_function ' and value != self . hash_function :
self . logger . warning ( " Ignoring --hash= %s argument, using previously chosen hash function %r instead " ,
self . hash_function , value )
self . hash_function = value
def __select_compress_method ( self , options , silent ) : # {{{3
valid_formats = self . compressors . keys ( )
selected_format = self . compression_method . lower ( )
if selected_format not in valid_formats :
self . logger . warning ( " Invalid compression format ` %s ' selected! " , selected_format )
selected_format = ' none '
if selected_format != ' none ' :
if not silent :
self . logger . debug ( " Using the %s compression method. " , selected_format )
# My custom LZO binding defines set_block_size() which enables
# optimizations like preallocating a buffer that can be reused for
# every call to compress() and decompress().
if selected_format == ' lzo ' :
module = __import__ ( ' lzo ' )
if hasattr ( module , ' set_block_size ' ) :
module . set_block_size ( self . block_size )
self . compress , self . decompress = self . compressors [ selected_format ]
def __write_blocks ( self , inode , buf , apparent_size ) : # {{{3
start_time = time . time ( )
# Delete existing index entries for file.
self . conn . execute ( ' DELETE FROM " index " WHERE inode = ? ' , ( inode , ) )
# Store any changed blocks and rebuild the file index.
storage_size = len ( buf )
for block_nr in xrange ( int ( math . ceil ( storage_size / float ( self . block_size ) ) ) ) :
buf . seek ( self . block_size * block_nr , os . SEEK_SET )
new_block = buf . read ( self . block_size )
digest = self . __hash ( new_block )
2019-07-08 05:01:07 +00:00
encoded_digest = digest . encode ( ' hex ' ) # sqlite3.Binary(digest)
2019-06-14 09:51:59 +00:00
row = self . conn . execute ( ' SELECT id FROM hashes WHERE hash = ? ' , ( encoded_digest , ) ) . fetchone ( )
if row :
hash_id = row [ 0 ]
2019-06-17 09:31:47 +00:00
existing_block = self . decompress ( self . __get_block_from_telegram ( digest . encode ( ' hex ' ) ) )
2019-06-14 09:51:59 +00:00
# Check for hash collisions.
if new_block != existing_block :
# Found a hash collision: dump debugging info and exit.
dumpfile_collision = ' /tmp/dedupfs-collision- %i ' % time . time ( )
handle = open ( dumpfile_collision , ' w ' )
handle . write ( ' Content of existing block is %r . \n ' % existing_block )
handle . write ( ' Content of new block is %r . \n ' % new_block )
handle . close ( )
self . logger . critical (
" Found a hash collision on block number %i of inode %i ! \n " + \
" The existing block is %i bytes and hashes to %s . \n " + \
" The new block is %i bytes and hashes to %s . \n " + \
" Saved existing and conflicting data blocks to %r . " ,
block_nr , inode , len ( existing_block ) , digest ,
len ( new_block ) , digest , dumpfile_collision )
os . _exit ( 1 )
self . conn . execute ( ' INSERT INTO " index " (inode, hash_id, block_nr) VALUES (?, ?, ?) ' ,
( inode , hash_id , block_nr ) )
else :
2019-07-08 05:01:07 +00:00
2019-07-08 09:01:30 +00:00
FIFO_PIPE = str ( ' upipe_ ' + digest . encode ( ' hex ' ) )
2019-07-08 05:01:07 +00:00
try :
os . mkfifo ( FIFO_PIPE )
except OSError as oe :
if oe . errno != errno . EEXIST :
2019-07-08 09:01:30 +00:00
process = Popen ( [ " python3.6 " , " download_service.py " , " upload " , digest . encode ( ' hex ' ) ] , bufsize = - 1 )
2019-07-08 05:01:07 +00:00
with open ( FIFO_PIPE , ' wb ' ) as pipe :
os . unlink ( FIFO_PIPE )
pipe . write ( self . compress ( new_block ) )
2019-07-10 08:37:24 +00:00
# if callable(getattr(pipe, 'flush', None)):
# pipe.flush()
2019-06-14 09:51:59 +00:00
process . wait ( )
# self.blocks[digest] = self.compress(new_block)
self . conn . execute ( ' INSERT INTO hashes (id, hash) VALUES (NULL, ?) ' , ( encoded_digest , ) )
self . conn . execute ( ' INSERT INTO " index " (inode, hash_id, block_nr) VALUES (?, last_insert_rowid(), ?) ' ,
( inode , block_nr ) )
# Check that the data was properly stored in the database?
self . __verify_write ( new_block , digest , block_nr , inode )
block_nr + = 1
# Update file size and last modified time.
self . conn . execute ( ' UPDATE inodes SET size = ?, mtime = ? WHERE inode = ? ' ,
( apparent_size , self . __newctime ( ) , inode ) )
self . time_spent_writing_blocks + = time . time ( ) - start_time
def __insert ( self , path , mode , size , rdev = 0 ) : # {{{3
parent , name = os . path . split ( path )
parent_id , parent_ino = self . __path2keys ( parent )
nlinks = mode & stat . S_IFDIR and 2 or 1
t = self . __newctime ( )
uid , gid = self . __getctx ( )
self . conn . execute (
' INSERT INTO inodes (nlinks, mode, uid, gid, rdev, size, atime, mtime, ctime) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ' ,
( nlinks , mode , uid , gid , rdev , size , t , t , t ) )
inode = self . __fetchval ( ' SELECT last_insert_rowid() ' )
string_id = self . __intern ( name )
self . conn . execute ( ' INSERT INTO tree (parent_id, name, inode) VALUES (?, ?, ?) ' , ( parent_id , string_id , inode ) )
node_id = self . __fetchval ( ' SELECT last_insert_rowid() ' )
self . __cache_set ( path , ( node_id , inode ) )
return inode , parent_ino
def __intern ( self , string ) : # {{{3
start_time = time . time ( )
args = ( sqlite3 . Binary ( string ) , )
result = self . conn . execute ( ' SELECT id FROM strings WHERE value = ? ' , args ) . fetchone ( )
if not result :
self . conn . execute ( ' INSERT INTO strings (id, value) VALUES (NULL, ?) ' , args )
result = self . conn . execute ( ' SELECT last_insert_rowid() ' ) . fetchone ( )
self . time_spent_interning + = time . time ( ) - start_time
return int ( result [ 0 ] )
def __remove ( self , path , check_empty = False ) : # {{{3
node_id , inode = self . __path2keys ( path )
# Make sure directories are empty before deleting them to avoid orphaned inodes.
query = """ SELECT COUNT(t.id) FROM tree t, inodes i WHERE
t . parent_id = ? AND i . inode = t . inode AND i . nlinks > 0 """
if check_empty and self . __fetchval ( query , node_id ) > 0 :
raise OSError , ( errno . ENOTEMPTY , os . strerror ( errno . ENOTEMPTY ) , path )
self . __cache_set ( path , None )
self . conn . execute ( ' DELETE FROM tree WHERE id = ? ' , ( node_id , ) )
self . conn . execute ( ' UPDATE inodes SET nlinks = nlinks - 1 WHERE inode = ? ' , ( inode , ) )
# Inodes with nlinks = 0 are purged periodically from __collect_garbage() so
# we don't have to do that here.
if self . __fetchval ( ' SELECT mode FROM inodes where inode = ? ' , inode ) & stat . S_IFDIR :
parent_id , parent_ino = self . __path2keys ( os . path . split ( path ) [ 0 ] )
self . conn . execute ( ' UPDATE inodes SET nlinks = nlinks - 1 WHERE inode = ? ' , ( parent_ino , ) )
def __verify_write ( self , block , digest , block_nr , inode ) : # {{{3
if self . verify_writes :
2019-06-17 09:31:47 +00:00
saved_value = self . decompress ( self . __get_block_from_telegram ( digest . encode ( ' hex ' ) ) )
2019-06-14 09:51:59 +00:00
if saved_value != block :
# The data block was corrupted when it was written or read.
dumpfile_corruption = ' /tmp/dedupfs-corruption- %i ' % time . time ( )
handle = open ( dumpfile_corruption , ' w ' )
handle . write ( ' The content that should have been stored is %r . \n ' % block )
handle . write ( ' The content that was retrieved from the database is %r . \n ' % saved_value )
handle . close ( )
self . logger . critical (
" Failed to verify data with block number %i of inode %i ! \n " + \
" Saved original and corrupted data blocks to %i . " ,
block_nr , inode , dumpfile_corruption )
os . _exit ( 1 )
def __access ( self , inode , flags ) : # {{{3
# Check if the flags include writing while the database is read only.
if self . read_only and flags & os . W_OK :
return False
# Get the path's mode, owner and group through the inode.
query = ' SELECT mode, uid, gid FROM inodes WHERE inode = ? '
attrs = self . conn . execute ( query , ( inode , ) ) . fetchone ( )
# Determine by whom the request is being made.
uid , gid = self . __getctx ( )
o = uid == attrs [ ' uid ' ] # access by same user id?
g = gid == attrs [ ' gid ' ] and not o # access by same group id?
# Note: "and not o" added after experimenting with EXT4.
w = not ( o or g ) # anything else
m = attrs [ ' mode ' ]
# The essence of UNIX file permissions. Did I miss anything?! (Probably...)
return ( not ( flags & os . R_OK ) or ( ( o and ( m & 0400 ) ) or ( g and ( m & 0040 ) ) or ( w and ( m & 0004 ) ) ) ) \
and ( not ( flags & os . W_OK ) or ( ( o and ( m & 0200 ) ) or ( g and ( m & 0020 ) ) or ( w and ( m & 0002 ) ) ) ) \
and ( not ( flags & os . X_OK ) or ( ( o and ( m & 0100 ) ) or ( g and ( m & 0010 ) ) or ( w and ( m & 0001 ) ) ) )
def __path2keys ( self , path ) : # {{{3
node_id , inode = 1 , 1
if path == ' / ' :
return node_id , inode
start_time = time . time ( )
node = self . cached_nodes
parent_id = node_id
for segment in self . __split_segments ( path ) :
if segment in node :
node = node [ segment ]
node [ self . __NODE_KEY_LAST_USED ] = start_time
node_id , inode = node [ self . __NODE_KEY_VALUE ]
else :
query_start_time = time . time ( )
query = ' SELECT t.id, t.inode FROM tree t, strings s WHERE t.parent_id = ? AND t.name = s.id AND s.value = ? LIMIT 1 '
result = self . conn . execute ( query , ( parent_id , sqlite3 . Binary ( segment ) ) ) . fetchone ( )
self . time_spent_querying_tree + = time . time ( ) - query_start_time
if result == None :
self . __cache_check_gc ( )
self . time_spent_traversing_tree + = time . time ( ) - start_time
raise OSError , ( errno . ENOENT , os . strerror ( errno . ENOENT ) , path )
node_id , inode = result
new_node = { self . __NODE_KEY_VALUE : ( node_id , inode ) , self . __NODE_KEY_LAST_USED : start_time }
node [ segment ] = new_node
node = new_node
parent_id = node_id
self . __cache_check_gc ( )
self . time_spent_traversing_tree + = time . time ( ) - start_time
return node_id , inode
def __cache_set ( self , key , value ) : # {{{3
segments = self . __split_segments ( key )
last_segment = segments . pop ( - 1 )
node = self . cached_nodes
time_now = time . time ( )
for segment in segments :
# Check that the keys of the sub path have been cached.
if segment not in node :
self . time_spent_caching_nodes + = time . time ( ) - time_now
return False
# Resolve the next path segment.
node = node [ segment ]
# Update the last used time of the sub path.
node [ self . __NODE_KEY_LAST_USED ] = time_now
if not value :
# Delete the path's keys.
if last_segment in node :
del node [ last_segment ]
elif last_segment not in node :
# Create the path's keys.
node [ last_segment ] = { self . __NODE_KEY_VALUE : value , self . __NODE_KEY_LAST_USED : time_now }
else :
# Update the path's keys.
node = node [ last_segment ]
node [ self . __NODE_KEY_VALUE ] = value
node [ self . __NODE_KEY_LAST_USED ] = time_now
self . __cache_check_gc ( )
self . time_spent_caching_nodes + = time . time ( ) - time_now
return True
def __cache_check_gc ( self ) : # {{{3
self . cache_requests + = 1
if self . cache_requests > = 2500 :
time_now = time . time ( )
if time_now - self . cache_gc_last_run > = self . cache_timeout :
self . __cache_do_gc ( self . cached_nodes , time_now )
self . cache_gc_last_run = time_now
self . cache_requests = 0
def __cache_do_gc ( self , node , time_now ) : # {{{3
for key in node . keys ( ) :
child = node [ key ]
if isinstance ( child , dict ) :
last_used = time_now - child [ self . __NODE_KEY_LAST_USED ]
if last_used > self . cache_timeout :
del node [ key ]
else :
self . __cache_do_gc ( child , time_now )
def __split_segments ( self , key ) : # {{{3
return filter ( None , key . split ( ' / ' ) )
def __newctime ( self ) : # {{{3
return time . time ( )
def __getctx ( self ) : # {{{3
c = fuse . FuseGetContext ( )
return ( c [ ' uid ' ] , c [ ' gid ' ] )
def __hash ( self , data ) : # {{{3
start_time = time . time ( )
context = self . hash_function_impl ( )
context . update ( data )
digest = context . digest ( )
self . time_spent_hashing + = time . time ( ) - start_time
return digest
def __print_stats ( self ) : # {{{3
self . logger . info ( ' - ' * 79 )
self . __report_memory_usage ( )
self . __report_throughput ( )
self . __report_timings ( )
def __report_timings ( self ) : # {{{3
if self . logger . isEnabledFor ( logging . DEBUG ) :
timings = [ ( self . time_spent_traversing_tree , ' Traversing the tree ' ) ,
( self . time_spent_caching_nodes , ' Caching tree nodes ' ) ,
( self . time_spent_interning , ' Interning path components ' ) ,
( self . time_spent_writing_blocks , ' Writing data blocks ' ) ,
( self . time_spent_hashing , ' Hashing data blocks ' ) ,
( self . time_spent_querying_tree , ' Querying the tree ' ) ]
maxdescwidth = max ( [ len ( l ) for t , l in timings ] ) + 3
timings . sort ( reverse = True )
uptime = time . time ( ) - self . fs_mounted_at
printed_heading = False
for timespan , description in timings :
percentage = timespan / ( uptime / 100 )
if percentage > = 1 :
if not printed_heading :
self . logger . debug ( " Cumulative timings of slowest operations: " )
printed_heading = True
self . logger . debug (
" - %-*s %s ( %i %% ) " % ( maxdescwidth , description + ' : ' , format_timespan ( timespan ) , percentage ) )
def report_disk_usage ( self ) : # {{{3
disk_usage = self . __fetchval ( ' PRAGMA page_size ' ) * self . __fetchval ( ' PRAGMA page_count ' )
disk_usage + = os . stat ( self . datastore_file ) . st_size
apparent_size = self . __fetchval ( ' SELECT SUM(inodes.size) FROM tree, inodes WHERE tree.inode = inodes.inode ' )
self . logger . info ( " The total apparent size is %s while the databases take up %s (that ' s %.2f %% ). " ,
format_size ( apparent_size ) , format_size ( disk_usage ) , float ( disk_usage ) / ( apparent_size / 100 ) )
def __report_memory_usage ( self ) : # {{{3
memory_usage = get_memory_usage ( )
msg = " Current memory usage is " + format_size ( memory_usage )
difference = abs ( memory_usage - self . memory_usage )
if self . memory_usage != 0 and difference :
direction = self . memory_usage < memory_usage and ' up ' or ' down '
msg + = " ( %s by %s ) " % ( direction , format_size ( difference ) )
self . logger . info ( msg + ' . ' )
self . memory_usage = memory_usage
def __report_throughput ( self , nbytes = None , nseconds = None , label = None ) : # {{{3
if nbytes == None :
self . bytes_read , self . time_spent_reading = \
self . __report_throughput ( self . bytes_read , self . time_spent_reading , " read " )
self . bytes_written , self . time_spent_writing = \
self . __report_throughput ( self . bytes_written , self . time_spent_writing , " write " )
else :
if nbytes > 0 :
average = format_size ( nbytes / max ( 1 , nseconds ) )
self . logger . info ( " Average %s speed is %s /s. " , label , average )
# Decrease the influence of previous measurements over time?
if nseconds > 60 and nbytes > 1024 * * 2 :
return nbytes / 2 , nseconds / 2
return nbytes , nseconds
def __report_top_blocks ( self ) : # {{{3
query = """
SELECT * , COUNT ( * ) AS " count " FROM " index "
GROUP BY hash_id ORDER BY " count " DESC
) , hashes WHERE
" count " > 1 AND
hash_id = hashes . id
LIMIT 10 """
if self . logger . isEnabledFor ( logging . DEBUG ) :
printed_header = False
for row in self . conn . execute ( query ) :
if not printed_header :
self . logger . debug ( " A listing of the most used blocks follows: " )
printed_header = True
msg = " Block # %s of %s has been used %i times: %r "
preview = row [ ' value ' ]
max_length = 60
if len ( preview ) < max_length :
preview = str ( preview )
else :
preview = preview [ 0 : max_length ] + ' ... '
nbytes = format_size ( len ( row [ ' value ' ] ) )
self . logger . debug ( msg , row [ ' hash_id ' ] , nbytes , row [ ' count ' ] , preview )
def __gc_hook ( self , nested = False ) : # {{{3
# Don't collect any garbage for nested calls.
if not nested :
# Don't call time.time() more than once every 500th FUSE call.
self . opcount + = 1
if self . opcount % 500 == 0 :
# Every minute the other statistics are reported and garbage
# collection is performed when it isn't disabled.
if time . time ( ) - self . gc_hook_last_run > = self . gc_interval :
self . __collect_garbage ( )
self . __print_stats ( )
self . gc_hook_last_run = time . time ( )
def __collect_garbage ( self ) : # {{{3
if self . gc_enabled and not self . read_only :
start_time = time . time ( )
self . logger . info ( " Performing garbage collection (this might take a while) .. " )
self . should_vacuum = False
for method in self . __collect_strings , self . __collect_inodes , \
self . __collect_indices , self . __collect_blocks , self . __vacuum_metastore :
sub_start_time = time . time ( )
msg = method ( )
if msg :
elapsed_time = time . time ( ) - sub_start_time
self . logger . info ( msg , format_timespan ( elapsed_time ) )
elapsed_time = time . time ( ) - start_time
self . logger . info ( " Finished garbage collection in %s . " , format_timespan ( elapsed_time ) )
def __collect_strings ( self ) : # {{{4
count = self . conn . execute ( ' DELETE FROM strings WHERE id NOT IN (SELECT name FROM tree) ' ) . rowcount
if count > 0 :
self . should_vacuum = True
return " Cleaned up %i unused path segment %s in %% s. " % ( count , count != 1 and ' s ' or ' ' )
def __collect_inodes ( self ) : # {{{4
count = self . conn . execute ( ' DELETE FROM inodes WHERE nlinks = 0 ' ) . rowcount
if count > 0 :
self . should_vacuum = True
return " Cleaned up %i unused inode %s in %% s. " % ( count , count != 1 and ' s ' or ' ' )
def __collect_indices ( self ) : # {{{4
count = self . conn . execute ( ' DELETE FROM " index " WHERE inode NOT IN (SELECT inode FROM inodes) ' ) . rowcount
if count > 0 :
self . should_vacuum = True
return " Cleaned up %i unused index entr %s in %% s. " % ( count , count != 1 and ' ies ' or ' y ' )
def __collect_blocks ( self ) : # {{{4
should_reorganize = False
for row in self . conn . execute ( ' SELECT hash FROM hashes WHERE id NOT IN (SELECT hash_id FROM " index " ) ' ) :
# del self.blocks[str(row[0])]
should_reorganize = True
if should_reorganize :
self . __dbmcall ( ' reorganize ' )
count = self . conn . execute ( ' DELETE FROM hashes WHERE id NOT IN (SELECT hash_id FROM " index " ) ' ) . rowcount
if count > 0 :
self . should_vacuum = True
return " Cleaned up %i unused data block %s in %% s. " % ( count , count != 1 and ' s ' or ' ' )
def __vacuum_metastore ( self ) : # {{{4
if self . should_vacuum :
self . conn . execute ( ' VACUUM ' )
return " Vacuumed SQLite metadata store in %s . "
def __commit_changes ( self , nested = False ) : # {{{3
if self . use_transactions and not nested :
self . conn . commit ( )
def __rollback_changes ( self , nested = False ) : # {{{3
if self . use_transactions and not nested :
self . logger . info ( ' Rolling back changes ' )
self . conn . rollback ( )
2019-06-17 09:31:47 +00:00
def __get_block_from_telegram ( self , digest ) :
2019-07-08 09:01:30 +00:00
buf = tempfile . NamedTemporaryFile ( )
process = Popen ( [ " python3.6 " , " download_service.py " , " download " , str ( digest ) , str ( buf . name ) ] , bufsize = - 1 )
2019-06-14 09:51:59 +00:00
process . wait ( )
2019-07-08 09:01:30 +00:00
buf . seek ( 0 )
block = buf . read ( )
buf . close ( )
2019-06-14 09:51:59 +00:00
return block
def __get_file_buffer ( self , path ) : # {{{3
if path in self . buffers :
return self . buffers [ path ]
else :
buf = Buffer ( )
inode = self . __path2keys ( path ) [ 1 ]
query = """ SELECT h.hash FROM hashes h, " index " i
WHERE i . inode = ? AND h . id = i . hash_id
ORDER BY i . block_nr ASC """
for row in self . conn . execute ( query , ( inode , ) ) . fetchall ( ) :
# TODO Make the file system more robust against failure by doing
# something sensible when self.blocks.has_key(digest) is false.
2019-06-17 09:31:47 +00:00
buf . write ( self . decompress ( self . __get_block_from_telegram ( str ( row [ 0 ] ) ) ) )
2019-06-14 09:51:59 +00:00
self . buffers [ path ] = buf
return buf
def __fetchval ( self , query , * values ) : # {{{3
return self . conn . execute ( query , values ) . fetchone ( ) [ 0 ]
def __except_to_status ( self , method , exception , code = errno . ENOENT ) : # {{{3
# Don't report ENOENT raised from getattr().
if method != ' getattr ' or code != errno . ENOENT :
sys . stderr . write ( ' %s \n ' % ( ' - ' * 50 ) )
sys . stderr . write ( " Caught exception in %s (): %s \n " % ( method , exception ) )
traceback . print_exc ( file = sys . stderr )
sys . stderr . write ( ' %s \n ' % ( ' - ' * 50 ) )
sys . stderr . write ( " Returning %i \n " % - code )
sys . stderr . flush ( )
# Convert the exception to a FUSE error code.
if isinstance ( exception , OSError ) :
return - exception . errno
else :
return - code
class Buffer : # {{{1
This class wraps cStringIO . StringIO with two additions : The __len__
method and a dirty flag to determine whether a buffer has changed .
def __init__ ( self ) :
self . buf = cStringIO . StringIO ( )
self . dirty = False
def __getattr__ ( self , attr , default = None ) :
""" Delegate to the StringIO object. """
return getattr ( self . buf , attr , default )
def __len__ ( self ) :
""" Get the total size of the buffer in bytes. """
position = self . buf . tell ( )
self . buf . seek ( 0 , os . SEEK_END )
length = self . buf . tell ( )
self . buf . seek ( position , os . SEEK_SET )
return length
def truncate ( self , * args ) :
""" Truncate the file at the current position and set the dirty flag. """
if len ( self ) > self . buf . tell ( ) :
self . dirty = True
return self . buf . truncate ( * args )
def write ( self , * args ) :
""" Write a string to the file and set the dirty flag. """
self . dirty = True
return self . buf . write ( * args )
# Named tuples used to return complex objects to FUSE. {{{1
try :
import collections
Stat = collections . namedtuple ( ' Stat ' , ' st_atime st_blksize st_blocks \
st_ctime st_dev st_gid st_ino st_mode st_mtime st_nlink st_rdev \
st_size st_uid ' )
StatVFS = collections . namedtuple ( ' StatVFS ' , ' f_bavail f_bfree f_blocks \
f_bsize f_favail f_ffree f_files f_flag f_frsize f_namemax ' )
except ImportError :
# Fall back to regular Python classes instead of named tuples.
class __Struct :
def __init__ ( self , * * kw ) :
for k , v in kw . iteritems ( ) :
setattr ( self , k , v )
class Stat ( __Struct ) :
class StatVFS ( __Struct ) :
# }}}1
if __name__ == ' __main__ ' :
if ' --profile ' in sys . argv :
sys . stderr . write ( " Enabling profiling.. \n " )
import cProfile , pstats
profile = ' .dedupfs.cprofile- %i ' % time . time ( )
cProfile . run ( ' main() ' , profile )
sys . stderr . write ( " \n Profiling statistics: \n \n " )
s = pstats . Stats ( profile )
s . sort_stats ( ' time ' )
s . print_stats ( 0.1 )
os . unlink ( profile )
else :
main ( )
# vim: ts=2 sw=2 et