Removed old scripts
parent
3bfaedfb52
commit
7aabffead4
@ -1,64 +0,0 @@
|
||||
#!/bin/bash
|
||||
# abc - A Binary Comparison tool
|
||||
# Written by mirrorsedgefan (mef)
|
||||
|
||||
function oct2asc() {
|
||||
if [[ $1 ]]; then
|
||||
echo -en "\0$(printf %o $((8#$1)))"
|
||||
fi
|
||||
}
|
||||
|
||||
function oct2dec() {
|
||||
if [[ $1 ]]; then
|
||||
echo $((8#$1))
|
||||
fi
|
||||
}
|
||||
|
||||
|
||||
IFS=$'\n'
|
||||
list=$( cmp -l $1 $2 )
|
||||
for i in $list
|
||||
do
|
||||
lval=$val
|
||||
val=$( echo $i | awk '{ print $1 }' )
|
||||
if [[ $lval != "" ]]; then
|
||||
if [[ $val != $(( lval + 1 )) ]]; then
|
||||
echo
|
||||
fi
|
||||
fi
|
||||
data=$( echo $i | awk '{ print $2, $3 }' )
|
||||
i="$(( val - 1 )) $data"
|
||||
echo $i | awk 'function oct2dec(oct, dec) {
|
||||
for (i = 1; i <= length(oct); i++) {
|
||||
dec *= 8;
|
||||
dec += substr(oct, i, 1)
|
||||
};
|
||||
return dec
|
||||
}
|
||||
|
||||
{
|
||||
printf "%08X %02X %02X ", $1, oct2dec($2), oct2dec($3)
|
||||
}'
|
||||
|
||||
val1=$( echo $i | awk '{ print $2 }' )
|
||||
val2=$( echo $i | awk '{ print $3 }' )
|
||||
|
||||
val1dec=$( oct2dec $val1 )
|
||||
val2dec=$( oct2dec $val2 )
|
||||
|
||||
if [[ $val1dec -ge 32 && $val1dec -le 126 ]]
|
||||
then
|
||||
oct2asc $val1
|
||||
else
|
||||
printf " "
|
||||
fi
|
||||
|
||||
if [[ $val2dec -ge 32 && $val2dec -le 126 ]]
|
||||
then
|
||||
printf " "
|
||||
oct2asc $val2
|
||||
fi
|
||||
|
||||
echo
|
||||
|
||||
done
|
||||
@ -1,5 +0,0 @@
|
||||
#!/bin/perl
|
||||
use Chatbot::Eliza
|
||||
|
||||
$bot = new Chatbot::Eliza;
|
||||
$bot->command_interface;
|
||||
@ -1,90 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
# matrix_decrypt - Download and decrypt an encrypted attachment
|
||||
# from a matrix server
|
||||
|
||||
# Copyright © 2019 Damir Jelić <poljar@termina.org.uk>
|
||||
#
|
||||
# Permission to use, copy, modify, and/or distribute this software for
|
||||
# any purpose with or without fee is hereby granted, provided that the
|
||||
# above copyright notice and this permission notice appear in all copies.
|
||||
#
|
||||
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
|
||||
# SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
|
||||
# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
|
||||
# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
|
||||
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
|
||||
import argparse
|
||||
import requests
|
||||
import tempfile
|
||||
import subprocess
|
||||
|
||||
from urllib.parse import urlparse, parse_qs
|
||||
from nio.crypto import decrypt_attachment
|
||||
|
||||
|
||||
def save_file(data):
|
||||
"""Save data to a temporary file and return its name."""
|
||||
tmp_dir = tempfile.gettempdir()
|
||||
|
||||
with tempfile.NamedTemporaryFile(
|
||||
prefix='plumber-',
|
||||
dir=tmp_dir,
|
||||
delete=False
|
||||
) as f:
|
||||
f.write(data)
|
||||
f.flush()
|
||||
return f.name
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description='Download and decrypt matrix attachments'
|
||||
)
|
||||
parser.add_argument('url', help='the url of the attachment')
|
||||
parser.add_argument('file', nargs='?', help='save attachment to <file>')
|
||||
parser.add_argument('--plumber',
|
||||
help='program that gets called with the '
|
||||
'dowloaded file')
|
||||
|
||||
args = parser.parse_args()
|
||||
url = urlparse(args.url)
|
||||
query = parse_qs(url.query)
|
||||
|
||||
if not query["key"] or not query["iv"] or not query["hash"]:
|
||||
print("Missing decryption argument")
|
||||
return -1
|
||||
|
||||
key = query["key"][0]
|
||||
iv = query["iv"][0]
|
||||
hash = query["hash"][0]
|
||||
|
||||
http_url = "https://{}{}".format(url.netloc, url.path)
|
||||
|
||||
request = requests.get(http_url)
|
||||
|
||||
if not request.ok:
|
||||
print("Error downloading file")
|
||||
return -2
|
||||
|
||||
plumber = args.plumber
|
||||
plaintext = decrypt_attachment(request.content, key, hash, iv)
|
||||
|
||||
if args.file is None:
|
||||
file_name = save_file(plaintext)
|
||||
if plumber is None:
|
||||
plumber = "xdg-open"
|
||||
else:
|
||||
file_name = args.file
|
||||
open(file_name, "wb").write(plaintext)
|
||||
|
||||
if plumber is not None:
|
||||
subprocess.run([plumber, file_name])
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@ -1,318 +0,0 @@
|
||||
#!/usr/bin/env -S python3 -u
|
||||
# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
|
||||
#
|
||||
# Permission to use, copy, modify, and/or distribute this software for
|
||||
# any purpose with or without fee is hereby granted, provided that the
|
||||
# above copyright notice and this permission notice appear in all copies.
|
||||
#
|
||||
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
|
||||
# SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
|
||||
# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
|
||||
# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
|
||||
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
|
||||
|
||||
import os
|
||||
import json
|
||||
import magic
|
||||
import requests
|
||||
import argparse
|
||||
from urllib.parse import urlparse
|
||||
from itertools import zip_longest
|
||||
import urllib3
|
||||
|
||||
from nio import Api, UploadResponse, UploadError
|
||||
from nio.crypto import encrypt_attachment
|
||||
|
||||
from json.decoder import JSONDecodeError
|
||||
|
||||
urllib3.disable_warnings()
|
||||
|
||||
|
||||
def to_stdout(message):
|
||||
print(json.dumps(message), flush=True)
|
||||
|
||||
|
||||
def error(e):
|
||||
message = {
|
||||
"type": "status",
|
||||
"status": "error",
|
||||
"message": str(e)
|
||||
}
|
||||
to_stdout(message)
|
||||
os.sys.exit()
|
||||
|
||||
|
||||
def mime_from_file(file):
|
||||
try:
|
||||
t = magic.from_file(file, mime=True)
|
||||
except AttributeError:
|
||||
try:
|
||||
m = magic.open(magic.MIME)
|
||||
m.load()
|
||||
t, _ = m.file(file).split(';')
|
||||
except AttributeError:
|
||||
error('Your \'magic\' module is unsupported. '
|
||||
'Install either https://github.com/ahupp/python-magic '
|
||||
'or https://github.com/file/file/tree/master/python '
|
||||
'(official \'file\' python bindings, available as the '
|
||||
'python-magic package on many distros)')
|
||||
|
||||
raise SystemExit
|
||||
|
||||
return t
|
||||
|
||||
|
||||
class Upload(object):
|
||||
def __init__(self, file, chunksize=1 << 13):
|
||||
self.file = file
|
||||
self.filename = os.path.basename(file)
|
||||
self.chunksize = chunksize
|
||||
self.totalsize = os.path.getsize(file)
|
||||
self.mimetype = mime_from_file(file)
|
||||
self.readsofar = 0
|
||||
|
||||
def send_progress(self):
|
||||
message = {
|
||||
"type": "progress",
|
||||
"data": self.readsofar
|
||||
}
|
||||
to_stdout(message)
|
||||
|
||||
def __iter__(self):
|
||||
with open(self.file, 'rb') as file:
|
||||
while True:
|
||||
data = file.read(self.chunksize)
|
||||
|
||||
if not data:
|
||||
break
|
||||
|
||||
self.readsofar += len(data)
|
||||
self.send_progress()
|
||||
|
||||
yield data
|
||||
|
||||
def __len__(self):
|
||||
return self.totalsize
|
||||
|
||||
|
||||
def chunk_bytes(iterable, n):
|
||||
args = [iter(iterable)] * n
|
||||
return (
|
||||
bytes(
|
||||
(filter(lambda x: x is not None, chunk))
|
||||
) for chunk in zip_longest(*args)
|
||||
)
|
||||
|
||||
|
||||
class EncryptedUpload(Upload):
|
||||
def __init__(self, file, chunksize=1 << 13):
|
||||
super().__init__(file, chunksize)
|
||||
self.source_mimetype = self.mimetype
|
||||
self.mimetype = "application/octet-stream"
|
||||
|
||||
with open(self.filename, "rb") as file:
|
||||
self.ciphertext, self.file_keys = encrypt_attachment(file.read())
|
||||
|
||||
def send_progress(self):
|
||||
message = {
|
||||
"type": "progress",
|
||||
"data": self.readsofar
|
||||
}
|
||||
to_stdout(message)
|
||||
|
||||
def __iter__(self):
|
||||
for chunk in chunk_bytes(self.ciphertext, self.chunksize):
|
||||
self.readsofar += len(chunk)
|
||||
self.send_progress()
|
||||
yield chunk
|
||||
|
||||
def __len__(self):
|
||||
return len(self.ciphertext)
|
||||
|
||||
|
||||
class IterableToFileAdapter(object):
|
||||
def __init__(self, iterable):
|
||||
self.iterator = iter(iterable)
|
||||
self.length = len(iterable)
|
||||
|
||||
def read(self, size=-1):
|
||||
return next(self.iterator, b'')
|
||||
|
||||
def __len__(self):
|
||||
return self.length
|
||||
|
||||
|
||||
def upload_process(args):
|
||||
file_path = os.path.expanduser(args.file)
|
||||
thumbnail = None
|
||||
|
||||
try:
|
||||
if args.encrypt:
|
||||
upload = EncryptedUpload(file_path)
|
||||
|
||||
if upload.source_mimetype.startswith("image"):
|
||||
# TODO create a thumbnail
|
||||
thumbnail = None
|
||||
else:
|
||||
upload = Upload(file_path)
|
||||
|
||||
except (FileNotFoundError, OSError, IOError) as e:
|
||||
error(e)
|
||||
|
||||
try:
|
||||
url = urlparse(args.homeserver)
|
||||
except ValueError as e:
|
||||
error(e)
|
||||
|
||||
upload_url = ("https://{}".format(args.homeserver)
|
||||
if not url.scheme else args.homeserver)
|
||||
_, api_path, _ = Api.upload(args.access_token, upload.filename)
|
||||
upload_url += api_path
|
||||
|
||||
headers = {
|
||||
"Content-type": upload.mimetype,
|
||||
}
|
||||
|
||||
proxies = {}
|
||||
|
||||
if args.proxy_address:
|
||||
user = args.proxy_user or ""
|
||||
|
||||
if args.proxy_password:
|
||||
user += ":{}".format(args.proxy_password)
|
||||
|
||||
if user:
|
||||
user += "@"
|
||||
|
||||
proxies = {
|
||||
"https": "{}://{}{}:{}/".format(
|
||||
args.proxy_type,
|
||||
user,
|
||||
args.proxy_address,
|
||||
args.proxy_port
|
||||
)
|
||||
}
|
||||
|
||||
message = {
|
||||
"type": "status",
|
||||
"status": "started",
|
||||
"total": upload.totalsize,
|
||||
"file_name": upload.filename,
|
||||
}
|
||||
|
||||
if isinstance(upload, EncryptedUpload):
|
||||
message["mimetype"] = upload.source_mimetype
|
||||
else:
|
||||
message["mimetype"] = upload.mimetype
|
||||
|
||||
to_stdout(message)
|
||||
|
||||
session = requests.Session()
|
||||
session.trust_env = False
|
||||
|
||||
try:
|
||||
r = session.post(
|
||||
url=upload_url,
|
||||
auth=None,
|
||||
headers=headers,
|
||||
data=IterableToFileAdapter(upload),
|
||||
verify=(not args.insecure),
|
||||
proxies=proxies
|
||||
)
|
||||
except (requests.exceptions.RequestException, OSError) as e:
|
||||
error(e)
|
||||
|
||||
try:
|
||||
json_response = json.loads(r.content)
|
||||
except JSONDecodeError:
|
||||
error(r.content)
|
||||
|
||||
response = UploadResponse.from_dict(json_response)
|
||||
|
||||
if isinstance(response, UploadError):
|
||||
error(str(response))
|
||||
|
||||
message = {
|
||||
"type": "status",
|
||||
"status": "done",
|
||||
"url": response.content_uri
|
||||
}
|
||||
|
||||
if isinstance(upload, EncryptedUpload):
|
||||
message["file_keys"] = upload.file_keys
|
||||
|
||||
to_stdout(message)
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Encrypt and upload matrix attachments"
|
||||
)
|
||||
parser.add_argument("file", help="the file that will be uploaded")
|
||||
parser.add_argument(
|
||||
"homeserver",
|
||||
type=str,
|
||||
help="the address of the homeserver"
|
||||
)
|
||||
parser.add_argument(
|
||||
"access_token",
|
||||
type=str,
|
||||
help="the access token to use for the upload"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--encrypt",
|
||||
action="store_const",
|
||||
const=True,
|
||||
default=False,
|
||||
help="encrypt the file before uploading it"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--insecure",
|
||||
action="store_const",
|
||||
const=True,
|
||||
default=False,
|
||||
help="disable SSL certificate verification"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--proxy-type",
|
||||
choices=[
|
||||
"http",
|
||||
"socks4",
|
||||
"socks5"
|
||||
],
|
||||
default="http",
|
||||
help="type of the proxy that will be used to establish a connection"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--proxy-address",
|
||||
type=str,
|
||||
help="address of the proxy that will be used to establish a connection"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--proxy-port",
|
||||
type=int,
|
||||
default=8080,
|
||||
help="port of the proxy that will be used to establish a connection"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--proxy-user",
|
||||
type=str,
|
||||
help="user that will be used for authentication on the proxy"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--proxy-password",
|
||||
type=str,
|
||||
help="password that will be used for authentication on the proxy"
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
upload_process(args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@ -1,269 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
#magical's LZSS compressor/decompressor
|
||||
|
||||
import sys
|
||||
from sys import stdin, stdout, stderr, exit
|
||||
from os import SEEK_SET, SEEK_CUR, SEEK_END
|
||||
from errno import EPIPE
|
||||
from struct import pack, unpack
|
||||
|
||||
__all__ = ('decompress', 'decompress_file', 'decompress_bytes',
|
||||
'decompress_overlay', 'DecompressionError')
|
||||
|
||||
class DecompressionError(ValueError):
|
||||
pass
|
||||
|
||||
def bits(byte):
|
||||
return ((byte >> 7) & 1,
|
||||
(byte >> 6) & 1,
|
||||
(byte >> 5) & 1,
|
||||
(byte >> 4) & 1,
|
||||
(byte >> 3) & 1,
|
||||
(byte >> 2) & 1,
|
||||
(byte >> 1) & 1,
|
||||
(byte) & 1)
|
||||
|
||||
def decompress_raw_lzss10(indata, decompressed_size, _overlay=False):
|
||||
"""Decompress LZSS-compressed bytes. Returns a bytearray."""
|
||||
data = bytearray()
|
||||
|
||||
it = iter(indata)
|
||||
|
||||
if _overlay:
|
||||
disp_extra = 3
|
||||
else:
|
||||
disp_extra = 1
|
||||
|
||||
def writebyte(b):
|
||||
data.append(b)
|
||||
def readbyte():
|
||||
return next(it)
|
||||
def readshort():
|
||||
# big-endian
|
||||
a = next(it)
|
||||
b = next(it)
|
||||
return (a << 8) | b
|
||||
def copybyte():
|
||||
data.append(next(it))
|
||||
|
||||
while len(data) < decompressed_size:
|
||||
b = readbyte()
|
||||
flags = bits(b)
|
||||
for flag in flags:
|
||||
if flag == 0:
|
||||
copybyte()
|
||||
elif flag == 1:
|
||||
sh = readshort()
|
||||
count = (sh >> 0xc) + 3
|
||||
disp = (sh & 0xfff) + disp_extra
|
||||
|
||||
for _ in range(count):
|
||||
writebyte(data[-disp])
|
||||
else:
|
||||
raise ValueError(flag)
|
||||
|
||||
if decompressed_size <= len(data):
|
||||
break
|
||||
|
||||
if len(data) != decompressed_size:
|
||||
raise DecompressionError("decompressed size does not match the expected size")
|
||||
|
||||
return data
|
||||
|
||||
def decompress_raw_lzss11(indata, decompressed_size):
|
||||
"""Decompress LZSS-compressed bytes. Returns a bytearray."""
|
||||
data = bytearray()
|
||||
|
||||
it = iter(indata)
|
||||
|
||||
def writebyte(b):
|
||||
data.append(b)
|
||||
def readbyte():
|
||||
return next(it)
|
||||
def copybyte():
|
||||
data.append(next(it))
|
||||
|
||||
while len(data) < decompressed_size:
|
||||
b = readbyte()
|
||||
flags = bits(b)
|
||||
for flag in flags:
|
||||
if flag == 0:
|
||||
copybyte()
|
||||
elif flag == 1:
|
||||
b = readbyte()
|
||||
indicator = b >> 4
|
||||
|
||||
if indicator == 0:
|
||||
# 8 bit count, 12 bit disp
|
||||
# indicator is 0, don't need to mask b
|
||||
count = (b << 4)
|
||||
b = readbyte()
|
||||
count += b >> 4
|
||||
count += 0x11
|
||||
elif indicator == 1:
|
||||
# 16 bit count, 12 bit disp
|
||||
count = ((b & 0xf) << 12) + (readbyte() << 4)
|
||||
b = readbyte()
|
||||
count += b >> 4
|
||||
count += 0x111
|
||||
else:
|
||||
# indicator is count (4 bits), 12 bit disp
|
||||
count = indicator
|
||||
count += 1
|
||||
|
||||
disp = ((b & 0xf) << 8) + readbyte()
|
||||
disp += 1
|
||||
|
||||
try:
|
||||
for _ in range(count):
|
||||
writebyte(data[-disp])
|
||||
except IndexError:
|
||||
raise Exception(count, disp, len(data), sum(1 for x in it) )
|
||||
else:
|
||||
raise ValueError(flag)
|
||||
|
||||
if decompressed_size <= len(data):
|
||||
break
|
||||
|
||||
if len(data) != decompressed_size:
|
||||
raise DecompressionError("decompressed size does not match the expected size")
|
||||
|
||||
return data
|
||||
|
||||
|
||||
def decompress_overlay(f, out):
|
||||
# the compression header is at the end of the file
|
||||
f.seek(-8, SEEK_END)
|
||||
header = f.read(8)
|
||||
|
||||
# decompression goes backwards.
|
||||
# end < here < start
|
||||
|
||||
# end_delta == here - decompression end address
|
||||
# start_delta == decompression start address - here
|
||||
end_delta, start_delta = unpack("<LL", header)
|
||||
|
||||
filelen = f.tell()
|
||||
|
||||
padding = end_delta >> 0x18
|
||||
end_delta &= 0xFFFFFF
|
||||
decompressed_size = start_delta + end_delta
|
||||
|
||||
f.seek(-end_delta, SEEK_END)
|
||||
|
||||
data = bytearray()
|
||||
data.extend(f.read(end_delta - padding))
|
||||
data.reverse()
|
||||
|
||||
#stdout.write(data.tostring())
|
||||
|
||||
uncompressed_data = decompress_raw_lzss10(data, decompressed_size,
|
||||
_overlay=True)
|
||||
uncompressed_data.reverse()
|
||||
|
||||
# first we write up to the portion of the file which was "overwritten" by
|
||||
# the decompressed data, then the decompressed data itself.
|
||||
# i wonder if it's possible for decompression to overtake the compressed
|
||||
# data, so that the decompression code is reading its own output...
|
||||
f.seek(0, SEEK_SET)
|
||||
out.write(f.read(filelen - end_delta))
|
||||
out.write(uncompressed_data)
|
||||
|
||||
def decompress(obj):
|
||||
"""Decompress LZSS-compressed bytes or a file-like object.
|
||||
|
||||
Shells out to decompress_file() or decompress_bytes() depending on
|
||||
whether or not the passed-in object has a 'read' attribute or not.
|
||||
|
||||
Returns a bytearray."""
|
||||
if hasattr(obj, 'read'):
|
||||
return decompress_file(obj)
|
||||
else:
|
||||
return decompress_bytes(obj)
|
||||
|
||||
def decompress_bytes(data):
|
||||
"""Decompress LZSS-compressed bytes. Returns a bytearray."""
|
||||
header = data[:4]
|
||||
if header[0] == 0x10:
|
||||
decompress_raw = decompress_raw_lzss10
|
||||
elif header[0] == 0x11:
|
||||
decompress_raw = decompress_raw_lzss11
|
||||
else:
|
||||
raise DecompressionError("not as lzss-compressed file")
|
||||
|
||||
decompressed_size, = unpack("<L", header[1:] + b'\x00')
|
||||
|
||||
data = data[4:]
|
||||
return decompress_raw(data, decompressed_size)
|
||||
|
||||
def decompress_file(f):
|
||||
"""Decompress an LZSS-compressed file. Returns a bytearray.
|
||||
|
||||
This isn't any more efficient than decompress_bytes, as it reads
|
||||
the entire file into memory. It is offered as a convenience.
|
||||
"""
|
||||
header = f.read(4)
|
||||
if header[0] == 0x10:
|
||||
decompress_raw = decompress_raw_lzss10
|
||||
elif header[0] == 0x11:
|
||||
decompress_raw = decompress_raw_lzss11
|
||||
else:
|
||||
raise DecompressionError("not as lzss-compressed file")
|
||||
|
||||
decompressed_size, = unpack("<L", header[1:] + b'\x00')
|
||||
|
||||
data = f.read()
|
||||
return decompress_raw(data, decompressed_size)
|
||||
|
||||
def main(args=None):
|
||||
if args is None:
|
||||
args = sys.argv[1:]
|
||||
|
||||
if '--overlay' in args:
|
||||
args.remove('--overlay')
|
||||
overlay = True
|
||||
else:
|
||||
overlay = False
|
||||
|
||||
if len(args) < 1 or args[0] == '-':
|
||||
if overlay:
|
||||
print("Can't decompress overlays from stdin", file=stderr)
|
||||
return 2
|
||||
|
||||
if hasattr(stdin, 'buffer'):
|
||||
f = stdin.buffer
|
||||
else:
|
||||
f = stdin
|
||||
else:
|
||||
try:
|
||||
f = open(args[0], "rb")
|
||||
except IOError as e:
|
||||
print(e, file=stderr)
|
||||
return 2
|
||||
|
||||
stdout = sys.stdout
|
||||
if hasattr(stdout, 'buffer'):
|
||||
# grab the underlying binary stream
|
||||
stdout = stdout.buffer
|
||||
|
||||
try:
|
||||
if overlay:
|
||||
decompress_overlay(f, stdout)
|
||||
else:
|
||||
stdout.write(decompress_file(f))
|
||||
except IOError as e:
|
||||
if e.errno == EPIPE:
|
||||
# don't complain about a broken pipe
|
||||
pass
|
||||
else:
|
||||
raise
|
||||
except (DecompressionError,) as e:
|
||||
print(e, file=stderr)
|
||||
return 1
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
exit(main())
|
||||
@ -1,257 +0,0 @@
|
||||
#!/bin/python3
|
||||
#magical's LZSS compressor
|
||||
# used http://code.google.com/p/u-lzss/source/browse/trunk/js/lib/ulzss.js as
|
||||
# a guide
|
||||
from sys import stderr
|
||||
|
||||
from collections import defaultdict
|
||||
from operator import itemgetter
|
||||
from struct import pack, unpack
|
||||
|
||||
class SlidingWindow:
|
||||
# The size of the sliding window
|
||||
size = 4096
|
||||
|
||||
# The minimum displacement.
|
||||
disp_min = 2
|
||||
|
||||
# The hard minimum — a disp less than this can't be represented in the
|
||||
# compressed stream.
|
||||
disp_start = 1
|
||||
|
||||
# The minimum length for a successful match in the window
|
||||
match_min = 1
|
||||
|
||||
# The maximum length of a successful match, inclusive.
|
||||
match_max = None
|
||||
|
||||
def __init__(self, buf):
|
||||
self.data = buf
|
||||
self.hash = defaultdict(list)
|
||||
self.full = False
|
||||
|
||||
self.start = 0
|
||||
self.stop = 0
|
||||
#self.index = self.disp_min - 1
|
||||
self.index = 0
|
||||
|
||||
assert self.match_max is not None
|
||||
|
||||
def __next__(self):
|
||||
if self.index < self.disp_start - 1:
|
||||
self.index += 1
|
||||
return
|
||||
|
||||
if self.full:
|
||||
olditem = self.data[self.start]
|
||||
assert self.hash[olditem][0] == self.start
|
||||
self.hash[olditem].pop(0)
|
||||
|
||||
item = self.data[self.stop]
|
||||
self.hash[item].append(self.stop)
|
||||
self.stop += 1
|
||||
self.index += 1
|
||||
|
||||
if self.full:
|
||||
self.start += 1
|
||||
else:
|
||||
if self.size <= self.stop:
|
||||
self.full = True
|
||||
|
||||
def advance(self, n=1):
|
||||
"""Advance the window by n bytes"""
|
||||
for _ in range(n):
|
||||
next(self)
|
||||
|
||||
def search(self):
|
||||
match_max = self.match_max
|
||||
match_min = self.match_min
|
||||
|
||||
counts = []
|
||||
indices = self.hash[self.data[self.index]]
|
||||
for i in indices:
|
||||
matchlen = self.match(i, self.index)
|
||||
if matchlen >= match_min:
|
||||
disp = self.index - i
|
||||
#assert self.index - disp >= 0
|
||||
#assert self.disp_min <= disp < self.size + self.disp_min
|
||||
if self.disp_min <= disp:
|
||||
counts.append((matchlen, -disp))
|
||||
if matchlen >= match_max:
|
||||
#assert matchlen == match_max
|
||||
return counts[-1]
|
||||
|
||||
if counts:
|
||||
match = max(counts, key=itemgetter(0))
|
||||
return match
|
||||
|
||||
return None
|
||||
|
||||
def match(self, start, bufstart):
|
||||
size = self.index - start
|
||||
|
||||
if size == 0:
|
||||
return 0
|
||||
|
||||
matchlen = 0
|
||||
it = list(range(min(len(self.data) - bufstart, self.match_max)))
|
||||
for i in it:
|
||||
if self.data[start + (i % size)] == self.data[bufstart + i]:
|
||||
matchlen += 1
|
||||
else:
|
||||
break
|
||||
return matchlen
|
||||
|
||||
class NLZ10Window(SlidingWindow):
|
||||
size = 4096
|
||||
|
||||
match_min = 3
|
||||
match_max = 3 + 0xf
|
||||
|
||||
class NLZ11Window(SlidingWindow):
|
||||
size = 4096
|
||||
|
||||
match_min = 3
|
||||
match_max = 0x111 + 0xFFFF
|
||||
|
||||
class NOverlayWindow(NLZ10Window):
|
||||
disp_min = 3
|
||||
|
||||
def _compress(input, windowclass=NLZ10Window):
|
||||
"""Generates a stream of tokens. Either a byte (int) or a tuple of (count,
|
||||
displacement)."""
|
||||
|
||||
window = windowclass(input)
|
||||
|
||||
i = 0
|
||||
while True:
|
||||
if len(input) <= i:
|
||||
break
|
||||
match = window.search()
|
||||
if match:
|
||||
yield match
|
||||
#if match[1] == -283:
|
||||
# raise Exception(match, i)
|
||||
window.advance(match[0])
|
||||
i += match[0]
|
||||
else:
|
||||
yield input[i]
|
||||
next(window)
|
||||
i += 1
|
||||
|
||||
def packflags(flags):
|
||||
n = 0
|
||||
for i in range(8):
|
||||
n <<= 1
|
||||
try:
|
||||
if flags[i]:
|
||||
n |= 1
|
||||
except IndexError:
|
||||
pass
|
||||
return n
|
||||
|
||||
def chunkit(it, n):
|
||||
buf = []
|
||||
for x in it:
|
||||
buf.append(x)
|
||||
if n <= len(buf):
|
||||
yield buf
|
||||
buf = []
|
||||
if buf:
|
||||
yield buf
|
||||
|
||||
def compress(input, out):
|
||||
# header
|
||||
out.write(pack("<L", (len(input) << 8) + 0x10))
|
||||
|
||||
# body
|
||||
length = 0
|
||||
for tokens in chunkit(_compress(input), 8):
|
||||
flags = [type(t) == tuple for t in tokens]
|
||||
out.write(pack(">B", packflags(flags)))
|
||||
|
||||
for t in tokens:
|
||||
if type(t) == tuple:
|
||||
count, disp = t
|
||||
count -= 3
|
||||
disp = (-disp) - 1
|
||||
assert 0 <= disp < 4096
|
||||
sh = (count << 12) | disp
|
||||
out.write(pack(">H", sh))
|
||||
else:
|
||||
out.write(pack(">B", t))
|
||||
|
||||
length += 1
|
||||
length += sum(2 if f else 1 for f in flags)
|
||||
|
||||
# padding
|
||||
padding = 4 - (length % 4 or 4)
|
||||
if padding:
|
||||
out.write(b'\xff' * padding)
|
||||
|
||||
def compress_nlz11(input, out):
|
||||
# header
|
||||
out.write(pack("<L", (len(input) << 8) + 0x11))
|
||||
|
||||
# body
|
||||
length = 0
|
||||
for tokens in chunkit(_compress(input, windowclass=NLZ11Window), 8):
|
||||
flags = [type(t) == tuple for t in tokens]
|
||||
out.write(pack(">B", packflags(flags)))
|
||||
length += 1
|
||||
|
||||
for t in tokens:
|
||||
if type(t) == tuple:
|
||||
count, disp = t
|
||||
disp = (-disp) - 1
|
||||
#if disp == 282:
|
||||
# raise Exception
|
||||
assert 0 <= disp <= 0xFFF
|
||||
if count <= 1 + 0xF:
|
||||
count -= 1
|
||||
assert 2 <= count <= 0xF
|
||||
sh = (count << 12) | disp
|
||||
out.write(pack(">H", sh))
|
||||
length += 2
|
||||
elif count <= 0x11 + 0xFF:
|
||||
count -= 0x11
|
||||
assert 0 <= count <= 0xFF
|
||||
b = count >> 4
|
||||
sh = ((count & 0xF) << 12) | disp
|
||||
out.write(pack(">BH", b, sh))
|
||||
length += 3
|
||||
elif count <= 0x111 + 0xFFFF:
|
||||
count -= 0x111
|
||||
assert 0 <= count <= 0xFFFF
|
||||
l = (1 << 28) | (count << 12) | disp
|
||||
out.write(pack(">L", l))
|
||||
length += 4
|
||||
else:
|
||||
raise ValueError(count)
|
||||
else:
|
||||
out.write(pack(">B", t))
|
||||
length += 1
|
||||
|
||||
# padding
|
||||
padding = 4 - (length % 4 or 4)
|
||||
if padding:
|
||||
out.write(b'\xff' * padding)
|
||||
|
||||
def dump_compress_nlz11(input, out):
|
||||
# body
|
||||
length = 0
|
||||
def dump():
|
||||
for t in _compress(input, windowclass=NLZ11Window):
|
||||
if type(t) == tuple:
|
||||
yield t
|
||||
from pprint import pprint
|
||||
pprint(list(dump()))
|
||||
|
||||
if __name__ == '__main__':
|
||||
from sys import stdout, argv
|
||||
data = open(argv[1], "rb").read()
|
||||
stdout = stdout.detach()
|
||||
#compress(data, stdout)
|
||||
compress_nlz11(data, stdout)
|
||||
|
||||
#dump_compress_nlz11(data, stdout)
|
||||
@ -1,2 +0,0 @@
|
||||
#!/bin/dash
|
||||
pip list --outdated --format=freeze | grep -v '^\-e' | cut -d = -f 1 | xargs -n1 pip install -U
|
||||
@ -1,3 +0,0 @@
|
||||
#!/bin/dash
|
||||
#repack - Completely repack an NDS ROM with ndstool
|
||||
ndstool -c pk_baserom.nds -9 arm9.bin -9i arm9i.bin -7 arm7.bin -7i arm7i.bin -y9 y9.bin -y7 y7.bin -d data -y overlay -t banner.bin -h header.bin # -o banner.bmp
|
||||
@ -1,16 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
#Copyright (c) 2022 - Fedora Contributors - Licensed under CC BY-SA 4.0
|
||||
|
||||
old_kernels=($(dnf repoquery --installonly --latest-limit=-1 -q))
|
||||
if [ "${#old_kernels[@]}" -eq 0 ]; then
|
||||
echo "No old kernels found"
|
||||
exit 0
|
||||
fi
|
||||
|
||||
if ! dnf remove "${old_kernels[@]}"; then
|
||||
echo "Failed to remove old kernels"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "Removed old kernels"
|
||||
exit 0
|
||||
@ -1,3 +0,0 @@
|
||||
#!/bin/dash
|
||||
#unpack - Completely unpack an NDS ROM with ndstool
|
||||
ndstool -x baserom.nds -9 arm9.bin -9i arm9i.bin -7 arm7.bin -7i arm7i.bin -y9 y9.bin -y7 y7.bin -d data -y overlay -t banner.bin -h header.bin -o banner.bmp
|
||||
Loading…
Reference in New Issue