1
0
Fork 0
mirror of https://github.com/SlavikMIPT/tgcloud.git synced 2025-03-09 15:40:14 +00:00

download_file alpha

This commit is contained in:
slavikmipt 2018-07-05 12:58:57 +03:00
parent e3b5fa89ab
commit c79efbbe7f

View file

@ -3,10 +3,15 @@ import logging
import os
from io import BytesIO
from telethon.crypto import CdnDecrypter
from telethon.errors import (
FileMigrateError
)
from telethon.tl.custom import InputSizedFile
from telethon.tl.functions.upload import (
SaveBigFilePartRequest, SaveFilePartRequest
SaveBigFilePartRequest, SaveFilePartRequest, GetFileRequest
)
from telethon.tl.types.upload import FileCdnRedirect
try:
import socks
@ -63,6 +68,7 @@ class TelegramClientX(TelegramClient):
self._phone = None
self._session_name = session
self._upload_threads_count = 8
self._download_threads_count = 8
# Sometimes we need to know who we are, cache the self peer
self._self_input_peer = None
@ -97,6 +103,9 @@ class TelegramClientX(TelegramClient):
def set_upload_threads_count(self, count: int):
self._upload_threads_count = int(count)
def set_download_threads_count(self, count: int):
self._download_threads_count = int(count)
def upload_file(self,
file,
part_size_kb=None,
@ -248,3 +257,165 @@ class TelegramClientX(TelegramClient):
return InputSizedFile(
file_id, part_count, file_name, md5=hash_md5, size=file_size
)
class ProcessDownload(Thread):
def __init__(self, name, client, q_request=None):
Thread.__init__(self)
self.name = name
self.client = TelegramClient(client._session_name, client.api_id, client.api_hash, update_workers=None,
spawn_read_thread=False)
self.q_request = q_request
self.result = None
def run(self):
print('Thread %s started' % self.name)
time.sleep(random.randrange(200, 2000, 10) * 0.001)
if not self.client.is_connected():
self.client.connect()
while True:
request = self.q_request.get()
if request is None:
break
self.result = None
# time.sleep(random.randrange(20, 100, 1) * 0.001)
if isinstance(request, CdnDecrypter):
self.result = request.get_file()
else:
self.result = self.client.invoke(request)
if self.result is False:
break
self.q_request.task_done()
self.client.disconnect()
print('Thread {0} stopped result {1}'.format(self.name, self.result))
return
def download_file(self,
input_location,
file,
part_size_kb=None,
file_size=None,
progress_callback=None):
"""
Downloads the given input location to a file.
Args:
input_location (:tl:`InputFileLocation`):
The file location from which the file will be downloaded.
file (`str` | `file`):
The output file path, directory, or stream-like object.
If the path exists and is a file, it will be overwritten.
part_size_kb (`int`, optional):
Chunk size when downloading files. The larger, the less
requests will be made (up to 512KB maximum).
file_size (`int`, optional):
The file size that is about to be downloaded, if known.
Only used if ``progress_callback`` is specified.
progress_callback (`callable`, optional):
A callback function accepting two parameters:
``(downloaded bytes, total)``. Note that the
``total`` is the provided ``file_size``.
"""
if not part_size_kb:
if not file_size:
part_size_kb = 64 # Reasonable default
else:
part_size_kb = utils.get_appropriated_part_size(file_size)
part_size = int(part_size_kb * 1024)
# https://core.telegram.org/api/files says:
# > part_size % 1024 = 0 (divisible by 1KB)
#
# But https://core.telegram.org/cdn (more recent) says:
# > limit must be divisible by 4096 bytes
# So we just stick to the 4096 limit.
if part_size % 4096 != 0:
raise ValueError(
'The part size must be evenly divisible by 4096.')
if isinstance(file, str):
# Ensure that we'll be able to download the media
helpers.ensure_parent_dir_exists(file)
f = open(file, 'wb')
else:
f = file
# The used client will change if FileMigrateError occurs
client = self
cdn_decrypter = None
download_thread = []
q_request = []
__log__.info('Downloading file in chunks of %d bytes', part_size)
threads_count = 2 + int((self._download_threads_count - 2) * float(file_size) / (1024 * 1024 * 100))
threads_count = min(threads_count, self._download_threads_count)
# threads_count = 1
# threads_count = min(part_count, threads_count)
try:
offset = 0
result = None
try:
request = GetFileRequest(input_location, offset, part_size)
result = client(request)
if isinstance(result, FileCdnRedirect):
__log__.info('File lives in a CDN')
cdn_decrypter, result = CdnDecrypter.prepare_decrypter(client, self._get_cdn_client(result), result)
else:
f.write(result.bytes)
offset += part_size
except FileMigrateError as e:
__log__.info('File lives in another DC')
client = self._get_exported_client(e.new_dc)
# if cdn_decrypter:
# result = cdn_decrypter.get_file()
# if not result.bytes:
# return getattr(result, 'type', '')
# f.write(result.bytes)
__log__.debug('Saved %d more bytes', len(result.bytes))
if progress_callback:
progress_callback(f.tell(), file_size)
# spawn threads
for i in range(threads_count):
q_request.append(Queue())
thread_dl = self.ProcessDownload('thread {0}'.format(i), self, q_request[i])
thread_dl.start()
download_thread.append(thread_dl)
# offset += part_size
while True:
for i in range(threads_count):
if cdn_decrypter:
q_request[i].put(cdn_decrypter)
else:
request = GetFileRequest(input_location, offset, part_size)
q_request[i].put(request)
offset += part_size
for q in q_request:
q.join()
for th in download_thread:
if th.result and th.result.bytes:
f.write(th.result.bytes)
if progress_callback:
progress_callback(f.tell(), file_size)
else:
for i in range(threads_count):
q_request[i].put(None)
for th in download_thread:
th.join()
return getattr(th.result, 'type', '')
finally:
if client != self:
client.disconnect()
if cdn_decrypter:
try:
cdn_decrypter.client.disconnect()
except:
pass
if isinstance(file, str):
f.close()