GirderUploadStreamProcessor

class openmsistream.GirderUploadStreamProcessor(girder_api_url, girder_api_key, config_file, topic_name, *, girder_root_folder_id=None, collection_name=None, girder_root_folder_path=None, metadata=None, replace_existing=False, **other_kwargs)

Bases: DataFileStreamProcessor

A stream processor to reconstruct data files read as messages from a topic, hold them in memory or on disk, and upload them to a Girder instance when all of their messages have been received

Parameters:
  • girder_api_url (str) – URL of the REST API endpoint for the Girder instance to which files should be uploaded

  • girder_api_key (str) – API key for interacting with the Girder instance

  • config_file (pathlib.Path) – Path to the config file to use in defining the Broker connection and Consumers

  • topic_name (str) – Name of the topic to which the Consumers should be subscribed

  • girder_root_folder_ID (str) – ID of an existing Girder Folder relative to which files should be uploaded. Additional Folders will be created within this root Folder to replicate the original Producer-side subdirectory structure.

  • collection_name (str) – Name of the Girder Collection to which files should be uploaded. Only used if girder_root_folder_id is not given.

  • girder_root_folder_path (str) – Path to the root Folder within which files should be uploaded. Additional Folders will be created within this root Folder to replicate the original Producer-side subdirectory structure. This argument is only used if girder_root_folder_id is not provided. If a collection_name is given but this argument is not, a Folder named after the topic will be created within the Collection.

  • metadata (str (JSON-serializable)) – If this argument is given, an extra metadata field with the given value will be added to uploaded Files and Folders.

  • filepath_regex – If given, only messages associated with files whose paths match this regex will be consumed

_process_downloaded_data_file(datafile, lock)

Upload a fully-reconstructed file to the Girder instance, creating Folders as necessary to preserve original subdirectory structure. Also adds metadata to Files and Folders listing the version of OpenMSIStream that’s running and the name of the topic from which files are being consumed.

Parameters:
Returns:

None if upload was successful, a caught Exception otherwise

__process_downloaded_data_file(datafile, metadata=None)

Actual process_downloaded_data_file method used in the wrapper above

classmethod get_command_line_arguments()

Return the names of arguments needed to run the program from the command line

classmethod get_init_args_kwargs(parsed_args)

Get the list of init arguments and the dictionary of init keyword arguments for this class given a namespace of, for example, parsed arguments.

Parameters:

parsed_args (argparse.Namespace) – A namespace containing entries needed to determine the init args and kwargs for this class

Returns:

A list of init args

Returns:

A dictionary of init kwargs

classmethod run_from_command_line(args=None)

Run a GirderUploadStreamProcessor directly from the command line

Calls process_files_as_read() on a GirderUploadStreamProcessor defined by command line (or given) arguments

Parameters:

args (list, optional) – the list of arguments to send to the parser instead of getting them from sys.argv

ARGUMENT_PARSER_TYPE

alias of OpenMSIStreamArgumentParser

_print_still_alive()

Print the “still alive” character to the console like a regular ControlledProcess, but also produce messages to the heartbeat and/or log topics if applicable

property consumer_topic_name

Name of the topic to which the consumers are subscribed

get_heartbeat_message()

Return the HeartbeatProducible-type object that should be produced to the heartbeat topic

get_log_message()

Return the LogProducible-type object that should be produced to the log topic

get_new_producer(**kwargs)

Return a new OpenMSIStreamProducer object. Call this function from a child thread to get thread-independent Producers. Note: this function just creates the Producer; closing it etc. must be handled by whatever calls this function.

Returns:

a Producer created using the config set in the constructor

Return type:

OpenMSIStreamProducer

property other_datafile_kwargs

Overload this in child classes to define additional keyword arguments that should go to the specific datafile constructor

process_files_as_read()

Consumes messages and stores their data in memory. Uses several parallel threads to consume message and calls _process_downloaded_data_file() for fully read files. Runs until the user inputs a command to shut it down.

Returns:

the total number of messages consumed

Return type:

int

Returns:

the total number of messages processed (registered in memory)

Return type:

int

Returns:

the number of files successfully processed during the run

Return type:

int

Returns:

the paths of up to 50 most recent files successfully processed during the run

Return type:

List

set_heartbeat_producer(producer, close_it=False)

Set producer instance for generating heartbeats

set_log_producer(producer, close_it=True)

Set producer instance for generating logs