File copy progress python

How to Move/Copy a File or Directory (Folder) with a Progress Bar in Python

In the last article, titled How to Recursively Copy a Folder (Directory) in Python, I covered how to copy a folder recursively from one place to another.

As useful as that is, there is still something else that we could add that would make copying a directory much more user friendly!

Enter the progress bar from stage left. It’s a useful utility that all big name software uses to tell the user something is happening and where it is with the task it is doing.

So how do we incorporate this into copying a directory? Glad you asked, my question filled friend!

The Progress Bar

First, we’ll make a simple class to make printing progress 100 times easier as well as give us a nice reusable class to use in other projects that might need it.

Here we go. First we’ll add the __init__ method:

# -*- coding: utf-8 -*- import sys class ProgressBar(object): def __init__(self, message, width=20, progressSymbol=u'▣ ', emptySymbol=u'□ '): self.width = width if self.width < 0: self.width = 0 self.message = message self.progressSymbol = progressSymbol self.emptySymbol = emptySymbol

That wasn’t too hard! We pass in the message we want to print, the width of the progress bar, and the two symbols for progress done and empty progress. We check that the width is always greater that or equal to zero, because we can’t have a negative length! 🙂

Alright, on to the hard part. We have to figure out how to update and render the progress bar to the output window without printing a new line every single time.

That’s where the carriage return ( \r ) comes in. The carriage return is a special character that allows the printed message to start at the beginning of a line when printed. Every time we print something, we’ll just use it at the beginning of our line and it should take care of our needs just fine. The only limitation this has is that if the progress bar happens to wrap in the terminal, the carriage return will not work as expected and will just output a new line.

Here is our update function that will print the updated progress:

def update(self, progress): totalBlocks = self.width filledBlocks = int(round(progress / (100 / float(totalBlocks)) )) emptyBlocks = totalBlocks - filledBlocks progressBar = self.progressSymbol * filledBlocks + \ self.emptySymbol * emptyBlocks if not self.message: self.message = u'' progressMessage = u'\r  %'.format(self.message, progressBar, progress) sys.stdout.write(progressMessage) sys.stdout.flush() def calculateAndUpdate(self, done, total): progress = int(round( (done / float(total)) * 100) ) self.update(progress)

What we just did is calculate the number of filled blocks using the total blocks needed. It looks a little complicated, so let me break it down. We want the number of filled blocks to be the progress divided by 100% divided by the total number of blocks. The calculateAndUpdate function is just to make our life a little easier. All it does is calculate and print the percentage done given the current number of items and the total number of items.

filledBlocks = int(round(progress / (100 / float(totalBlocks)) ))

That call to float is there to make sure the calculation is done with floating point precision, and then we round up the floating point number with round and make it an integer with int .

emptyBlocks = totalBlocks - filledBlocks

Then the empty blocks are simply the totalBlocks minus the filledBlocks.

progressBar = self.progressSymbol * filledBlocks + \ self.emptySymbol * emptyBlocks

We take the number of filled blocks and multiply it by the progressSymbol and add it to the emptySymbol multiplied by the number of empty blocks.

progressMessage = u'\r  %'.format(self.message, progressBar, progress)

We use the carriage return at the beginning of the message to set the print position to the beginning of the line and make a message formatted like: “Message! ▣ ▣ ▣ □ □ □ 50%”.

sys.stdout.write(progressMessage) sys.stdout.flush()

And, finally, we print the message to the screen. Why don’t we just use the print function? Because the print function adds a new line to the end of our message and that’s not what we want. We want it printed just as we set it.

Читайте также:  Css flex width auto

On with the show! To the copying a directory function!

Copying a Directory (Folder) with Progress in Python

In order to know our progress, we’ll have to sacrifice some speed. We need to count all of the files that we’re copying to get the total amount of progress left, and this requires us to recursively search the directory and count all of the files.

Let’s write a function to do that.

import os def countFiles(directory): files = [] if os.path.isdir(directory): for path, dirs, filenames in os.walk(directory): files.extend(filenames) return len(files)

Pretty straight forward. We just checked if the directory passed in is a directory, then if it is, recursively walk the directories and add the files in the directories to the files list. Then we just return the length of the list. Simple.

Next is the copying directories function. If you looked at the article mentioned at the top of this article, you’ll notice that I used the shutil.copytree function. Here we can’t do that because shutil doesn’t support progress updates, so we’ll have to write our own copying function.

First we create an instance of our ProgressBar class.

p = ProgressBar('Copying files. ')

Next, we define a function to make directories that don’t exist yet. This will allow us to create the directory structure of the source directory.

def makedirs(dest): if not os.path.exists(dest): os.makedirs(dest)

