From 80881aba5901be0df159162911abeb31651cdc57 Mon Sep 17 00:00:00 2001 From: megapascal <39701838+megapascal@users.noreply.github.com> Date: Sun, 2 Sep 2018 23:57:39 +0500 Subject: [PATCH] =?UTF-8?q?=C2=AF\=5F(=E3=83=84)=5F/=C2=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit NOT sure if it works! :smile: --- file_telegram_rxtx/telegram_client_x.py | 299 +++++++++++------------- 1 file changed, 131 insertions(+), 168 deletions(-) diff --git a/file_telegram_rxtx/telegram_client_x.py b/file_telegram_rxtx/telegram_client_x.py index 89c33c2..1c3f450 100644 --- a/file_telegram_rxtx/telegram_client_x.py +++ b/file_telegram_rxtx/telegram_client_x.py @@ -1,17 +1,16 @@ +# TODO TEST TEST AND TEST THIS PROBABLY WON'T WORK/ HOWEVER WHY NOT +# NEWEST TELETHON 1.2 + +import asyncio import hashlib +import io import logging import os from io import BytesIO +import telethon.errors as errors from telethon.crypto import CdnDecrypter -from telethon.errors import ( - FileMigrateError -) -from telethon.tl.custom import InputSizedFile -from telethon.tl.functions.upload import ( - SaveBigFilePartRequest, SaveFilePartRequest, GetFileRequest -) -from telethon.tl.types.upload import FileCdnRedirect +from telethon.tl import types, functions, custom try: import socks @@ -25,38 +24,26 @@ try: except ImportError: hachoir = None from telethon import helpers, utils -from telethon.tl.types import (InputFile, InputFileBig) __log__ = logging.getLogger(__name__) from telethon import TelegramClient from threading import Thread import random import time -from queue import Queue -from telethon.network import ConnectionMode from datetime import timedelta class TelegramClientX(TelegramClient): def __init__(self, session, api_id, api_hash, - connection_mode=ConnectionMode.TCP_FULL, use_ipv6=False, proxy=None, - update_workers=None, timeout=timedelta(seconds=10), - spawn_read_thread=True, - report_errors=True, - **kwargs): + ): super().__init__( session, api_id, api_hash, - connection_mode=connection_mode, use_ipv6=use_ipv6, proxy=proxy, - update_workers=update_workers, - spawn_read_thread=spawn_read_thread, timeout=timeout, - report_errors=report_errors, - **kwargs ) self._event_builders = [] @@ -72,18 +59,29 @@ class TelegramClientX(TelegramClient): # Sometimes we need to know who we are, cache the self peer self._self_input_peer = None - class ProcessUpload(Thread): - def __init__(self, name, client, q_request=None): - Thread.__init__(self) + class ProcessUpload: # TODO fuck all threads + name = None + client = None + q_request = None + result = None + + async def init(self, name, client, q_request=None): self.name = name - self.client = TelegramClient(client._session_name, client.api_id, client.api_hash, update_workers=None, - spawn_read_thread=False) + self.client = TelegramClient(client._session_name, client.api_id, client.api_hash) self.q_request = q_request self.result = None + """ + def __init__(self, name, client, q_request=None): + Thread.__init__() + self.name = name + self.client = TelegramClient(client._session_name, client.api_id, client.api_hash) + self.q_request = q_request + self.result = None +""" - def run(self): - print('Thread %s started' % self.name) - time.sleep(random.randrange(200, 2000, 10) * 0.001) + async def run(self): + print('Async task %s started' % self.name) + asyncio.sleep(random.choice([.001, 0.00001, .002])) if not self.client.is_connected(): self.client.connect() while True: @@ -92,12 +90,12 @@ class TelegramClientX(TelegramClient): break self.result = None # time.sleep(random.randrange(20, 100, 1) * 0.001) - self.result = self.client.invoke(request) + self.result = await self.client(request) if self.result is False: break self.q_request.task_done() self.client.disconnect() - print('Thread {0} stopped result {1}'.format(self.name, self.result)) + print('Task {0} stopped result {1}'.format(self.name, self.result)) return def set_upload_threads_count(self, count: int): @@ -106,8 +104,8 @@ class TelegramClientX(TelegramClient): def set_download_threads_count(self, count: int): self._download_threads_count = int(count) - def upload_file(self, - file, + async def upload_file(self, + file, *, part_size_kb=None, file_name=None, use_cache=None, @@ -116,48 +114,43 @@ class TelegramClientX(TelegramClient): Uploads the specified file and returns a handle (an instance of InputFile or InputFileBig, as required) which can be later used before it expires (they are usable during less than a day). - Uploading a file will simply return a "handle" to the file stored remotely in the Telegram servers, which can be later used on. This will **not** upload the file to your own chat or any chat at all. - Args: file (`str` | `bytes` | `file`): The path of the file, byte array, or stream that will be sent. Note that if a byte array or a stream is given, a filename or its type won't be inferred, and it will be sent as an "unnamed application/octet-stream". - Subsequent calls with the very same file will result in immediate uploads, unless ``.clear_file_cache()`` is called. - part_size_kb (`int`, optional): Chunk size when uploading files. The larger, the less requests will be made (up to 512KB maximum). - file_name (`str`, optional): The file name which will be used on the resulting InputFile. If not specified, the name will be taken from the ``file`` and if this is not a ``str``, it will be ``"unnamed"``. - use_cache (`type`, optional): The type of cache to use (currently either ``InputDocument`` or ``InputPhoto``). If present and the file is small enough to need the MD5, it will be checked against the database, and if a match is found, the upload won't be made. Instead, an instance of type ``use_cache`` will be returned. - progress_callback (`callable`, optional): A callback function accepting two parameters: ``(sent bytes, total)``. - Returns: :tl:`InputFileBig` if the file size is larger than 10MB, ``InputSizedFile`` (subclass of :tl:`InputFile`) otherwise. """ - if isinstance(file, (InputFile, InputFileBig)): + if isinstance(file, (types.InputFile, types.InputFileBig)): return file # Already uploaded + if not file_name and getattr(file, 'name', None): + file_name = file.name + if isinstance(file, str): file_size = os.path.getsize(file) elif isinstance(file, bytes): @@ -210,51 +203,35 @@ class TelegramClientX(TelegramClient): __log__.info('Uploading file of %d bytes in %d chunks of %d', file_size, part_count, part_size) - with open(file, 'rb') if isinstance(file, str) else BytesIO(file) as stream: - threads_count = 2 + int((self._upload_threads_count - 2) * float(file_size) / (1024 * 1024 * 100)) - threads_count = min(threads_count, self._upload_threads_count) - threads_count = min(part_count, threads_count) - upload_thread = [] - q_request = Queue() - # spawn threads - for i in range(threads_count): - thread_dl = self.ProcessUpload('thread {0}'.format(i), self, q_request) - thread_dl.start() - upload_thread.append(thread_dl) - for part_index in range(0, part_count, threads_count): + with open(file, 'rb') if isinstance(file, str) else BytesIO(file)\ + as stream: + for part_index in range(part_count): # Read the file by in chunks of size part_size - for part_thread_index in range(threads_count): - if part_index + part_thread_index >= part_count: - break - part = stream.read(part_size) - # The SavePartRequest is different depending on whether - # the file is too large or not (over or less than 10MB) - if is_large: - request = SaveBigFilePartRequest(file_id, part_index + part_thread_index, part_count, part) - else: - request = SaveFilePartRequest(file_id, part_index + part_thread_index, part) - q_request.put(request) - # q_request.join() - job_completed = False - while not job_completed: - for th in upload_thread: - if th: - if th.result is True: - job_completed = True - __log__.debug('Uploaded %d/%d', part_index + 1, part_count) - if progress_callback: - progress_callback(stream.tell(), file_size) - elif th.result is False: - raise RuntimeError('Failed to upload file part {}.'.format(part_index)) - q_request.join() - for i in range(threads_count): - q_request.put(None) - for th in upload_thread: - th.join() + part = stream.read(part_size) + + # The SavePartRequest is different depending on whether + # the file is too large or not (over or less than 10MB) + if is_large: + request = functions.upload.SaveBigFilePartRequest( + file_id, part_index, part_count, part) + else: + request = functions.upload.SaveFilePartRequest( + file_id, part_index, part) + + result = await self(request) + if result: + __log__.debug('Uploaded %d/%d', part_index + 1, + part_count) + if progress_callback: + progress_callback(stream.tell(), file_size) + else: + raise RuntimeError( + 'Failed to upload file part {}.'.format(part_index)) + if is_large: - return InputFileBig(file_id, part_count, file_name) + return types.InputFileBig(file_id, part_count, file_name) else: - return InputSizedFile( + return custom.InputSizedFile( file_id, part_count, file_name, md5=hash_md5, size=file_size ) @@ -262,13 +239,12 @@ class TelegramClientX(TelegramClient): def __init__(self, name, client, q_request=None): Thread.__init__(self) self.name = name - self.client = TelegramClient(client._session_name, client.api_id, client.api_hash, update_workers=None, - spawn_read_thread=False) + self.client = TelegramClient(client._session_name, client.api_id, client.api_hash) self.q_request = q_request self.result = None - def run(self): - print('Thread %s started' % self.name) + async def run(self): + print('Task %s started' % self.name) time.sleep(random.randrange(200, 2000, 10) * 0.001) if not self.client.is_connected(): self.client.connect() @@ -281,31 +257,33 @@ class TelegramClientX(TelegramClient): if isinstance(request, CdnDecrypter): self.result = request.get_file() else: - self.result = self.client.invoke(request) + self.result = self.client(request) if self.result is False: break self.q_request.task_done() self.client.disconnect() - print('Thread {0} stopped result {1}'.format(self.name, self.result)) + print('Task {0} stopped result {1}'.format(self.name, self.result)) return - def download_file(self, - input_location, - file, - part_size_kb=None, - file_size=None, - progress_callback=None): + async def download_file( + self, input_location, file=None, *, part_size_kb=None, + file_size=None, progress_callback=None): """ Downloads the given input location to a file. Args: - input_location (:tl:`InputFileLocation`): + input_location (:tl:`FileLocation` | :tl:`InputFileLocation`): The file location from which the file will be downloaded. + See `telethon.utils.get_input_location` source for a complete + list of supported types. - file (`str` | `file`): + file (`str` | `file`, optional): The output file path, directory, or stream-like object. If the path exists and is a file, it will be overwritten. + If the file path is ``None``, then the result will be + saved in memory and returned as `bytes`. + part_size_kb (`int`, optional): Chunk size when downloading files. The larger, the less requests will be made (up to 512KB maximum). @@ -336,86 +314,71 @@ class TelegramClientX(TelegramClient): raise ValueError( 'The part size must be evenly divisible by 4096.') - if isinstance(file, str): + in_memory = file is None + if in_memory: + f = io.BytesIO() + elif isinstance(file, str): # Ensure that we'll be able to download the media helpers.ensure_parent_dir_exists(file) f = open(file, 'wb') else: f = file - # The used client will change if FileMigrateError occurs - client = self - cdn_decrypter = None - download_thread = [] - q_request = [] + dc_id, input_location = utils.get_input_location(input_location) + exported = dc_id and self.session.dc_id != dc_id + if exported: + try: + sender = await self._borrow_exported_sender(dc_id) + except errors.DcIdInvalidError: + # Can't export a sender for the ID we are currently in + config = await self(functions.help.GetConfigRequest()) + for option in config.dc_options: + if option.ip_address == self.session.server_address: + self.session.set_dc( + option.id, option.ip_address, option.port) + self.session.save() + break + + # TODO Figure out why the session may have the wrong DC ID + sender = self._sender + exported = False + else: + # The used sender will also change if ``FileMigrateError`` occurs + sender = self._sender __log__.info('Downloading file in chunks of %d bytes', part_size) - threads_count = 2 + int((self._download_threads_count - 2) * float(file_size) / (1024 * 1024 * 100)) - threads_count = min(threads_count, self._download_threads_count) - # threads_count = 1 - # threads_count = min(part_count, threads_count) try: offset = 0 - result = None - try: - request = GetFileRequest(input_location, offset, part_size) - result = client(request) - if isinstance(result, FileCdnRedirect): - __log__.info('File lives in a CDN') - cdn_decrypter, result = CdnDecrypter.prepare_decrypter(client, self._get_cdn_client(result), result) - else: - f.write(result.bytes) - offset += part_size - except FileMigrateError as e: - __log__.info('File lives in another DC') - client = self._get_exported_client(e.new_dc) - - # if cdn_decrypter: - # result = cdn_decrypter.get_file() - - # if not result.bytes: - # return getattr(result, 'type', '') - # f.write(result.bytes) - __log__.debug('Saved %d more bytes', len(result.bytes)) - if progress_callback: - progress_callback(f.tell(), file_size) - - # spawn threads - for i in range(threads_count): - q_request.append(Queue()) - thread_dl = self.ProcessDownload('thread {0}'.format(i), self, q_request[i]) - thread_dl.start() - download_thread.append(thread_dl) - # offset += part_size while True: - for i in range(threads_count): - if cdn_decrypter: - q_request[i].put(cdn_decrypter) - else: - request = GetFileRequest(input_location, offset, part_size) - q_request[i].put(request) - offset += part_size - for q in q_request: - q.join() - for th in download_thread: - if th.result and th.result.bytes: - f.write(th.result.bytes) - if progress_callback: - progress_callback(f.tell(), file_size) - else: - for i in range(threads_count): - q_request[i].put(None) - for th in download_thread: - th.join() - return getattr(th.result, 'type', '') - finally: - if client != self: - client.disconnect() - - if cdn_decrypter: try: - cdn_decrypter.client.disconnect() - except: - pass - if isinstance(file, str): + result = await sender.send(functions.upload.GetFileRequest( + input_location, offset, part_size + )) + if isinstance(result, types.upload.FileCdnRedirect): + # TODO Implement + raise NotImplementedError + except errors.FileMigrateError as e: + __log__.info('File lives in another DC') + sender = await self._borrow_exported_sender(e.new_dc) + exported = True + continue + + offset += part_size + if not result.bytes: + if in_memory: + f.flush() + return f.getvalue() + else: + return getattr(result, 'type', '') + + __log__.debug('Saving %d more bytes', len(result.bytes)) + f.write(result.bytes) + if progress_callback: + progress_callback(f.tell(), file_size) + finally: + if exported: + await self._return_exported_sender(sender) + elif sender != self._sender: + await sender.disconnect() + if isinstance(file, str) or in_memory: f.close()