Source code for dodfminer.extract.pure.utils.title_extractor
"""
Extract Title and Subtitles.
"""
# TODO: Improve docummentation
# TODO: Remove global variables and functions
from typing import List
from functools import reduce
from collections import namedtuple
import os
import re
import json
import operator
import fitz
from dodfminer.extract.pure.utils import title_filter
Box = namedtuple("Box", "x0 y0 x1 y1")
BBox = namedtuple("BBox", "bbox")
TitlesSubtitles = namedtuple("TitlesSubtitles", "titles subtitles")
TextTypeBboxPageTuple = namedtuple(
"TextTypeBboxPageTuple", "text type bbox page")
_TRASH_WORDS = [
"SUMÁRIO",
"DIÁRIO OFICIAL",
"SEÇÃO (I|II|III)",
]
_TRASH_COMPILED = re.compile('|'.join(_TRASH_WORDS))
_TYPE_TITLE, _TYPE_SUBTITLE = "title", "subtitle"
_TITLE_MULTILINE_THRESHOLD = 10
[docs]def load_blocks_list(path):
"""Loads list of blocks list from the file specified.
Args:
path: string with path to DODF pdf file
Returns:
A list with page blocks, each element being a list with its
according page blocks.
"""
doc = fitz.open(path)
return [p.getTextPage().extractDICT()['blocks'] for p in doc]
[docs]def group_by_column(elements, width):
"""Groups elements by its culumns.
The sorting assumes they are on the same page
and on a 2-column layout.
Essentially a "groupby" where the key is the page number of each span.
Args:
elements: Iterable[TextTypeBboxPageTuple] sorted by its page
number to be grouped.
Returns:
A dict with spans of each page, being keys the page numbers.
"""
left_right = [[], []]
mid_width = width / 2
for i in elements:
if i.bbox.x0 <= mid_width:
left_right[0].append(i)
else:
left_right[1].append(i)
return left_right
[docs]def group_by_page(elements):
"""Groups elements by page number.
Essentially a "groupby" where the key is the page number of each span.
Args:
elements: Iterable[TextTypeBboxPageTuple] sorted by its page
number to be grouped.
Returns:
A dict with spans of each page, being keys the page numbers.
"""
page_elements = {}
for page_num in set(map(lambda x: x.page, elements)):
page_elements[page_num] = []
for element in elements:
page_elements[element.page].append(element)
return page_elements
[docs]def sort_by_column(elements, width):
"""Sorts list elements by columns.
Args:
elements: Iterable[TextTypeBboxPageTuple].
width: the page width (the context in which all list elements
were originally).
Returns:
List[TextTypeBboxPageTuple] containing the list elements
sorted according to:
1. columns
2. position on column
Assumes a 2-column page layout. All elements on the left column will
be placed first of any element on the right one. Inside each columns,
reading order is expected to be kept.
"""
left_right = group_by_column(elements, width)
# Sort by height
ordenado = (sorted(i, key=lambda x: x.bbox.y0) for i in left_right)
return reduce(operator.add, ordenado)
[docs]def invert_text_type_bbox_page_tuple(text_type_bbox_page_tuple):
"""Reverses the type between _TYPE_TITLE and _TYPE_SUBTITLE.
Args:
textTypeBboxPageTuple: instance of TextTypeBboxPageTuple.
Returns:
copy of textTypeBboxPageTuple with its type field reversed.
"""
text, _type, bbox, page = text_type_bbox_page_tuple
return TextTypeBboxPageTuple(text, _TYPE_TITLE if _type is _TYPE_SUBTITLE
else _TYPE_SUBTITLE, bbox, page)
[docs]def _extract_bold_upper_page(page):
"""Extracts page content which have bold font and are uppercase.
Args:
page: fitz.fitz.Page object to have its bold content extracted.
Returns:
A list containing all bold (and simultaneously upper)
content at the page.
"""
lis = []
for block in page.getTextPage().extractDICT()['blocks']:
for line in block['lines']:
for span in line['spans']:
flags = span['flags']
txt: str = span['text']
cond1 = flags in title_filter.BoldUpperCase.BOLD_FLAGS
if cond1 and txt == txt.upper():
span['bbox'] = Box(*span['bbox'])
span['page'] = page.number
del span['color']
del span['flags']
lis.append(span)
return lis
[docs]def _extract_bold_upper_pdf(doc):
"""Extracts bold content from DODF pdf.
Args:
doc: DODF pdf file returned by `fitz.open`
Returns:
a list of list of bold span text
"""
return [_extract_bold_upper_page(page) for page in doc]
[docs]def sort_2column(elements, width_lis):
"""Sorts TextTypeBboxPageTuple iterable.
Sorts sequence of TextTypeBboxPageTuple objects, assuming a full 2-columns
layout over them.
Args:
elements: Iterable[TextTypeBboxPageTuple]
Returns:
dictionary mapping page number to its elements sorted by column
(assumig there are always 2 columns per page)
"""
by_page = group_by_page(elements)
ordered_by_page = {idx: sort_by_column(elements, width=width_lis[idx]) for idx, elements in
sorted(by_page.items())}
return ordered_by_page
# TODO: deal with `subtitles` using homogeneous reasoning
# (pretty much what was done on `titles`, so that the
# multiline are correctly assembled)
[docs]def _get_titles_subtitles(elements, width_lis):
"""Extracts titles and subtitles from list. WARNING: Based on font size and heuristic.
Args:
titles_subtitles: a list of dict all of them having the keys:
size -> float
text -> str
bbox -> Box
page -> int
Returns:
TitlesSubtitles[List[TextTypeBboxPageTuple], List[TextTypeBboxPageTuple]].
"""
# mainly to remove "DISTRITO FEDERAL" trash below
elements = sorted(
elements, key=lambda d: d['size'], reverse=True)
# Usually part of "Diário Oficial do DISTRITO FEDERAL"
if "DISTRITO FEDERAL" in elements[0]['text']:
del elements[0]
# heuristic part: which font is the one use by titles?
guessed_title_font_size = elements[min(
2, len(elements) - 1)]['size']
titles = []
previous_element = elements[0]
while elements:
current_element = elements[0]
if guessed_title_font_size == current_element['size']:
if titles:
cond1 = abs(
previous_element['bbox'].y1 - current_element['bbox'].y0) < _TITLE_MULTILINE_THRESHOLD
cond2 = previous_element['page'] == current_element['page']
# Titles must be also in the same column
column_grouped = group_by_column((BBox(previous_element['bbox']), BBox(current_element['bbox'])),
width=width_lis[current_element['page']])
cond3 = not (column_grouped[0] and column_grouped[1])
if cond1 and cond2 and cond3:
titles[-1][0].append(current_element['text'])
else:
titles.append([[current_element['text']], _TYPE_TITLE,
current_element['bbox'],
current_element['page']])
else:
titles.append([[current_element['text']], _TYPE_TITLE,
current_element['bbox'],
current_element['page']])
elements = elements[1:]
else:
break
previous_element = current_element
# Titles with more than one line should be a single string
titles = [TextTypeBboxPageTuple("\n".join(i[0]), *i[1:]) for i in titles]
sub_titles = []
# if the elements is over, there are no subtitles
if elements:
size = elements[0]['size']
# PS: majority of subtitles uses only 1 line. Hard to distinguish
while elements:
current_element = elements[0]
# TODO deal with cases like "DEPARTAMENTO DE ESTRADAS DE
# RODAGEM DO DISTRITO FEDERAL" (5/1/2005)
if size == current_element['size']:
sub_titles.append((current_element['text'], _TYPE_SUBTITLE,
current_element['bbox'],
current_element['page']))
else: # this and next elements has others font sizes;
break
elements = elements[1:]
sub_titles = [TextTypeBboxPageTuple(*i) for i in sub_titles]
# Sometimes heuristic fails. However, the fix below seems to work on most
# cases.
# Happens mostly when there are only one title and other stuffs.
if not titles and sub_titles:
return TitlesSubtitles([invert_text_type_bbox_page_tuple(i) for i in sub_titles], titles)
else:
return TitlesSubtitles(titles, sub_titles)
[docs]def _get_titles_subtitles_smart(doc, width_lis):
"""Extracts titles and subtitles. Makes use of heuristics.
Wraps _get_titles_subtitles, removing most of impurity
(spans not which aren't titles/subtutles).
Args:
doc: DODF pdf file returned by `fitz.open`
Returns:
TitlesSubtitles(List[TextTypeBboxPageTuple],
List[TextTypeBboxPageTuple]).
"""
bold_spans = reduce(operator.add, _extract_bold_upper_pdf(doc))
filtered1 = filter(title_filter.BoldUpperCase.dict_text, bold_spans)
filtered2 = filter(lambda s: not re.search(
_TRASH_COMPILED, s['text']), filtered1)
# 'calibri' as font apears sometimes, however never in titles or subtitles
filtered3 = filter(lambda x: 'calibri' not in x['font'].lower(), filtered2)
# TODO: check for necessity of this sorting
ordered1 = sorted(filtered3,
key=lambda x: (-x['page'], x['size']),
reverse=True)
return _get_titles_subtitles(ordered1, width_lis)
[docs]def extract_titles_subtitles(path):
"""Extracts titles and subtitles from DODF pdf.
Args:
path: str indicating the path for the pdf to have its
content extracted.
Returns:
List[TextTypeBboxPageTuple] containing all titles ans subtitles.
"""
doc = fitz.open(path)
width_lis = [p.MediaBox[2] for p in doc]
titles_subtitles = _get_titles_subtitles_smart(doc, width_lis=width_lis)
by_page = sort_2column(
reduce(operator.add, titles_subtitles), width_lis=width_lis)
return reduce(operator.add, by_page.values())
# TODO: use tuples instead of lists for ensure
# immutability and avoid unexpected behavior
# (e.g, user modifying internal state of an ExtractorTitleSubtitle
# instance through appending elements to its internals lists)
[docs]class ExtractorTitleSubtitle(object):
"""Use this class like that:
>> path = "path_to_pdf"
>> extractor = ExtractorTitleSubtitle(path)
>> # To extract titles
>> titles = extractor.titles
>> # To extract subtitles
>> titles = extractor.subtitles
>> # To dump titles and subtitles on a json file
>> json_path = "valid_file_name"
>> extractor.dump_json(json_path)
."""
_TITLE_MULTILINE_THRESHOLD = 10
def __init__(self, path):
""".
Args:
path: str indicating the path for the pdf to have its
content extracted
"""
self._titles_subtitles = TitlesSubtitles([], [])
self._titles = []
self._subtitles = []
self._path = path
self._cached = False
self._json = {}
self._hierarchy = []
def _mount_json(self):
"""Mounts json containing titles with its associated subtitles
and store it at self._json.
Returns:
self._json
"""
i = 0
_json = {}
titles_subtitles = self._titles_subtitles
limit = len(titles_subtitles)
while i < limit:
current_element = titles_subtitles[i]
if current_element.type == _TYPE_TITLE:
title = current_element.text
_json[title] = _json.get(title, [])
i += 1
while i < limit:
current_element = titles_subtitles[i]
if current_element.type == _TYPE_SUBTITLE:
_json[title].append(current_element.text)
i += 1
else:
break
else:
raise ValueError("Does not begin with a title")
_json = {k: tuple(val) for k, val in _json.items()}
self._json = _json
return self._json
def _mount_hierarchy(self):
"""Mounts list containing titles with its associated subtitles
and store it at self._hierarchy.
Returns:
self._hierarchy
"""
i = 0
hierarchy = []
titles_subtitles = self._titles_subtitles
limit = len(titles_subtitles)
while i < limit:
current_element = titles_subtitles[i]
if current_element.type == _TYPE_TITLE:
title = current_element.text
hierarchy.append([title, []])
i += 1
while i < limit:
current_element = titles_subtitles[i]
if current_element.type == _TYPE_SUBTITLE:
hierarchy[-1][1].append(current_element.text)
i += 1
else:
break
else:
raise ValueError("Não começa com títulos")
self._hierarchy = [TitlesSubtitles(*i) for i in hierarchy]
return self._hierarchy
def _do_cache(self):
"""Computes some internal attributes.
So that no more computations are needed for them.
Implicit called after first access to some property.
Computes and caches the following internal attributes:
- _titles_subtitles
- _titles
- _subtitles
"""
self._titles_subtitles = tuple(extract_titles_subtitles(self._path))
self._titles = tuple(filter(lambda x: x.type == _TYPE_TITLE,
self._titles_subtitles))
self._subtitles = tuple(filter(lambda x: x.type == _TYPE_SUBTITLE,
self._titles_subtitles))
self._cached = True
@property
def titles(self):
"""All titles extracted from the file speficied by self._path.
Returns:
List[TextTypeBboxPageTuple] each of which having its type attribute
equals _TYPE_TITLE
"""
if not self._cached:
self._do_cache()
return list(self._titles)
@property
def subtitles(self):
"""All subtitles extracted from the file speficied by self._path.
Returns:
List[TextTypeBboxPageTuple] each of which having its type attribute
equals _TYPE_SUBTITLE
"""
if not self._cached:
self._do_cache()
return list(self._subtitles)
@property
def titles_subtitles(self):
"""A list with titles and subtitles, sorted according to its reading order.
"""
if not self._cached:
self._do_cache()
return list(self._titles_subtitles)
@property
def json(self):
"""All titles with its subtitles associated.
All subtitles under the same title are at the same level.
Deprecated. Better use `titles_subtitles` or
`titles_subtitles_hierarchy`.
"""
if not self._json:
if not self._cached:
self._do_cache()
self._mount_json()
# return self._json.copy()
return {k: list(val) for k, val in self._json.items()}
@property
def titles_subtitles_hierarchy(self) -> TitlesSubtitles(str, List[str]):
"""All titles and subtitles extracted from the file specified by
self._path, hierarchically organized.
Returns:
List[TitlesSubtitles(str, List[str])]: the titles and its
respectively subtitles
"""
if not self._hierarchy:
if not self._cached:
self._do_cache()
self._mount_hierarchy()
return self._hierarchy.copy()
[docs] def dump_json(self, path):
"""Writes on file specified by path the JSON representation of titles
and subtitles extracted.
Dumps the titles and subtitles according to the hierarchy verified
on the document.
The outputfile should be specified and will be suffixed with the
".json" if it's not.
Args:
path: string containing path to .json file where the dump will
be done. Its suffixed with ".json" if it's not.
"""
with open(f"{path}{(not path.endswith('.json')) * '.json'}", 'w', encoding='utf-8') as json_file:
json.dump(self.json,
json_file,
ensure_ascii=False, indent=' ')
[docs] def reset(self):
"""Sets cache to False and reset others internal attributes.
Use when for some reason the internal state was
somehow modified by user.
"""
self._json = {}
self._hierarchy = []
self._cache = False
[docs]def gen_title_base(dir_path=".", base_name="titles", indent=4, forced=False):
"""Generates titles base from all PDFs immediately under dir_path directory.
The base is generated under dir_path directory.
Args:
dir_path: path so base_name will contain all titles
from PDFs under dir_path
base_name: titles' base file name
indent: how many spaces used will be used for indent
Returns:
dict containing "titles" as key and a list of titles,
the same stored at base_name[.json]
"""
base_name = f"{dir_path}/{base_name + (not base_name.endswith('.json')) * '.json'}"
if os.path.exists(base_name) and not forced:
print(f"Error: {base_name} already exists")
return None
elif os.path.isdir(base_name):
print(f"Error: {base_name} ir a directory")
return None
titles = set()
for file in filter(lambda x: not os.path.isdir(x) and x.endswith('.pdf'), os.listdir(dir_path)):
extractor = ExtractorTitleSubtitle(file)
titles_text = map(lambda x: x.text, extractor.titles)
titles.update(titles_text)
json_content = {"titles": list(titles)}
with open(f"{base_name}", 'w', encoding='uft-8') as json_file:
json.dump(json_content, json_file,
ensure_ascii=False, indent=indent*' ')
return json_content
[docs]def gen_hierarchy_base(dir_path=".",
folder="hierarchy", indent=4, forced=False):
"""Generates json base from all PDFs immediately under dir_path directory.
The hiearchy files are generated under dir_path directory.
Args:
dir_path: path so folder containing PDFs
base_name: titles' base file name
forced: proceed even if folder `base_name` already exists
indent: how many spaces used will be used for indent
Returns:
List[Dict[str, List[Dict[str, List[Dict[str, str]]]]]]
e.g:
[
{ "22012019": [
{
"PODER EXECUTIVO": []
},
{
"SECRETARIA DE ESTADO DE FAZENDA,\nPLANEJAMENTO, ORÇAMENTO E GESTÃO": [
{
"SUBSECRETARIA DA RECEITA": ""
}
]
}
}
]
In case of error trying to create `base_name` folder,
returns None.
"""
folder = "{}/{}".format(dir_path, folder)
if not dir_path:
dir_path = "."
try:
os.makedirs(folder, exist_ok=forced)
except OSError as error:
print(error)
return None
hierarchies = []
for file in filter(lambda x: x.endswith('.pdf'), os.listdir(dir_path)):
et = ExtractorTitleSubtitle("{}/{}".format(dir_path, file))
hierarchy = et.titles_subtitles_hierarchy
hierarchy = [
({d[0]: [dict([(i, '')]) for i in d[1]]})
for d in hierarchy
]
hierarchy = {file.rstrip('.pdf'): hierarchy}
hierarchies.append(hierarchy)
json.dump(hierarchy,
open("{}/{}.json".format(
folder, file.rstrip('.pdf')), 'w'),
ensure_ascii=False, indent=indent*' ')
return hierarchies