This commit is contained in:
Christian Zangl 2019-09-23 22:39:30 +02:00
parent 3d2de7feb3
commit 8489424926
9 changed files with 553 additions and 0 deletions

8
.gitignore vendored Normal file
View File

@ -0,0 +1,8 @@
README.rst
dist/
.cache/
.chkbit
.pytest_cache/
*.egg-info/
*.pyc
_*

22
LICENSE Normal file
View File

@ -0,0 +1,22 @@
The MIT License (MIT)
Copyright (c) 2014 Christian Zangl
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

139
README.md Normal file
View File

@ -0,0 +1,139 @@
# chkbit
chkbit is a lightweight **bitrot detection tool**.
bitrot (a bit flipping in your data) can occur
- at a low level on the storage media through decay (hdd/sdd)
- at a high level in the os or firmware through bugs
chkbit is independent of the file system and can help you detect bitrot on you primary system, on backups and in the cloud.
## Installation
```
pip install --user chkbit
```
Or in its own environment:
```
pipx install chkbit
```
## Usage
Run `chkbit -u PATH` to create/update the chkbit index.
chkbit will
- create a `.chkbit` index in every subdirectory of the path it was given.
- update the index with md5 hashes for every file.
- report bitrot for files that rotted since the last run (check the exit status).
```
usage: chkbit.py [-h] [-u] [-f] [-q] [-v] PATH [PATH ...]
Checks files for bitrot. See https://github.com/laktak/chkbit-py
positional arguments:
PATH
optional arguments:
-h, --help show this help message and exit
-u, --update update indices (without this chkbit will only verify files)
-f, --force force update of damaged items
-q, --quiet quiet, don't show progress/information
-v, --verbose verbose output
Status codes:
ROT: error, bitrot detected
EIX: error, index damaged
old: warning, file replaced by an older version
add: add to index
upd: file updated
ok : check ok
skp: skipped (see .chkbitignore)
EXC: internal exception
```
## Repair
chkbit cannot repair bitrot, its job is simply to detect it.
You should
- backup regularly.
- run chkbit *before* each backup.
- check for bitrot on the backup media.
- in case of bitrot *restore* from a checked backup.
## Ignore files
Add a `.chkbitignore` file containing the names of the files/directories you wish to ignore
- each line should contain exactly one name
- lines starting with `#` are skipped
## FAQ
### Should I run `chkbit` on my whole drive?
You would typically run it only on *content* that you keep for a long time (e.g. your pictures, music, videos).
### Why is chkbit placing the index in `.chkbit` files (vs a database)?
The advantage of the .chkbit files is that
- when you move a directory the index moves with it
- when you make a backup the index is also backed up
The disadvantage is that you get hidden `.chkbit` files in your content folders.
### How does chkbit work?
chkbit operates on files.
When run for the first time it records a md5 hash of the file contents as well as the file modification time.
When you run it again it first checks the modification time,
- if the time changed (because you made an edit) it records a new md5 hash.
- otherwise it will compare the current md5 to the recorded value and report an error if they do not match.
### Can I test if chkbit is working correctly?
On Linux/OS X you can try:
Create test and set the modified time:
```
$ echo foo1 > test; touch -t 201501010000 test
$ chkbit -u .
a ./test
$
```
`a` indicates the file was added.
Now update test with a new modified:
```
$ echo foo2 > test; touch -t 201501010001 test # update test & modified
$ chkbit -u .
u ./test
$
```
`u` indicates the file was updated.
Now update test with the same modified to simulate bitrot:
```
$ echo foo3 > test; touch -t 201501010001 test
$ chkbit -u .
E ./test
chkbit detected bitrot in these files:
./test
error: detected 1 file(s) with bitrot!
$
```
`E` indicates an error.

9
chkbit.py Executable file
View File

@ -0,0 +1,9 @@
#!/usr/bin/env python
import sys
if sys.version_info < (3, 6):
sys.exit("Python < 3.6 is not supported")
from chkbit.main import main
main()

21
chkbit/hashfile.py Normal file
View File

@ -0,0 +1,21 @@
import hashlib
BLOCKSIZE = 2 ** 10 * 128 # kb
def hashfile(path):
md5 = hashlib.md5()
with open(path, "rb") as f:
while True:
buf = f.read(BLOCKSIZE)
if len(buf) <= 0:
break
md5.update(buf)
return md5.hexdigest()
def hashtext(text):
md5 = hashlib.md5()
md5.update(text.encode("utf-8"))
return md5.hexdigest()

