o
    !i"                     @  s   d dl mZ d dlmZ d dlmZmZmZ d dlm	Z	 d dl
Z
d dlmZmZ G dd dZddddZG dd de	ZG dd deZG dd deZG dd deZdS )    )annotations)SpawnContext)EmptyFullQueue)ThreadN)
DataLoaderDatasetc                   @  s4   e Zd ZdZdddd	Zd
d Zdd Zdd ZdS )ThreadBuffera  
    Iterates over values from self.src in a separate thread but yielding them in the current thread. This allows values
    to be queued up asynchronously. The internal thread will continue running so long as the source has values or until
    the stop() method is called.

    One issue raised by using a thread in this way is that during the lifetime of the thread the source object is being
    iterated over, so if the thread hasn't finished another attempt to iterate over it will raise an exception or yield
    unexpected results. To ensure the thread releases the iteration and proper cleanup is done the stop() method must
    be called which will join with the thread.

    Args:
        src: Source data iterable
        buffer_size: Number of items to buffer from the source
        timeout: Time to wait for an item from the buffer, or to wait while the buffer is full when adding items
       {Gz?buffer_sizeinttimeoutfloatc                 C  s.   || _ || _|| _t| j| _d | _d| _d S NF)srcr   r   r   buffer
gen_thread
is_running)selfr   r   r    r   Z/home/dell461/cl/sdc2/last_ska_mid/HISourceFinder-master-l/src/monai/data/thread_buffer.py__init__(   s   
zThreadBuffer.__init__c              	   C  sN   | j D ]!}| jr!z| jj|| jd W n	 ty   Y nw n| js d S qd S )Nr   )r   r   r   putr   r   )r   Zsrc_valr   r   r   enqueue_values0   s   
zThreadBuffer.enqueue_valuesc                 C  s$   d| _ | jd ur| j  d | _d S r   )r   r   joinr   r   r   r   stop<   s   


zThreadBuffer.stopc                 c  s    d| _ t| jdd| _| j  zK| j rH| j s| j sOz| jj| j	dV  W n	 t
y3   Y nw | j rV| j s| j rW |   d S W |   d S W |   d S W |   d S |   w )NT)targetdaemonr   )r   r   r   r   startis_aliver   emptygetr   r   r   r   r   r   r   __iter__D   s&   
zThreadBuffer.__iter__N)r   r   )r   r   r   r   )__name__
__module____qualname____doc__r   r   r   r&   r   r   r   r   r
      s    r
   r   r   r   r   r   r   repeatsc                 c  s2    t | ||d}|D ]}t|D ]}|V  qq
dS )a  
    Create a ThreadBuffer object using the `src`, `buffer_size`, and `timeout` parameters given for the constructor
    arguments of the same names, and yield each generated object `repeats` number of times successively.

    Args:
        src: Source data iterable
        buffer_size: Number of items to buffer from the source
        timeout: Time to wait for an item from the buffer, or to wait while the buffer is full when adding items
        repeats: Number of repeat generations to perform which is asynchronous from the generation of the next value

    Returns:
        Generator yield (repeated) values from `src` asynchronously
    )r   r   r   N)r
   range)r   r   r   r+   r   batch_r   r   r   buffer_iteratorS   s   r/   c                      s,   e Zd ZdZedd Z fddZ  ZS )_ProcessThreadzHShim class to make a thread look like a process to the DataLoader class.c                 C  s   t | S N)idr   r   r   r   pidk   s   z_ProcessThread.pidc                   s0   zt    W d tjjjj_d S d tjjjj_w r1   )superruntorchutilsdata_utilsworker_worker_infor   	__class__r   r   r5   o   s   "z_ProcessThread.run)r'   r(   r)   r*   propertyr3   r5   __classcell__r   r   r<   r   r0   h   s
    
