From c79efbbe7f7090ba505c8195fedac67e88d06c4b Mon Sep 17 00:00:00 2001 From: slavikmipt Date: Thu, 5 Jul 2018 12:58:57 +0300 Subject: [PATCH] download_file alpha --- telegram_client_x.py | 173 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 172 insertions(+), 1 deletion(-) diff --git a/telegram_client_x.py b/telegram_client_x.py index d148136..89c33c2 100644 --- a/telegram_client_x.py +++ b/telegram_client_x.py @@ -3,10 +3,15 @@ import logging import os from io import BytesIO +from telethon.crypto import CdnDecrypter +from telethon.errors import ( + FileMigrateError +) from telethon.tl.custom import InputSizedFile from telethon.tl.functions.upload import ( - SaveBigFilePartRequest, SaveFilePartRequest + SaveBigFilePartRequest, SaveFilePartRequest, GetFileRequest ) +from telethon.tl.types.upload import FileCdnRedirect try: import socks @@ -63,6 +68,7 @@ class TelegramClientX(TelegramClient): self._phone = None self._session_name = session self._upload_threads_count = 8 + self._download_threads_count = 8 # Sometimes we need to know who we are, cache the self peer self._self_input_peer = None @@ -97,6 +103,9 @@ class TelegramClientX(TelegramClient): def set_upload_threads_count(self, count: int): self._upload_threads_count = int(count) + def set_download_threads_count(self, count: int): + self._download_threads_count = int(count) + def upload_file(self, file, part_size_kb=None, @@ -248,3 +257,165 @@ class TelegramClientX(TelegramClient): return InputSizedFile( file_id, part_count, file_name, md5=hash_md5, size=file_size ) + + class ProcessDownload(Thread): + def __init__(self, name, client, q_request=None): + Thread.__init__(self) + self.name = name + self.client = TelegramClient(client._session_name, client.api_id, client.api_hash, update_workers=None, + spawn_read_thread=False) + self.q_request = q_request + self.result = None + + def run(self): + print('Thread %s started' % self.name) + time.sleep(random.randrange(200, 2000, 10) * 0.001) + if not self.client.is_connected(): + self.client.connect() + while True: + request = self.q_request.get() + if request is None: + break + self.result = None + # time.sleep(random.randrange(20, 100, 1) * 0.001) + if isinstance(request, CdnDecrypter): + self.result = request.get_file() + else: + self.result = self.client.invoke(request) + if self.result is False: + break + self.q_request.task_done() + self.client.disconnect() + print('Thread {0} stopped result {1}'.format(self.name, self.result)) + return + + def download_file(self, + input_location, + file, + part_size_kb=None, + file_size=None, + progress_callback=None): + """ + Downloads the given input location to a file. + + Args: + input_location (:tl:`InputFileLocation`): + The file location from which the file will be downloaded. + + file (`str` | `file`): + The output file path, directory, or stream-like object. + If the path exists and is a file, it will be overwritten. + + part_size_kb (`int`, optional): + Chunk size when downloading files. The larger, the less + requests will be made (up to 512KB maximum). + + file_size (`int`, optional): + The file size that is about to be downloaded, if known. + Only used if ``progress_callback`` is specified. + + progress_callback (`callable`, optional): + A callback function accepting two parameters: + ``(downloaded bytes, total)``. Note that the + ``total`` is the provided ``file_size``. + """ + if not part_size_kb: + if not file_size: + part_size_kb = 64 # Reasonable default + else: + part_size_kb = utils.get_appropriated_part_size(file_size) + + part_size = int(part_size_kb * 1024) + # https://core.telegram.org/api/files says: + # > part_size % 1024 = 0 (divisible by 1KB) + # + # But https://core.telegram.org/cdn (more recent) says: + # > limit must be divisible by 4096 bytes + # So we just stick to the 4096 limit. + if part_size % 4096 != 0: + raise ValueError( + 'The part size must be evenly divisible by 4096.') + + if isinstance(file, str): + # Ensure that we'll be able to download the media + helpers.ensure_parent_dir_exists(file) + f = open(file, 'wb') + else: + f = file + + # The used client will change if FileMigrateError occurs + client = self + cdn_decrypter = None + download_thread = [] + q_request = [] + + __log__.info('Downloading file in chunks of %d bytes', part_size) + threads_count = 2 + int((self._download_threads_count - 2) * float(file_size) / (1024 * 1024 * 100)) + threads_count = min(threads_count, self._download_threads_count) + # threads_count = 1 + # threads_count = min(part_count, threads_count) + try: + offset = 0 + result = None + try: + request = GetFileRequest(input_location, offset, part_size) + result = client(request) + if isinstance(result, FileCdnRedirect): + __log__.info('File lives in a CDN') + cdn_decrypter, result = CdnDecrypter.prepare_decrypter(client, self._get_cdn_client(result), result) + else: + f.write(result.bytes) + offset += part_size + except FileMigrateError as e: + __log__.info('File lives in another DC') + client = self._get_exported_client(e.new_dc) + + # if cdn_decrypter: + # result = cdn_decrypter.get_file() + + # if not result.bytes: + # return getattr(result, 'type', '') + # f.write(result.bytes) + __log__.debug('Saved %d more bytes', len(result.bytes)) + if progress_callback: + progress_callback(f.tell(), file_size) + + # spawn threads + for i in range(threads_count): + q_request.append(Queue()) + thread_dl = self.ProcessDownload('thread {0}'.format(i), self, q_request[i]) + thread_dl.start() + download_thread.append(thread_dl) + # offset += part_size + while True: + for i in range(threads_count): + if cdn_decrypter: + q_request[i].put(cdn_decrypter) + else: + request = GetFileRequest(input_location, offset, part_size) + q_request[i].put(request) + offset += part_size + for q in q_request: + q.join() + for th in download_thread: + if th.result and th.result.bytes: + f.write(th.result.bytes) + if progress_callback: + progress_callback(f.tell(), file_size) + else: + for i in range(threads_count): + q_request[i].put(None) + for th in download_thread: + th.join() + return getattr(th.result, 'type', '') + finally: + if client != self: + client.disconnect() + + if cdn_decrypter: + try: + cdn_decrypter.client.disconnect() + except: + pass + if isinstance(file, str): + f.close()