141
chkbit/index.py Normal file
View File

@ -0,0 +1,141 @@
import os
import subprocess
import sys
import json
from enum import Enum
from chkbit import hashfile, hashtext
VERSION = 2 # index version
INDEX = ".chkbit"
IGNORE = ".chkbitignore"
class Stat(Enum):
ERR_BITROT = "ROT"
ERR_IDX = "EIX"
WARN_OLD = "old"
ADD = "add"
UPDATE = "upd"
OK = "ok "
SKIP = "skp"
INTERNALEXCEPTION = "EXC"
FLAG_MOD = "fmod"
class Index:
def __init__(self, path, files, *, log=None):
self.path = path
self.files = files
self.old = {}
self.new = {}
self.ignore = []
self.load_ignore()
self.updates = []
self.modified = True
self.log = log
@property
def ignore_file(self):
return os.path.join(self.path, IGNORE)
@property
def idx_file(self):
return os.path.join(self.path, INDEX)
def should_ignore(self, name):
return name in self.ignore
def _setmod(self):
self.modified = True
def _log(self, stat, name):
if self.log:
self.log(stat, os.path.join(self.path, name))
def update(self):
for name in self.files:
if self.should_ignore(name):
self._log(Stat.SKIP, name)
continue
self.new[name] = self._calc_file(name)
def check_fix(self, force):
for name in self.new.keys():
if not name in self.old:
self._log(Stat.ADD, name)
self._setmod()
continue
a = self.old[name]
b = self.new[name]
amod = a["mod"]
bmod = b["mod"]
if a["md5"] == b["md5"]:
# ok, if the content stays the same the mod time does not matter
self._log(Stat.OK, name)
if amod != bmod:
self._setmod()
continue
if amod == bmod:
# rot detected
self._log(Stat.ERR_BITROT, name)
# replace with old so we don't loose the information on the next run
# unless force is set
if not force:
self.new[name] = a
else:
self._setmod()
elif amod < bmod:
# ok, the file was updated
self._log(Stat.UPDATE, name)
self._setmod()
elif amod > bmod:
self._log(Stat.WARN_OLD, name)
self._setmod()
def _calc_file(self, name):
path = os.path.join(self.path, name)
info = os.stat(path)
mtime = int(info.st_mtime * 1000)
return {"mod": mtime, "md5": hashfile(path)}
def save(self):
if self.modified:
data = {"v": VERSION, "idx": self.new}
text = json.dumps(self.new, separators=(",", ":"))
data["idx_hash"] = hashtext(text)
with open(self.idx_file, "w", encoding="utf-8") as f:
json.dump(data, f)
self.modified = False
return True
else:
return False
def load(self):
if not os.path.exists(self.idx_file):
return
self.modified = False
with open(self.idx_file, "r", encoding="utf-8") as f:
data = json.load(f)
if "data" in data:
# extract old format from js version
for item in json.loads(data["data"]):
self.old[item["name"]] = {"mod": item["mod"], "md5": item["md5"]}
elif "idx" in data:
self.old = data["idx"]
text = json.dumps(self.old, separators=(",", ":"))
if data.get("idx_hash") != hashtext(text):
self.modified = True
self._log(Stat.ERR_IDX, self.idx_file)
def load_ignore(self):
if not os.path.exists(self.ignore_file):
return
with open(self.ignore_file, "r", encoding="utf-8") as f:
text = f.read()
self.ignore = list(
filter(lambda x: x and x[0] != "#" and len(x.strip()) > 0, text.splitlines())
)

65
chkbit/indexthread.py Normal file
View File

@ -0,0 +1,65 @@
import os
import sys
import time
import threading
from chkbit import Index, Stat
class IndexThread:
def __init__(self, idx, args, res_queue, todo_queue):
self.idx = idx
self.update = args.update
self.force = args.force
self.todo_queue = todo_queue
self.res_queue = res_queue
self.t = threading.Thread(target=self.run)
self.t.daemon = True
self.t.start()
def _log(self, stat, path):
self.res_queue.put((self.idx, stat, path))
def _process_root(self, parent):
files = []
dirs = []
# load files and subdirs
for name in os.listdir(path=parent):
path = os.path.join(parent, name)
if name[0] == ".":
continue
if os.path.isdir(path):
dirs.append(name)
elif os.path.isfile(path):
files.append(name)
# load index
e = Index(parent, files, log=self._log)
e.load()
# update the index from current state
e.update()
# compare
e.check_fix(self.force)
# save if update is set
if self.update:
if e.save():
self._log(Stat.FLAG_MOD, "")
# process subdirs
for name in dirs:
if not e.should_ignore(name):
self.todo_queue.put(os.path.join(parent, name))
def run(self):
while True:
parent = self.todo_queue.get()
if parent is None:
break
try:
self._process_root(parent)
except Exception as e:
self._log(Stat.INTERNALEXCEPTION, e)
self.todo_queue.task_done()