r0   c                   @  s    e Zd ZdZdd Zdd ZdS )_ProcessQueuezTShim class to make a thread queue look like a process queue to the DataLoader class.c                 C     d S r1   r   r   r   r   r   closey      z_ProcessQueue.closec                 C  rA   r1   r   r   r   r   r   cancel_join_thread|   rC   z _ProcessQueue.cancel_join_threadN)r'   r(   r)   r*   rB   rD   r   r   r   r   r@   v   s    r@   c                   @  s   e Zd ZdZeZeZdS )_ProcessThreadContextZprocessthreadN)r'   r(   r)   _namer0   Processr@   r   r   r   r   r   rE      s    rE   c                      s8   e Zd ZdZ				dd fddZ fddZ  ZS )ThreadDataLoadera  
    Subclass of `DataLoader` using a `ThreadBuffer` object to implement `__iter__` method asynchronously. This will
    iterate over data from the loader as expected however the data is generated on a separate thread. Use this class
    where a `DataLoader` instance is required and not just an iterable object.

    The default behaviour with `repeats` set to 1 is to yield each batch as it is generated, however with a higher
    value the generated batch is yielded that many times while underlying dataset asynchronously generates the next.
    Typically not all relevant information is learned from a batch in a single iteration so training multiple times
    on the same batch will still produce good training with minimal short-term overfitting while allowing a slow batch
    generation process more time to produce a result. This duplication is done by simply yielding the same object many
    times and not by regenerating the data.

    Another typical usage is to accelerate light-weight preprocessing (usually cached all the deterministic transforms
    and no IO operations), because it leverages the separate thread to execute preprocessing to avoid unnecessary IPC
    between multiple workers of DataLoader. And as CUDA may not work well with the multi-processing of DataLoader,
    `ThreadDataLoader` can be useful for GPU transforms. For more details:
    https://github.com/Project-MONAI/tutorials/blob/master/acceleration/fast_model_training_guide.md.

    The `use_thread_workers` will cause workers to be created as threads rather than processes although everything else
    in terms of how the class works is unchanged. This allows multiple workers to be used in Windows for example, or in
    any other situation where thread semantics is desired. Please note that some MONAI components like several datasets
    and random transforms are not thread-safe and can't work as expected with `thread workers`, need to check all the
    preprocessing components carefully before enabling `use_thread_workers`.

    See:
        * Fischetti et al. "Faster SGD training by minibatch persistency." ArXiv (2018) https://arxiv.org/abs/1806.07353
        * Dami et al., "Faster Neural Network Training with Data Echoing" ArXiv (2020) https://arxiv.org/abs/1907.05550
        * Ramezani et al. "GCN meets GPU: Decoupling "When to Sample" from "How to Sample"." NeurIPS (2020).
          https://proceedings.neurips.cc/paper/2020/file/d714d2c5a796d5814c565d78dd16188d-Paper.pdf

    Args:
        dataset: input dataset.
        buffer_size: number of items to buffer from the data source.
        buffer_timeout: time to wait for an item from the buffer, or to wait while the buffer is full when adding items.
        repeats: number of times to yield the same batch.
        use_thread_workers: if True and num_workers > 0 the workers are created as threads instead of processes
        kwargs: other arguments for `DataLoader` except for `dataset`.

    r   r   Fdatasetr	   r   r   buffer_timeoutr   r+   use_thread_workersboolc                   sP   |r| dddkrt |d< d|d< t j|fi | || _|| _|| _d S )Nnum_workersr   multiprocessing_contextFpersistent_workers)r%   rE   r4   r   r   rJ   r+   )r   rI   r   rJ   r+   rK   kwargsr<   r   r   r      s   

zThreadDataLoader.__init__c                 #  s&    t t  | j| j| jE d H  d S r1   )r/   r4   r&   r   rJ   r+   r   r<   r   r   r&      s   $zThreadDataLoader.__iter__)r   r   r   F)
rI   r	   r   r   rJ   r   r+   r   rK   rL   )r'   r(   r)   r*   r   r&   r?   r   r   r<   r   rH      s    +rH   )r   r   r   )r   r   r   r   r+   r   )
__future__r   Zmultiprocessing.contextr   queuer   r   r   	threadingr   r6   
monai.datar   r	   r
   r/   r0   r@   rE   rH   r   r   r   r   <module>   s   <
	