1
0
Fork 0
mirror of https://github.com/SlavikMIPT/tgcloud.git synced 2025-02-12 11:12:09 +00:00

#version 1.1

-improved loading process
-pipe is used for interprocess communication
-readme update
-session creation is in a separate file to avoid bans/floodwaits
-refactoring
This commit is contained in:
Вячеслав Баженов 2019-07-08 08:01:07 +03:00
parent 9d619b074d
commit d67499a8b6
11 changed files with 183 additions and 222 deletions

4
.gitignore vendored
View file

@ -1,2 +1,6 @@
.idea .idea
/tg_access.py /tg_access.py
__pycache__
*.session
/secret.py.bk.py
/test.py

View file

@ -1,38 +1,54 @@
# tgcloud # tgcloud
## UNDER DEVELOPMENT ## UNDER DEVELOPMENT v1.1
## Opensourсe Virtual Filesystem for Telegram - `secret.py` : вставить `api_hash` и `api_id` полученные с https://my.telegram.org
Synchronizes and structures files downloaded to Telegram.
- Stores only metadata, accessing raw data only when loading files. - Установить Python2.7 и Python3.6
- Loading speed is up to 240Mbit/s per session
- Multiplatform: provides standard volumes which can be mounted on linux/win/mac... - Скачать исходный код
- Opensource ```
### Project structure: cd ~
**tgcloud:** linux based docker container git clone https://github.com/SlavikMIPT/tgcloud.git
* **redis** - updates, rpc, communication ```
* **tfs:** FUSE based VFS module - Установить зависимости
* [python-fuse](https://github.com/SlavikMIPT/tfs) - interface to linux kernel FS
* redis storage - FS struct, meta, telegram file_id,settings `sudo pip3 install -r requirements.txt`
* rq communication interface - Создать сессию запустив **из папки с проектом**
* docker
* **file_telegram_rxtx** - telegram read/write driver `python3.6 telegram_create_session.py`
* [telethon(sync)](https://github.com/SlavikMIPT/Telethon) by [@Lonami](https://github.com/Lonami) - telegram access, multithreaded downloading/uploading
* improved and tested by [@SlavikMIPT](https://github.com/SlavikMIPT) - load speed 240Mb/s - Установить fuse bindings
* rq communication interface
* docker `sudo yum install python-fuse`
* **polling daemon**
* [telethon(asyncio)](https://github.com/SlavikMIPT/Telethon) - updates from telegram, synchronization, hashtags - Создать папку для монтирования
* rq communication interface
* docker `mkdir storage`
* **client**
* telegram authorization interface - Запустить VFS **из папки с проектом**:
* [filebrowser](https://github.com/SlavikMIPT/filebrowser) - opensource golang filebrowser
* windows service с отладкой
* telegram desktop client with filebrowser
* settings, statistics, monitoring... `python2.7 dedupfs/dedupfs.py -df --block-size 20971520 -o auto_unmount -o hard_remove storage/`
* rq communication interface
* docker в фоне
![Diagram](/img/ProjectDiagram.png)
отредактировать `<username>` в `tgcloud.service`
```
sudo cp tgcloud.service /ect/systemd/system/
sudo systemctl enable tgcloud.service
sudo systemctl daemon-reload
sudo systemctl start tgcloud.service
sudo systemctl status tgcloud.service -l
```
Версия 1.1
Работает пободрее, но все еще сырой прототип - может падать.
Для тестов лучше использовать отдельный профиль.
Если забанят - пишите `recover@telegram.org` - разбанят
You are welcome to collaborate - contact You are welcome to collaborate - contact
Telegram: [@SlavikMIPT](t.me/SlavikMIPT) Telegram: [@SlavikMIPT](t.me/SlavikMIPT)
Channel: [@MediaTube_stream](t.me/MediaTube_stream) Channel: [@MediaTube_stream](t.me/MediaTube_stream)

View file

@ -24,8 +24,6 @@ Copyright 2010 Peter Odding <peter@peterodding.com>.
# Check the Python version, warn the user if untested. # Check the Python version, warn the user if untested.
import sys import sys
import subprocess
import os
flagone = True flagone = True
if sys.version_info[:2] != (2, 6): if sys.version_info[:2] != (2, 6):
@ -63,9 +61,8 @@ except ImportError:
from my_formats import format_size, format_timespan from my_formats import format_size, format_timespan
from get_memory_usage import get_memory_usage from get_memory_usage import get_memory_usage
from subprocess import Popen, PIPE from subprocess import Popen
# chat_id = 709766994
def main(): # {{{1 def main(): # {{{1
""" """
This function enables using dedupfs.py as a shell script that creates FUSE This function enables using dedupfs.py as a shell script that creates FUSE
@ -796,7 +793,7 @@ class DedupFS(fuse.Fuse): # {{{1
buf.seek(self.block_size * block_nr, os.SEEK_SET) buf.seek(self.block_size * block_nr, os.SEEK_SET)
new_block = buf.read(self.block_size) new_block = buf.read(self.block_size)
digest = self.__hash(new_block) digest = self.__hash(new_block)
encoded_digest = digest.encode('hex')#sqlite3.Binary(digest) encoded_digest = digest.encode('hex') # sqlite3.Binary(digest)
row = self.conn.execute('SELECT id FROM hashes WHERE hash = ?', (encoded_digest,)).fetchone() row = self.conn.execute('SELECT id FROM hashes WHERE hash = ?', (encoded_digest,)).fetchone()
if row: if row:
hash_id = row[0] hash_id = row[0]
@ -820,10 +817,17 @@ class DedupFS(fuse.Fuse): # {{{1
self.conn.execute('INSERT INTO "index" (inode, hash_id, block_nr) VALUES (?, ?, ?)', self.conn.execute('INSERT INTO "index" (inode, hash_id, block_nr) VALUES (?, ?, ?)',
(inode, hash_id, block_nr)) (inode, hash_id, block_nr))
else: else:
process = Popen(["python3.6", "download_service.py", "upload", digest.encode('hex')], stdin=PIPE,
bufsize=-1) FIFO_PIPE = str('pipe_' + digest.encode('hex'))
process.stdin.write(self.compress(new_block)) try:
process.stdin.close() os.mkfifo(FIFO_PIPE)
except OSError as oe:
if oe.errno != errno.EEXIST:
raise
process = Popen(["python3.6", "download_service.py", "upload", digest.encode('hex')], shell=False)
with open(FIFO_PIPE, 'wb') as pipe:
os.unlink(FIFO_PIPE)
pipe.write(self.compress(new_block))
process.wait() process.wait()
# self.blocks[digest] = self.compress(new_block) # self.blocks[digest] = self.compress(new_block)
@ -1169,13 +1173,17 @@ class DedupFS(fuse.Fuse): # {{{1
self.conn.rollback() self.conn.rollback()
def __get_block_from_telegram(self, digest): def __get_block_from_telegram(self, digest):
buf = tempfile.NamedTemporaryFile() FIFO_PIPE = str('pipe_' + str(digest))
process = Popen(["python3.6", "download_service.py", "download", str(digest)], stdout=buf, try:
bufsize=-1, shell=False) os.mkfifo(FIFO_PIPE)
except OSError as oe:
if oe.errno != errno.EEXIST:
raise
process = Popen(["python3.6", "download_service.py", "download", str(digest)], shell=False)
with open(FIFO_PIPE, 'rb') as pipe:
# os.unlink(FIFO_PIPE)
block = pipe.read()
process.wait() process.wait()
buf.seek(0)
block = buf.read()
buf.close()
return block return block
def __get_file_buffer(self, path): # {{{3 def __get_file_buffer(self, path): # {{{3

View file

@ -1,4 +1,4 @@
#!/usr/bin/python #!/usr/bin/python3
""" """
The function in this Python module determines the current memory usage of the The function in this Python module determines the current memory usage of the

View file

@ -3,72 +3,30 @@
from __future__ import print_function from __future__ import print_function
from __future__ import unicode_literals from __future__ import unicode_literals
import os import os
import shutil
import time import time
import tempfile
import mimetypes
from telethon.tl.types import DocumentAttributeFilename from telethon.tl.types import DocumentAttributeFilename
from telethon.tl.types import Document
from telethon.utils import get_input_media
from telethon.errors.rpc_error_list import LocationInvalidError
# from telegram_client_x import TelegramClientX
from telethon.telegram_client import TelegramClient from telethon.telegram_client import TelegramClient
from telethon.tl.types import Message from telegram_client_x import TelegramClientX
from tg_access import * from secret import *
from io import BytesIO
import sys
class Buffer: # {{{1
"""
This class wraps cStringIO.StringIO with two additions: The __len__
method and a dirty flag to determine whether a buffer has changed.
"""
def __init__(self):
self.buf = BytesIO()
self.dirty = False
def __getattr__(self, attr, default=None):
""" Delegate to the StringIO object. """
return getattr(self.buf, attr, default)
def __len__(self):
""" Get the total size of the buffer in bytes. """
position = self.buf.tell()
self.buf.seek(0, os.SEEK_END)
length = self.buf.tell()
self.buf.seek(position, os.SEEK_SET)
return length
def truncate(self, *args):
""" Truncate the file at the current position and set the dirty flag. """
if len(self) > self.buf.tell():
self.dirty = True
return self.buf.truncate(*args)
def write(self, *args):
""" Write a string to the file and set the dirty flag. """
self.dirty = True
return self.buf.write(*args)
path_home = './' # os.path.abspath('.') path_home = './' # os.path.abspath('.')
path_local = './local' client = TelegramClientX(entity, api_id, api_hash, update_workers=None, spawn_read_thread=True)
# client = TelegramClientX(entity, api_id, api_hash, update_workers=None, spawn_read_thread=True) # client = TelegramClient(entity, api_id, api_hash, update_workers=None, spawn_read_thread=True)
client = TelegramClient(entity, api_id, api_hash, update_workers=None, spawn_read_thread=True) client.set_upload_threads_count(24) # 24
client.set_download_threads_count(24) # 8
# client.set_upload_threads_count(24)#24
# client.set_download_threads_count(8)#8
last_call_time_sent = time.time() last_call_time_sent = time.time()
last_call_time_receive = time.time() last_call_time_receive = time.time()
client.connect() client.connect()
if not client.is_user_authorized(): if not client.is_user_authorized():
phone = input('Enter phone: ') raise Exception("Telegram session not found - run from the project folder: python3.6 telegram_create_session.py")
client.send_code_request(phone)
client.sign_in(phone, input('Enter code: '))
def on_download_progress(recv_bytes, total_bytes): def on_download_progress(recv_bytes, total_bytes):
global last_call_time_receive global last_call_time_receive
if time.time() - last_call_time_receive < 1: if time.time() - last_call_time_receive < 1:
@ -77,6 +35,7 @@ def on_download_progress(recv_bytes, total_bytes):
# print(f"receive {recv_bytes}/{total_bytes}", end="\r") # print(f"receive {recv_bytes}/{total_bytes}", end="\r")
return 0 return 0
def on_upload_progress(send_bytes, total_bytes): def on_upload_progress(send_bytes, total_bytes):
global last_call_time_sent global last_call_time_sent
if time.time() - last_call_time_sent < 1: if time.time() - last_call_time_sent < 1:
@ -85,42 +44,56 @@ def on_upload_progress(send_bytes, total_bytes):
# print(f"sent {send_bytes}/{total_bytes}", end="\r") # print(f"sent {send_bytes}/{total_bytes}", end="\r")
return 0 return 0
# #
def download_block(hash_uid,chat_id=None): def download_block(hash_uid):
try: try:
hash_uid = str(hash_uid) hash_uid = str(hash_uid)
os.chdir(path_home) os.chdir(path_home)
entity = client.get_entity(client.get_me()) entity = client.get_entity(client.get_me())
messages = client.get_messages(entity, limit=40,search=hash_uid) messages = client.get_messages(entity, limit=1, search=hash_uid)
for i in range(len(messages)): for i in range(len(messages)):
msg = messages[i] msg = messages[i]
if msg.message == hash_uid: if msg.message == hash_uid:
outbuf = tempfile.NamedTemporaryFile() FIFO = f"pipe_{hash_uid}"
client.download_media(msg, file=outbuf, progress_callback=on_download_progress) import errno
outbuf.seek(0) try:
sys.stdout.buffer.write(outbuf.read()) os.mkfifo(FIFO)
outbuf.close() except OSError as oe:
if oe.errno != errno.EEXIST:
raise
with open(FIFO, 'wb') as outbuf:
os.unlink(FIFO)
client.download_media(msg, file=outbuf, progress_callback=on_download_progress)
return 0 return 0
except Exception: except Exception as e:
return -1 return -1
finally: finally:
client.disconnect() client.disconnect()
def upload_block(bytesin, hash_uid,chat_id=None): def upload_block(hash_uid):
try: try:
hash_uid = str(hash_uid) hash_uid = str(hash_uid)
os.chdir(path_home) os.chdir(path_home)
entity = client.get_entity(client.get_me()) entity = client.get_entity(client.get_me())
message = client.send_file(entity, FIFO = f"pipe_{hash_uid}"
file=bytesin, import errno
caption=f'{hash_uid}', try:
attributes=[DocumentAttributeFilename(f'{hash_uid}')], os.mkfifo(FIFO)
allow_cache=False, except OSError as oe:
part_size_kb=512, if oe.errno != errno.EEXIST:
force_document=True, raise
progress_callback=on_upload_progress) with open(FIFO, 'rb') as bytesin:
message = client.send_file(entity,
file=bytesin,
caption=f'{hash_uid}',
attributes=[DocumentAttributeFilename(f'{hash_uid}')],
allow_cache=False,
part_size_kb=512,
force_document=True,
progress_callback=on_upload_progress)
# message.id # message.id
return 0 return 0
except Exception: except Exception:
@ -137,9 +110,8 @@ def main(argv):
download_block(hash_uid=uid) download_block(hash_uid=uid)
return 0 return 0
elif service == 'upload': elif service == 'upload':
data = sys.stdin.buffer.read()
uid = str(argv[2]) uid = str(argv[2])
upload_block(bytesin=data, hash_uid=uid) upload_block(hash_uid=uid)
return 0 return 0
except Exception as e: except Exception as e:
@ -153,4 +125,4 @@ def main(argv):
if __name__ == '__main__': if __name__ == '__main__':
import sys import sys
main(sys.argv[0:]) main(sys.argv[0:])

4
secret.py Normal file
View file

@ -0,0 +1,4 @@
#paste your api_hash and api_id from my.telegram.org
api_hash = "xxxxxxxxxxxxxxxxxxxxxxx"
api_id = 123456
entity = "tgcloud"

View file

@ -1,12 +1,12 @@
import hashlib import hashlib
import io
import logging import logging
import os import os
import random
import time
from io import BytesIO from io import BytesIO
from telethon.crypto import CdnDecrypter from telethon.crypto import CdnDecrypter
from telethon.errors import (
FileMigrateError
)
from telethon.tl.custom import InputSizedFile from telethon.tl.custom import InputSizedFile
from telethon.tl.functions.upload import ( from telethon.tl.functions.upload import (
SaveBigFilePartRequest, SaveFilePartRequest, GetFileRequest SaveBigFilePartRequest, SaveFilePartRequest, GetFileRequest
@ -24,14 +24,16 @@ try:
import hachoir.parser import hachoir.parser
except ImportError: except ImportError:
hachoir = None hachoir = None
from telethon import helpers, utils from telethon import helpers, utils
from telethon.errors import FileMigrateError
from telethon.tl.types import (InputFile, InputFileBig) from telethon.tl.types import (InputFile, InputFileBig)
from telethon.extensions import markdown
__log__ = logging.getLogger(__name__) __log__ = logging.getLogger(__name__)
from telethon import TelegramClient from telethon import TelegramClient
from threading import Thread from threading import Thread
import random
import time
from queue import Queue from queue import Queue
from telethon.network import ConnectionTcpFull from telethon.network import ConnectionTcpFull
from datetime import timedelta from datetime import timedelta
@ -62,10 +64,14 @@ class TelegramClientX(TelegramClient):
self._event_builders = [] self._event_builders = []
self._events_pending_resolve = [] self._events_pending_resolve = []
# Default parse mode
self._parse_mode = markdown
# Some fields to easy signing in. Let {phone: hash} be # Some fields to easy signing in. Let {phone: hash} be
# a dictionary because the user may change their mind. # a dictionary because the user may change their mind.
self._phone_code_hash = {} self._phone_code_hash = {}
self._phone = None self._phone = None
self._tos = None
self._session_name = session self._session_name = session
self._upload_threads_count = 8 self._upload_threads_count = 8
self._download_threads_count = 8 self._download_threads_count = 8
@ -83,7 +89,7 @@ class TelegramClientX(TelegramClient):
def run(self): def run(self):
# print('Thread %s started' % self.name) # print('Thread %s started' % self.name)
# time.sleep(random.randrange(200, 2000, 10) * 0.001) time.sleep(random.randrange(20, 200, 10) * 0.001)
if not self.client.is_connected(): if not self.client.is_connected():
self.client.connect() self.client.connect()
while True: while True:
@ -211,7 +217,7 @@ class TelegramClientX(TelegramClient):
file_size, part_count, part_size) file_size, part_count, part_size)
with open(file, 'rb') if isinstance(file, str) else BytesIO(file) as stream: with open(file, 'rb') if isinstance(file, str) else BytesIO(file) as stream:
threads_count = 2 + int((self._upload_threads_count - 2) * float(file_size) / (1024 * 1024 * 100)) threads_count = 2 + int((self._upload_threads_count - 2) * float(file_size) / (1024 * 1024 * 10))
threads_count = min(threads_count, self._upload_threads_count) threads_count = min(threads_count, self._upload_threads_count)
threads_count = min(part_count, threads_count) threads_count = min(part_count, threads_count)
upload_thread = [] upload_thread = []
@ -269,7 +275,7 @@ class TelegramClientX(TelegramClient):
def run(self): def run(self):
# print('Thread %s started' % self.name) # print('Thread %s started' % self.name)
# time.sleep(random.randrange(200, 2000, 10) * 0.001) time.sleep(random.randrange(20, 200, 10) * 0.001)
if not self.client.is_connected(): if not self.client.is_connected():
self.client.connect() self.client.connect()
while True: while True:
@ -277,7 +283,6 @@ class TelegramClientX(TelegramClient):
if request is None: if request is None:
break break
self.result = None self.result = None
# time.sleep(random.randrange(20, 100, 1) * 0.001)
if isinstance(request, CdnDecrypter): if isinstance(request, CdnDecrypter):
self.result = request.get_file() self.result = request.get_file()
else: else:
@ -337,7 +342,10 @@ class TelegramClientX(TelegramClient):
raise ValueError( raise ValueError(
'The part size must be evenly divisible by 4096.') 'The part size must be evenly divisible by 4096.')
if isinstance(file, str): in_memory = file is None
if in_memory:
f = io.BytesIO()
elif isinstance(file, str):
# Ensure that we'll be able to download the media # Ensure that we'll be able to download the media
helpers.ensure_parent_dir_exists(file) helpers.ensure_parent_dir_exists(file)
f = open(file, 'wb') f = open(file, 'wb')
@ -347,12 +355,12 @@ class TelegramClientX(TelegramClient):
# The used client will change if FileMigrateError occurs # The used client will change if FileMigrateError occurs
client = self client = self
cdn_decrypter = None cdn_decrypter = None
input_location = utils.get_input_loction(input_location) input_location = utils.get_input_location(input_location)
download_thread = [] download_thread = []
q_request = [] q_request = []
__log__.info('Downloading file in chunks of %d bytes', part_size) __log__.info('Downloading file in chunks of %d bytes', part_size)
threads_count = 2 + int((self._download_threads_count - 2) * float(file_size) / (1024 * 1024 * 100)) threads_count = 2 + int((self._download_threads_count - 2) * float(file_size) / (1024 * 1024 * 10))
threads_count = min(threads_count, self._download_threads_count) threads_count = min(threads_count, self._download_threads_count)
# threads_count = 1 # threads_count = 1
# threads_count = min(part_count, threads_count) # threads_count = min(part_count, threads_count)

View file

@ -0,0 +1,22 @@
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
from __future__ import print_function
from __future__ import unicode_literals
from telethon.telegram_client import TelegramClient
from telegram_client_x import TelegramClientX
from secret import *
path_home = './' # os.path.abspath('.')
client = TelegramClientX(entity, api_id, api_hash, update_workers=None, spawn_read_thread=True)
# client = TelegramClient(entity, api_id, api_hash, update_workers=None, spawn_read_thread=True)
client.set_upload_threads_count(24) # 24
client.set_download_threads_count(24) # 8
client.connect()
if not client.is_user_authorized():
client.start()
client.disconnect()

81
test.py
View file

@ -1,81 +0,0 @@
#!/usr/bin/python3
import shelve
import os
from subprocess import Popen, PIPE
try:
from StringIO import StringIO
except ImportError:
from io import StringIO
from io import BytesIO
from download_service import download_block, upload_block
import subprocess
import tempfile
class Buffer: # {{{1
"""
This class wraps cStringIO.StringIO with two additions: The __len__
method and a dirty flag to determine whether a buffer has changed.
"""
def __init__(self):
self.buf = BytesIO()
self.dirty = False
def __getattr__(self, attr, default=None):
""" Delegate to the StringIO object. """
return getattr(self.buf, attr, default)
def __len__(self):
""" Get the total size of the buffer in bytes. """
position = self.buf.tell()
self.buf.seek(0, os.SEEK_END)
length = self.buf.tell()
self.buf.seek(position, os.SEEK_SET)
return length
def truncate(self, *args):
""" Truncate the file at the current position and set the dirty flag. """
if len(self) > self.buf.tell():
self.dirty = True
return self.buf.truncate(*args)
def write(self, *args):
""" Write a string to the file and set the dirty flag. """
self.dirty = True
return self.buf.write(*args)
def get_block_from_telegram(chat_id, digest):
buf = tempfile.NamedTemporaryFile()
process = Popen(["python3.6", "download_service.py", "download", str(chat_id), str(digest)], stdout=buf,shell=False)
process.wait()
buf.seek(0)
block = buf.read()
buf.close()
return block
chat_id = 123
user_id = 123
file_id = 'a4ddb160d8a42a9379d6fbbd0cb72ff11efe9cb5'
# #upload
# buf = Buffer()
# with open('test.mp4','rb') as fp:
# buf = fp.read()
# import hashlib
#
# hexdigest = hashlib.sha1(buf).hexdigest()
# process = Popen(["python3.6", "download_service.py", "upload", str(chat_id), str(hexdigest)], stdin=PIPE, bufsize=-1)
# process.stdin.write(buf)
# process.stdin.close()
# process.wait()
# #download
block = get_block_from_telegram(chat_id, file_id)
outfile = open('tempfile_read2.mp3','wb')
outfile.write(block)
outfile.close()

View file

@ -1,4 +0,0 @@
api_hash = ""
phone = ""
entity = ""
api_id = 0

12
tgcloud.service Normal file
View file

@ -0,0 +1,12 @@
[Unit]
Description=Open-source Telegram VFS
[Service]
WorkingDirectory=/home/<username>/tgcloud
#ExecStart=/bin/python2.7 dedupfs/dedupfs.py -f --block-size 20971520 -o auto_unmount -o direct_io -o no_splice_write -o no_splice_read -o no_splice_move storage/
ExecStart=/bin/python2.7 dedupfs/dedupfs.py -f --block-size 20971520 -o hard_remove -o auto_unmount storage/
Restart=always
RestartSec=10
SyslogIdentifier=tgcloud-dedupfs-fuse
User=<username>
[Install]
WantedBy=multi-user.target