123
chkbit/main.py Normal file
View File

@ -0,0 +1,123 @@
import os
import sys
import time
import argparse
import queue
import threading
from chkbit import IndexThread, Stat
STATUS_CODES = """
Status codes:
ROT: error, bitrot detected
EIX: error, index damaged
old: warning, file replaced by an older version
add: add to index
upd: file updated
ok : check ok
skp: skipped (see .chkbitignore)
EXC: internal exception
"""
class Main:
def __init__(self):
self.stdscr = None
self.err_list = []
self.modified = False
self.verbose = False
self.total = 0
self._parse_args()
def _log(self, idx, stat, path):
if stat == Stat.FLAG_MOD:
self.modified = True
else:
if stat in [Stat.ERR_BITROT, Stat.INTERNALEXCEPTION]:
self.err_list.append(path)
if stat in [Stat.OK, Stat.UPDATE, Stat.ADD]:
self.total += 1
if self.verbose or not stat in [Stat.OK, Stat.SKIP]:
print(stat.value, path)
if not self.quiet:
print(self.total, end="\r")
def _parse_args(self):
parser = argparse.ArgumentParser(
description="Checks files for bitrot. See https://github.com/laktak/chkbit-py",
epilog=STATUS_CODES,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("PATH", nargs="+")
parser.add_argument(
"-u",
"--update",
action="store_true",
help="update indices (without this chkbit will only verify files)",
)
parser.add_argument(
"-f", "--force", action="store_true", help="force update of damaged items"
)
# parser.add_argument(
# "-d", "--delete", action="store_true", help="remove all .chkbit files from target"
# )
parser.add_argument(
"-q", "--quiet", action="store_true", help="quiet, don't show progress/information"
)
parser.add_argument("-v", "--verbose", action="store_true", help="verbose output")
self.args = parser.parse_args()
self.verbose = self.args.verbose
self.quiet = self.args.quiet
def _res_worker(self):
while True:
item = self.res_queue.get()
if not item:
break
self._log(*item)
self.res_queue.task_done()
def process(self):
self.res_queue = queue.Queue()
todo_queue = queue.Queue()
for path in self.args.PATH:
todo_queue.put(path)
workers = [IndexThread(idx, self.args, self.res_queue, todo_queue) for idx in range(5)]
res_worker = threading.Thread(target=self._res_worker)
res_worker.daemon = True
res_worker.start()
todo_queue.join()
self.res_queue.join()
def print_result(self):
if not self.quiet:
print(f"Processed {self.total} file(s).")
if self.modified:
print("Indices were updated.")
if self.err_list:
print("chkbit detected bitrot in these files:", file=sys.stderr)
for err in self.err_list:
print(err, file=sys.stderr)
print(f"error: detected {len(self.err_list)} file(s) with bitrot!", file=sys.stderr)
sys.exit(1)
def main():
try:
m = Main()
m.process()
m.print_result()
except KeyboardInterrupt:
print("abort")
sys.exit(1)

25
setup.py Normal file
View File

@ -0,0 +1,25 @@
import sys
from setuptools import setup
import os
if sys.version_info < (3, 6):
sys.exit("Please install with Python >= 3.6")
with open(os.path.join(os.path.dirname(__file__), "README.md"), encoding="utf-8") as f:
readme = f.read()
setup(
name="chkbit",
version="2.0.0",
url="https://github.com/laktak/chkbit-py",
author="Christian Zangl",
author_email="laktak@cdak.net",
description="chkbit is a lightweight bitrot detection tool.",
long_description=readme,
long_description_content_type="text/markdown",
entry_points={"console_scripts": ["chkbit = chkbit.main:main"]},
packages=["chkbit"],
install_requires=[],
python_requires=">=3.6.0",
)