UDF clients
All UDFs in Flow, such as the Map UDF and Reduce UDF, are provided a clients Python object as input. The clients object contains API clients that let you access Instabase APIs as the user running the flow, without having to pass in an access token. The clients object contains four clients, each used to access different APIs:
-
flow_client
: Accesses flow APIs. -
job_client
: Accesses job APIs. -
ibfile
: Accesses the filesystem API. -
conversion_client
: Accesses the conversion API.
You can access the clients object in two ways:
- As an input variable for the UDF. For example:
def udf_func(input_record: Dict, clients: object, *args, **kwargs):
flow_client = clients.flow_client
...
- Through the
_FN_CONTEXT_KEY
keyword argument. For example:
fn_context = kwargs.get('_FN_CONTEXT_KEY')
clients, _ = fn_context.get_by_col_name("CLIENTS")
flow_client = clients.flow_client
Flow client
The flow client is used to interact with the flow APIs. It can be used to run a new flow, query flow status, and perform other flow tasks.
Run flow binary
The run_flow_binary()
method runs the flow binary with the given settings.
Input parameters
Input parameters are the same as request parameters for the Run flow binary API.
Output parameters
Returns a tuple of result and an error message. The result contains the following keys:
Name | Type | Description |
---|---|---|
job_id |
string | ID for the job reported by the initial execution request. |
output_folder |
string | Path to the output folder in Instabase file system. |
Example
import logging
def run_flow(**kwargs):
fn_context = kwargs.get("_FN_CONTEXT_KEY")
clients, _ = fn_context.get_by_col_name("CLIENTS")
flow_client = clients.flow_client
run_flow_result, err = flow_client.run_flow_binary(
input_dir='jaydoe/my-repo/fs/Instabase Drive/sampleflow/datasets/input',
binary_path='jaydoe/my-repo/fs/Instabase Drive/sampleflow/childflow.ibflowbin',
settings={
'delete_out_dir': False,
'output_has_run_id': True
})
if err:
logging.error(f'Unable to run flow binary. Error={err}')
logging.info('Flow scheduled, job_id={}, output_folder={}'.format(run_flow_result['job_id'], run_flow_result['output_folder'])
Get flow status
The get_status()
method gets the current status of the flow.
Input parameters
Name | Type | Description |
---|---|---|
job_id |
string | ID for the job reported by the initial execution request. |
Output parameters
Returns a tuple of result and an error message. The result contains the following keys:
Name | Type | Description | Values |
---|---|---|---|
state |
string | State of the flow. | PENDING or DONE |
status |
string | Status of the flow. | OK or ERROR |
job_id |
string | ID for the job reported by the initial execution request. | |
output_folder |
string | Path to the output folder. | |
reviewer |
string | Instabase username of the user assigned to review the job. Empty if none. | |
review_state |
string | Current state of job in the review process. | NONE , IN REVIEW , COMPLETED , NOT_COMPLETED |
num_files_processed |
integer | Number of records processed. | |
num_files_failed |
integer | Number of records that failed. | |
num_files_total |
integer | Total number of records. |
Example
import logging
def get_flow_status(**kwargs):
fn_context = kwargs.get("_FN_CONTEXT_KEY")
clients, _ = fn_context.get_by_col_name("CLIENTS")
flow_client = clients.flow_client
flow_status, err = flow_client.get_status(job_id='123456789')
if err:
logging.error(f'Unable to get flow status. Error={err}')
logging.info('Flow status={}'.format(str(flow_status)))
List pipeline
The pipelines()
method lists flow pipelines a user has access to.
Input parameters
Name | Type | Description | Values |
---|---|---|---|
perms |
list | Pipeline permissions assigned to the user. | read , execute , delete , write , manage_pipelines |
include_all |
boolean | Optional. Specifies whether to include all pipelines. | true , false |
Output parameters
Returns a tuple of result and an error message. The result has the same response schema as the List flow pipelines API.
Example
import logging
def list_pipelines(**kwargs):
fn_context = kwargs.get("_FN_CONTEXT_KEY")
clients, _ = fn_context.get_by_col_name("CLIENTS")
flow_client = clients.flow_client
pipelines, err = flow_client.pipelines(perms=['read'])
if err:
logging.error(f'Unable to list pipelines. Error={err}')
logging.info('Pipelines={}'.format(str(pipelines)))
Job client
Use the job client to access the job APIs. The client has methods to pause, resume, retry, cancel, list, get status, and update pipelines of a job.
List job
The list()
method is used to list jobs based on the filter parameters.
Input parameters
Name | Type | Description | Values |
---|---|---|---|
req_user |
string | Optional. Instabase username of the user who submitted the job. | |
req_reviewer |
string | Optional. Instabase username of the user assigned to review the job. | |
limit |
integer | Optional. Maximum number of flow jobs to return. | Jobs limit per response (default 20). |
offset |
integer | Optional. Initial flow job index to start returning jobs from. Used for pagination with limit. | Starting index (default 0 ). |
from_timestamp |
integer | Optional. 10-digit Unix timestamp. Returns all jobs started after this timestamp. | Starting timestamp (default is one week before current timestamp). |
to_timestamp |
integer | Optional. 10-digit Unix timestamp. Returns all jobs started before this timestamp. | Ending timestamp (default is the current timestamp). |
job_id |
string | Optional. ID associated with each job, or the partial ID. Returns the singular flow job associated with that job id. | A valid job ID. |
job_ids |
string | Optional. Returns all flow jobs associated with any of the job IDs passed in the list. | A comma-separated list of job IDs. |
requested_states |
list | Optional. Returns flow jobs in any of the input states. When not passed, all flow jobs are returned. | A list of strings. Valid string values are: PENDING , COMPLETE , FAILED , CANCELLED , RUNNING , PAUSED , and CHECKPOINT_FAILED . |
tags |
list | Optional. Returns all flow jobs that were started with any of the input tags. See how to attach tags to flow jobs in the Run a Flow binary API. | A list of string tags. |
pipeline_ids |
list | Optional. The list of pipeline IDs of the jobs. | A list of strings representing the pipeline IDs of the jobs. |
orgs |
string | Optional. The organization of the user who submitted the job. | A string value representing the organization of the user who submitted the job. |
review_states |
list | Optional. Returns all flow jobs that are in any of the given review states. | A list of strings. Valid string values are: NONE , IN REVIEW , COMPLETED , and NOT_COMPLETED . |
Output parameters
Returns a tuple of a dictionary containing the result and an error message. The result contains the same response schema as the List jobs API.
Example
import logging
def list_jobs(**kwargs):
fn_context = kwargs.get("_FN_CONTEXT_KEY")
clients, _ = fn_context.get_by_col_name("CLIENTS")
job_client = clients.job_client
list_result, err = job_client.list(limit=10, offset=0)
if err:
logging.error(f'Unable to list jobs. Error={err}')
logging.info('Jobs listed, jobs={}'.format(str(list_result['jobs'])))
Job status
The status()
method is used to get the status of a job.
Input parameters
Name | Type | Description | Values |
---|---|---|---|
job_id |
string | ID for the job reported by the initial execution request. | |
result_type |
string | Type of job. | flow, refiner, job, async, group |
Output parameters
Returns a tuple of a dictionary containing the result and an error message. The result is the same as the Job status API response schema.
Example
import logging
def get_job_status(**kwargs):
fn_context = kwargs.get("_FN_CONTEXT_KEY")
clients, _ = fn_context.get_by_col_name("CLIENTS")
job_client = clients.job_client
job_id = 'job_id'
status_result, err = job_client.status(job_id=job_id)
if err:
logging.error(f'Unable to get job status. Error={err}')
logging.info('Job status, status={}'.format(str(status_result)))
Pause job
The pause()
method is used to pause a given job.
Input parameters
Name | Type | Description |
---|---|---|
job_id |
string | ID for the job reported by the initial execution request. |
Output parameters
Returns a tuple of a dictionary containing the request status and an error message. The result is a dictionary containing the following:
Name | Type | Description | Values |
---|---|---|---|
status |
string | Status of the request. | OK or ERROR |
Example
import logging
def pause_job(**kwargs):
fn_context = kwargs.get("_FN_CONTEXT_KEY")
clients, _ = fn_context.get_by_col_name("CLIENTS")
job_client = clients.job_client
pause_result, err = job_client.pause(job_id='123456789')
if err:
logging.error(f'Unable to pause job. Error={err}')
logging.info('Job paused, status={}'.format(pause_result['status']))
Resume job
The resume()
method is used to resume a given job.
Input parameters
Name | Type | Description |
---|---|---|
job_id |
string | ID for the job reported by the initial execution request. |
Output parameters
Returns a tuple of a dictionary containing the request status and an error message. The result is a dictionary containing the following:
Name | Type | Description | Values |
---|---|---|---|
status |
string | Status of the request. | OK or ERROR |
Example
import logging
def resume_job(**kwargs):
fn_context = kwargs.get("_FN_CONTEXT_KEY")
clients, _ = fn_context.get_by_col_name("CLIENTS")
job_client = clients.job_client
resume_result, err = job_client.resume(job_id='123456789')
if err:
logging.error(f'Unable to resume job. Error={err}')
logging.info('Job resumed, status={}'.format(resume_result['status']))
Retry job
The retry()
method is used to retry a given job.
Input parameters
Name | Type | Description | Values |
---|---|---|---|
job_id |
string | ID for the job reported by the initial execution request. | |
retry_type |
string | Optional. Specifying a type retries files within the flow with only a specific type of failure. If type is omitted, all failed files are retried. | all, checkpoint_failure, step_failure |
Output parameters
Returns a tuple of a dictionary containing the request status and an error message. The result is a dictionary containing the following:
Name | Type | Description | Values |
---|---|---|---|
status |
string | Status of the request. | OK or ERROR |
Example
import logging
def retry_job(**kwargs):
fn_context = kwargs.get("_FN_CONTEXT_KEY")
clients, _ = fn_context.get_by_col_name("CLIENTS")
job_client = clients.job_client
retry_result, err = job_client.retry(job_id='123456789')
if err:
logging.error(f'Unable to retry job. Error={err}')
logging.info('Job retried, status={}'.format(retry_result['status']))
Cancel job
The cancel()
method is used to cancel a given job-id.
Input parameters
Name | Type | Description |
---|---|---|
job_id |
string | ID for the job reported by the initial execution request. |
Output parameters
Returns a tuple of a dictionary containing the request status and an error message. The result is a dictionary containing the following:
Name | Type | Description | Values |
---|---|---|---|
status |
string | Status of the request. | OK or ERROR |
Example
import logging
def cancel_job(**kwargs):
fn_context = kwargs.get("_FN_CONTEXT_KEY")
clients, _ = fn_context.get_by_col_name("CLIENTS")
job_client = clients.job_client
cancel_result, err = job_client.cancel(job_id='123456789')
if err:
logging.error(f'Unable to cancel job. Error={err}')
logging.info('Job cancelled, status={}'.format(cancel_result['status']))
Update pipeline
The update_pipelines()
method is used to update the pipeline of a job. Given a job ID and list of pipeline ID, it updates the job’s pipeline_ids
.
Input parameters
Name | Type | Description |
---|---|---|
job_id |
string | ID for the job reported by the initial execution request. |
pipeline_ids |
list | List of strings representing the pipeline IDs of the jobs. |
Output parameters
Returns a tuple of a dictionary containing the result and an error message. The result contains the following:
Name | Type | Description | Values |
---|---|---|---|
status | string | Status of the request. | ERROR , OK |
updated_pipelines | list | List containing flow pipeline info. | A list of dictionaries containing keys flow_pipeline_name , flow_pipeline_id . |
Example
import logging
def update_pipelines(**kwargs):
fn_context = kwargs.get("_FN_CONTEXT_KEY")
clients, _ = fn_context.get_by_col_name("CLIENTS")
job_client = clients.job_client
update_result, err = job_client.update_pipelines(job_id='123456789', pipeline_ids=['123456789', '987654321'])
if err:
logging.error(f'Unable to update pipeline. Error={err}')
logging.info('Pipeline updated, status={}'.format(update_result['status']))
Ibfile
The ibfile client is an Instabase FileHandle reference that provides pre-authenticated access to file operations. All operations done with the ibfile object have the same permissions as the user that invoked the operation.
is_file
The is_file()
method checks if the provided path is a file.
Input parameters
Name | Type | Description |
---|---|---|
complete_path |
string | The absolute path of the file. |
Output parameters
The function returns a boolean value indicating whether the provided path is a file or not.
Example
import logging
def test_is_file(clients, complete_path):
is_file_resp = clients.ibfile.is_file(complete_path)
logging.info('Is file at path {}: {}'.format(complete_path, is_file_resp))
is_dir
The is_dir()
method checks if the provided path is a directory.
Input parameters
Name | Type | Description |
---|---|---|
complete_path |
string | The absolute path of the file. |
Output parameters
The function returns a boolean value indicating whether the provided path is a directory.
Example
import logging
def test_is_dir(clients, complete_path):
is_dir = clients.ibfile.is_dir(complete_path)
logging.info('Is dir at path {}: {}'.format(complete_path, is_dir))
exists
The exists()
method checks if a file or a directory exists at the provided path.
Input parameters
Name | Type | Description |
---|---|---|
complete_path |
string | The absolute path of the file. |
Output parameters
The function returns a boolean value indicating whether the file in the provided path exists.
Example
import logging
def test_exists(clients, complete_path):
exists_resp = clients.ibfile.exists(complete_path)
logging.info('Exists at path {}: {}'.format(complete_path, exists_resp))
mkdir
The mkdir()
method creates a new directory at the specified path.
Input parameters
Name | Type | Description |
---|---|---|
complete_path |
string | The absolute path of the directory. |
Output parameters
The function returns a tuple containing an instance of the MkdirResp
class and a string. The MkdirResp
instance has a status attribute of type StatusCode enum which indicates the status of the operation.
Example
import logging
def test_mkdir(clients, complete_path):
mkdir_resp, err = clients.ibfile.mkdir(complete_path)
if err:
logging.error(f'Unable to create directory. Error={err}')
logging.info('Mkdir at path {}: {}'.format(complete_path, mkdir_resp.status))
copy
The copy()
method copies a file or a directory along with its contents from the complete_path
to new_complete_path
.
Input parameters
Name | Type | Description |
---|---|---|
complete_path |
string | The absolute path of the original file or directory. |
new_complete_path |
string | The absolute path of the target file or directory. |
Output parameters
The function returns a tuple containing an instance of the CopyResp
class and a string. The CopyResp
instance has a status attribute of type StatusCode enum which indicates the status of the operation.
Example
import logging
def test_copy(clients, complete_path, new_complete_path):
copy_resp, err = clients.ibfile.copy(complete_path, new_complete_path)
if err:
logging.error('Unable to copy at path {}: {}'.format(complete_path, err))
return
logging.info('Copy at path {}: {}'.format(complete_path, copy_resp.status))
rm
The rm()
method removes a file or directory at the specified path.
Input parameters
Name | Type | Description | Values |
---|---|---|---|
complete_path |
string | The absolute path of the file or directory to be removed. | |
recursive |
boolean | Optional. Indicates whether to remove a directory and all its contents recursively. When set to True , the directory and all its contents are removed. |
True or False (default is True ). |
force |
boolean | Optional. When set to True , non-existent resource does not result in an error. When set to False non-existent resource results in an error. |
True or False (default isTrue ). |
Output parameters
The function returns a tuple containing an instance of the RmResp
class and a string. The RmResp
instance has a status attribute of type StatusCode enum which indicates the status of the operation.
Example
import logging
def test_rm(clients, complete_path):
rm_resp, err = clients.ibfile.rm(complete_path)
if err:
logging.error('Rm at path {} failed: {}'.format(complete_path, err))
logging.info('Rm at path {}: {}'.format(complete_path, rm_resp.status))
def test_rm_optional_params(clients, complete_path):
rm_resp, err = clients.ibfile.rm(complete_path, recursive=True, force=False)
if err:
logging.error('Rm at path {} failed: {}'.format(complete_path, err))
return
logging.info('Rm at path {}: {}'.format(complete_path, rm_resp.status))
open
The open()
method opens a file and returns a file object that can be used to access the contents of the file.
Input parameters
Name | Type | Description |
---|---|---|
path |
string | The absolute path of the file to be opened. |
mode |
string | Optional parameter indicating the mode in which to open the file. The mode specifies how to use the file, such as for reading, writing, or appending. The valid modes are defined in VALID_MODES. The default mode is r (read-only). |
Output parameters
The function returns an instance of the IBFileBase class, which represents the opened file. The IBFileBase instance has the following attributes:
-
path
: A string containing the relative path to the file. -
_mode
: A string containing the mode in which the file was opened. See VALID_MODES
Valid modes
VALID_MODES
is a frozenset containing the modes that can be used when opening a file. The following modes are allowed:
Read-only modes
-
‘r’: Read-only mode (default).
-
‘rU’: Universal read-only mode.
-
‘rb’: Read-only mode in binary format.
-
‘rbU’: Universal read-only mode in binary format.
Writeable modes
-
‘r+’: Read and write mode.
-
‘rb+’: Read and write mode in binary format.
-
‘w’: Write-only mode.
-
‘w+’: Read and write mode. Truncates the file to zero length or creates a new file if it doesn’t exist.
-
‘wb’: Write-only mode in binary format. Truncates the file to zero length or creates a new file if it doesn’t exist.
-
‘wb+’: Read and write mode in binary format. Truncates the file to zero length or creates a new file if it doesn’t exist.
Append modes
-
‘a’: Append-only mode. Creates a new file if it doesn’t exist.
-
‘a+’: Read and append mode. Creates a new file if it doesn’t exist.
-
‘ab’: Append-only mode in binary format. Creates a new file if it doesn’t exist.
-
‘ab+’: Read and append mode in binary format. Creates a new file if it doesn’t exist.
For more information on modes, see the Python documentation.
Example
import logging
def test_open(clients, complete_path):
file_obj = clients.ibfile.open('/path/to/file.txt', mode='w')
file_obj.write('This is some data to write to the file.')
file_obj.close()
read_file
The read_file()
method returns the contents of a file as a string.
Input parameters
Name | Type | Description |
---|---|---|
file_path |
string | The absolute path of the file to be read. |
Output parameters
The function returns a tuple containing the contents of the file, and error message explaining the result of the operation. If the operation was successful, the error message string is None
.
Example
import logging
def test_read_file(clients, complete_path):
read_file_resp, err = clients.ibfile.read_file(complete_path)
if err:
logging.error('Read file at path {} failed: {}'.format(complete_path, err))
return
logging.info('Read file at path {}: {}'.format(complete_path, len(read_file_resp)))
write_file
The write_file()
method writes a string to a file.
Input parameters
Name | Type | Description |
---|---|---|
file_path |
string | The absolute path of the file to be written to. |
content |
string | The string to be written to the file. |
Output parameters
The function returns a tuple containing a boolean value and a string. The boolean value is True
if the operation was successful, and False
if it was not. The string is an error message explaining the result of the operation. If the operation was successful, the error message is None
.
Example
import logging
def test_write_file(clients, complete_path, data):
write_file_resp, err = clients.ibfile.write_file(complete_path, data)
if err:
logging.error('Write file at path {} failed: {}'.format(complete_path, err))
logging.info('Write file at path {}: {}'.format(complete_path, write_file_resp))
list_dir
The list_dir()
method returns a list of files and folders in a given directory.
Input parameters
Name | Type | Description |
---|---|---|
path |
string | The absolute path of the directory to list. |
start_page_token |
string | An optional page token to use for pagination. If provided, the function returns the contents of the directory starting from this page. |
Output parameters
The function returns a tuple containing an instance of the ListDirInfo
class and a string.
The ListDirInfo
instance has the following attributes:
-
nodes
: A list of NodeInfo objects, representing the file and folder resources in the directory. -
start_page_token
: A string representing the start page token for the current list of resources. -
next_page_token
: A string representing the start page token for the next list of resources. -
has_more
: A boolean value indicating whether there are more resources in the directory that have not been listed yet.
The string is a error message explaining the result of the operation. If the operation was successful, the error message is None
.
The NodeInfo
class has the following attributes:
-
name
: A string containing the name of the file or folder resource. -
path
: A string containing the path of the resource relative to the mounted repo. -
full_path
: A string containing the full path of the resource, including the location of the mounted repo. -
_type
: A string indicating the type of the node, eitherfile
orfolder
.
Example
import logging
def test_list_dir(clients, path, start_page_token):
list_dir_info, err = clients.ibfile.list_dir(path, start_page_token)
if err:
logging.info('ERROR list_dir at path {}: {}'.format(path, err))
logging.info('List dir at path {}: {}'.format(path, list_dir_info))
for node in list_dir_info.nodes:
logging.info('Node {}'.format(node.as_dict()))
return list_dir_info.nodes
StatusCode enum
The StatusCode
enum is used to indicate the status of the operation and contains the following values:
-
OK: The operation was successful.
-
MISSING_PARAM: A required parameter was missing.
-
READ_ERROR: There was an error reading a file.
-
WRITE_ERROR: There was an error writing to a file.
-
NONEXISTENT: A required file or directory does not exist.
-
FAILURE: A general exception occurred.
-
NO_MOUNT_DETAILS: Mount details are missing.
-
ACCESS_DENIED: Access to a file or directory was denied.
Conversion client
The conversion client is used to convert Microsoft Office documents and HTML pages to PDFs.
The PDF conversion client is available in the following steps:
-
apply UDF
-
map UDF
-
reduce UDF
-
pre-flow
-
post-flow
The conversion service can convert .docx, .xlsx, .html, .csv and .pptx document types to PDF. The service maintains the original document layout.
The input to the service is the raw byte content of original document conversion types. The returned value is the converted PDF as bytes.
Converting documents to PDFs in a flow
The conversion client be used within Apply UDF, Map UDF, Reduce UDF, Pre-flow UDF, Post-flow UDF for custom conversion of Microsoft office documents and further processing of the generated PDFs.
Use the conversion client to send requests to the conversion service. Use the method clients.conversion_client.get_pdf()
to send conversion requests to the conversion service:
def get_pdf(document_bytes, conversion_type):
...
Input parameters
-
document_bytes
: The raw byte content of original document. -
conversion_type
:html_to_pdf
,docx_to_pdf
,xlsx_to_pdf
,csv_to_pdf
orpptx_to_pdf
The return value is the converted PDF as bytes.
Example UDF script for PDF conversion in the apply UDF step
-
Set the Output File Extension to pdf in the Apply UDF step of a flow.
-
Create the UDF script required by the conversion service.
-
The UDF script must filter the input files instead of creating new files for the converted documents to be read by next steps in the flow.
-
The UDF must return the converted bytes instead of creating a PDF in the output folder (usually
out/s1_apply_udf
).
-
def convert_docx_to_pdf(file_path, clients, *args, **kwargs):
file_content, err = clients.ibfile.read_file(file_path)
if err:
raise IOError('Could not read file {}'.format(file_path))
converted_file, err = clients.conversion_client.get_pdf(file_content,'docx_to_pdf')
if err:
raise IOError('Could not convert the file {}'.format(file_path))
return converted_file
def register(name_to_fn):
more_fns = {
'filter_files': {
'fn': convert_docx_to_pdf,
'ex': '',
'desc': ''
}
}
name_to_fn.update(more_fns)
Example UDF script for PDF conversion in the map UDF step
import os
def convert_docx_to_pdf_map_func(input_record, step_folder, clients, *args, **kwargs):
input_filepath = input_record['input_filepath']
file_content = input_record['content']
output_filename = input_record['output_filename']
output_file_without_ext = os.path.splitext(output_filename)[0]
# Filter files here only for docx files to be converted.
file_content, err = clients.ibfile.read_file(input_filepath)
if err:
raise IOError('Could not read file {}'.format(input_filepath))
converted_file, err = clients.conversion_client.get_pdf(file_content,'docx_to_pdf')
if err:
raise IOError('Could not convert the file {}'.format(input_filepath))
return {
"out_files": [
{
"filename": f"{output_file_without_ext}.pdf",
"content": converted_file
}
]
}
def register(name_to_fn):
more_fns = {
'convert_to_pdf_map_func': {
'fn': convert_to_pdf_map_func,
'ex': '',
'desc': ''
}
}
name_to_fn.update(more_fns)