If the directory doesn’t exist, we create it.

Now for the copy function. This one is a bit of a doozy, so I’ll put it all down in one place, then break it down.

import shutil def copyFilesWithProgress(src, dest): numFiles = countFiles(src) if numFiles > 0: makedirs(dest) numCopied = 0 for path, dirs, filenames in os.walk(src): for directory in dirs: destDir = path.replace(src,dest) makedirs(os.path.join(destDir, directory)) for sfile in filenames: srcFile = os.path.join(path, sfile) destFile = os.path.join(path.replace(src, dest), sfile) shutil.copy(srcFile, destFile) numCopied += 1 p.calculateAndUpdate(numCopied, numFiles) print

Firstly, we count the number of files.

def copyFilesWithProgress(src, dest): numFiles = countFiles(src)

Nothing too difficult about that.

Читайте также:  Linking css sheet to html

Next, if there are actually files to copy, we create the destination folder if it doesn’t exist and initialize a variable to count the current number of files copied.

if numFiles > 0: makedirs(dest) numCopied = 0

Then, we walk the directory tree and create all the directories needed.

for path, dirs, filenames in os.walk(src): for directory in dirs: destDir = path.replace(src, dest) makedirs(os.path.join(destDir, directory))

The only tricky part here is that I replaced the source directory string in path, which is the root path, with the destination folder path.

Next, we copy files and update progress!

for path, dirs, filenames in os.walk(src): (. ) for sfile in filenames: srcFile = os.path.join(path, sfile) destFile = os.path.join(path.replace(src, dest), sfile) shutil.copy(srcFile, destFile) numCopied += 1 p.calculateAndUpdate(numCopied, numFiles)

Here, we go through all the files, copy them, update the number of files copied so far, and then draw our progress bar.

That’s it! We’re done! Now you have a nice copying function that will alert you of the progress while copying files.

Moving a Directory (Folder) with Progress in Python

As a note, you may also change the shutil.copy function call in copyFilesWithProgress to shutil.move and have a move progress bar instead. All the other code would be the same, so I’m not going to rewrite it here. I’ll leave that up to you! 🙂

Until the next time, I bid you adieu.

Источник

monstermunchkin / cp_with_prog.py

This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters

Читайте также:  Button событие onclick html
#!/usr/bin/python
# Copy function with progress display using a callback function.
# The callback function used here mimics the output of
# 'rsync --progress'.
# Copyright (C) 2012 Thomas Hipp
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import datetime
import os
def copy ( src , dst , callback = None ):
blksize = 1048576 # 1MiB
try :
s = open ( src , 'rb' )
d = open ( dst , 'wb' )
except ( KeyboardInterrupt , Exception ) as e :
if 's' in locals ():
s . close ()
if 'd' in locals ():
d . close ()
raise
try :
total = os . stat ( src ). st_size
pos = 0
start_elapsed = datetime . datetime . now ()
start_update = datetime . datetime . now ()
while True :
buf = s . read ( blksize )
bytes_written = d . write ( buf )
end = datetime . datetime . now ()
pos += bytes_written
diff = end - start_update
if callback and diff . total_seconds () >= 0.2 :
callback ( pos , total , end - start_elapsed )
start_update = datetime . datetime . now ()
if bytes_written < len ( buf ) or bytes_written == 0 :
break
except ( KeyboardInterrupt , Exception ) as e :
s . close ()
d . close ()
raise
else :
callback ( total , total , end - start_elapsed )
s . close ()
d . close ()
def tmstr ( t ):
days , rest = divmod ( t , 86400 )
hours , rest = divmod ( rest , 3600 )
minutes , seconds = divmod ( rest , 60 )
return '::' . format ( int ( days * 24 + hours ),
int ( minutes ), round ( seconds ))
mod = 101
speed = [ 0 ] * mod
index = 0
def progress ( pos , total , elapsed ):
global speed
global index
global mod
elapsed_ = elapsed . total_seconds ()
speed [ index % mod ] = pos / elapsed_
index = ( index + 1 ) % mod
out = ' B/s '
unit = ( 'Mi' , 1048576 ) if total > 999999 else ( 'ki' , 1024 )
# complete
if pos == total :
print ( out . format ( pos , pos / total , sum ( speed ) / mod / unit [ 1 ],
tmstr ( elapsed_ ), unit = unit [ 0 ]))
# in progress
else :
print ( out . format ( pos , pos / total , sum ( speed ) / mod / unit [ 1 ],
tmstr (( total - pos ) / sum ( speed ) * mod ), unit = unit [ 0 ]), end = '' )
print ( ' \r ' )

Источник

Оцените статью