
    i                         d dl Z d dlZd dlZd dlmZ d dlmZmZmZ 	 d dl	m
Z
 n# e$ r	 d dlm
Z
 Y nw xY wdZdZ G d de          ZdS )	    N)Thread)APIErrorDatetimeSerializer
batch_post)Emptyi  i  P c                   h    e Zd ZdZ ej        d          Z	 	 	 	 	 	 	 	 dd	Zd
 Zd Z	d Z
d Zd ZdS )Consumerz.Consumes the messages from the client's queue.posthogd   N      ?F
      c                     t          j        |            d| _        || _        || _        || _        || _        || _        || _        || _	        d| _
        || _        |	| _        |
| _        dS )zCreate a consumer thread.TN)r   __init__daemonflush_atflush_intervalapi_keyhoston_errorqueuegziprunningretriestimeouthistorical_migration)selfr   r   r   r   r   r   r   r   r   r   s              C/var/www/icac/venv/lib/python3.11/site-packages/posthog/consumer.pyr   zConsumer.__init__   sr     	 ,	 
	
 $8!!!    c                     | j                             d           | j        r|                                  | j        | j                             d           dS )zRuns the consumer.zconsumer is running...zconsumer exited.N)logdebugr   uploadr   s    r   runzConsumer.run;   sW    /000l 	KKMMM l 	 	)*****r   c                     d| _         dS )zPause the consumer.FN)r   r$   s    r   pausezConsumer.pauseC   s    r   c                    d}|                                  }t          |          dk    rdS 	 |                     |           d}n# t          $ rx}| j                            d|           d}| j        rJ	 |                     ||           n2# t          $ r%}| j                            d|           Y d}~nd}~ww xY wY d}~nd}~ww xY w|D ]}| j                                         n## |D ]}| j                                         w xY w|S )z:Upload the next batch of items, return whether successful.Fr   Tzerror uploading: %szon_error handler failed: %sN)	nextlenrequest	Exceptionr!   errorr   r   	task_done)r   successbatcheitems        r   r#   zConsumer.uploadG   s]   		u::??5	'LLGG 	E 	E 	EHNN0!444G} EEMM!U++++  E E EHNN#@!DDDDDDDDE	E  ' '
$$&&&&' ' '
$$&&&&' sY   A C) 
C$C4B
C
B:B50C5B::C=C) CC) ) D	c                    | j         }g }t          j                    }d}t          |          | j        k     rt          j                    |z
  }|| j        k    rn	 |                    d| j        |z
            }t          t          j        |t                    
                                          }|t          k    r)| j                            dt          |                     |                    |           ||z  }|t           k    r| j                            d|           n*n# t$          $ r Y nw xY wt          |          | j        k     |S )z)Return the next batch of items to upload.r   T)blockr   )clsz)Item exceeds 900kib limit, dropping. (%s)zhit batch size limit (size: %d))r   time	monotonicr*   r   r   getjsondumpsr   encodeMAX_MSG_SIZEr!   r-   strappendBATCH_SIZE_LIMITr"   r   )r   r   items
start_time
total_sizeelapsedr2   	item_sizes           r   r)   zConsumer.next`   sY   
^%%

%jj4=((n&&3G$---yytT5H75RySS
45G H H H O O Q QRR	|++HNNCSYY   T"""i'
!111HNN#DjQQQ 2    # %jj4=((( s   BD* (A D* *
D76D7c           	         d }d}t          | j        dz             D ]}	 t          | j        | j        | j        | j        || j                    dS # t          $ rv}|} ||          s || j        k     rSt          |dd          }|r|dk    rt          j        |           n%t          j        t          d|z  d                     Y d}~d}~ww xY w|r|dS )	z=Attempt to upload the batch and retry before raising an errorc                     t          | t                    r,| j        dk    rdS d| j        cxk    odk     nc o| j        dv S dS )NzN/AFi  i  )i  i  T)
isinstancer   status)excs    r   is_retryablez&Consumer.request.<locals>.is_retryable   sb    #x(( 	 :&& 5 CJ44444444V#*J:VWW tr   N   )r   r   r0   r   retry_afterr         )ranger   r   r   r   r   r   r   r,   getattrr6   sleepmin)r   r0   rJ   last_excattemptr1   rL   s          r   r+   zConsumer.request~   s,   
	 
	 
	 T\A-.. 	8 	8G8LI L)-)B     
8 
8 
8#|A T\))")!]D"A"AK" 8{Q
;////
3q'z2#6#6777
8  	N	 	s   .A
CA,CC)r   NNr   Fr   r   F)__name__
__module____qualname____doc__logging	getLoggerr!   r   r%   r'   r#   r)   r+    r   r   r	   r	      s        88
'
I
&
&C "9 9 9 9B+ + +    2  <( ( ( ( (r   r	   )r9   rY   r6   	threadingr   posthog.requestr   r   r   r   r   ImportErrorQueuer<   r?   r	   r[   r   r   <module>r`      s             D D D D D D D D D D     # Q Q Q Q Qv Q Q Q Q Qs   % 33