GirderUploadStreamProcessor

class openmsistream.GirderUploadStreamProcessor(girder_api_url, girder_api_key, config_file, topic_name, *, girder_root_folder_id=None, collection_name=None, girder_root_folder_path=None, metadata=None, **other_kwargs)

Bases: DataFileStreamProcessor

A stream processor to reconstruct data files read as messages from a topic, hold them in memory or on disk, and upload them to a Girder instance when all of their messages have been received

Parameters:
  • girder_api_url (str) – URL of the REST API endpoint for the Girder instance to which files should be uploaded

  • girder_api_key (str) – API key for interacting with the Girder instance

  • config_file (pathlib.Path) – Path to the config file to use in defining the Broker connection and Consumers

  • topic_name (str) – Name of the topic to which the Consumers should be subscribed

  • girder_root_folder_ID (str) – ID of an existing Girder Folder relative to which files should be uploaded. Additional Folders will be created within this root Folder to replicate the original Producer-side subdirectory structure.

  • collection_name (str) – Name of the Girder Collection to which files should be uploaded. Only used if girder_root_folder_id is not given.

  • girder_root_folder_path (str) – Path to the root Folder within which files should be uploaded. Additional Folders will be created within this root Folder to replicate the original Producer-side subdirectory structure. This argument is only used if girder_root_folder_id is not provided. If a collection_name is given but this argument is not, a Folder named after the topic will be created within the Collection.

  • metadata (str (JSON-serializable)) – If this argument is given, an extra metadata field with the given value will be added to uploaded Files and Folders.

  • filepath_regex – If given, only messages associated with files whose paths match this regex will be consumed

_process_downloaded_data_file(datafile, lock)

Upload a fully-reconstructed file to the Girder instance, creating Folders as necessary to preserve original subdirectory structure. Also adds metadata to Files and Folders listing the version of OpenMSIStream that’s running and the name of the topic from which files are being consumed.

Parameters:
Returns:

None if upload was successful, a caught Exception otherwise

classmethod get_command_line_arguments()

Return the names of arguments needed to run the program from the command line

classmethod get_init_args_kwargs(parsed_args)

Get the list of init arguments and the dictionary of init keyword arguments for this class given a namespace of, for example, parsed arguments.

Parameters:

parsed_args (argparse.Namespace) – A namespace containing entries needed to determine the init args and kwargs for this class

Returns:

A list of init args

Returns:

A dictionary of init kwargs

classmethod run_from_command_line(args=None)

Run a GirderUploadStreamProcessor directly from the command line

Calls process_files_as_read() on a GirderUploadStreamProcessor defined by command line (or given) arguments

Parameters:

args (list, optional) – the list of arguments to send to the parser instead of getting them from sys.argv

ARGUMENT_PARSER_TYPE

alias of OpenMSIStreamArgumentParser

property other_datafile_kwargs

Overload this in child classes to define additional keyword arguments that should go to the specific datafile constructor

process_files_as_read()

Consumes messages and stores their data in memory. Uses several parallel threads to consume message and calls _process_downloaded_data_file() for fully read files. Runs until the user inputs a command to shut it down.

Returns:

the total number of messages consumed

Return type:

int

Returns:

the total number of messages processed (registered in memory)

Return type:

int

Returns:

the number of files successfully processed during the run

Return type:

int

Returns:

the paths of up to 50 most recent files successfully processed during the run

Return type:

List