add UniversalDetector
This commit is contained in:
parent
e1f3000cfa
commit
efd0e3f444
3 changed files with 126 additions and 1 deletions
|
@ -16,3 +16,34 @@ def detect(msg):
|
||||||
if isinstance(encoding, bytes):
|
if isinstance(encoding, bytes):
|
||||||
encoding = encoding.decode()
|
encoding = encoding.decode()
|
||||||
return {"encoding": encoding, "confidence": confidence}
|
return {"encoding": encoding, "confidence": confidence}
|
||||||
|
|
||||||
|
class UniversalDetector(object):
|
||||||
|
def __init__(self):
|
||||||
|
self._detector = _cchardet.UniversalDetector()
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, exception_type, exception_value, traceback):
|
||||||
|
self.close()
|
||||||
|
return False
|
||||||
|
|
||||||
|
def reset(self):
|
||||||
|
self._detector.reset()
|
||||||
|
|
||||||
|
def feed(self, data):
|
||||||
|
self._detector.feed(data)
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
self._detector.close()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def done(self):
|
||||||
|
return self._detector.done
|
||||||
|
|
||||||
|
@property
|
||||||
|
def result(self):
|
||||||
|
encoding, confidence = self._detector.result
|
||||||
|
if isinstance(encoding, bytes):
|
||||||
|
encoding = encoding.decode()
|
||||||
|
return {"encoding": encoding, "confidence": confidence}
|
||||||
|
|
|
@ -31,3 +31,66 @@ def detect_with_confidence(const_char_ptr msg):
|
||||||
return detected_charset, detected_confidence
|
return detected_charset, detected_confidence
|
||||||
|
|
||||||
return None, None
|
return None, None
|
||||||
|
|
||||||
|
cdef class UniversalDetector:
|
||||||
|
cdef uchardet_t _ud
|
||||||
|
cdef int _done
|
||||||
|
cdef int _closed
|
||||||
|
cdef bytes _detected_charset
|
||||||
|
cdef float _detected_confidence
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self._ud = uchardet_new()
|
||||||
|
self._done = 0
|
||||||
|
self._closed = 0
|
||||||
|
self._detected_charset = b""
|
||||||
|
self._detected_confidence = 0.0
|
||||||
|
|
||||||
|
def reset(self):
|
||||||
|
if not self._closed:
|
||||||
|
self._done = 0
|
||||||
|
self._closed = 0
|
||||||
|
self._detected_charset = b""
|
||||||
|
self._detected_confidence = 0.0
|
||||||
|
uchardet_reset(self._ud)
|
||||||
|
|
||||||
|
def feed(self, const_char_ptr msg):
|
||||||
|
cdef int length
|
||||||
|
cdef int result
|
||||||
|
|
||||||
|
if self._closed:
|
||||||
|
return
|
||||||
|
|
||||||
|
length = len(msg)
|
||||||
|
if length > 0:
|
||||||
|
result = uchardet_handle_data(self._ud, msg, length)
|
||||||
|
|
||||||
|
if result != 0:
|
||||||
|
self._closed = 1
|
||||||
|
uchardet_delete(self._ud)
|
||||||
|
raise Exception("Handle data error")
|
||||||
|
else:
|
||||||
|
self._done = 1
|
||||||
|
uchardet_data_end(self._ud)
|
||||||
|
self._detected_charset = uchardet_get_charset(self._ud)
|
||||||
|
self._detected_confidence = uchardet_get_confidence(self._ud)
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
if not self._closed:
|
||||||
|
uchardet_data_end(self._ud)
|
||||||
|
self._detected_charset = uchardet_get_charset(self._ud)
|
||||||
|
self._detected_confidence = uchardet_get_confidence(self._ud)
|
||||||
|
|
||||||
|
uchardet_delete(self._ud)
|
||||||
|
self._closed = 1
|
||||||
|
|
||||||
|
@property
|
||||||
|
def done(self):
|
||||||
|
return bool(self._done)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def result(self):
|
||||||
|
if len(self._detected_charset):
|
||||||
|
return self._detected_charset, self._detected_confidence
|
||||||
|
else:
|
||||||
|
return None, None
|
||||||
|
|
|
@ -46,3 +46,34 @@ class TestCChardet():
|
||||||
testfile
|
testfile
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# def test_detector(self):
|
||||||
|
# testfiles = glob.glob('tests/testdata/*/*.txt')
|
||||||
|
# for testfile in testfiles:
|
||||||
|
# if testfile.replace("\\", "/") in SKIP_LIST:
|
||||||
|
# continue
|
||||||
|
|
||||||
|
# base = os.path.basename(testfile)
|
||||||
|
# expected_charset = os.path.splitext(base)[0]
|
||||||
|
|
||||||
|
# detector = cchardet.UniversalDetector()
|
||||||
|
# with open(testfile, 'rb') as f:
|
||||||
|
# msg = f.read()
|
||||||
|
# detector.feed(msg)
|
||||||
|
# # line = f.readline()
|
||||||
|
# # while line:
|
||||||
|
# # detector.feed(line)
|
||||||
|
# # if detector.done:
|
||||||
|
# # break
|
||||||
|
# # line = f.readline()
|
||||||
|
# detector.close()
|
||||||
|
# detected_encoding = detector.result
|
||||||
|
# eq_(
|
||||||
|
# expected_charset.lower(),
|
||||||
|
# detected_encoding['encoding'].lower(),
|
||||||
|
# 'Expected %s, but got %s for "%s"' % (
|
||||||
|
# expected_charset.lower(),
|
||||||
|
# detected_encoding['encoding'].lower(),
|
||||||
|
# testfile
|
||||||
|
# )
|
||||||
|
# )
|
||||||
|
|
Loading…
Reference in a new issue