mirror of
https://github.com/SlavikMIPT/tgcloud.git
synced 2025-02-13 03:32:15 +00:00
commit
4a9e815595
1 changed files with 131 additions and 168 deletions
|
@ -1,17 +1,16 @@
|
||||||
|
# TODO TEST TEST AND TEST THIS PROBABLY WON'T WORK/ HOWEVER WHY NOT
|
||||||
|
# NEWEST TELETHON 1.2
|
||||||
|
|
||||||
|
import asyncio
|
||||||
import hashlib
|
import hashlib
|
||||||
|
import io
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
|
|
||||||
|
import telethon.errors as errors
|
||||||
from telethon.crypto import CdnDecrypter
|
from telethon.crypto import CdnDecrypter
|
||||||
from telethon.errors import (
|
from telethon.tl import types, functions, custom
|
||||||
FileMigrateError
|
|
||||||
)
|
|
||||||
from telethon.tl.custom import InputSizedFile
|
|
||||||
from telethon.tl.functions.upload import (
|
|
||||||
SaveBigFilePartRequest, SaveFilePartRequest, GetFileRequest
|
|
||||||
)
|
|
||||||
from telethon.tl.types.upload import FileCdnRedirect
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
import socks
|
import socks
|
||||||
|
@ -25,38 +24,26 @@ try:
|
||||||
except ImportError:
|
except ImportError:
|
||||||
hachoir = None
|
hachoir = None
|
||||||
from telethon import helpers, utils
|
from telethon import helpers, utils
|
||||||
from telethon.tl.types import (InputFile, InputFileBig)
|
|
||||||
|
|
||||||
__log__ = logging.getLogger(__name__)
|
__log__ = logging.getLogger(__name__)
|
||||||
from telethon import TelegramClient
|
from telethon import TelegramClient
|
||||||
from threading import Thread
|
from threading import Thread
|
||||||
import random
|
import random
|
||||||
import time
|
import time
|
||||||
from queue import Queue
|
|
||||||
from telethon.network import ConnectionMode
|
|
||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
|
|
||||||
|
|
||||||
class TelegramClientX(TelegramClient):
|
class TelegramClientX(TelegramClient):
|
||||||
def __init__(self, session, api_id, api_hash,
|
def __init__(self, session, api_id, api_hash,
|
||||||
connection_mode=ConnectionMode.TCP_FULL,
|
|
||||||
use_ipv6=False,
|
use_ipv6=False,
|
||||||
proxy=None,
|
proxy=None,
|
||||||
update_workers=None,
|
|
||||||
timeout=timedelta(seconds=10),
|
timeout=timedelta(seconds=10),
|
||||||
spawn_read_thread=True,
|
):
|
||||||
report_errors=True,
|
|
||||||
**kwargs):
|
|
||||||
super().__init__(
|
super().__init__(
|
||||||
session, api_id, api_hash,
|
session, api_id, api_hash,
|
||||||
connection_mode=connection_mode,
|
|
||||||
use_ipv6=use_ipv6,
|
use_ipv6=use_ipv6,
|
||||||
proxy=proxy,
|
proxy=proxy,
|
||||||
update_workers=update_workers,
|
|
||||||
spawn_read_thread=spawn_read_thread,
|
|
||||||
timeout=timeout,
|
timeout=timeout,
|
||||||
report_errors=report_errors,
|
|
||||||
**kwargs
|
|
||||||
)
|
)
|
||||||
|
|
||||||
self._event_builders = []
|
self._event_builders = []
|
||||||
|
@ -72,18 +59,29 @@ class TelegramClientX(TelegramClient):
|
||||||
# Sometimes we need to know who we are, cache the self peer
|
# Sometimes we need to know who we are, cache the self peer
|
||||||
self._self_input_peer = None
|
self._self_input_peer = None
|
||||||
|
|
||||||
class ProcessUpload(Thread):
|
class ProcessUpload: # TODO fuck all threads
|
||||||
def __init__(self, name, client, q_request=None):
|
name = None
|
||||||
Thread.__init__(self)
|
client = None
|
||||||
|
q_request = None
|
||||||
|
result = None
|
||||||
|
|
||||||
|
async def init(self, name, client, q_request=None):
|
||||||
self.name = name
|
self.name = name
|
||||||
self.client = TelegramClient(client._session_name, client.api_id, client.api_hash, update_workers=None,
|
self.client = TelegramClient(client._session_name, client.api_id, client.api_hash)
|
||||||
spawn_read_thread=False)
|
|
||||||
self.q_request = q_request
|
self.q_request = q_request
|
||||||
self.result = None
|
self.result = None
|
||||||
|
"""
|
||||||
|
def __init__(self, name, client, q_request=None):
|
||||||
|
Thread.__init__()
|
||||||
|
self.name = name
|
||||||
|
self.client = TelegramClient(client._session_name, client.api_id, client.api_hash)
|
||||||
|
self.q_request = q_request
|
||||||
|
self.result = None
|
||||||
|
"""
|
||||||
|
|
||||||
def run(self):
|
async def run(self):
|
||||||
print('Thread %s started' % self.name)
|
print('Async task %s started' % self.name)
|
||||||
time.sleep(random.randrange(200, 2000, 10) * 0.001)
|
asyncio.sleep(random.choice([.001, 0.00001, .002]))
|
||||||
if not self.client.is_connected():
|
if not self.client.is_connected():
|
||||||
self.client.connect()
|
self.client.connect()
|
||||||
while True:
|
while True:
|
||||||
|
@ -92,12 +90,12 @@ class TelegramClientX(TelegramClient):
|
||||||
break
|
break
|
||||||
self.result = None
|
self.result = None
|
||||||
# time.sleep(random.randrange(20, 100, 1) * 0.001)
|
# time.sleep(random.randrange(20, 100, 1) * 0.001)
|
||||||
self.result = self.client.invoke(request)
|
self.result = await self.client(request)
|
||||||
if self.result is False:
|
if self.result is False:
|
||||||
break
|
break
|
||||||
self.q_request.task_done()
|
self.q_request.task_done()
|
||||||
self.client.disconnect()
|
self.client.disconnect()
|
||||||
print('Thread {0} stopped result {1}'.format(self.name, self.result))
|
print('Task {0} stopped result {1}'.format(self.name, self.result))
|
||||||
return
|
return
|
||||||
|
|
||||||
def set_upload_threads_count(self, count: int):
|
def set_upload_threads_count(self, count: int):
|
||||||
|
@ -106,8 +104,8 @@ class TelegramClientX(TelegramClient):
|
||||||
def set_download_threads_count(self, count: int):
|
def set_download_threads_count(self, count: int):
|
||||||
self._download_threads_count = int(count)
|
self._download_threads_count = int(count)
|
||||||
|
|
||||||
def upload_file(self,
|
async def upload_file(self,
|
||||||
file,
|
file, *,
|
||||||
part_size_kb=None,
|
part_size_kb=None,
|
||||||
file_name=None,
|
file_name=None,
|
||||||
use_cache=None,
|
use_cache=None,
|
||||||
|
@ -116,48 +114,43 @@ class TelegramClientX(TelegramClient):
|
||||||
Uploads the specified file and returns a handle (an instance of
|
Uploads the specified file and returns a handle (an instance of
|
||||||
InputFile or InputFileBig, as required) which can be later used
|
InputFile or InputFileBig, as required) which can be later used
|
||||||
before it expires (they are usable during less than a day).
|
before it expires (they are usable during less than a day).
|
||||||
|
|
||||||
Uploading a file will simply return a "handle" to the file stored
|
Uploading a file will simply return a "handle" to the file stored
|
||||||
remotely in the Telegram servers, which can be later used on. This
|
remotely in the Telegram servers, which can be later used on. This
|
||||||
will **not** upload the file to your own chat or any chat at all.
|
will **not** upload the file to your own chat or any chat at all.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
file (`str` | `bytes` | `file`):
|
file (`str` | `bytes` | `file`):
|
||||||
The path of the file, byte array, or stream that will be sent.
|
The path of the file, byte array, or stream that will be sent.
|
||||||
Note that if a byte array or a stream is given, a filename
|
Note that if a byte array or a stream is given, a filename
|
||||||
or its type won't be inferred, and it will be sent as an
|
or its type won't be inferred, and it will be sent as an
|
||||||
"unnamed application/octet-stream".
|
"unnamed application/octet-stream".
|
||||||
|
|
||||||
Subsequent calls with the very same file will result in
|
Subsequent calls with the very same file will result in
|
||||||
immediate uploads, unless ``.clear_file_cache()`` is called.
|
immediate uploads, unless ``.clear_file_cache()`` is called.
|
||||||
|
|
||||||
part_size_kb (`int`, optional):
|
part_size_kb (`int`, optional):
|
||||||
Chunk size when uploading files. The larger, the less
|
Chunk size when uploading files. The larger, the less
|
||||||
requests will be made (up to 512KB maximum).
|
requests will be made (up to 512KB maximum).
|
||||||
|
|
||||||
file_name (`str`, optional):
|
file_name (`str`, optional):
|
||||||
The file name which will be used on the resulting InputFile.
|
The file name which will be used on the resulting InputFile.
|
||||||
If not specified, the name will be taken from the ``file``
|
If not specified, the name will be taken from the ``file``
|
||||||
and if this is not a ``str``, it will be ``"unnamed"``.
|
and if this is not a ``str``, it will be ``"unnamed"``.
|
||||||
|
|
||||||
use_cache (`type`, optional):
|
use_cache (`type`, optional):
|
||||||
The type of cache to use (currently either ``InputDocument``
|
The type of cache to use (currently either ``InputDocument``
|
||||||
or ``InputPhoto``). If present and the file is small enough
|
or ``InputPhoto``). If present and the file is small enough
|
||||||
to need the MD5, it will be checked against the database,
|
to need the MD5, it will be checked against the database,
|
||||||
and if a match is found, the upload won't be made. Instead,
|
and if a match is found, the upload won't be made. Instead,
|
||||||
an instance of type ``use_cache`` will be returned.
|
an instance of type ``use_cache`` will be returned.
|
||||||
|
|
||||||
progress_callback (`callable`, optional):
|
progress_callback (`callable`, optional):
|
||||||
A callback function accepting two parameters:
|
A callback function accepting two parameters:
|
||||||
``(sent bytes, total)``.
|
``(sent bytes, total)``.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
:tl:`InputFileBig` if the file size is larger than 10MB,
|
:tl:`InputFileBig` if the file size is larger than 10MB,
|
||||||
``InputSizedFile`` (subclass of :tl:`InputFile`) otherwise.
|
``InputSizedFile`` (subclass of :tl:`InputFile`) otherwise.
|
||||||
"""
|
"""
|
||||||
if isinstance(file, (InputFile, InputFileBig)):
|
if isinstance(file, (types.InputFile, types.InputFileBig)):
|
||||||
return file # Already uploaded
|
return file # Already uploaded
|
||||||
|
|
||||||
|
if not file_name and getattr(file, 'name', None):
|
||||||
|
file_name = file.name
|
||||||
|
|
||||||
if isinstance(file, str):
|
if isinstance(file, str):
|
||||||
file_size = os.path.getsize(file)
|
file_size = os.path.getsize(file)
|
||||||
elif isinstance(file, bytes):
|
elif isinstance(file, bytes):
|
||||||
|
@ -210,51 +203,35 @@ class TelegramClientX(TelegramClient):
|
||||||
__log__.info('Uploading file of %d bytes in %d chunks of %d',
|
__log__.info('Uploading file of %d bytes in %d chunks of %d',
|
||||||
file_size, part_count, part_size)
|
file_size, part_count, part_size)
|
||||||
|
|
||||||
with open(file, 'rb') if isinstance(file, str) else BytesIO(file) as stream:
|
with open(file, 'rb') if isinstance(file, str) else BytesIO(file)\
|
||||||
threads_count = 2 + int((self._upload_threads_count - 2) * float(file_size) / (1024 * 1024 * 100))
|
as stream:
|
||||||
threads_count = min(threads_count, self._upload_threads_count)
|
for part_index in range(part_count):
|
||||||
threads_count = min(part_count, threads_count)
|
|
||||||
upload_thread = []
|
|
||||||
q_request = Queue()
|
|
||||||
# spawn threads
|
|
||||||
for i in range(threads_count):
|
|
||||||
thread_dl = self.ProcessUpload('thread {0}'.format(i), self, q_request)
|
|
||||||
thread_dl.start()
|
|
||||||
upload_thread.append(thread_dl)
|
|
||||||
for part_index in range(0, part_count, threads_count):
|
|
||||||
# Read the file by in chunks of size part_size
|
# Read the file by in chunks of size part_size
|
||||||
for part_thread_index in range(threads_count):
|
part = stream.read(part_size)
|
||||||
if part_index + part_thread_index >= part_count:
|
|
||||||
break
|
# The SavePartRequest is different depending on whether
|
||||||
part = stream.read(part_size)
|
# the file is too large or not (over or less than 10MB)
|
||||||
# The SavePartRequest is different depending on whether
|
if is_large:
|
||||||
# the file is too large or not (over or less than 10MB)
|
request = functions.upload.SaveBigFilePartRequest(
|
||||||
if is_large:
|
file_id, part_index, part_count, part)
|
||||||
request = SaveBigFilePartRequest(file_id, part_index + part_thread_index, part_count, part)
|
else:
|
||||||
else:
|
request = functions.upload.SaveFilePartRequest(
|
||||||
request = SaveFilePartRequest(file_id, part_index + part_thread_index, part)
|
file_id, part_index, part)
|
||||||
q_request.put(request)
|
|
||||||
# q_request.join()
|
result = await self(request)
|
||||||
job_completed = False
|
if result:
|
||||||
while not job_completed:
|
__log__.debug('Uploaded %d/%d', part_index + 1,
|
||||||
for th in upload_thread:
|
part_count)
|
||||||
if th:
|
if progress_callback:
|
||||||
if th.result is True:
|
progress_callback(stream.tell(), file_size)
|
||||||
job_completed = True
|
else:
|
||||||
__log__.debug('Uploaded %d/%d', part_index + 1, part_count)
|
raise RuntimeError(
|
||||||
if progress_callback:
|
'Failed to upload file part {}.'.format(part_index))
|
||||||
progress_callback(stream.tell(), file_size)
|
|
||||||
elif th.result is False:
|
|
||||||
raise RuntimeError('Failed to upload file part {}.'.format(part_index))
|
|
||||||
q_request.join()
|
|
||||||
for i in range(threads_count):
|
|
||||||
q_request.put(None)
|
|
||||||
for th in upload_thread:
|
|
||||||
th.join()
|
|
||||||
if is_large:
|
if is_large:
|
||||||
return InputFileBig(file_id, part_count, file_name)
|
return types.InputFileBig(file_id, part_count, file_name)
|
||||||
else:
|
else:
|
||||||
return InputSizedFile(
|
return custom.InputSizedFile(
|
||||||
file_id, part_count, file_name, md5=hash_md5, size=file_size
|
file_id, part_count, file_name, md5=hash_md5, size=file_size
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -262,13 +239,12 @@ class TelegramClientX(TelegramClient):
|
||||||
def __init__(self, name, client, q_request=None):
|
def __init__(self, name, client, q_request=None):
|
||||||
Thread.__init__(self)
|
Thread.__init__(self)
|
||||||
self.name = name
|
self.name = name
|
||||||
self.client = TelegramClient(client._session_name, client.api_id, client.api_hash, update_workers=None,
|
self.client = TelegramClient(client._session_name, client.api_id, client.api_hash)
|
||||||
spawn_read_thread=False)
|
|
||||||
self.q_request = q_request
|
self.q_request = q_request
|
||||||
self.result = None
|
self.result = None
|
||||||
|
|
||||||
def run(self):
|
async def run(self):
|
||||||
print('Thread %s started' % self.name)
|
print('Task %s started' % self.name)
|
||||||
time.sleep(random.randrange(200, 2000, 10) * 0.001)
|
time.sleep(random.randrange(200, 2000, 10) * 0.001)
|
||||||
if not self.client.is_connected():
|
if not self.client.is_connected():
|
||||||
self.client.connect()
|
self.client.connect()
|
||||||
|
@ -281,31 +257,33 @@ class TelegramClientX(TelegramClient):
|
||||||
if isinstance(request, CdnDecrypter):
|
if isinstance(request, CdnDecrypter):
|
||||||
self.result = request.get_file()
|
self.result = request.get_file()
|
||||||
else:
|
else:
|
||||||
self.result = self.client.invoke(request)
|
self.result = self.client(request)
|
||||||
if self.result is False:
|
if self.result is False:
|
||||||
break
|
break
|
||||||
self.q_request.task_done()
|
self.q_request.task_done()
|
||||||
self.client.disconnect()
|
self.client.disconnect()
|
||||||
print('Thread {0} stopped result {1}'.format(self.name, self.result))
|
print('Task {0} stopped result {1}'.format(self.name, self.result))
|
||||||
return
|
return
|
||||||
|
|
||||||
def download_file(self,
|
async def download_file(
|
||||||
input_location,
|
self, input_location, file=None, *, part_size_kb=None,
|
||||||
file,
|
file_size=None, progress_callback=None):
|
||||||
part_size_kb=None,
|
|
||||||
file_size=None,
|
|
||||||
progress_callback=None):
|
|
||||||
"""
|
"""
|
||||||
Downloads the given input location to a file.
|
Downloads the given input location to a file.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
input_location (:tl:`InputFileLocation`):
|
input_location (:tl:`FileLocation` | :tl:`InputFileLocation`):
|
||||||
The file location from which the file will be downloaded.
|
The file location from which the file will be downloaded.
|
||||||
|
See `telethon.utils.get_input_location` source for a complete
|
||||||
|
list of supported types.
|
||||||
|
|
||||||
file (`str` | `file`):
|
file (`str` | `file`, optional):
|
||||||
The output file path, directory, or stream-like object.
|
The output file path, directory, or stream-like object.
|
||||||
If the path exists and is a file, it will be overwritten.
|
If the path exists and is a file, it will be overwritten.
|
||||||
|
|
||||||
|
If the file path is ``None``, then the result will be
|
||||||
|
saved in memory and returned as `bytes`.
|
||||||
|
|
||||||
part_size_kb (`int`, optional):
|
part_size_kb (`int`, optional):
|
||||||
Chunk size when downloading files. The larger, the less
|
Chunk size when downloading files. The larger, the less
|
||||||
requests will be made (up to 512KB maximum).
|
requests will be made (up to 512KB maximum).
|
||||||
|
@ -336,86 +314,71 @@ class TelegramClientX(TelegramClient):
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
'The part size must be evenly divisible by 4096.')
|
'The part size must be evenly divisible by 4096.')
|
||||||
|
|
||||||
if isinstance(file, str):
|
in_memory = file is None
|
||||||
|
if in_memory:
|
||||||
|
f = io.BytesIO()
|
||||||
|
elif isinstance(file, str):
|
||||||
# Ensure that we'll be able to download the media
|
# Ensure that we'll be able to download the media
|
||||||
helpers.ensure_parent_dir_exists(file)
|
helpers.ensure_parent_dir_exists(file)
|
||||||
f = open(file, 'wb')
|
f = open(file, 'wb')
|
||||||
else:
|
else:
|
||||||
f = file
|
f = file
|
||||||
|
|
||||||
# The used client will change if FileMigrateError occurs
|
dc_id, input_location = utils.get_input_location(input_location)
|
||||||
client = self
|
exported = dc_id and self.session.dc_id != dc_id
|
||||||
cdn_decrypter = None
|
if exported:
|
||||||
download_thread = []
|
try:
|
||||||
q_request = []
|
sender = await self._borrow_exported_sender(dc_id)
|
||||||
|
except errors.DcIdInvalidError:
|
||||||
|
# Can't export a sender for the ID we are currently in
|
||||||
|
config = await self(functions.help.GetConfigRequest())
|
||||||
|
for option in config.dc_options:
|
||||||
|
if option.ip_address == self.session.server_address:
|
||||||
|
self.session.set_dc(
|
||||||
|
option.id, option.ip_address, option.port)
|
||||||
|
self.session.save()
|
||||||
|
break
|
||||||
|
|
||||||
|
# TODO Figure out why the session may have the wrong DC ID
|
||||||
|
sender = self._sender
|
||||||
|
exported = False
|
||||||
|
else:
|
||||||
|
# The used sender will also change if ``FileMigrateError`` occurs
|
||||||
|
sender = self._sender
|
||||||
|
|
||||||
__log__.info('Downloading file in chunks of %d bytes', part_size)
|
__log__.info('Downloading file in chunks of %d bytes', part_size)
|
||||||
threads_count = 2 + int((self._download_threads_count - 2) * float(file_size) / (1024 * 1024 * 100))
|
|
||||||
threads_count = min(threads_count, self._download_threads_count)
|
|
||||||
# threads_count = 1
|
|
||||||
# threads_count = min(part_count, threads_count)
|
|
||||||
try:
|
try:
|
||||||
offset = 0
|
offset = 0
|
||||||
result = None
|
|
||||||
try:
|
|
||||||
request = GetFileRequest(input_location, offset, part_size)
|
|
||||||
result = client(request)
|
|
||||||
if isinstance(result, FileCdnRedirect):
|
|
||||||
__log__.info('File lives in a CDN')
|
|
||||||
cdn_decrypter, result = CdnDecrypter.prepare_decrypter(client, self._get_cdn_client(result), result)
|
|
||||||
else:
|
|
||||||
f.write(result.bytes)
|
|
||||||
offset += part_size
|
|
||||||
except FileMigrateError as e:
|
|
||||||
__log__.info('File lives in another DC')
|
|
||||||
client = self._get_exported_client(e.new_dc)
|
|
||||||
|
|
||||||
# if cdn_decrypter:
|
|
||||||
# result = cdn_decrypter.get_file()
|
|
||||||
|
|
||||||
# if not result.bytes:
|
|
||||||
# return getattr(result, 'type', '')
|
|
||||||
# f.write(result.bytes)
|
|
||||||
__log__.debug('Saved %d more bytes', len(result.bytes))
|
|
||||||
if progress_callback:
|
|
||||||
progress_callback(f.tell(), file_size)
|
|
||||||
|
|
||||||
# spawn threads
|
|
||||||
for i in range(threads_count):
|
|
||||||
q_request.append(Queue())
|
|
||||||
thread_dl = self.ProcessDownload('thread {0}'.format(i), self, q_request[i])
|
|
||||||
thread_dl.start()
|
|
||||||
download_thread.append(thread_dl)
|
|
||||||
# offset += part_size
|
|
||||||
while True:
|
while True:
|
||||||
for i in range(threads_count):
|
|
||||||
if cdn_decrypter:
|
|
||||||
q_request[i].put(cdn_decrypter)
|
|
||||||
else:
|
|
||||||
request = GetFileRequest(input_location, offset, part_size)
|
|
||||||
q_request[i].put(request)
|
|
||||||
offset += part_size
|
|
||||||
for q in q_request:
|
|
||||||
q.join()
|
|
||||||
for th in download_thread:
|
|
||||||
if th.result and th.result.bytes:
|
|
||||||
f.write(th.result.bytes)
|
|
||||||
if progress_callback:
|
|
||||||
progress_callback(f.tell(), file_size)
|
|
||||||
else:
|
|
||||||
for i in range(threads_count):
|
|
||||||
q_request[i].put(None)
|
|
||||||
for th in download_thread:
|
|
||||||
th.join()
|
|
||||||
return getattr(th.result, 'type', '')
|
|
||||||
finally:
|
|
||||||
if client != self:
|
|
||||||
client.disconnect()
|
|
||||||
|
|
||||||
if cdn_decrypter:
|
|
||||||
try:
|
try:
|
||||||
cdn_decrypter.client.disconnect()
|
result = await sender.send(functions.upload.GetFileRequest(
|
||||||
except:
|
input_location, offset, part_size
|
||||||
pass
|
))
|
||||||
if isinstance(file, str):
|
if isinstance(result, types.upload.FileCdnRedirect):
|
||||||
|
# TODO Implement
|
||||||
|
raise NotImplementedError
|
||||||
|
except errors.FileMigrateError as e:
|
||||||
|
__log__.info('File lives in another DC')
|
||||||
|
sender = await self._borrow_exported_sender(e.new_dc)
|
||||||
|
exported = True
|
||||||
|
continue
|
||||||
|
|
||||||
|
offset += part_size
|
||||||
|
if not result.bytes:
|
||||||
|
if in_memory:
|
||||||
|
f.flush()
|
||||||
|
return f.getvalue()
|
||||||
|
else:
|
||||||
|
return getattr(result, 'type', '')
|
||||||
|
|
||||||
|
__log__.debug('Saving %d more bytes', len(result.bytes))
|
||||||
|
f.write(result.bytes)
|
||||||
|
if progress_callback:
|
||||||
|
progress_callback(f.tell(), file_size)
|
||||||
|
finally:
|
||||||
|
if exported:
|
||||||
|
await self._return_exported_sender(sender)
|
||||||
|
elif sender != self._sender:
|
||||||
|
await sender.disconnect()
|
||||||
|
if isinstance(file, str) or in_memory:
|
||||||
f.close()
|
f.close()
|
||||||
|
|
Loading…
Reference in a new issue