Fetch Files
The Fetch Files step is the first step of a Flow.
How to use
-
In Flow Editor, add the Fetch-Files step in a Flow as the first step.
-
Configure the required settings:
-
Input File Document
-
Input Folder
-
Output Folder
-
-
Configure optional Extra Settings:
-
Open Params
-
Scripts Directory
-
-
Run the Flow. The results are in the Output Folder. For example,
s1_fetch_files
. -
Use the defined output folder as the input folder for the Process Files step in a Flow.
Required settings
Configure the required settings.
Input File Document
One of the following must be set.
-
Custom - Calls a custom fetcher. The fetcher is configured with Extra Settings > Open Params and the scripts directory.
-
Fetch-attachment - Calls the attachment fetcher that fetches embedded attachments from files of different types.
Attachments can be extracted from these supported file types:
File type Supported attachments extracted PDF All (PDF, XLSX, XLS, PPT, DOCX, TXT, JPG, GIF, PNG, and so on) EML All (PDF, XLSX, XLS, PPT, DOCX, TXT, JPG, GIF, PNG, and so on) XLSX and XSL PDF
Input Folder
Select the input folder that contains the files to extract the attachments from.
Output Folder
Select the output folder for the extracted attachments.
Files generated in this output folder are based on the JSON configuration provided in the Open Params. These files will be stored in the output folder:
-
Attachment files that are extracted from the input files.
-
Files that do not contain attachments.
-
Files that cannot contain attachments, example: .png, .txt, and so on.
-
Errors folder with error logs
If the attachment fetcher fails to fetch attachments for certain files, then ‘Attachment_fetcher_error_resp.json’ error logs are generated for each file in an
errors
folder.
Extra settings
Optional settings.
Open Params
A JSON object that provides details like username and password to your specific fetcher.
-
If Input File Document is Custom, then any key-value pairs are supported in this JSON object.
-
If Input File Document is Fetch-attachments, then the required configuration is:
{"output_attachments_only":"true"}
Scripts Directory
When this folder is selected, all .py
files are scanned for registered fetcher classes using the special registration function register_fetcher
. If you define a fetcher class, you can attach custom Python code to download files from remote sources.
Sample custom fetcher
Here is a sample CustomFetcher
:
from instabase.custom_scripts_lib.fetcher import FetcherBase
class CustomFetcher(FetcherBase):
def __init__(self, context):
# type: (FetcherContext) -> None
# Invoke base class constructor - Mandatory
super(MyFetcher, self).__init__(context)
def get_version(self):
# type: () -> Text
return '1.0'
def execute(self):
# type: () -> None
# Your implementation here
out_files = []
per_query_list = []
for i in range(0, len(self.context.input_payloads), 1):
input_payload = self.context.input_payloads[i]
content = json.dumps(dict(
input_record=self.context.input_records[i],
csv_params=input_payload['csv_params'],
json_params=input_payload['json_params'],
query_name=input_payload['query_name']
), indent=2)
out_file = {
'file_name': 'input_record{}.txt'.format(i),
'content': content,
'content_type': 'application/text'
}
per_query_files_dict = {
'query_name': input_payload['query_name'],
'out_files': [out_file]
}
out_files.append(out_file)
per_query_list.append(per_query_files_dict)
self.context.out_files.extend(out_files)
self.context.out_files_per_query_list.extend(per_query_list)
def register_fetcher():
# type: () -> Dict
return {
'customfetcher': {
'class': CustomFetcher
}
}
In this example, the term query
is a synonym for input_record
. For fetcher to work with a Flow Binary, you must associate all the files fetched for a particular query. Specify the queries for the fetcher using a .csv
or .json
file.
- In a CSV file, each input record corresponds to one line.
- In a JSON file each record must contain the
query_name
andjson_params
key. You must use this same query_name when populating theout_files_per_query_list
. You can put any custom information that is associated with json_params.
An example JSON file looks like:
{
"records": [
{
"query_name": "company1,usa",
"json_params": {
"hq": "los angeles"
}
},
{
"query_name": "company2,china",
"json_params": {
"hq": "miami"
}
}
]
}
The fetcherContext (self.context) is an object with the following properties:
-
open_params - type: Dict. The JSON object you provided in the Fetch Files step.
-
input_payloads - type:
List[InputPayloadDict]
. The list of inputs provided in your CSV or JSON input files. -
out_files - type:
List[OutFileDict]
. Add an entry to this list for every file to write to theout
directory. For fetcher to work inside Flow Binary, you must populate out_files_per_query_list instead. -
out_files_per_query_list - type:
List[OutFilesPerQueryDict]
. Add an entry for every input record and its associated files. -
error - type: Text. To propagate an error message from this fetcher, populate this string.
-
runtime_config - type:
Dict[Text, Text]
. This map provides access to the runtime config that is passed to the Flow Binary. -
ibfile - type: IBFile Object. This object provides access to the IBFile module for performing file operations.
For example, access the custom fetcher like this:
from instabase.custom_scripts_lib.fetcher import FetcherBase
class CustomFetcher(FetcherBase):
def __init__(self, context):
...
def get_version(self):
...
def execute(self):
# type: () -> None
# Your implementation here
...
path = '/dir1/subdir1/'
self.context.ibfile.list_dir(path)
list_dir_info, err = clients.ibfile.list_dir(path, start_page_token)
if err:
logging.info('ERROR list_dir at path {}: {}'.format(path, err))
else:
for node in list_dir_info.nodes:
logging.info('Node {}'.format(node.as_dict()))
For a list of all IBFile methods, see Libraries for UDFs.
InputPayloadDict
Consists of the following keys and values:
-
csv_params - Type: Text. A string corresponding to one line in the CSV input (only when
.csv
files are provided). -
json_params - Type: Dict. The
json_params
dict specified in each of the records in the.json
files. -
query_name - Type: Text. A string that uniquely identifies the record. When you fetch files for a particular query and add it to
out_files_per_query_list
you must use this query name for the downstream Flow to work correctly.
OutFileDict
Must consist of the following keys and values:
-
file_name - Type: Text. The filename for the file written to the out directory.
-
content - Type: bytes. The contents of the file.
-
content_type - Type: Text. The content type for the file, such as “application/pdf”.
OutFilesPerQueryDict
Must consist of the following keys and values:
-
query_name - Type: Text. The query or input record that was used to initiate the fetcher, such as “John Doe” if you were fetching based on that name.
-
out_files - Type:
List[OutFileDict]
. List of files that were fetched and associated with that query. -
error - Type: Text. If an error occurred at any stage, fill in this error, and block further execution for this particular query.
FetcherBase class
The FetcherBase class provides you the following properties:
- chrome_driver - Type: Selenium WebDriver. You can automate Chrome clicks and actions using this WebDriver object. See the selenium-webdriver package README for details.
The CustomFetcher populates out_files
when using non-compiled binary. When the fetcher is executed from within the Flow Binary, out_files_per_query_list
must be populated.
Fetch files utilities
The fetch files library provides a suite of utilities to use in custom Fetch Files steps. This library provides access to common search terms, helper functions for parsing HTML, and utilities for retrying failed API requests.
Common search terms
Many custom fetchers make requests on a subject for a certain topic, such as searching for Company A within the context of class action lawsuits. Use the following code to access these common search terms.
from instabase.fetcher_utils.all_topics import Topics
from instabase.fetcher_utils.topics.names import TopicNames
# Create the Topics object
topics = Topics()
# For a list of topics names...
topic_names = topics.all_topic_names
# -> [Topic names, such as 'Fraud' or 'Money Laundering']
# Retrieve a list of search terms for a given topic
topic = topics.get_topic(TopicNames.ANTITRUST_VIOLATION)
print(topic.search_terms)
# -> [Search terms, such as 'antitrust' or 'monopoly']
The list of possible profiles (named through TopicNames):
-
ADVERSE_NEWS
-
ANTITRUST_VIOLATION
-
CLASS_ACTION_LAWSUIT
-
DRUG_TRAFFICKING
-
HUMAN_TRAFFICKING
-
MONEY_LAUNDERING
-
SANCTIONED_COUNTRIES
-
TERRORISM
API requests
This module provides an easy way to construct a retry-enabled requests.Session
object.
from instabase.fetcher_utils.requests import RetryRequestSession
# Use this session to make your requests.
session = RetryRequestSession()
The constructor can accept the following retry arguments:
-
retries - Type: int. The maximum number of retries to make for a request. Defaults to 3.
-
backoff_factor - Type: float. A backoff factor to apply between attempts after each try, resulting in a sleep for some time. Defaults to 0.3.
-
status_forcelist. Type: tuple[int]. A set of integer HTTP status codes to force a retry on. Defaults to
(500, 502, 504)
.
HTML processing
Many custom fetch file scripts pull articles and webpages from various sources. The fetch files utils library provides functions for extracting text content from these web pages to help content processing.
For example:
import instabase.fetcher_utils.html_parser as html_parser
html_content = u'<html><body>Hello <i>there!</i></body></html>'
text_content = html_parser.extract_text_from_webpage(html_content)
# -> u'Hello there!'