MsRewards/Flask/app.py

472 lines
15 KiB
Python
Raw Permalink Normal View History

import gevent.monkey
gevent.monkey.patch_all()
from time import sleep
import subprocess
import os
from flask import Flask, Response, redirect, url_for, request, session, abort, render_template, send_from_directory
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from flask_login import LoginManager, UserMixin, login_required, login_user, logout_user
from werkzeug.utils import secure_filename
import json
import re
from requests import get
import redis
APP_ROOT = os.getenv("APP_ROOT")
if APP_ROOT is None:
APP_ROOT = "/app/MsRewards-Reborn/"
NO_SUBPROCESS = os.getenv("NO_SUBPROCESS")
if NO_SUBPROCESS is not None:
2024-04-12 16:20:25 +02:00
def fake_popen(*args, **kwargs):
print("Calling subprocess.Popen with", args, kwargs)
subprocess.Popen = fake_popen
print("Faking subprocess calls")
# redis part for live update
pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
r = redis.Redis(connection_pool=pool)
def get_path(path):
return os.path.join(APP_ROOT, path)
def generate_output():
pubsub = r.pubsub()
pubsub.subscribe('console')
try :
for message in pubsub.listen():
if message['type'] == 'message':
print(message)
yield f"data: {message['data'].decode()}\n\n"
except Exception as e:
print(f"ya eu une erreur sad {e}")
# the end
global password
with open(get_path("user_data/flask.json"), "r") as inFile:
data = json.load(inFile)
password = data["password"]
secret = data["secret"]
if secret == "":
import secrets
secret = secrets.token_hex()
with open(get_path("user_data/flask.json"), "w") as inFile:
data = {
"password": password,
"secret": secret
}
json.dump(data, inFile)
"""
#Automatic start of MsRewards
"""
def daily_command():
subprocess.Popen(["git",'pull'])
subprocess.Popen(["pkill","-9","chrome"])
subprocess.Popen(["pkill","-9","Xvfb"])
subprocess.Popen(["pkill","-9","Xvnc"])
subprocess.Popen(["pkill","-9", "undetected_chromedriver"])
scheduler = BackgroundScheduler()
scheduler.start()
scheduler.add_job( # on relance le job
daily_command, # ---
trigger=CronTrigger(
year="*", month="*", day="*", hour="0", minute="0", second="0"
), # ---
name="Daily refresh", # ---
id="99" # ---
)
def start_ms(i):
print("\033[32m" + f"Starting config {i}" + "\033[0m")
log = open(get_path(f"Flask/static/logs/{i}.txt"), 'a') # so that data written to it will be appended
subprocess.Popen([f"python3 -u {get_path('V6.py')} -c {i}"], stdout=log, stderr=log, shell=True)
log.close()
TriggerDict = {}
def update_jobs():
with open(get_path("user_data/configs.json"), "r") as inFile:
configs = json.load(inFile)
for i in configs:
try :
h, m = configs[i]["time"].split(":")
print("\033[36m" + f"config {i} : {h}:{m}" + "\033[0m")
TriggerDict[i] = CronTrigger(
year="*", month="*", day="*", hour=h, minute=m, second="0"
)
if configs[i]["enabled"]:
try :
scheduler.remove_job(i) # on reset le job
except Exception as e:
print(f"\033[33merror with deleting config {i} : {e}\033[0m")
try :
scheduler.add_job( # on relance le job
start_ms, # ---
trigger=TriggerDict[i], # ---
args=[i], # ---
name="Daily start", # ---
id=i # ---
)
print("\033[36m" + f"successfully created config {i}" + "\033[0m")
except Exception as e:
print(f"\033[33merror with creating config {i} : {e}\033[0m")
else :
try :
scheduler.remove_job(i)
except Exception as e :
print(f"\033[33merror with deleting config {i} : {e}\033[0m")
except Exception as e:
print(e)
"""
#Flask app
"""
app = Flask(__name__)
@app.context_processor
def inject_default_variables():
with open(get_path("version"), "r") as f:
version = f.readline().replace("\n", '')
return dict(version=version)
"""
#Login stuff
"""
# config
app.config["TEMPLATES_AUTO_RELOAD"] = True
app.config.update(
SECRET_KEY = secret
)
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = "login"
# silly user model
class User(UserMixin):
def __init__(self, id):
self.id = id
self.name = "user" + str(id)
self.password = password
def __repr__(self):
return "%d/%s/%s" % (self.id, self.name, self.password)
users = [User(1)]
@app.route('/stream')
def stream():
return Response(generate_output(), content_type='text/event-stream')
@app.route("/login/", methods=["GET", "POST"])
def login():
if request.method == 'POST':
if request.form['password'] == password:
user = User(id)
login_user(user)
if password == "ChangeMe":
return(redirect('/change_password'))
return(redirect('/schedule'))
else:
return abort(401)
else:
return(render_template("login.html"))
@app.route("/change_password/", methods=["GET", "POST"])
@login_required
def change_password():
global password
if request.method == 'POST':
password = request.form["password"]
subprocess.Popen(["grafana-cli", "admin", "reset-admin-password", password])
with open(get_path("user_data/flask.json"), "w") as inFile:
data = {
"password": password,
"secret": secret
}
json.dump(data, inFile)
return(render_template("change_password.html"))
# handle login failed
@app.errorhandler(401)
def unauthorized(e):
return(redirect("/login"))
# callback to reload the user object
@login_manager.user_loader
def load_user(userid):
return User(userid)
"""
#end of login stuff
"""
@app.route("/")
def main():
with open(get_path("user_data/configs.json"), "r") as inFile:
configs = json.load(inFile)
return(render_template("schedule.html", data=configs))
@app.route("/discord/")
def discord_get():
with open(get_path("user_data/discord.json"), "r") as inFile:
data = json.load(inFile)
return(render_template("discord.html", data=data, len=maxi(data)))
@app.route("/discord/", methods=["post"])
def discord_post():
with open(get_path("user_data/discord.json"), "r") as inFile:
data = json.load(inFile)
action = request.form
if action['DISCORD'] == "delete" :
data.pop(action["select"], None)
else :
config = action["select"]
successL = action["successL"]
try :
a = action["successT"]
successT = "True"
except:
successT = "False"
try :
a = action["errorsT"]
errorsT = "True"
except:
errorsT = "False"
errorsL = action["errorsL"]
name = action["name"] if action["name"] else f"unnamed{action['select']}"
data[config] = {"errorsL" : errorsL, "errorsT": errorsT, "successT": successT, "successL": successL, "name": name}
with open(get_path("user_data/discord.json"), "w") as outFile:
json.dump(data, outFile)
return(render_template("discord.html", data=data, len=maxi(data)))
@app.route("/dev/")
def dev2():
return(render_template("dev.html"))
@app.route("/settings/")
def settings_get():
with open(get_path("user_data/settings.json"), "r") as inFile:
settings = json.load(inFile)
return(render_template("settings.html", data=settings))
@app.route("/settings/", methods=["post"])
def settings_post():
settings = {}
action = request.form
settings['avatarlink'] = action["avatarlink"]
with open(get_path("user_data/settings.json"), "w") as inFile:
json.dump(settings, inFile)
return(render_template("settings.html", data=settings))
@app.route("/proxy/")
def proxy_get():
with open(get_path("user_data/proxy.json"), "r") as inFile:
j = json.load(inFile)
return(render_template("proxy.html", data=j, len=maxi(j)))
@app.route("/proxy/", methods=["post"])
def proxy_post():
with open(get_path("user_data/proxy.json"), "r") as inFile:
data = json.load(inFile)
action = request.form
print(action)
if action['PROXY'] == "delete" :
print(action)
data.pop(action["select"], None)
else :
try :
config = action["select"]
address = action["address"]
port = action["port"]
name = action["name"] if action["name"] else f"@unnamed{action['select']}"
data[config] = {"address" : address, "port": port, "name": name}
except :
print("error : probably bad config")
with open(get_path("user_data/proxy.json"), "w") as outFile:
json.dump(data, outFile)
return(render_template("proxy.html", data=data, len=maxi(data)))
@app.route("/schedule/")
def schedule_get():
with open(get_path("user_data/configs.json"), "r") as inFile:
configs = json.load(inFile)
return(render_template("schedule.html", data=configs))
@app.route("/schedule/", methods=["post"])
def schedule_post():
with open(get_path("user_data/configs.json"), "r") as inFile:
configs = json.load(inFile)
data = dict(request.form)
for i in configs:
try :
data[f'switch{i}']
except :
data[f'switch{i}'] = "off"
for i in configs:
configs[i]["time"] = data[f"time{i}"]
configs[i]["enabled"] = data[f"switch{i}"] == "on"
with open(get_path("user_data/configs.json"), "w") as inFile:
json.dump(configs, inFile)
update_jobs()
return(render_template("schedule.html", data=configs))
@app.route("/config/")
def config_get():
with open(get_path("user_data/proxy.json"), "r") as inFile:
proxys = json.load(inFile)
with open(get_path("user_data/discord.json"), "r") as inFile:
discords = json.load(inFile)
with open(get_path("user_data/configs.json"), "r") as inFile:
configs = json.load(inFile)
return(render_template("config.html", data=configs, discords=discords, proxys=proxys, configs=configs, len=maxi(configs)))
@app.route("/config/", methods=["POST"])
def config_post():
action = request.form
with open(get_path("user_data/proxy.json"), "r") as inFile:
proxys = json.load(inFile)
with open(get_path("user_data/discord.json"), "r") as inFile:
discords = json.load(inFile)
with open(get_path("user_data/configs.json"), "r") as inFile:
configs = json.load(inFile)
if action["data"] == "delete":
print(action["config"])
configs.pop(action["config"])
else :
comptes = {
"1":{"mail": action["mail1"], "pwd": action["pwd1"], "2fa": action["2fa1"]},
"2":{"mail": action["mail2"], "pwd": action["pwd2"], "2fa": action["2fa2"]},
"3":{"mail": action["mail3"], "pwd": action["pwd3"], "2fa": action["2fa3"]},
"4":{"mail": action["mail4"], "pwd": action["pwd4"], "2fa": action["2fa4"]},
"5":{"mail": action["mail5"], "pwd": action["pwd5"], "2fa": action["2fa5"]}
}
configs[action["config"]] = {
"name" : action["name"] if action["name"] != "" else f"unnamed{action['config']}",
"proxy": action["proxy"],
"discord": action["discord"],
"time":"",
"enabled":"False",
"accounts": comptes
}
with open(get_path("user_data/configs.json"), "w") as outFile:
json.dump(configs, outFile)
return(render_template("config.html", data=configs, discords=discords, proxys=proxys, configs=configs, len=maxi(configs)))
@app.route("/logs/", methods=["GET", "POST"])
def logs():
with open(get_path("user_data/configs.json"), "r") as inFile:
configs = json.load(inFile)
files = [(configs[i]["name"], i) for i in configs]
config_files = [i[1] for i in files]
for f in os.listdir(get_path("Flask/static/logs")):
fid = ".".join(f.split(".")[:-1]) # filename without .txt
if f != ".gitignore" and fid not in config_files:
files.append((f, fid))
return render_template(
"logs.html",
files=files
)
@app.route("/stats/", methods=["GET", "POST"])
def stats():
return(render_template("stats.html"))
@app.route("/override/", methods=["POST"])
def override_post():
json = request.form.to_dict(flat=False)
log = open(get_path("Flask/static/logs/custom.txt"), 'w') # so that data written to it will be appended
subprocess.Popen([f"python3 -u {get_path('V6.py')} -c {json['config'][0]} --json \"{json}\""], stdout=log, stderr=log, shell=True)
log.close()
return(render_template("vnc_post.html"))
@app.route("/override/", methods=["GET"])
def override_get():
with open(get_path("user_data/configs.json"), "r") as inFile:
configs = json.load(inFile)
return(render_template("vnc_get.html", configs=configs))
@app.route('/download/<path:filename>', methods=['GET', 'POST'])
@login_required
def download(filename):
return send_from_directory(directory=get_path("user_data/"), path=filename, as_attachment=True)
def allowed_file(filename):
ALLOWED_EXTENSIONS = ["json"]
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
@app.route('/upload_file/', methods=['POST'])
@login_required
def upload_file():
print(request.files)
i = 1
while f'file{i}' in request.files :
file = request.files[f'file{i}']
if file.filename == '':
print('end of files')
return redirect(url_for('settings_get'))
elif file and allowed_file(file.filename):
filename = secure_filename(file.filename)
print(os.path.join(get_path("user_data/"), filename))
file.save(os.path.join(get_path("user_data/"), filename))
i += 1
print(i)
print(f'file{i}' in request.files)
print("requete bizarre")
return redirect(url_for('settings_get'))
def maxi(dict):
m = 0
for i in dict :
if int(i) >= m:
m = int(i)
return(m+1)
update_jobs()
subprocess.Popen(["bash", get_path("config/request.sh")])
if __name__ == "__main__":
app.run()