1
0
Fork 0
mirror of https://github.com/SlavikMIPT/tgcloud.git synced 2025-02-12 11:12:09 +00:00
tgcloud/telegram_client_x.py

436 lines
17 KiB
Python
Raw Permalink Normal View History

import hashlib
import io
import logging
import os
import random
import time
from io import BytesIO
2018-07-05 09:58:57 +00:00
from telethon.crypto import CdnDecrypter
2018-10-04 10:09:02 +00:00
from telethon.tl.custom import InputSizedFile
from telethon.tl.functions.upload import (
SaveBigFilePartRequest, SaveFilePartRequest, GetFileRequest
)
from telethon.tl.types.upload import FileCdnRedirect
try:
import socks
except ImportError:
socks = None
try:
import hachoir
import hachoir.metadata
import hachoir.parser
except ImportError:
hachoir = None
from telethon import helpers, utils
from telethon.errors import FileMigrateError
2018-10-04 10:09:02 +00:00
from telethon.tl.types import (InputFile, InputFileBig)
from telethon.extensions import markdown
__log__ = logging.getLogger(__name__)
from telethon import TelegramClient
from threading import Thread
2018-10-04 10:09:02 +00:00
from queue import Queue
2019-06-16 15:38:44 +00:00
from telethon.network import ConnectionTcpFull
2018-07-04 18:23:35 +00:00
from datetime import timedelta
class TelegramClientX(TelegramClient):
2018-07-04 17:46:23 +00:00
def __init__(self, session, api_id, api_hash,
2019-06-16 15:38:44 +00:00
connection=ConnectionTcpFull,
2018-07-04 17:46:23 +00:00
use_ipv6=False,
proxy=None,
2018-10-04 10:09:02 +00:00
update_workers=None,
2018-07-04 17:46:23 +00:00
timeout=timedelta(seconds=10),
2018-10-04 10:09:02 +00:00
spawn_read_thread=True,
report_errors=True,
**kwargs):
2018-07-04 17:46:23 +00:00
super().__init__(
session, api_id, api_hash,
2019-06-16 15:38:44 +00:00
connection=connection,
2018-07-04 17:46:23 +00:00
use_ipv6=use_ipv6,
proxy=proxy,
2018-10-04 10:09:02 +00:00
update_workers=update_workers,
spawn_read_thread=spawn_read_thread,
2018-07-04 17:46:23 +00:00
timeout=timeout,
2018-10-04 10:09:02 +00:00
report_errors=report_errors,
**kwargs
2018-07-04 17:46:23 +00:00
)
self._event_builders = []
self._events_pending_resolve = []
# Default parse mode
self._parse_mode = markdown
2018-07-04 17:46:23 +00:00
# Some fields to easy signing in. Let {phone: hash} be
# a dictionary because the user may change their mind.
self._phone_code_hash = {}
self._phone = None
self._tos = None
2018-07-04 17:46:23 +00:00
self._session_name = session
2018-07-04 18:04:59 +00:00
self._upload_threads_count = 8
2018-07-05 09:58:57 +00:00
self._download_threads_count = 8
2018-07-04 17:46:23 +00:00
# Sometimes we need to know who we are, cache the self peer
self._self_input_peer = None
2018-10-04 10:09:02 +00:00
class ProcessUpload(Thread):
def __init__(self, name, client, q_request=None):
2018-10-04 10:09:02 +00:00
Thread.__init__(self)
self.name = name
2018-10-04 10:09:02 +00:00
self.client = TelegramClient(client._session_name, client.api_id, client.api_hash, update_workers=None,
spawn_read_thread=False)
self.q_request = q_request
self.result = None
2018-10-04 10:09:02 +00:00
def run(self):
2019-06-14 00:08:43 +00:00
# print('Thread %s started' % self.name)
time.sleep(random.randrange(20, 200, 10) * 0.001)
if not self.client.is_connected():
self.client.connect()
while True:
request = self.q_request.get()
if request is None:
break
self.result = None
# time.sleep(random.randrange(20, 100, 1) * 0.001)
2018-10-04 10:09:02 +00:00
self.result = self.client.invoke(request)
if self.result is False:
break
self.q_request.task_done()
self.client.disconnect()
2019-06-14 00:08:43 +00:00
# print('Thread {0} stopped result {1}'.format(self.name, self.result))
return
2018-07-04 18:23:35 +00:00
def set_upload_threads_count(self, count: int):
2018-07-04 18:04:59 +00:00
self._upload_threads_count = int(count)
2018-07-04 18:23:35 +00:00
2018-07-05 09:58:57 +00:00
def set_download_threads_count(self, count: int):
self._download_threads_count = int(count)
2018-10-04 10:09:02 +00:00
def upload_file(self,
file,
part_size_kb=None,
file_name=None,
use_cache=None,
progress_callback=None):
"""
Uploads the specified file and returns a handle (an instance of
InputFile or InputFileBig, as required) which can be later used
before it expires (they are usable during less than a day).
2018-10-04 10:09:02 +00:00
Uploading a file will simply return a "handle" to the file stored
remotely in the Telegram servers, which can be later used on. This
will **not** upload the file to your own chat or any chat at all.
2018-10-04 10:09:02 +00:00
Args:
file (`str` | `bytes` | `file`):
The path of the file, byte array, or stream that will be sent.
Note that if a byte array or a stream is given, a filename
or its type won't be inferred, and it will be sent as an
"unnamed application/octet-stream".
2018-10-04 10:09:02 +00:00
Subsequent calls with the very same file will result in
immediate uploads, unless ``.clear_file_cache()`` is called.
2018-10-04 10:09:02 +00:00
part_size_kb (`int`, optional):
Chunk size when uploading files. The larger, the less
requests will be made (up to 512KB maximum).
2018-10-04 10:09:02 +00:00
file_name (`str`, optional):
The file name which will be used on the resulting InputFile.
If not specified, the name will be taken from the ``file``
and if this is not a ``str``, it will be ``"unnamed"``.
2018-10-04 10:09:02 +00:00
use_cache (`type`, optional):
The type of cache to use (currently either ``InputDocument``
or ``InputPhoto``). If present and the file is small enough
to need the MD5, it will be checked against the database,
and if a match is found, the upload won't be made. Instead,
an instance of type ``use_cache`` will be returned.
2018-10-04 10:09:02 +00:00
progress_callback (`callable`, optional):
A callback function accepting two parameters:
``(sent bytes, total)``.
2018-10-04 10:09:02 +00:00
Returns:
:tl:`InputFileBig` if the file size is larger than 10MB,
``InputSizedFile`` (subclass of :tl:`InputFile`) otherwise.
"""
2018-10-04 10:09:02 +00:00
if isinstance(file, (InputFile, InputFileBig)):
return file # Already uploaded
if isinstance(file, str):
file_size = os.path.getsize(file)
elif isinstance(file, bytes):
file_size = len(file)
else:
file = file.read()
file_size = len(file)
# File will now either be a string or bytes
if not part_size_kb:
part_size_kb = utils.get_appropriated_part_size(file_size)
if part_size_kb > 512:
raise ValueError('The part size must be less or equal to 512KB')
part_size = int(part_size_kb * 1024)
if part_size % 1024 != 0:
raise ValueError(
'The part size must be evenly divisible by 1024')
# Set a default file name if None was specified
file_id = helpers.generate_random_long()
if not file_name:
if isinstance(file, str):
file_name = os.path.basename(file)
else:
file_name = str(file_id)
# Determine whether the file is too big (over 10MB) or not
# Telegram does make a distinction between smaller or larger files
is_large = file_size > 10 * 1024 * 1024
hash_md5 = hashlib.md5()
if not is_large:
# Calculate the MD5 hash before anything else.
# As this needs to be done always for small files,
# might as well do it before anything else and
# check the cache.
if isinstance(file, str):
with open(file, 'rb') as stream:
file = stream.read()
hash_md5.update(file)
if use_cache:
cached = self.session.get_file(
hash_md5.digest(), file_size, cls=use_cache
)
if cached:
return cached
part_count = (file_size + part_size - 1) // part_size
__log__.info('Uploading file of %d bytes in %d chunks of %d',
file_size, part_count, part_size)
2018-10-04 10:09:02 +00:00
with open(file, 'rb') if isinstance(file, str) else BytesIO(file) as stream:
threads_count = 2 + int((self._upload_threads_count - 2) * float(file_size) / (1024 * 1024 * 10))
2018-10-04 10:09:02 +00:00
threads_count = min(threads_count, self._upload_threads_count)
threads_count = min(part_count, threads_count)
upload_thread = []
q_request = Queue()
# spawn threads
for i in range(threads_count):
thread_dl = self.ProcessUpload('thread {0}'.format(i), self, q_request)
thread_dl.start()
upload_thread.append(thread_dl)
for part_index in range(0, part_count, threads_count):
# Read the file by in chunks of size part_size
2018-10-04 10:09:02 +00:00
for part_thread_index in range(threads_count):
if part_index + part_thread_index >= part_count:
break
part = stream.read(part_size)
# The SavePartRequest is different depending on whether
# the file is too large or not (over or less than 10MB)
if is_large:
request = SaveBigFilePartRequest(file_id, part_index + part_thread_index, part_count, part)
else:
request = SaveFilePartRequest(file_id, part_index + part_thread_index, part)
q_request.put(request)
# q_request.join()
job_completed = False
while not job_completed:
for th in upload_thread:
if th:
if th.result is True:
job_completed = True
__log__.debug('Uploaded %d/%d', part_index + 1, part_count)
if progress_callback:
progress_callback(stream.tell(), file_size)
elif th.result is False:
raise RuntimeError('Failed to upload file part {}.'.format(part_index))
q_request.join()
for i in range(threads_count):
q_request.put(None)
for th in upload_thread:
th.join()
if is_large:
2018-10-04 10:09:02 +00:00
return InputFileBig(file_id, part_count, file_name)
else:
2018-10-04 10:09:02 +00:00
return InputSizedFile(
file_id, part_count, file_name, md5=hash_md5, size=file_size
)
2018-07-05 09:58:57 +00:00
class ProcessDownload(Thread):
def __init__(self, name, client, q_request=None):
Thread.__init__(self)
self.name = name
2018-10-04 10:09:02 +00:00
self.client = TelegramClient(client._session_name, client.api_id, client.api_hash, update_workers=None,
spawn_read_thread=False)
2018-07-05 09:58:57 +00:00
self.q_request = q_request
self.result = None
2018-10-04 10:09:02 +00:00
def run(self):
2019-06-14 00:08:43 +00:00
# print('Thread %s started' % self.name)
time.sleep(random.randrange(20, 200, 10) * 0.001)
2018-07-05 09:58:57 +00:00
if not self.client.is_connected():
self.client.connect()
while True:
request = self.q_request.get()
if request is None:
break
self.result = None
if isinstance(request, CdnDecrypter):
self.result = request.get_file()
else:
2018-10-04 10:09:02 +00:00
self.result = self.client.invoke(request)
2018-07-05 09:58:57 +00:00
if self.result is False:
break
self.q_request.task_done()
self.client.disconnect()
2019-06-14 00:08:43 +00:00
# print('Thread {0} stopped result {1}'.format(self.name, self.result))
2018-07-05 09:58:57 +00:00
return
2018-10-04 10:09:02 +00:00
def download_file(self,
input_location,
file=None,
2018-10-04 10:09:02 +00:00
part_size_kb=None,
file_size=None,
progress_callback=None):
2018-07-05 09:58:57 +00:00
"""
Downloads the given input location to a file.
Args:
2018-10-04 10:09:02 +00:00
input_location (:tl:`InputFileLocation`):
2018-07-05 09:58:57 +00:00
The file location from which the file will be downloaded.
2018-10-04 10:09:02 +00:00
file (`str` | `file`):
2018-07-05 09:58:57 +00:00
The output file path, directory, or stream-like object.
If the path exists and is a file, it will be overwritten.
part_size_kb (`int`, optional):
Chunk size when downloading files. The larger, the less
requests will be made (up to 512KB maximum).
file_size (`int`, optional):
The file size that is about to be downloaded, if known.
Only used if ``progress_callback`` is specified.
progress_callback (`callable`, optional):
A callback function accepting two parameters:
``(downloaded bytes, total)``. Note that the
``total`` is the provided ``file_size``.
"""
if not part_size_kb:
if not file_size:
part_size_kb = 64 # Reasonable default
else:
part_size_kb = utils.get_appropriated_part_size(file_size)
part_size = int(part_size_kb * 1024)
# https://core.telegram.org/api/files says:
# > part_size % 1024 = 0 (divisible by 1KB)
#
# But https://core.telegram.org/cdn (more recent) says:
# > limit must be divisible by 4096 bytes
# So we just stick to the 4096 limit.
if part_size % 4096 != 0:
raise ValueError(
'The part size must be evenly divisible by 4096.')
in_memory = file is None
if in_memory:
f = io.BytesIO()
elif isinstance(file, str):
2018-07-05 09:58:57 +00:00
# Ensure that we'll be able to download the media
helpers.ensure_parent_dir_exists(file)
f = open(file, 'wb')
else:
f = file
2018-10-04 10:09:02 +00:00
# The used client will change if FileMigrateError occurs
client = self
cdn_decrypter = None
input_location = utils.get_input_location(input_location)
2018-10-04 10:09:02 +00:00
download_thread = []
q_request = []
2018-07-05 09:58:57 +00:00
__log__.info('Downloading file in chunks of %d bytes', part_size)
threads_count = 2 + int((self._download_threads_count - 2) * float(file_size) / (1024 * 1024 * 10))
2018-10-04 10:09:02 +00:00
threads_count = min(threads_count, self._download_threads_count)
# threads_count = 1
# threads_count = min(part_count, threads_count)
2018-07-05 09:58:57 +00:00
try:
offset = 0
2018-10-04 10:09:02 +00:00
result = None
try:
request = GetFileRequest(input_location, offset, part_size)
result = client(request)
if isinstance(result, FileCdnRedirect):
__log__.info('File lives in a CDN')
cdn_decrypter, result = CdnDecrypter.prepare_decrypter(client, self._get_cdn_client(result), result)
else:
f.write(result.bytes)
2019-07-10 08:37:24 +00:00
# if callable(getattr(f, 'flush', None)):
# f.flush()
2018-10-04 10:09:02 +00:00
offset += part_size
except FileMigrateError as e:
__log__.info('File lives in another DC')
client = self._get_exported_client(e.new_dc)
# if cdn_decrypter:
# result = cdn_decrypter.get_file()
# if not result.bytes:
# return getattr(result, 'type', '')
# f.write(result.bytes)
__log__.debug('Saved %d more bytes', len(result.bytes))
if progress_callback:
progress_callback(f.tell(), file_size)
# spawn threads
for i in range(threads_count):
q_request.append(Queue())
thread_dl = self.ProcessDownload('thread {0}'.format(i), self, q_request[i])
thread_dl.start()
download_thread.append(thread_dl)
# offset += part_size
2018-07-05 09:58:57 +00:00
while True:
2018-10-04 10:09:02 +00:00
for i in range(threads_count):
if cdn_decrypter:
q_request[i].put(cdn_decrypter)
2018-07-05 09:58:57 +00:00
else:
2018-10-04 10:09:02 +00:00
request = GetFileRequest(input_location, offset, part_size)
q_request[i].put(request)
offset += part_size
for q in q_request:
q.join()
for th in download_thread:
if th.result and th.result.bytes:
f.write(th.result.bytes)
2019-07-10 08:37:24 +00:00
# if callable(getattr(f, 'flush', None)):
# f.flush()
2018-10-04 10:09:02 +00:00
if progress_callback:
progress_callback(f.tell(), file_size)
else:
for i in range(threads_count):
q_request[i].put(None)
for th in download_thread:
th.join()
return getattr(th.result, 'type', '')
2018-09-12 11:52:43 +00:00
finally:
2018-10-04 10:09:02 +00:00
if client != self:
client.disconnect()
if cdn_decrypter:
try:
cdn_decrypter.client.disconnect()
except:
pass
if isinstance(file, str):
2018-07-05 09:58:57 +00:00
f.close()