
    Th}                     8   % S SK r S SKrS SKrS SKrS SKrS SKrS SKrS SKrS SKrS SK	r	S SK
r
S SKrS SKrS SKrS SKrS SKJr  S SKJr  S SKJr  S SKJr  S SKJrJrJr  S SKJr  S SKJrJrJ r J!r!J"r"  S S	K#J$r$  S SK%r%S SK&r%S SK'r%S SK(J)r*  S SK+J,r,  S S
K-J.r.  S SK/J0r0  S SK1J2r2  S SK3J4r4J5r5J6r6J7r7J8r8J9r9J:r:J;r;J<r<J=r=J>r>J?r?  S SK@JArAJBrBJCrC  \R                  " \E5      rF\FR                  \R                  5        / SQrISS/rJ\:=(       d    \;=(       d    \>rK " S S\ 5      rL0 S\L" SS5      _S\L" SS5      _S\L" SS5      _S\L" SS5      _S \L" S!S"5      _S#\L" S$S%5      _S&\L" S'S(5      _S)\L" S*S+5      _S,\L" S-S.5      _S/\L" S0S15      _S2\L" S3S45      _S5\L" S6S75      _S8\L" S9S:5      _S;\L" S<S=5      _S>\L" S?S@5      _SA\L" SBSC5      _SD\L" SESF5      _SG\L" SHSI5      0ErM\ " SJ SK5      5       rNSL rOSM rPSN rQSO rRSP rSSQ rTSR rUSS rVST rWSU rXSV rYSW rZSX r[SY r\SZ r]S[ r^S\ r_SS] jr`S^ raS_ rbS` rcSa\%R                  Sb\eSc\eSd\f4Se jrg\7SfSgSh\" SiSj9ShSkSh4Sl j5       rh\=(       a  SmriO\e" \R                  " SnSo5      5      riSpSq0rk\<(       a  Sr\kSs'   SSt\f4Su jjrlSd\e4Sv jrm\Sw 5       rnSSx\eSy\eSz\e4S{ jjroSy\eS|\p4S} jrqSqr\!\	R                     \tS~'   SS\!\p   SdS4S jjruSS jrvSrw " S S\?5      rx " S S\x5      ryS\z\p\{\   4   S\S\4S jr|Sq}Sd\f4S jr~S rS\i\w4S jr " S S\?5      r " S S\,GR                  5      r " S S\,GR                  5      r\ SS j5       r " S S\%GR                  GR                  R~                  5      r " S S\y5      r " S S\?5      rg)    N)contextmanager)	dataclass)	timedelta)Enum)partialreducewraps)StringIO)AnyCallable
NamedTupleOptionalUnion)patch)
DeviceType)_SymmetricMemory)	trace_log)FILE_SCHEMAfind_free_portIS_SANDCASTLEretry_on_connect_failuresskip_but_pass_in_sandcastleskip_but_pass_in_sandcastle_if	TEST_CUDATEST_HPUTEST_WITH_ROCMTEST_WITH_TSANTEST_XPUTestCase)_install_threaded_pg_uninstall_threaded_pgProcessLocalGroupncclxcclhcclcudaxpuc                   *    \ rS rSr% \\S'   \\S'   Srg)TestSkip@   	exit_codemessage N)__name__
__module____qualname____firstlineno__int__annotations__str__static_attributes__r.       d/var/www/fran/franai/venv/lib/python3.13/site-packages/torch/testing/_internal/common_distributed.pyr*   r*   @   s    NLr7   r*   backend_unavailableH   z5Skipped because distributed backend is not available.small_worldsizeI   z Skipped due to small world size.odd_worldsizeW   zSkipped due to odd world size.no_cudaJ   zCUDA is not available.zmulti-gpu-1K   zNeed at least 1 CUDA devicezmulti-gpu-2M   zNeed at least 2 CUDA deviceszmulti-gpu-3P   zNeed at least 3 CUDA deviceszmulti-gpu-4Q   zNeed at least 4 CUDA deviceszmulti-gpu-5R   zNeed at least 5 CUDA deviceszmulti-gpu-6S   zNeed at least 6 CUDA deviceszmulti-gpu-7T   zNeed at least 7 CUDA deviceszmulti-gpu-8U   zNeed at least 8 CUDA devicesr$   L   z#c10d not compiled with NCCL support
skipIfRocmN   zTest skipped for ROCmno_peer_accessO   z'Test skipped because no GPU peer accessgenericV   zHTest skipped at subprocess level, look at subprocess log for skip reasonimporterrorX   z"Test skipped due to missing importno_acceleratorY   zaccelerator is not available.c                       \ rS rSr0 r1 Sk\S'   \" 5       \S'   SS1\S'   SS1\S'   0 r1 S	k\S
'   1 S	k\S'   1 S	k\S'   1 S	k\S'   \" 5       \S'   \(       a  S1\S'   \(       a	  S1\S'   Sr	gSr	g)DistTestCases_   >   mpiuccr$   allgather_coalescedr   r$   rX   zsendrecv anysourcezcpu barrier>   rX   gloor$   gpur'   ddpsubgrouppluginr&   hpur%   r(   r.   N)
r/   r0   r1   r2   skip_collectivesetbackend_featurer   r   r6   r.   r7   r8   rU   rU   _   s     O-CO)* #OH-3UOO()&,e_OM" O4OE5OF4OE"9OJ #OH"("( r7   rU   c                     U [         ;   $ N)DDP_RANK_DEVICES)devices    r8   requires_ddp_rankrg   u   s    %%%r7   c                 0   ^  [        T 5      U 4S j5       nU$ )zSkips if the world size exceeds the number of GPUs, ensuring that if the
test is run, each rank has its own GPU via ``torch.cuda.device(rank)``.c                    > [         (       d=  [        (       d2  [        (       d'  [        R                  " [
        S   R                  5        [        [        R                  S   5      n[         (       aL  [        R                  R                  5       U:  a*  [        R                  " [
        SU 3   R                  5        [        (       aL  [        R                  R                  5       U:  a*  [        R                  " [
        SU 3   R                  5        [        (       aL  [        R                  R                  5       U:  a*  [        R                  " [
        SU 3   R                  5        T" U 0 UD6$ )Nr?   
WORLD_SIZE
multi-gpu-)r   r   r   sysexit
TEST_SKIPSr,   r3   osenvirontorchr'   device_countr_   r(   )argskwargs
world_sizefuncs      r8   wrapperskip_if_no_gpu.<locals>.wrapper}   s    	XXHHZ	*445L12
9002Z?HHZ*ZL 9:DDE8		..0:=HHZ*ZL 9:DDE8		..0:=HHZ*ZL 9:DDET$V$$r7   r	   rv   rw   s   ` r8   skip_if_no_gpur{   y   s"     4[% % Nr7   c                 0   ^  [        T 5      U 4S j5       nU$ )Nc                     > [         R                  S   S:w  aG  [        [         R                  S   5      S:  a'  [        R                  " [
        S   R                  5        T" U 0 UD6$ )NBACKENDrW   rj      r;   ro   rp   r3   rl   rm   rn   r,   rs   rt   rv   s     r8   rw   (skip_if_small_worldsize.<locals>.wrapper   sR    JJy!U*BJJ|4L0MPQ0QHHZ 12<<=T$V$$r7   ry   rz   s   ` r8   skip_if_small_worldsizer           
4[% % Nr7   c                 0   ^  [        T 5      U 4S j5       nU$ )Nc                     > [         R                  S   S:w  aJ  [        [         R                  S   5      S-  S:X  a'  [        R                  " [
        S   R                  5        T" U 0 UD6$ )Nr~   rW   rj         r=   r   r   s     r8   rw   &skip_if_odd_worldsize.<locals>.wrapper   sW    JJy!U*BJJ|4L0MPQ0QUV0VHHZ0::;T$V$$r7   ry   rz   s   ` r8   skip_if_odd_worldsizer      r   r7   c                    ^ ^ UU 4S jnU$ )Nc                 6   >^  [        T 5      UU U4S j5       nU$ )Nc                     > TS:X  aM  [         R                  R                  5       T:  a+  [        R                  " [
        ST 3   R                  5        g T" U 0 UD6$ Nr$   rk   )rq   r'   rr   rl   rm   rn   r,   )rs   rt   backendrv   ns     r8   rw   Crequire_n_gpus_for_nccl_backend.<locals>.decorator.<locals>.wrapper   sM    & UZZ%<%<%>%Bj$45??@T,V,,r7   ry   )rv   rw   r   r   s   ` r8   	decorator2require_n_gpus_for_nccl_backend.<locals>.decorator   s     	t	- 
	- r7   r.   )r   r   r   s   `` r8   require_n_gpus_for_nccl_backendr      s     r7   c                      S n U $ )Nc                 0   ^  [        T 5      U 4S j5       nU$ )Nc                     >  SSK JnJn  T" U 0 UD6$ ! [         a*    [        R
                  " [        S   R                  5         g f = f)Nr   )AutoModelForMaskedLM
BertConfigrP   )transformersr   r   ImportErrorrl   rm   rn   r,   )rs   rt   r   r   rv   s       r8   rw   ?import_transformers_or_skip.<locals>.decorator.<locals>.wrapper   sA    >IT,V,, >M2<<=>s    1AAry   rz   s   ` r8   r   .import_transformers_or_skip.<locals>.decorator   s     	t	> 
	> r7   r.   )r   s    r8   import_transformers_or_skipr      s    
 r7   c                    [         (       a#  [        R                  R                  5       U :  a  g[        (       a#  [        R
                  R                  5       U :  a  g[        (       a#  [        R                  R                  5       U :  a  gg)NTF)r   rq   r'   rr   r   r_   r   r(   )xs    r8   at_least_x_gpur      sY    yUZZ,,.!3xEII**,1xEII**,1r7   c                    ^  U 4S jnU$ )Nc                 4   >^  [        T 5      U U4S j5       nU$ )Nc                    > [         R                  R                  5       (       a*  [         R                  R                  5       T:  a  T" U 0 UD6$ [        (       a*  [         R
                  R                  5       T:  a  T" U 0 UD6$ [        (       a*  [         R                  R                  5       T:  a  T" U 0 UD6$ [        R                  " [        ST 3   R                  5        g )Nrk   )rq   r'   is_availablerr   r   r_   r   r(   rl   rm   rn   r,   )rs   rt   rv   r   s     r8   rw   4skip_if_lt_x_gpu.<locals>.decorator.<locals>.wrapper   s    zz&&((UZZ-D-D-F!-KT,V,,xEII2249T,V,,xEII2249T,V,,HHZ*QC 01;;<r7   ry   )rv   rw   r   s   ` r8   r   #skip_if_lt_x_gpu.<locals>.decorator   s     	t	= 
	= r7   r.   )r   r   s   ` r8   skip_if_lt_x_gpur      s     r7   c                    ^ ^ U U4S jnU$ )Nc                 6   >^  [        T 5      UU U4S j5       nU$ )Nc                    > TS:w  a  T" U 0 UD6$ [         R                  R                  5       (       a*  [         R                  R                  5       T:  a  T" U 0 UD6$ [        R
                  " [        ST 3   R                  5        g r   )rq   r'   r   rr   rl   rm   rn   r,   )rs   rt   r   rv   r   s     r8   rw   9nccl_skip_if_lt_x_gpu.<locals>.decorator.<locals>.wrapper   sp    & T,V,,zz&&((UZZ-D-D-F!-KT,V,,HHZ*QC 01;;<r7   ry   )rv   rw   r   r   s   ` r8   r   (nccl_skip_if_lt_x_gpu.<locals>.decorator   s     	t	= 
	= r7   r.   )r   r   r   s   `` r8   nccl_skip_if_lt_x_gpur      s    	 r7   c                     U R                  5       nSU;   d   eSU;   d   eSU;   d   eUS   nUR                  S5      S:X  a  UOUR                  S5      S   nXC;   d   SU SU 35       eg )	N	iteration	has_errorerrorz
Exception raised from r   zDid not find expected z in ddp logging data error: )_get_ddp_logging_datafindsplit)	model_DDP
err_substrddp_logging_datalogging_erractuals        r8   verify_ddp_error_loggedr      s     668********&&&&"7+K ??56"< 	89!<  	R	x'CK=QRr7   c                 0   ^  [        T 5      U 4S j5       nU$ )a6  
Convenience decorator to set/unset TORCH_NCCL_BLOCKING_WAIT flag. Note that use of
this decorator will override the setting of TORCH_NCCL_ASYNC_ERROR_HANDLING for
the particular test. After the test, both TORCH_NCCL_BLOCKING_WAIT and
TORCH_NCCL_ASYNC_ERROR_HANDLING will be restored to their original values.
c                    >  [         R                  S   n[         R                  S	  [         R                  S   nS[         R                  S'    T" U 0 UD6nUUb  U[         R                  S'   Ub  U[         R                  S'   $ $ ! [         a    S n Nmf = f! [         a    S n Nkf = f! S[         R                  S'   f = f! Ub  U[         R                  S'   Ub  U[         R                  S'   f f = f)NTORCH_NCCL_ASYNC_ERROR_HANDLINGTORCH_NCCL_BLOCKING_WAIT1)ro   rp   KeyError)rs   rt    cached_nccl_async_error_handlingcached_nccl_blocking_waitretrv   s        r8   rw   (with_nccl_blocking_wait.<locals>.wrapper  s   	4AC1B, 

<=	9:<***;% 69BJJ12	S''C 0; 5 

5 )49R

56 51  	4/3,	4  	-(,%	- 69BJJ12 0; 5 

5 )49R

56 5s@   $B B 	C BBB'$B* &B''B* *B?/C1ry   rz   s   ` r8   with_nccl_blocking_waitr     s%     4[ S  SD Nr7   c                    ^  U 4S jnU$ )zC
Runs a test for each distributed debug level specified in levels.
c                 4   >^  [        T 5      U U4S j5       nU$ )Nc                    > [         R                  R                  SS 5      nT H`  nU[         R                  S'   [        R                  " 5         T" U 0 UD6n[        R
                  " 5         Uc  MM  U[         R                  S'   Mb     W$ )NTORCH_DISTRIBUTED_DEBUG)ro   rp   getc10dset_debug_level_from_envbarrier)rs   rt   	old_levellevelr   rv   levelss        r8   rw   :with_dist_debug_levels.<locals>.decorator.<locals>.wrapper9  sq    

'@$GI8=

45--/D+F+(<EBJJ89   Jr7   ry   )rv   rw   r   s   ` r8   r   )with_dist_debug_levels.<locals>.decorator8  s     	t	 
	 r7   r.   )r   r   s   ` r8   with_dist_debug_levelsr   3  s    
$ r7   c                  J    [        [        R                  " 5       (       + S5      $ )Nz+c10d was not compiled with the Gloo backend)r   r   is_gloo_availabler.   r7   r8   requires_gloor   M  !    )""$$5 r7   c           	         [         R                  " 5       (       d  [        S5      $ [        [        R
                  R                  R                  5       U :  SU  S[        R
                  R                  R                  5        SU 35      $ )N+c10d was not compiled with the NCCL backendz0Requires NCCL version greater than or equal to: z	, found: z
, reason: )r   is_nccl_availabler   r   rq   r'   r$   version)r   msgs     r8   requires_nccl_versionr   T  sy    !!##*9
 	
 .JJOO##%/>wiyQVQ[Q[Q`Q`QhQhQjPkkuvyuz{
 	
r7   c                  J    [        [        R                  " 5       (       + S5      $ )Nr   )r   r   r   r.   r7   r8   requires_ncclr   `  r   r7   c                  J    [        [        R                  " 5       (       + S5      $ )Nz*c10d was not compiled with the UCC backend)r   r   is_ucc_availabler.   r7   r8   requires_uccr   g  !    )!!##4 r7   c                  J    [        [        R                  " 5       (       + S5      $ )Nz*c10d was not compiled with the MPI backend)r   r   is_mpi_availabler.   r7   r8   requires_mpir   n  r   r7   c                 `    U c  [         n [        S U  5       5      n[        U(       + SU  35      $ )a  
Decorator to skip tests if no accelerator communication backend (NCCL, XCCL, HCCL) is available.

Args:
    backends (Optional[List[str]]): Specific accelerator backends to check (e.g., ["nccl", "xccl", "hccl"]).
                                   If None, checks all supported accelerator backends (NCCL, XCCL, HCCL).

Returns:
    callable: A decorator that skips the test if no specified accelerator backend is available.
c              3      #    U  H>  n[         R                  [         R                  S  S.R                  US 5      " 5       v   M@     g7f)c                      [         $ rd   )r   r.   r7   r8   <lambda>=requires_accelerator_dist_backend.<locals>.<genexpr>.<lambda>  s    Hr7   r#   c                      gNFr.   r.   r7   r8   r   r     s    ur7   N)r   r   is_xccl_availabler   ).0r   s     r8   	<genexpr>4requires_accelerator_dist_backend.<locals>.<genexpr>  sO        G	 ****$	
 #g}
%		& 	( 	(
  s   AAz5No accelerator communication backend available among )ACCELERATOR_DIST_BACKENDSanyr   )backendsbackend_availables     r8   !requires_accelerator_dist_backendr   u  sH     ,     *
?zJ r7   c                      [         R                  R                  5       =(       a%    [        R                  " [
        R                  S5      n [        U (       + S5      $ )Nr   z"multicast support is not available)rq   r'   r   r   has_multicast_supportr   CUDAr   )r   s    r8   requires_multicast_supportr     sI    

! 	G22:??AF  *!!, r7   c                 >   ^  ST l         [        T 5      U 4S j5       nU$ )zSkips a test for ROCmTc                  z   > [         (       d  T" U 0 UD6$ [        R                  " [        S   R                  5        g )NrJ   )r   rl   rm   rn   r,   r   s     r8   rw   *skip_if_rocm_multiprocess.<locals>.wrapper  s/    ~(((L)334r7   )skip_if_rocm_multiprocessr	   rz   s   ` r8   r   r     s(    %)D"
4[5 5
 Nr7   c                  <    [        [        R                  S:H  S5      $ )Nwin32z8This unit test case is not supported on Windows platform)r   rl   platformr.   r7   r8   skip_if_win32r    s    )B r7   rf   majorminorreturnc                     U R                   S:w  a  [        S5      e[        R                  R                  b  g[        R
                  R                  U 5      X4:  $ )z
Returns True if the device's compute capability is (major, minor) or higher.
Error out if the device is not a CUDA device.
Returns False if device is a RoCM device.
r'   z3sm_is_or_later() is only supported for CUDA devicesF)type
ValueErrorrq   r   hipr'   get_device_capability)rf   r  r  s      r8   sm_is_or_higher_thanr    sK     {{fNOO}}$::++F3~EEr7   	localhostr   T   )minutesFc           	          [        5       nU(       a@  [        U[        SS9-  5      n[        R                  R
                  R                  XXU5      $ [        R                  " U UUUUUS9$ )zD
Creates a TCP store. Retries if the chosen port is already in use.
r   )milliseconds)wait_for_workers	use_libuv)r   r3   r   rq   classes	dist_c10dTCPStorer   )	addrru   	is_mastertimeoutr  	jit_classr  porttimeout_milliseconds	            r8   create_tcp_storer    sn     D!'I1,E"EF}}&&//
/B
 	
 }}-
 	
r7   i  !DISTRIBUTED_TESTS_DEFAULT_TIMEOUT300test_ddp_uneven_inputsi     test_join_kwargs	lazy_initc                     [         R                  S:X  d  U c  [        R                  R	                  SUS9$ [        R                  R	                  XS9$ )Nr  z	127.0.0.1)hostnamer$  	interfacer$  )rl   r  r   ProcessGroupGloocreate_devicer'  s     r8   r*  r*    sY    
||w)"3$$22 I 3 
 	
 $$22 3 
 	
r7   c                 Z    [         R                  U R                  S5      S   [        5      $ N.r   )TIMEOUT_OVERRIDEr   r   TIMEOUT_DEFAULT)test_ids    r8   get_timeoutr1    s#    c 22 6HHr7   c               #   D  #    [        5       [        5       p[        R                  [        R                  p2 Xs[        l        [        l        [        R                  [        R                  4v   X#s[        l        [        l        g ! X#s[        l        [        l        f = f7frd   )r
   rl   stdoutstderr)new_outnew_errold_outold_errs       r8   captured_outputr9    sa     z8:Wzz3::W2!(
CJjj#**$$!(
CJ
CJs   3B 8B .B BB rankru   
num_inputsc                    SS[         S[         S[         S[         4S jjnS[         4S jn[        USS9[        US	S9[        US
S9[        USS9[        US	S9[        US
S94 VVs/ s HQ  n[        U5       Vs/ s H  nU" X -  U-   X!-  5      PM     sn[        U5       Vs/ s H  od" XRU-  5      PM     sn4PMS     snn$ s  snf s  snf s  snnf )z
Generate a number of basic test cases for sparse reduction.
These cover tensors with a varying number of sparse dimensions and a varying
number of dense dimensions. The only reduction operation we support is sum.
r   r:  ru   sparse_dims
dense_dimsc           	         [         R                  " [         R                  " U S-   5      SU S-   45      nU/[        U5       Vs/ s H  nSPM     sn-   n[        US-
  5       HD  n[         R                  " U[         R
                  " SU S-   5      45      nUR                  U5        MF     [         R                  " U S-   /[        U5       Vs/ s H  nSPM     sn-   5      n[         R                  " XGU5      $ s  snf s  snf )Nr   r   )	rq   reshapearangerangecatzerosappendonessparse_coo_tensor)r:  ru   r=  r>  indices_shapevaluess           r8   generate,simple_sparse_reduce_tests.<locals>.generate  s     --TAX 6D1HF5+<=+<a+<=={Q'Aii%++a*B CDGLL$ ( TAXJU:5F)G5F!5F)GGH&&w>>  > *Hs   C8C=
c           
      ~    [        [        R                  [        U5       Vs/ s H
  o " X!5      PM     sn5      $ s  snf rd   )r   operatoraddrB  )fnru   r:  s      r8   compute_sum/simple_sparse_reduce_tests.<locals>.compute_sum  s6    LLE*<MN<MD2d/<MN
 	
Ns   :
)r=  r      )r>  )r   r   )r3   r   rB  )r:  ru   r;  rL  rR  rQ  is          r8   simple_sparse_reduce_testsrV    s    
?s 
? 
?# 
?s 
?
C 
 H!,H!,H!,H+H+H+

B	 z**A :$q(**AB* @EZ?PQ?P![*45?PQ	

  Rs$   &C9CC"C6C
Cr   c           
      |   [         R                  R                  5       n[        (       a  [         R                  R                  5       n[
        (       a  [         R                  R                  5       n[        U5      nSnX:  a  X -  n[        U 5       Vs0 s H  nU[        X5U-  US-   U-   5      _M     nnU$ s  snf )zMultigpu tests are designed to simulate the multi nodes with multi
GPUs on each node. Nccl backend requires equal #GPUs in each process.
On a single node, all visible GPUs are evenly
divided to subsets, each process only uses a subset.
r   )	rq   r'   rr   r   r_   r   r(   rB  list)ru   r   nGPUsvisible_devicesnGPUs_per_processrU  rank_to_GPUs          r8   init_multigpu_helperr]  2  s     JJ##%Ex		&&(x		&&(ElO !/ z""A 	
4$5 5QBS8STUU"   	s   !B9tmp_dirinit_methodc                    [         R                  " 5       q[        R                  [        R
                  S'   [        R                  " [        R                  R                  [        R                  S5      5        [        R                  " [        R                  R                  [        R                  S5      5        [        R                  R                  [        R                  S5      n[        R                  " U5        U b  U [        R
                  S'   g [        [        R                  R                  US5      -   [        R
                  S'   g )NTEMP_DIRr   test_dirinit_dirINIT_METHODshared_init_file)
tempfileTemporaryDirectoryr^  namero   rp   mkdirpathjoinr   )r_  init_dir_paths     r8   initialize_temp_directoriesrm  N  s    ))+G$\\BJJzHHRWW\\',,	23HHRWW\\',,
34GGLLz:MHH]$/

=!$/"'',,-3
 %


=!r7   c                  <    [         b  [         R                  5         g g rd   )r^  cleanupr.   r7   r8   cleanup_temp_dirrp  _  s     r7      c            	       f  ^  \ rS rSrSrSrS\4S jr\S\4S j5       r	\S\
4S j5       rS r SS	\S
\SS4U 4S jjjrS U 4S jjrS U 4S jjrS\4S jrS S jrS S jr " S S\5      r\S\
4S j5       r\S\
S\S\SS4S j5       rS\SS4S jrS S jrS S jrS S jr\S\4S j5       rSrU =r $ )!MultiProcessTestCaseis  r   
   r  c                     gr   r.   selfs    r8   _should_stop_test_suite,MultiProcessTestCase._should_stop_test_suite|  s    r7   c                     g)NTr.   rv  s    r8   destroy_pg_upon_exit)MultiProcessTestCase.destroy_pg_upon_exit  s    r7   c                     [         $ rd   DEFAULT_WORLD_SIZErv  s    r8   ru   MultiProcessTestCase.world_size      !!r7   c                 X   ^ [        T5      U4S j5       n[        R                  " X 5      $ )Nc                 l   > U R                   U R                  :X  a  U R                  T5        g T" 5         g rd   )r:  MAIN_PROCESS_RANK_join_processesrw  rQ  s    r8   rw   1MultiProcessTestCase.join_or_run.<locals>.wrapper  s(    yyD222$$R(r7   r	   types
MethodTyperw  rQ  rw   s    ` r8   join_or_run MultiProcessTestCase.join_or_run  ,    	r	 
	 ..r7   method_name
methodNameNc                    > US:w  a  Un[         TU ]  U5         [        X5      n[        XU R	                  U5      5        g ! [
         a,  nUS:w  a  [        SU R                   SU 35      Ue S nAg S nAff = fNrunTestzno such test method in : super__init__getattrsetattrr  AttributeErrorr
  	__class__rw  r  r  rQ  er  s        r8   r  MultiProcessTestCase.__init__      
 "$K%		+BDt'7'7';< 	Y& !-dnn-=R
|L '	   &A 
A7"A22A7c                    > [         TU ]  5         0 U l        / U l        / U l        U R
                  U l        [        R                  " SS9R                  U l
        0 U l        g )NFdelete)r  setUpspecial_return_code_checksskip_return_code_checks	processesr  r:  rf  NamedTemporaryFilerh  	file_namepid_to_piperw  r  s    r8   r  MultiProcessTestCase.setUp  sT     13' .0$**	!44EBGGr7   c                 v   > [         TU ]  5         U R                   H  nUR                  5         M     / U l        g rd   )r  tearDownr  	terminate)rw  pr  s     r8   r  MultiProcessTestCase.tearDown  s.    AKKM   r7   c                 F    U R                  5       R                  S5      S   $ r,  idr   rv  s    r8   _current_test_name'MultiProcessTestCase._current_test_name  s    wwys#B''r7   c                    / U l         [        [        U R                  5      5       H  n[        R
                  R                  5       u  p4U" U R                  R                  S[        U5      -   X R                  5       U R                  U4S[        U SS5      0S9nUR                  5         [        R                  SX%R                   5        X0R"                  UR                   '   U R                   R%                  U5        M     g )Nprocess fake_pgF)targetrh  rs   rt   Started process %s with pid %s)r  rB  r3   ru   rq   multiprocessingPiper  _runr5   r  r  r  startloggerinfopidr  rE  )rw  procr:  parent_conn
child_connprocesss         r8   _start_processes%MultiProcessTestCase._start_processes  s    #doo./D&+&;&;&@&@&B#K~~**#d)+335t~~zRwtY>	G MMOKK8$L,7W[[)NN!!'* 0r7   c                      [         R                  R                  S5        [         R                  R	                  S5      R
                  nU R                  U5        g ! [         a     NGf = f)Nspawn)rq   r  set_start_methodRuntimeErrorget_contextProcessr  )rw  r  s     r8   _spawn_processes%MultiProcessTestCase._spawn_processes  s[    	!!227; $$009AAd#	  		s   A 
A)(A)c                       \ rS rSrSrSrg)MultiProcessTestCase.Eventi  r   r.   N)r/   r0   r1   r2   GET_TRACEBACKr6   r.   r7   r8   Eventr    s    r7   r  r:  c                    [         R                  SU5         [        R                  R	                  X/5      nX;   a  U R
                  (       a  [         R                  SU5        g U R                  5       n[         R                  SXB5        U[        R                  R                  :X  a  [        R                  " SS9 n[        R                  " U5        UR                  5         UR!                  S5        U R#                  UR%                  5       5        [         R                  SU5        S S S 5        X;   a  g GM#  ! , (       d  f       N= f)Nz*Starting event listener thread for rank %sz:Pipe closed for process %s, stopping event listener threadzReceived event %s on process %szr+)moder   zProcess %s sent traceback)r  debugr  
connectionwaitclosedrecvr  rs  r  r  rf  r  faulthandlerdump_tracebackflushseeksendread)parent_pipesignal_piper:  ready_pipeseventtmp_files         r8   _event_listener$MultiProcessTestCase._event_listener  s    A4H)4499;:TUK)%%LLT #((*=uK066DDD!44$?8$33H= ( a(#((9$?F @ )5   @?s   =A-D;;
E		test_namer  c                 N    U " U5      nXl         X6l        UR                  X$5        g rd   )r:  r  run_testclsr:  r  r  r  rt   rw  s          r8   r  MultiProcessTestCase._run  s#     9~	"i-r7   c           	         [         R                  R                  SS9u  p4[        R                  " [
        R                  X#U R                  4SS9nUR                  5         [        R                  S:w  a3  [        R                  S:w  a  [         R                  R                  S5        S[        R                  S'    [        X5      " 5         Ub  UR;                  S 5        Uc   eUR=                  5         UR?                  5         U R@                  (       a   [B        RD                  " 5         g g ! [         R"                   a\  n[$        R'                  S	U R                  U[)        U5      5        [        R*                  " [,        S
   R.                  5         S nANS nAf[0         a    [$        R3                  S[4        R6                  " 5       U R                  [
        R8                  5        UR;                  [4        R6                  " 5       5        [        R*                  " [
        R8                  5         GNhf = f! Ub  UR;                  S 5        Uc   eUR=                  5         UR?                  5         f = f! [F        [H        4 a     g f = f)NF)duplexT)r  rs   daemonr  darwinr   TORCH_SHOW_CPP_STACKTRACESz4Process %s skipping test %s for following reason: %srN   z;Caught exception: 
%s exiting process %s with exit code: %s)%rq   r  r  	threadingThreadrs  r  r:  r  rl   r  _C'_set_print_stack_traces_on_fatal_signalro   rp   r  unittestSkipTestr  r  r5   rm   rn   r,   	Exceptionr   	traceback
format_excTEST_ERROR_EXIT_CODEr  rk  closer{  r   destroy_process_groupAssertionErrorr
  )rw  r  r  signal_recv_pipesignal_send_pipeevent_listener_threadses          r8   r  MultiProcessTestCase.run_test  s   -2-B-B-G-Gu-G-U* ) 0 0'77;!

 	##%<<7"s||x'? HH<<TB36

/0	 D$&(  + %%d+(444!&&($$ **,	 %7    	6KKF		B	 HHZ	*4455 		@LLQ$$&		$99	 Y1134HH)>>?		@  + %%d+(444!&&( #J/ sJ   ;D- I5 -H4AFH7 BH40H7 3H44H7 7;I25JJc                    / n[        U R                  5       Hi  u  p#UR                  b  M  U R                  UR                     n UR                  [        R                  R                  5        UR                  X$45        Mk     U H  u  pd UR                  S5      (       aQ  UR                  (       a  [        R                  SU5        ME  UR!                  5       n[        R                  SXg5        Mm  [        R                  SU5        M     g ! [         a#  n[        R                  SUU5         S nAGM  S nAff = f! [         a"  n[        R                  SUU5         S nAM  S nAff = f)NzBEncountered error while trying to get traceback for process %s: %sr  z5Pipe closed for process %s, cannot retrieve tracebackz)Process %s timed out with traceback: 

%sz6Could not retrieve traceback for timed out process: %s)	enumerater  exitcoder  r  r  rs  r  r  rE  ConnectionErrorr  r   pollr  r  r  )rw  pipesrU  r  piper  r:  r  s           r8   _get_timedout_process_traceback4MultiProcessTestCase._get_timedout_process_tracebackH  s-   #DNN3JA'''4II288FFGLL!+ 4  JD99Q<<{{S  ! $		ILLEt LLPRV!   ' LL\ 6 # X s<   ;D=D?&D?4D?
D<D77D<?
E+	E&&E+c                    [        U R                  5       5      n[        R                  " 5       nSn  [        U R                  5       Hz  u  pVUR
                  [        R                  :X  d  M%  [        SU SUR
                   S35        [        R                  R                  5       nU H  nUR                  5         M     Sn  O   U(       a  O[        S U R                   5       5      (       a  Oy[        R                  " 5       U-
  n	X:  aC  U R                  5         [        SU S35        U R                   H  nUR                  5         M     O[        R                  " S	5        GM6  [        R                  " 5       U-
  n
U R!                  X5        U R"                  R%                  5        H  nUR'                  5         M     g ! U R"                  R%                  5        H  nUR'                  5         M     f = f)
NFTProcess z terminated with exit code z", terminating remaining processes.c              3   <   #    U  H  oR                   S Lv   M     g 7frd   )r  )r   r  s     r8   r   7MultiProcessTestCase._join_processes.<locals>.<genexpr>  s     F~!zz-~s   zTiming out after z" seconds and killing subprocesses.g?)r1  r  timer  r  r  rs  r  printrq   r  active_childrenr  allr  sleep_check_return_codesr  rK  r  )rw  rQ  r  
start_timesubprocess_errorrU  r  r  acelapsedelapsed_timer  s               r8   r  $MultiProcessTestCase._join_processesr  s   dggi(YY[
 &	%dnn5DA zz%9%N%NN&qc)DQZZLPrs +0*?*?*O*O*Q"1BLLN #2+/( 6 $Ft~~FFF))+
2$88:+G94VW "^^ ,

3= @  99;3L$$R6 ((//1

 2((//1

 2s   7G -D%G 3G7c           
      B   U R                   (       d  [        R                  S5        gU R                   S   n[        U R                   5       VVs/ s H(  u  pEUR                  [
        R                  :X  d  M%  XE4PM*     nnnU(       a\  SnU HI  u  pHU R                  UR                     R                  5       n	USU S[
        R                   SU	 S3-  nMK     [        U5      e[        U R                   5       H$  u  pEUR                  b  M  [        SU S	U S
35      e   XR                  ;   a  g[        R                  5        Hy  n
UR                  U
R                  :X  d  M  [        (       a1  [        R!                  SU R#                  5       U
R$                  5          g[&        R(                  " U
R$                  5      e   SnXR*                  ;   a  U R*                  U   nU R-                  UR                  USU SUR                   SUR                   3S9  gs  snnf )z
Checks that the return codes of all spawned processes match, and skips
tests if they returned a return code indicating a skipping condition.
z<Note: no subprocesses were spawned, test was likely skipped.Nr    r  z exited with error code z and exception:

 terminated or timed out after  seconds6Skipping %s on sandcastle for the following reason: %szExpected exit code z	 but got z
 for pid: )r   )r  r  warningr  r  rs  r  r  r  r  r  r  rn   rK  r,   r   r  r  r-   r  r  r  assertEqual)rw  rQ  r  first_processrU  r  errored_processesr   r  error_messageskipexpected_return_codes               r8   r  (MultiProcessTestCase._check_return_codes  s    ~~NNN q) "$..1
1zz1FFF QF1 	 

 E/
 $ 0 0 = B B Dqc!9:N:c:c9d e''4oR9 0 u%% dnn-DAzz!"qc!@hW  . ---%%'D%%7 =
 KKP	
 "++DLL99 ("  ! 000#'#B#B2#F "" %&:%;9]E[E[D\\fgtgxgxfyz 	 	
g
s   $H7Hc                      U R                   S:H  $ )Nr   r:  rv  s    r8   r  MultiProcessTestCase.is_master  s    yyA~r7   )r  r  r  r:  r  r  r  r  r  N)!r/   r0   r1   r2   r  r  boolrx  propertyr{  r3   ru   r  r5   r  r  r  r  r  r  r   r  staticmethodr  classmethodr  r  r  r  r  r  r6   __classcell__r  s   @r8   rs  rs  s  s=   
   d   "C " "/ ?H8;	 &"(C (+"$    < ..#&.36.	. .5# 5t 5n(T*XJ
X 4  r7   rs  c                   L   ^  \ rS rSrU 4S jrS rS\4S jrS	S jrS r	Sr
U =r$ )
DistributedTestBasei  c                    > [         TU ]  5         [        U R                  5      [        R
                  S'   U R                  5         g )Nrj   )r  r  r5   ru   ro   rp   r  r  s    r8   r  DistributedTestBase.setUp  s/    #&t#7

< r7   c                      [         R                  R                  5          [        R
                  " U R                  5        g ! [         a     N.f = f! [         a     g f = frd   )rq   distributedr  r   ro   remover  OSErrorrv  s    r8   r  DistributedTestBase.tearDown  sU    	335	IIdnn%  		  		s"   A  A 
AA
AAr  c                 .    SU;   a  gSU;   a  gSU;   a  gg)Nr'   r$   r_   r&   r(   r%   rZ   r.   )rw  rf   s     r8   r   DistributedTestBase.backend  s$    Vf_f_r7   c                 $   Uc  U R                   n[        R                  " U5      R                  5       n[        R                  R                  U R                  U5      n[        R                  R                  U R                  U5      UU R                  US9  SU R                  U5      ;   d  SU R                  U5      ;   a)  [        R                  R                  U R                  5        [        R                  R                  R                  5       $ )Nr   ru   r:  storer$   r%   )ru   rq   get_device_modulerr   r=  	FileStorer  init_process_groupr   r:  acceleratorset_device_indexdistributed_c10d_get_default_group)rw  rf   ru   num_visible_devicesrE  s        r8   	create_pgDistributedTestBase.create_pg  s    J#55f=JJL!!++DNN<OP,,LL(!	 	- 	
 T\\&))Vt||F7K-K..tyy9  11DDFFr7   c                     [         R                  " U5      R                  5       n[        U R                  5       Vs0 s H  o3X2-  /_M
     sn$ s  snf rd   )rq   rF  rr   rB  ru   )rw  rf   rM  rU  s       r8   rank_to_device"DistributedTestBase.rank_to_device  sH    #55f=JJL6;DOO6LM6LA+,,6LMMMs   Ar.   rd   )r/   r0   r1   r2   r  r  r5   r   rN  rQ  r6   r6  r7  s   @r8   r9  r9    s+     
 GN Nr7   r9  subtest_configtest_fntest_kwargsc                    [        UR                  5       5      nU Vs/ s H  ofS   PM	     nnU Vs/ s H  ofS   PM	     nn[        R                  " U6  H  n	[	        [        Xy5      5      n
U R                  " S0 U
D6   [        R                  R                  5         U" U0 UDU
D6  [        R                  R                  5         SSS5        [        R                  " 5         M     gs  snf s  snf ! , (       d  f       N2= f)a0  
Runs a test function given by ``test_fn`` as a subtest according to the
configurations specified by ``subtest_config``. This amortizes the
costly setup overhead (including process spawn and initializing the
process group) over the subtests.

Args:
    subtest_config (Dict[str, List[Any]]): A mapping from subtest
        keyword argument name to a list of its possible values.
    test_fn (Callable): A callable that runs the actual test.
    test_args: Positional arguments to pass to ``test_fn``.
    test_kwargs: Keyword arguments to pass to ``test_fn``.
r   r   Nr.   )rX  items	itertoolsproductdictzipsubTestrq   _dynamoresetr   r   )cls_instrS  rT  	test_argsrU  subtest_config_itemsitemsubtest_config_keyssubtest_config_valuesrK  subtest_kwargss              r8   run_subtestsrf  "  s    * 9=^=Q=Q=S8T:N%O:N$1g:N%OBV-WBV$1gBV-W##%:;c"5>?//MM!Y@+@@MM! 0 	 < &P-W 0/s   C'C,>AC11
C?	c                      [         b  [         $  [        R                  " / SQSS9R                  S:H  q [         $ ! [         a
    Sq  [         $ f = f)z
If shell command `fi_info -p efa -t FI_EP_RDM` returns exit code 0 then we assume that the machine has
Libfabric EFA interfaces and EFA software components installed,
see https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/efa-start.html.
)fi_infoz-pefaz-t	FI_EP_RDMF)checkr   )EFA_PROBE_RESULT
subprocessrun
returncodeFileNotFoundErrorr.   r7   r8   has_efarq  H  s]     #!NN;5j 	   ! !s   $9 AAc                  ,    [        5       (       a  SS/$ S$ )aw  
If the machine has Libfabric EFA interfaces and EFA software components installed it may cause
'RuntimeError: In operator() at tensorpipe/common/ibv.h:172 "": Operation not supported' if tensorpipe
uses InfiniBand transport, so we exclude it from tensorpipe transports,
see https://github.com/pytorch/pytorch/issues/73885 and https://github.com/pytorch/pytorch/issues/65022
shmuvN)rq  r.   r7   r8   tp_transportsru  ^  s     $IIE4=/4/r7   c                 b   ^ ^^ T c  [        [        UTS9$ S m[        T 5      UU U4S j5       nU$ )z#
Wrapper to use with a test method
)r  ru   c                    ^ ^^^ [        5       m[        R                  " 5       nU4S jmUUU 4S jn/ n[        T 5       H;  n[        R
                  " X5TU4S9nUR                  5         UR                  U5        M=     U$ )Nc                  >   > T [         R                  R                  :H  $ rd   r   rK  _worldworlds   r8   world_is_validaspawn_threads_and_init_comms.<locals>._run_test_method_with_multi_threads.<locals>.world_is_validw      D118888r7   c                   > [         R                  " SU TUS9   T" 5         T" 5       (       a  [         R                  " 5         g g ! [         aT  n[        R                  R                  U [        R                  " 5       45        [        R                  " U5         S nAN|S nAff = f! T" 5       (       a  [         R                  " 5         f f = f)Nthreadedr   r:  ru   rE  )r   rH  BaseExceptionMultiThreadedTestCaseexception_queueputrl   exc_infor"   exception_handler  )r:  world_pgrE  excallbackr}  ru   s       r8   workerYspawn_threads_and_init_comms.<locals>._run_test_method_with_multi_threads.<locals>.workerz  s    ##"*E
1
 "##..0 $ ! %55994:PQ!22  "##..0 $s*   A 
B"A
BB% B""B% %$C	r  rs   )r    r   	HashStorerB  r  r  r  rE  )	ru   r  global_storer  threadsr:  tr|  r}  s	   ``     @@r8   #_run_test_method_with_multi_threadsIspawn_threads_and_init_comms.<locals>._run_test_method_with_multi_threadss  sj    $&~~'	9	1  *%D  E<5PQAGGINN1 &
 r7   c                 Z  >^ ^^ [         R                  R                  R                  S5         T" TUUUU 4S j5      n[        R                  UT5        [         R                  R                  R                  S5        g ! [         R                  R                  R                  S5        f = f)NTc                     > T" T/T Q70 TD6$ rd   r.   )rs   rv   rt   rw  s   r8   r   ?spawn_threads_and_init_comms.<locals>.wrapper.<locals>.<lambda>  s    D$?$?$?r7   F)rq   r  _distributed_c10d_set_thread_isolation_moder  _join_threads)rw  rs   rt   r  r  rv   ru   s   ``` r8   rw   -spawn_threads_and_init_comms.<locals>.wrapper  sv     	""==dC	I9?G "//>HH&&AA%HEHH&&AA%Hs   &A? ?+B*)r   spawn_threads_and_init_commsr	   )rv   r  ru   rw   r  s   ` ` @r8   r  r  h  sD     |('j
 	
> 4[
I 
I Nr7   c                   &  ^  \ rS rSrSr\R                  " 5       rSrS r	 SS\
S\
SS4U 4S	 jjjrS
 rS rSU 4S jjrU 4S jrS r\S 5       rS r\S 5       r\S 5       r\S\4S j5       r\S\
4S j5       rSSS.S jjrSSS.S jjrSrU =r$ )r  i  a  
Test runner that runs all tests with the in-proc process group using
multiple threads with the threaded process group.

Each test spawns world_size threads and run the test method in each thread.

Difference from regular MultiProcess test runner:
Must explicitly defines SetUp and call self._spawn_threads() to run the tests.
Cannot use setUp / tearDown (must use perThreadSetup / perThreadShutdown)
    to set up / tear down each thread when running each test.
No global state possible
    How bad of a limitation is this?
r   c                 X   ^ [        T5      U4S j5       n[        R                  " X 5      $ )Nc                    > U R                   U R                  :X  a  U R                  U R                  T5        g T" 5         g rd   )r:  MAIN_THREAD_RANKr  r  r  s    r8   rw   2MultiThreadedTestCase.join_or_run.<locals>.wrapper  s.    yyD111""4<<4r7   r  r  s    ` r8   r  !MultiThreadedTestCase.join_or_run  r  r7   r  r  r  Nc                    > US:w  a  Un[         TU ]  U5         [        X5      n[        XU R	                  U5      5        g ! [
         a,  nUS:w  a  [        SU R                   SU 35      Ue S nAg S nAff = fr  r  r  s        r8   r  MultiThreadedTestCase.__init__  r  r  c                     g rd   r.   rv  s    r8   perThreadSetUp$MultiThreadedTestCase.perThreadSetUp  s    r7   c                     g rd   r.   rv  s    r8   perThreadTearDown'MultiThreadedTestCase.perThreadTearDown  s    r7   c                 x   > [         TU ]  5         U R                  U l        / U l        S[
        R                  S'   g)zy
setUp only set up things in the main thread, if you want to configure things
in the spawned threads, use perThreadSetUp
r   r  N)r  r  r  r:  r  ro   rp   r  s    r8   r  MultiThreadedTestCase.setUp  s1    
 	))	36

/0r7   c                 0   > [         TU ]  5         / U l        g)z
tearDown only set up things in the main thread, if you want to configure things
in the spawned threads, use perThreadTearDown
N)r  r  r  r  s    r8   r  MultiThreadedTestCase.tearDown  s    
 	r7   c                   ^ [         R                  R                  R                  S5        U R                  n[        5       m[        R                  " 5       U R                  l	        U4S jnU" 5       (       d  [        S5      e[        U R                  5       Hc  n[        R                  " U R                  R                  XU R                  4S9nUR!                  5         U R"                  R%                  U5        Me     g)z[
class method to spawn threads and run test, use this method in the SetUp of your TestCase
Tc                  >   > T [         R                  R                  :H  $ rd   ry  r{  s   r8   r}  <MultiThreadedTestCase._spawn_threads.<locals>.world_is_valid  r  r7   zInvalid worldr  N)rq   r  r  r  r  r    r   r  r  r  r  rB  ru   r  r  r  r  r  rE  )rw  r  r}  r:  r  r|  s        @r8   _spawn_threads$MultiThreadedTestCase._spawn_threads  s     	""==dC++	$&&*nn&6#	9 //$//*D  ~~**)4??1SA GGILL" +r7   c                    U " U5      nX%l         [        US5      (       aX  [        R                  " 5       Ul        [
        R                  UR                  l        [
        R                  UR                  l	        UR                  XU5        g )N_tls)r:  hasattrr  localr  r   
_precision	precision_rel_tolrel_tolrun_test_with_threaded_pg)r  r  r:  ru   rt   rw  s         r8   r  MultiThreadedTestCase._run  sa    9~	 4  !)DI"*"5"5DII ( 1 1DII&&y
Cr7   c                    [         R                  " SUUU R                  R                  S9  U R	                  5          [        X5      " 5         [         R                  " 5         U R                  5         g! [         aP  nU R                  R                  U[        R                  " 5       45        [        R                  " U5         SnAN{SnAff = f! [         R                  " 5         U R                  5         f = f)zT
Run the current test associated with `test_name` using the threaded process group.
r  r  N)r   rH  r  r  r  r  r  r  r  rl   r  r"   r  r  r  )rw  r  r:  ru   r  s        r8   r  /MultiThreadedTestCase.run_test_with_threaded_pg  s     	!..--		
 			%D$& &&(""$  	  $$dCLLN%;<.. 	 &&(""$s*   A3 3
C=ACC CC 'C7c           
         [         n [        U5       Hl  u  pEUR                  [        SU5      5        UR	                  5       (       d  M7  [
        R                  R                  U[        [        SU S35      S 445        Mn     [        R                  " 5         / nU R                  R                  5       (       dL  U R                  R                  5       nUR                  U5        U R                  R                  5       (       d  ML  [        5         [        R                   R"                  R%                  S5        U R'                  XcU5        g ! [        5         [        R                   R"                  R%                  S5        f = f)Nr   zRank failed to join in under r#  F)r/  r  rk  maxis_aliver  r  r  TimeoutErrorr"   r^  emptyr   rE  r!   rq   r  r  r  r  )r  r  rQ  r  idxthreadfailed_ranksfailures           r8   r  #MultiThreadedTestCase._join_threads*  s-   !	I(1C7O,??$$)99== , ,&CG9H$U!" !%	  2 ##%L))//11--113##G, ))//11 #$HH&&AA%Hr: #$HH&&AA%Hs   >E 
B9E 5F c           	         SnSnU GHO  u  pgUS   n[        U[        R                  5      (       a>  [        R	                  SUU[        U5      5        US:  a  [        S   R                  nMf  Mh  [        U[        5      (       a)  SU SU S	3n	[        R                  U	5        [        U	5      e[        U[        5      (       aG  SR                  [        R                  " U6 5      n	[        R                  S
X5        USU SU	 S3-  nGM  [        U[        5      (       d  GM  [!        UR"                  5      [$        :X  d  GM:  US:  d  GMC  UR"                  nGMR     ['        U5      S:  a  [        U5      eUS:  ay  [        R)                  5        H`  n
XZR                  :X  d  M  [*        (       a#  [        R	                  SUU
R,                  5          g [        R                  " U
R,                  5      e   g g )Nr   r   r   z3Thread %s skipping test %s for following reason: %sr   rN   zThread r"  z	 seconds
z'Caught exception: 
%s exiting thread %sz exited with exception:
r!  r$  )
isinstancer  r  r  r  r5   rn   r,   r  r   r  r  rk  r  format_exception
SystemExitr	  coder3   lenrK  r   r-   )r  r  r  rQ  	error_msg	skip_coder:  r  excr   r*  s              r8   r  )MultiThreadedTestCase._check_return_codesH  s    		*ND1+C#x0011IH	 q= *9 5 ? ?I !C..v%DWIZXS!"3''C++ggi88(CDGSwtf,EcU"MM	C,,>S(Y] #I+ +0 y>Ay))q="))+.$}T LL
 &//== , r7   c                     [         $ rd   r~  rv  s    r8   ru    MultiThreadedTestCase.world_sizez  r  r7   c                 F    U R                  5       R                  S5      S   $ r,  r  rv  s    r8   r  (MultiThreadedTestCase._current_test_name~  s     wwys#B''r7   r   r.  c                J    U R                   U:X  a  U R                  XU5        gg)z
The reason why we have this util function instead of
self.assertEqual is all threads are sharing one CPU RNG
so the assertion result is only reliable on rank 0
N)r:  r&  rw  r   yr   r:  s        r8   assertEqualOnRank'MultiThreadedTestCase.assertEqualOnRank  s%     99Q3' r7   c                H    U R                   U:X  a  U R                  X5        g g rd   )r:  assertNotEqualr  s        r8   assertNotEqualOnRank*MultiThreadedTestCase.assertNotEqualOnRank  s!    99% r7   )r  r:  r  r0  r1  rd   )r/   r0   r1   r2   __doc__queueQueuer  r  r  r5   r  r  r  r  r  r  r5  r  r  r  r  r3  r3   ru   r  r  r  r6   r6  r7  s   @r8   r  r    s     kkmO/ ?H8;	 &	7#. D D%. ; ;: /> />b "C " " (C ( (( (&1 & &r7   r  c                      ^  \ rS rSrS\\R                  \R                  4   S\	SS4U 4S jjr
S\R                  S\R                  4S jrS	rU =r$ )
SaveForwardInputsModulei  forward_inputscast_forward_inputsr  Nc                 r   > [         TU ]  5         [        R                  " SS5      U l        Xl        X l        g )Nd   )r  r  nnLinearlr  r  rw  r  r  r  s      r8   r   SaveForwardInputsModule.__init__  s.    
 	3$,#6 r7   r   c                     XR                   U '   U R                  U R                  (       a3  UR                  U R                  R                  R
                  5      5      $ U5      $ rd   )r  r  r  toweightdtyperw  r   s     r8   forwardSaveForwardInputsModule.forward  sG    $%D!vv43K3Kadd466==../SSQRSSr7   )r  r  r  r/   r0   r1   r2   rZ  r  Modulerq   Tensorr2  r  r  r6   r6  r7  s   @r8   r  r    sZ    7RYY457 "7 
	7T T%,, T Tr7   r  c                      ^  \ rS rSrS\\R                  \R                  4   S\	SS4U 4S jjr
S\R                  S\R                  4S jrS	rU =r$ )
SaveForwardInputsModeli  r  r  r  Nc                 n   > [         TU ]  5         [        X5      U l        [        X5      U l        Xl        g rd   )r  r  r  c1c2r  r  s      r8   r  SaveForwardInputsModel.__init__  s.    
 	).N).N,r7   r   c                 ^    XR                   U '   U R                  U R                  U5      5      $ rd   )r  r  r  r  s     r8   r  SaveForwardInputsModel.forward  s'    $%D!wwtwwqz""r7   )r  r  r  r  r7  s   @r8   r  r    sV    -RYY45- "- 
	-# #%,, # #r7   r  c              #     #    U(       d  [         R                  R                  U 5        S[        R                  S'   S[        R                  S'   U(       ap  U(       aT  [         R
                  R                  R                  R                  R                  5       n[        R                  " SUU US9  O[        R                  " X US9  [         R                  R                  5         [         R                  R                  R                  R!                  5          S v   [         R                  R                  5         [         R                  R                  R                  R!                  5         U(       a  [        R"                  " 5         g g ! [         R                  R                  5         [         R                  R                  R                  R!                  5         U(       a  [        R"                  " 5         f f = f7f)Nr  MASTER_ADDR6789MASTER_PORTfakerD  )r   r:  ru   )rq   rI  rJ  ro   rp   testing	_internalr=  r  	FakeStorer   rH  r]  r^  utilscountersclearr  )r:  ru   r   init_pgr  rE  s         r8   _dynamo_dist_per_rank_initr    sG     **40 +BJJ} &BJJ}MM++77??IIKE##%	 ##G:V	MM	MM  &&()$$**,&&(  	$$**,&&( s    DG;F	 A.G;	A/G88G;c                   L   ^  \ rS rSrSr\U 4S j5       r\U 4S j5       rSrU =r	$ )#DynamoDistributedSingleProcTestCasei  z
Test harness for single-process dynamo distributed tests,
initializes dist process group.

Prefer this for simple tests, as it's easier to debug.
c                 b  > [         TU ]  5         U R                  R                  [        R
                  " [        R                  SSS.5      5        SU l        SU R                   3U l	        SU R                  ;   a  S OU R                  /U l
        [        R                  " SU R                  SS	9  g )
Nr  12355)r  r  r   zcuda:r'   r$   r   r:  ru   )r  
setUpClass_exit_stackenter_contextr   rZ  ro   rp   r:  rf   
device_idsr   rH  r  r  s    r8   r  .DynamoDistributedSingleProcTestCase.setUpClass  s    %%JJ

#.#*	
 SXXJ'
!'3::!5CHH:SXX!Dr7   c                 L   > [         R                  " 5         [        TU ]  5         g rd   )r   r  r  tearDownClassr  s    r8   r  1DynamoDistributedSingleProcTestCase.tearDownClass  s    ""$r7   r.   )
r/   r0   r1   r2   r  r5  r  r  r6   r6  r7  s   @r8   r  r    s2     E E"    r7   r  c            	       T    \ rS rSrSr\S\4S j5       r\S\S\	S\	SS4S	 j5       r
S
rg)"DynamoDistributedMultiProcTestCasei  a  
Use this for tests that actually run on multiple GPUs.

Decorate tests with @skip_if_lt_x_gpu(ngpu)

Note: MultiProcTestCase spawns processes per test and is slow.
Prefer MultiThreadedTestCase for most tests. Perhaps use this one
sparingly for integration tests.
r  c                 >    [         R                  R                  5       $ rd   )rq   rI  rr   rv  s    r8   ru   -DynamoDistributedMultiProcTestCase.world_size  s      --//r7   r:  r  r  Nc                     [         R                  " [        R                  " 5       5        U " U5      nXl        X6l        UR                  X$5        g rd   )r   
addHandlerloggingNullHandlerr:  r  r  r  s          r8   r  'DynamoDistributedMultiProcTestCase._run   s<     	W0023 9~	"i-r7   )r  r:  )r/   r0   r1   r2   r  r3  r3   ru   r5  r5   r  r6   r.   r7   r8   r  r    sV     0C 0 0 	.	.#&	.36	.		. 	.r7   r  c                   t  ^  \ rS rSr% SrSr\\S'   Sr\\S'   Sr	\
\   \S'   \" SS	9r\\S
'   Sr\\S'   \S\
\   4S j5       r\S\4S j5       r\SS j5       r\S 5       r\S\SS4S j5       r\S 5       r\SS j5       r\U 4S j5       r\U 4S j5       rSU 4S jjrS r S S\S\SS4U 4S jjjrSrU =r$ )!MultiProcContinousTesti  r   ru   r:  N	rdvz_filex   )secondsr  Fpoison_pillr  c                     g)z
ProcessGroup backend str.
To be customized by sub test classes, e.g. "nccl".
Otherwise we return None -- lazily decided by tensor.
Nr.   )r  s    r8   backend_str"MultiProcContinousTest.backend_str       r7   c                 ^    [         R                  R                  5       nUc  gUR                  $ )Ncpu)rq   rI  current_acceleratorr	  )r  curr_devices     r8   device_type"MultiProcContinousTest.device_type%  s+    '';;=r7   c                     g)zs
ProcessGroup init options.
To be customized by sub test classes, e.g. ProcessGroupNCCLOpTest
Here we return None.
Nr.   )r  high_priority_streams     r8   optsMultiProcContinousTest.opts,  r/  r7   c           	          Uc   e[         R                  " X25      n[         R                  " U R                  5       UUUU R	                  5       U R
                  S9  [         R                  R                  5       U l        g )N)r   ru   r:  rE  
pg_optionsr  )	r   rG  rH  r-  r8  r  rK  rL  pg)r  r:  ru   r(  rE  s        r8   _init_pgMultiProcContinousTest._init_pg5  sg    $$$y5 	OO%!xxzKK	
 &&99;r7   r0  c                     UR                  S5      S   nU " U5      nU R                  Ul        U R                  Ul        [        XC5      nU" S0 UD6  g )Nr-  r   r.   )r   r:  ru   r  )r  r0  rt   r  rw  rT  s         r8   _run_test_given_id)MultiProcContinousTest._run_test_given_idE  sK     MM#&r*	9~HH	..$*&r7   c                    SUs=::  a  U:  d   e   eXl         X l        U R                  XU5        [        R	                  S5         UR                  5       n[        R                  SU 35        Uc  O% U R                  U5        UR                  U5        MQ  [        R	                  S5        [        R                  " 5         g ! [         a  nUR                  U5         S nANMS nAff = f)Nr   zSetup completez	Got test zTerminating ...)r:  ru   r=  r  r  r   r  r@  r  r  r   r  )r  r:  ru   r(  
task_queuecompletion_queuer0  r  s           r8   _worker_loop#MultiProcContinousTest._worker_loopQ  s     D%:%%%%%# 	Ty1 	$%  nn&GLL9WI./)&&w/ $$W-   	%&""$ ! ) $$R(()s   4"C 
C(C##C(c                    / U l         / U l        / U l        [        R                  " SS9R
                  U l         [        R                  R                  S5        [        [        U5      5       GH  n[        R                  R                  5       n[        R                  R                  5       n[        R                  R                  U R                  S[!        U5      -   SX!U R                  X44S9nUR#                  5         U R                   R%                  U5        U R                  R%                  U5        U R                  R%                  U5        [&        R)                  SX%R*                  5        GM     g ! [         a     GN+f = f)NFr  r  r  T)r  rh  r  rs   r  )r  task_queuescompletion_queuesrf  r  rh  r(  rq   r  r  r  rB  r3   r  r  rE  r5   r  rE  r  r  r  )r  ru   r:  rC  rD  r  s         r8   r  'MultiProcContinousTest._spawn_processess  s5    " 335AFF	!!227;
 #j/*D..446J$44::<++33''#d)+zT	 4 G MMOMM  )OO"":.!!(()9:KK0$ +	  		s   E7 7
FFc                   > [         TU ]  5         U R                  5       nU R                  S:X  aS  [        R
                  " U5      R                  5       U l        U R                  S:X  a  [        R                  " SU S35      e[        R                  SU R                   SU R                   SU 35        U R                  U R                  5        g)	zm
Class-scope test fixture. Run once for entire test class, before any test starts.
Set up the process group.
r'  r   zNo z devices availablezTesting class z on  N)r  r  r4  ru   rq   rF  rr   r  r  r  r  r/   r  )r  r4  r  s     r8   r  !MultiProcContinousTest.setUpClass  s     	 oo'>>R"44[ANNPCN~~"''#k]:L(MNNS\\N$s~~.>a}M	
 	S^^,r7   c                   > [         R                  SU R                   S35        U R                   H  nUR	                  S5        M     U R
                   H  nUR                  5         M      [        R                  " U R                  5        [         R                  SU R                   S35        [        TU ]9  5         g! [         a     N>f = f)zp
Class-scope test fixture. Run once for entire test class, after all tests finish.
Tear down the process group.
zJoining z workersNzClass z	 finished)r  r  ru   rH  r  r  rk  ro   r>  r(  r?  r  r/   r  r  )r  rC  r  r  s      r8   r  $MultiProcContinousTest.tearDownClass  s     	x/x89//JNN4  * }}GLLN %	IIcmm$ 	fS\\N)45	  		s   - B? ?
CCc                   > [         TU ]  5         U R                  U l        U R                  R
                  (       a'  [        R                  " SU R                  5        35      e[        U R                  5       HM  u  p[        R                  SU SU R                  5        35        UR                  U R                  5       5        MO     g)z%
Test fixture. Run before each test.
zPrevious test failed, skipping zSending Rank r  N)r  r  r  r:  r  r+  r  r  r  r  rH  r  r  r  )rw  rU  rC  r  s      r8   r  MultiProcContinousTest.setUp  s     	 **	 >>%%##&Edggi[$QRR 't'7'78MALL=2dggi[9:NN4779% 9r7   c                 X   ^ [        T5      U4S j5       n[        R                  " X 5      $ )Nc           	      >  > U R                   U R                  :X  a  [        R                  SU R	                  5        35        [        U R                  5       H  u  pUR                  5       n[        U[        5      (       aS  [        R                  SU SU R	                  5        SU R                  R                   35        SU R                  l        UeX0R	                  5       :X  d   e[        R                  SU SU R	                  5        35        M     g T" 5         g )NzWaiting for workers to finish zDetected failure from Rank z in: z(, skipping rest of tests in Test class: TzMain proc detected rank z
 finished )r:  r  r  r  r  r  rI  r   r  r  r%  r  r/   r+  )rw  rU  rD  rvrQ  s       r8   rw   =MultiProcContinousTest._worker_run_main_wait.<locals>.wrapper  s    yyD222=dggi[IJ+4T5K5K+L'A)--/B!"m449!E$'') MEEI^^E\E\D]_ 6:2  ?*?LL21#Z	{K ,M( r7   r  r  s    ` r8   _worker_run_main_wait,MultiProcContinousTest._worker_run_main_wait  s,    	r	 
	4 ..r7   r  r  c                    > US:w  a  Un[         TU ]  U5         [        X5      n[        XU R	                  U5      5        g ! [
         a,  nUS:w  a  [        SU R                   SU 35      Ue S nAg S nAff = fr  )r  r  r  r  rV  r  r
  r  r  s        r8   r  MultiProcContinousTest.__init__  s    
 "$K%		+BDt'A'A"'EF 	Y& !-dnn-=R
|L '	r  r  )Fr1  r0  )r/   r0   r1   r2   r  ru   r3   r4   r:  r(  r   r5   r   r  r+  r2  r5  r-  r4  r8  r=  r@  rE  r  r  r  r  rV  r  r6   r6  r7  s   @r8   r&  r&    sJ   JD#N#Ix}#"3/GY/KHSM    C       < < 	 	4 	 	 % %B  @ - -*    .&$/F ?H8;	 r7   r&  rd   r   )r   r1  )r$   TF)r  rX  r"  r  rO  ro   r  rm  rl   rf  r  r  r  r  r  
contextlibr   dataclassesr   datetimer   enumr   	functoolsr   r   r	   ior
   typingr   r   r   r   r   unittest.mockr   rq   torch._dynamo.test_casetorch.cuda.nccltorch.distributedr=  r   torch.nnr  torch._C._autogradr   torch._C._distributed_c10dr   torch._logging._internalr   $torch.testing._internal.common_utilsr   r   r   r   r   r   r   r   r   r   r   r   5torch.testing._internal.distributed.multi_threaded_pgr    r!   r"   	getLoggerr/   r  setLevelINFOr   re   HAS_ACCELERATORr*   rn   rU   rg   r{   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r  rf   r3   r2  r  r  r/  getenvr.  r*  r1  r9  rV  r5   r]  r^  rg  r4   rm  rp  r  rs  r9  rZ  rX  rf  rl  rq  ru  r  r  r  r  r  r  r]  	test_caser  r  r&  r.   r7   r8   <module>rq     s        	   
       % !   , ,  = =        ) 7 .     
		8	$  4 E? 3x38z 
8
C x$FG	
 Xb"BC x45 8B => 8B >? 8B >? 8B >? 8B >? 8B >? 8B >? 8B >? HR>?  (267!" hr#LM#$ x
V%* 8B DE+, hr#BC-
4 * * **&6 $R$+\4	
:
F Fc F# F$ F  	a 
 
: O"))$GOPO,c2  +.'(
T 
IC I 2 2(S (c (s (XS 3 2 26(--	. 5
Xc] 
d 
"  y8 yB+N. +N\d3i( 
 F   ,0 
3E7tl&H l&^Tbii T #RYY #  <A) )> %--*A*A*J*J  @.)< .8zX zr7   