您好, 欢迎来到 !    登录 | 注册 | | 设为首页 | 收藏本站

python支持断点续传的多线程下载示例

5b51 2022/1/14 8:17:53 python 字数 10374 阅读 343 来源 www.jb51.cc/python

复制代码代码如下:#!/usr/bin/envpython#coding=utf-8 from__future__importunicode_literals frommultiprocessing.dummyimportPoolasThreadPoolimportthreading

概述

from __future__ import unicode_literals

from multiprocessing.dummy import Pool as ThreadPool
import threading

import os
import sys
import cPickle
from collections import namedtuple
import urllib2
from urlparse import urlsplit

import time


# global lock
lock = threading.Lock()


# default parameters
defaults = dict(thread_count=10,
    buffer_size=10*1024,
    block_size=1000*1024)


def progress(percent,width=50):
    print "%s %d%%\r" % (('%%-%ds' % width) % (width * percent / 100 * '='),percent),
    if percent >= 100:
        print
        sys.stdout.flush()


def write_data(filepath,data):
    with open(filepath,'wb') as output:
        cPickle.dump(data,output)


def read_data(filepath):
    with open(filepath,'rb') as output:
        return cPickle.load(output)


FileInfo = namedtuple('FileInfo','url name size lastmodified')


def get_file_info(url):
    class HeadRequest(urllib2.Request):
        def get_method(self):
            return "HEAD"
    res = urllib2.urlopen(HeadRequest(url))
    res.read()
    headers = dict(res.headers)
    size = int(headers.get('content-length',0))
    lastmodified = headers.get('last-modified','')
    name = None
    if headers.has_key('content-disposition'):
        name = headers['content-disposition'].split('filename=')[1]
        if name[0] == '"' or name[0] == "'":
            name = name[1:-1]
    else:
        name = os.path.basename(urlsplit(url)[2])

    return FileInfo(url,name,size,lastmodified)


def download(url,output,
        thread_count = defaults['thread_count'],
        buffer_size = defaults['buffer_size'],
        block_size = defaults['block_size']):
    # get latest file info
    file_info = get_file_info(url)

    # init path
    if output is None:
        output = file_info.name
    workpath = '%s.ing' % output
    infopath = '%s.inf' % output

    # split file to blocks. every block is a array [start,offset,end],
    # then each greenlet download filepart according to a block,and
    # update the block' offset.
    blocks = []

    if os.path.exists(infopath):
        # load blocks
        _x,blocks = read_data(infopath)
        if (_x.url != url or
                _x.name != file_info.name or
                _x.lastmodified != file_info.lastmodified):
            blocks = []

    if len(blocks) == 0:
        # set blocks
        if block_size > file_info.size:
            blocks = [[0,file_info.size]]
        else:
            block_count,remain = divmod(file_info.size,block_size)
            blocks = [[i*block_size,i*block_size,(i+1)*block_size-1] for i in range(block_count)]
            blocks[-1][-1] += remain
        # create new blank workpath
        with open(workpath,'wb') as fobj:
            fobj.write('')

    print 'Downloading %s' % url
    # start monitor
    threading.Thread(target=_monitor,args=(infopath,file_info,blocks)).start()

    # start downloading
    with open(workpath,'rb+') as fobj:
        args = [(url,blocks[i],fobj,buffer_size) for i in range(len(blocks)) if blocks[i][1] < blocks[i][2]]

        if thread_count > len(args):
            thread_count = len(args)

        pool = ThreadPool(thread_count)
        pool.map(_worker,args)
        pool.close()
        pool.join()


    # rename workpath to output
    if os.path.exists(output):
        os.remove(output)
    os.rename(workpath,output)

    # delete infopath
    if os.path.exists(infopath):
        os.remove(infopath)

    assert all([block[1]>=block[2] for block in blocks]) is True


def _worker((url,block,buffer_size)):
    req = urllib2.Request(url)
    req.headers['Range'] = 'bytes=%s-%s' % (block[1],block[2])
    res = urllib2.urlopen(req)

    while 1:
        chunk = res.read(buffer_size)
        if not chunk:
            break
        with lock:
            fobj.seek(block[1])
            fobj.write(chunk)
            block[1] += len(chunk)


def _monitor(infopath,blocks):
    while 1:
        with lock:
            percent = sum([block[1] - block[0] for block in blocks]) * 100 / file_info.size
            progress(percent)
            if percent >= 100:
                break
            write_data(infopath,(file_info,blocks))
        time.sleep(2)


if __name__ == '__main__':
    import argparse
    parser = argparse.ArgumentParser(description='Download file by multi-threads.')
    parser.add_argument('url',type=str,help='url of the download file')
    parser.add_argument('-o',default=None,dest="output",help='output file')
    parser.add_argument('-t',type=int,default=defaults['thread_count'],dest="thread_count",help='thread counts to downloading')
    parser.add_argument('-b',default=defaults['buffer_size'],dest="buffer_size",help='buffer size')
    parser.add_argument('-s',default=defaults['block_size'],dest="block_size",help='block size')

    argv = sys.argv[1:]

    if len(argv) == 0:
        argv = ['https://eyes.nasa.gov/eyesproduct/EYES/os/win']

    args = parser.parse_args(argv)

    start_time = time.time()
    download(args.url,args.output,args.thread_count,args.buffer_size,args.block_size)
    print 'times: %ds' % int(time.time()-start_time)

总结

以上是编程之家为你收集整理的python支持断点续传的多线程下载示例全部内容,希望文章能够帮你解决python支持断点续传的多线程下载示例所遇到的程序开发问题。


如果您也喜欢它,动动您的小指点个赞吧

除非注明,文章均由 laddyq.com 整理发布,欢迎转载。

转载请注明:
链接:http://laddyq.com
来源:laddyq.com
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。


联系我
置顶