
    
Bj                       d Z ddlmZ ddlZddlZddlZddlZddlZddlm	Z	 ddl
mZmZmZ ddlmZ ddlmZ ddlmZmZmZ  ej2                  d	      Z e       Z e       Z e       Ze	 G d
 d             Z G d d      Z y)u~  Gateway streaming consumer — bridges sync agent callbacks to async platform delivery.

The agent fires stream_delta_callback(text) synchronously from its worker thread.
GatewayStreamConsumer:
  1. Receives deltas via on_delta() (thread-safe, sync)
  2. Queues them to an asyncio task via queue.Queue
  3. The async run() task buffers, rate-limits, and progressively edits
     a single message on the target platform

Design: Uses the edit transport (send initial message, then editMessageText).
This is universally supported across Telegram, Discord, and Slack.

Credit: jobless0x (#774, #1312), OutThisLife (#798), clicksingh (#697).
    )annotationsN)	dataclass)AnyCallableOptional)BasePlatformAdapter)_custom_unit_to_cp)DEFAULT_STREAMING_EDIT_INTERVAL"DEFAULT_STREAMING_BUFFER_THRESHOLDDEFAULT_STREAMING_CURSORzgateway.stream_consumerc                  t    e Zd ZU dZeZded<   eZded<   e	Z
ded<   dZd	ed
<   dZded<   dZded<   dZded<   y)StreamConsumerConfigz5Runtime config for a single stream consumer instance.floatedit_intervalintbuffer_thresholdstrcursorFboolbuffer_only        fresh_final_after_secondsauto	transport 	chat_typeN)__name__
__module____qualname____doc__ _DEFAULT_STREAMING_EDIT_INTERVALr   __annotations__#_DEFAULT_STREAMING_BUFFER_THRESHOLDr   _DEFAULT_STREAMING_CURSORr   r   r   r   r        </home/ubuntu/.hermes/hermes-agent/gateway/stream_consumer.pyr   r   0   sS    ?;M5;?c?+FC+K (+u* Is Isr&   r   c                     e Zd ZU dZdZdZdZdZded<   	 	 	 	 d(	 	 	 	 	 	 	 	 	 	 	 d)d	Z	e
d*d
       Ze
d*d       Zd+dZd,dZd+dZddd-dZd,dZd+dZd,dZd+dZd+dZ ej.                  d      Zed.d       Zd/dZd0dZd1dZeef	 	 	 	 	 	 	 d2d       Zd,dZ d*dZ!d*dZ"d3d Z#d+d!Z$d+d"Z%d3d#Z&d*d$Z'd3d%Z(dd&d4d'Z)y)5GatewayStreamConsumera2  Async consumer that progressively edits a platform message with streamed tokens.

    Usage::

        consumer = GatewayStreamConsumer(adapter, chat_id, config, metadata=metadata)
        # Pass consumer.on_delta as stream_delta_callback to AIAgent
        agent = AIAgent(..., stream_delta_callback=consumer.on_delta)
        # Start the consumer as an asyncio task
        task = asyncio.create_task(consumer.run())
        # ... run agent in thread pool ...
        consumer.finish()  # signal completion
        await task         # wait for final edit
       )z<REASONING_SCRATCHPAD>z<think>z<reasoning>z
<THINKING>z
<thinking>z	<thought>)z</REASONING_SCRATCHPAD>z</think>z</reasoning>z</THINKING>z</thinking>z
</thought>r   r   _draft_id_counterNc                   || _         || _        |xs
 t               | _        || _        || _        || _        t        j                         | _	        d| _
        d | _        d | _        d| _        d| _        d| _        d| _        d| _        d| _        d| _        | j                  j(                  | _        d| _        t/        |dd      du | _        d| _        d| _        d| _        d | _        d| _        y )Nr   FTr   r   REQUIRES_EDIT_FINALIZE)adapterchat_idr   cfgmetadata_on_new_message_initial_reply_to_idqueueQueue_queue_accumulated_message_id_message_created_ts_already_sent_edit_supported_last_edit_time_last_sent_text_fallback_final_send_fallback_prefix_flood_strikesr   _current_edit_interval_final_response_sentgetattr_adapter_requires_finalize_in_think_block_think_buffer_use_draft_streaming	_draft_id_draft_failures)selfr.   r/   configr1   on_new_messageinitial_reply_to_ids          r'   __init__zGatewayStreamConsumer.__init__q   s     313   .$7!#(;;=*. 59 "#"!$)! "&*hh&<&<#$)! G5u=E 	'
  % %*!(,  !r&   c                    | j                   S )z?True if at least one message was sent or edited during the run.)r:   rJ   s    r'   already_sentz"GatewayStreamConsumer.already_sent   s     !!!r&   c                    | j                   S )zBTrue when the stream consumer delivered the final assistant reply.)rB   rP   s    r'   final_response_sentz)GatewayStreamConsumer.final_response_sent   s     (((r&   c                B    | j                   j                  t               y)z>Finalize the current stream segment and start a fresh message.N)r6   put_NEW_SEGMENTrP   s    r'   on_segment_breakz&GatewayStreamConsumer.on_segment_break   s    %r&   c                L    |r"| j                   j                  t        |f       yy)z7Queue a completed interim assistant commentary message.N)r6   rU   _COMMENTARYrJ   texts     r'   on_commentaryz#GatewayStreamConsumer.on_commentary   s    KKOO[$/0 r&   c                ~    | j                   }|y	  |        y# t        $ r t        j                  dd       Y yw xY w)z8Fire the on_new_message callback, swallowing any errors.Nzon_new_message callback errorTexc_info)r2   	Exceptionloggerdebug)rJ   cbs     r'   _notify_new_messagez)GatewayStreamConsumer._notify_new_message   sB    !!:	ID 	ILL84LH	Is     <<Fpreserve_no_editc                  |r| j                   dk(  ry d | _         d | _        d| _        d| _        d| _        d| _        | j                  r9t        |       xj                  dz  c_        t        |       j                  | _	        y y )N__no_edit__r   F   )
r8   r9   r7   r=   r>   r?   rG   typer+   rH   )rJ   rf   s     r'   _reset_segment_statez*GatewayStreamConsumer._reset_segment_state   s{     0 0M A#' !$)! " $$J((A-(!$Z99DN %r&   c                f    |r| j                   j                  |       y|| j                          yy)u2  Thread-safe callback — called from the agent's worker thread.

        When *text* is ``None``, signals a tool boundary: the current message
        is finalized and subsequent text will be sent as a new message so it
        appears below any tool-progress messages the gateway sent in between.
        N)r6   rU   rW   rZ   s     r'   on_deltazGatewayStreamConsumer.on_delta   s-     KKOOD!\!!# r&   c                B    | j                   j                  t               y)z#Signal that the stream is complete.N)r6   rU   _DONErP   s    r'   finishzGatewayStreamConsumer.finish   s    r&   c                   | j                   |z   }d| _         |rJ| j                  rd}d}| j                  D ]1  }|j                  |      }|dk7  s|dk(  s||k  s%|}t	        |      }3 |rd| _        |||z   d }nt        d | j                  D              }t	        |      |kD  r|| d | _         y|| _         yd}d}| j                  D ]  }d}	 |j                  ||      }|dk(  r|dk(  r+| j                   xs | j                  j                  d      }	nt|d| }
|
j                  d      }|dk(  r@| j                   xs | j                  j                  d      xr |
j                         dk(  }	n|
|d	z   d j                         dk(  }	|	r|dk(  s||k  r|}t	        |      }|d	z   } |r(| xj                  |d| z  c_        d| _        |||z   d }nd}| j                  D ]9  }t        d	t	        |            D ]  }|j                  |d|       s||kD  s|}! ; |r%| xj                  |d|  z  c_        || d | _         y| xj                  |z  c_        y|rIyy)
ab  Add a text delta to the accumulated buffer, suppressing think blocks.

        Uses a state machine that tracks whether we are inside a
        reasoning/thinking block.  Text inside such blocks is silently
        discarded.  Partial tags at buffer boundaries are held back in
        ``_think_buffer`` until enough characters arrive to decide.
        r   r   FNc              3  2   K   | ]  }t        |        y w)N)len).0ts     r'   	<genexpr>z?GatewayStreamConsumer._filter_and_accumulate.<locals>.<genexpr>  s     !IQ#a&!Is   T
ri   )rF   rE   _CLOSE_THINK_TAGSfindrt   max_OPEN_THINK_TAGSr7   endswithrfindstriprange)rJ   r[   bufbest_idxbest_lentagidxmax_tagsearch_startis_boundary	precedinglast_nl	held_backis                 r'   _filter_and_accumulatez,GatewayStreamConsumer._filter_and_accumulate   s      4'##11 ,C((3-Cbyh"nh#&#&s8	, +0D(h123C "!I$2H2H!IIG;>s8g;MgXYD& TWD& 00 /C#$L!hhsL9"9!!8$($5$5 5 !D#'#4#4#=#=d#C (
 ),DS	I&/ood&;G&"})-):):%: &I(,(9(9(B(B4(H%@(1(9R(? !, /8!.E.K.K.MQS.S&HNcHn'*H'*3xH!'*Qw5 /< %%Yh7%+/D(h123C !"I#44 .!&q#c(!3 .A"||CG4Y,-	.. !))S9*-==)-0)-=*  ))S0)W r&   c                    | j                   r4| j                  s'| xj                  | j                   z  c_        d| _         yyy)zFlush any held-back partial-tag buffer into accumulated text.

        Called when the stream ends (got_done) so that partial text that
        was held back waiting for a possible opening tag is not lost.
        r   N)rF   rE   r7   rP   s    r'   _flush_think_bufferz)GatewayStreamConsumer._flush_think_bufferU  s<     d&:&:!3!33!#D ';r&   c                  K   t        | j                  t              r| j                  j                  nt        }t        | j                  dd      }t        d| || j                  j                        z
  dz
        }| j                         | _
        | j                  rct        |       xj                  dz  c_        t        |       j                  | _        t        j                  d| j                   | j                         	 	 d}d}d	}	 	 | j"                  j%                         }|t&        u rd}nL|t(        u rd}nAt        |t*              rt	        |      d
k(  r|d   t,        u r|d   }n| j/                  |       r|r| j5                          t7        j8                         }|| j:                  z
  }	|xs |xs |d	u}
| j                  j<                  sO|
xsK |	| j>                  k\  xr | j@                  xs, t	        | j@                        | j                  jB                  k\  }
d}|
rL| j@                  r? || j@                        |kD  r| jD                  | j                  jG                  | j@                  ||      }d}| jD                  xs | jH                  }|D ]'  }| jK                  ||       d	{   }| ||k7  s&d}) d| _         d| _&        t7        j8                         | _        |r|| _'        y	|rd	| _"        d| _(        d| _)         || j@                        |kD  r| jD                  | jT                  rtW        | j@                  ||      }| j@                  jY                  dd|      }||d
z  k  r|}| j@                  d	| }| j[                  |       d	{   }| jP                  s|sn_| j@                  |d	 j]                  d      | _         d	| _"        d| _&         || j@                        |kD  r| jD                  | jT                  r| j@                  }|s|s||| j                  j                  z  }| j[                  ||xs |       d	{   }t7        j8                         | _        |r| j@                  r| jP                  r$| j_                  | j@                         d	{    y	|r| j`                  sd| _'        y	| jD                  r+| j[                  | j@                  d       d	{   | _'        y	| jb                  s(| j[                  | j@                         d	{   | _'        y	|R| je                          | jg                  |       d	{    t7        j8                         | _        | je                          |rS| j@                  r5|s3| jD                  r'| jD                  dk7  r| ji                          d	{    | je                  d       tk        jl                  d       d	{    # t0        j2                  $ r Y w xY w7 b7 {7 7 s7 .7 7 7 b7 5# tj        jn                  $ rt d}| j@                  rJ| jD                  r>	 tq        | j[                  | j@                         d	{  7        }n# tr        $ r Y nw xY w|r| jN                  s	d| _'        Y y	Y y	Y y	tr        $ r }t        ju                  d|       Y d	}~y	d	}~ww xY ww)z@Async task that drains the queue and edits the platform message.MAX_MESSAGE_LENGTH     d   ri   zBStream consumer using native-draft transport (chat=%s draft_id=%s)TFN   r   len_fnr   rx   finalizerh   re   g?zStream consumer error: %s);
isinstancer.   _BasePlatformAdaptermessage_len_fnrt   rC   r{   r0   r   _resolve_draft_streamingrG   rj   r+   rH   ra   rb   r/   r6   
get_nowaitro   rV   tuplerY   r   r4   Emptyr   time	monotonicr<   r   rA   r7   r   r8   truncate_messager3   _send_new_chunkr=   rB   r>   r?   r;   r	   r~   _send_or_editlstrip_send_fallback_finalrD   r:   rk   _send_commentary#_flush_segment_tail_on_edit_failureasynciosleepCancelledErrorr   r`   error)rJ   _len_fn
_raw_limit_safe_limitgot_donegot_segment_breakcommentary_textitemnowelapsedshould_editcurrent_update_visiblechunkschunks_deliveredreply_tochunknew_id
_cp_budgetsplit_atokdisplay_text_best_effort_okes                          r'   runzGatewayStreamConsumer.run_  s     $,,(<= LL'' 	
 T\\+?F
#zGDHHOO,DDsJK %)$A$A$C!$$J((A-(!$Z99DNLLTdnn
U	9 $)!"&#{{5575='+H!</04-!%dE2s4yA~$q'U`J`.21gO!33D9 & ,,. nn& 4 44 3(3&d2 
 xx++"- # D$?$?? 2 $ 1 1O t001TXX5N5NN   */&4#4#4   1 12[@ ,,4 "&!>!> --{7 "? " ,1(#'#3#3#Pt7P7P%+ 8E+/+?+?x+P%PF%1f6H37 08 -/)/1,/3~~/?,#
 9ID5",/3D,8=D546D1 
   1 12[@ ,,8 00%7 --{G&
 $(#4#4#:#:4J#O#kQ&66'2H $ 1 1)8 <#'#5#5e#<<44B ",0,=,=hi,H,O,OPT,U)+/(/1,+   1 12[@ ,,8 00* $(#4#4L#,=/BY$7 483E3E$"*"?.? 4F 4 .* ,0>>+;D(
 ((44"&";";D<M<M"NNN& # 3$($C$C 9=D5  "-- ?C>P>P $ 1 1D ?Q ? 9D5
  "&!3!3>B>P>PQUQbQb>c8cD5".--///@@@+/>>+;D(--/  % )) 6 ,, ,,="FFHHH--t-DmmD)))A $ !;; ^ &QB =,. O9 9d
 AB I *%% 	1#O  T%5%5&*1C1CDDUDU1V+V+V&WO   t'@'@,0) (A 	9LL4a88	9s  C>[X+ 
$W: .X+ /
W: 9X+ :.W: (X+ )W: :D8X+ 2X3X+ :X+  4X+ 4[5B'X+ XA1X+ AX+ XAX+ *X+X+ /[0X+ [,X+ 2X 3	X+ <[=*X+ 'X#(	X+ 1[2&X+ X%A+X+ X'.X+ 3X)4X+ :XX+ XX+ X+ X+ X+  X+ #X+ %X+ 'X+ )X+ +-[#Z<Y?
=	Z[	Z[Z[+[1[9[[[[z[`"']?MEDIA:\s*\S+[`"']?c                    d| vrd| vr| S | j                  dd      }t        j                  j                  d|      }t	        j                  dd|      }|j                         S )u  Strip MEDIA: directives and internal markers from text before display.

        The streaming path delivers raw text chunks that may include
        ``MEDIA:<path>`` tags and ``[[audio_as_voice]]`` directives meant for
        the platform adapter's post-processing.  The actual media files are
        delivered separately via ``_deliver_media_from_response()`` after the
        stream finishes — we just need to hide the raw directives from the
        user.
        zMEDIA:z[[audio_as_voice]]r   z\n{3,}z

)replacer)   	_MEDIA_REsubrerstrip)r[   cleaneds     r'   _clean_for_displayz(GatewayStreamConsumer._clean_for_displayX  sa     4$8$DK,,3R8'1155b'B&&FG4~~r&   c                6  K   | j                  |      }|j                         s|S 	 | j                  rt        | j                        ni }| j                  j                  | j                  |||       d{   }|j                  rY|j                  rMt        |j                        | _
        d| _        || _        | j                          t        |j                        S d| _        |S 7 r# t        $ r"}t         j#                  d|       |cY d}~S d}~ww xY ww)zSend a new message chunk, optionally threaded to a previous message.

        Returns the message_id so callers can thread subsequent chunks.
        r/   contentr   r1   NTFzStream send chunk error: %s)r   r   r1   dictr.   sendr/   success
message_idr   r8   r:   r=   rd   r;   r`   ra   r   )rJ   r[   reply_to_idmetaresultr   s         r'   r   z%GatewayStreamConsumer._send_new_chunkl  s     
 &&t,zz|	*.--4&RD<<,,$	 -  F ~~&"3"3#&v'8'8#9 %)"'+$ ((*6,,--',$""!"  	LL6:	sT   $DAC+ 6C)7A(C+ D C+ (D)C+ +	D4DDDDDc                    | j                   xs d}| j                  j                  rH|j                  | j                  j                        r#|dt	        | j                  j                          }| j                  |      S )z>Return the visible text already shown in the streamed message.r   N)r=   r0   r   r}   rt   r   rJ   prefixs     r'   _visible_prefixz%GatewayStreamConsumer._visible_prefix  s^    %%+88??vtxx?2c$((//223F&&v..r&   c                    | j                   xs | j                         }|r-|j                  |      r|t        |      d j	                         S |S )zAReturn only the part of final_text the user has not already seen.N)r?   r   
startswithrt   r   )rJ   
final_textr   s      r'   _continuation_textz(GatewayStreamConsumer._continuation_text  sK    &&@$*>*>*@j++F3c&kl+2244r&   c                "    ||       |k  r| gS g }| } ||      |kD  r^t        |||      }|j                  dd|      }||dz  k  r|}|j                  |d|        ||d j                  d      } ||      |kD  r^|r|j                  |       |S )z;Split text into reasonably sized chunks for fallback sends.rx   r   r   N)r	   r~   appendr   )r[   limitr   r   	remainingr   r   s          r'   _split_text_chunksz(GatewayStreamConsumer._split_text_chunks  s     $<5 6M	Y%'+IufEJ tQ
;H%1*$ MM)IX./!(),33D9I Y%' MM)$r&   c                8  K   | j                  |      }| j                  |      }d| _        |j                         s|j                         r|| j	                         k7  r|}n| j
                  r| j                  r| j                  j                  r| j                  j                  | j                  j                        r{| j                  dt        | j                  j                          }	 | j                  j                  | j                  | j
                  |       d{   }|j                  r|| _        d| _        d| _        yt%        | j                  dd      }t'        | j                  t(              r| j                  j*                  nt        }t-        d|dz
        }| j/                  |||	      }	| j
                  }
d}d
}d}|	D ]  }d}t1        d      D ]  }| j                  j3                  | j                  || j4                         d{   }|j                  r nL|dk(  rD| j7                  |      r3t8        j;                  d       t=        j>                  d       d{     n |r|j                  sE|r%d| _        d| _        || _        || _        d
| _          yd| _        d| _        d
| _        d
| _          yd}|}|jB                  xs |}| jE                          ! |
r:|
|k7  r5t%        | j                  dd      }|	  || j                  |
       d{    || _        d| _        d| _        |	d   | _        d
| _         y7 0# t        $ r Y 'w xY w7 O7 7 C# t        $ r!}t8        j;                  d|
|       Y d}~fd}~ww xY ww)zSend the final continuation after streaming edits stop working.

        Retries each chunk once on flood-control failures with a short delay.
        FNr/   r   r   Tr   r   r   r   r   r   r   r/   r   r1   r   z.Flood control on fallback send, retrying in 3sg      @delete_messagez(Fallback partial cleanup failed (%s): %srr   )#r   r   r>   r   r   r8   r=   r0   r   r}   rt   r.   edit_messager/   r   r`   r:   rB   rC   r   r   r   r{   r   r   r   r1   _is_flood_errorra   rb   r   r   r?   r   rd   )rJ   r[   r   continuation
clean_textr   	raw_limitr   
safe_limitr   stale_message_idlast_message_idlast_successful_chunksent_any_chunkr   attempt	delete_fnr   s                     r'   r   z*GatewayStreamConsumer._send_fallback_final  s    
 ,,T2
..z:$)!!!# !jD4H4H4J&J) $$,,,,55dhhooF!%!5!56LDHHOO8L7L!MJ	'+||'@'@$(LL'+'7'7$. (A ( "
 ">>3=D0 &*",0)DLL*>E	 $,,(<= LL'' 	
 i#o.
((z'(R++)- " *	'EF 8 #||00 LL!!]]  1   
 >>a<D$8$8$@LLH "--,,,  ! *.D&04D-'6D$+@D(,.D) &+"#' ')$(*%!N$)!$//B?O $$&U*	'b  0O C.>EI$#DLL2BCCC +!$(!%bz "u" % . -L D  LLB(! s   C+N.5M #M$M ;CNM&AN-M).B"NM- 'M+(M- ,'NM 	M#N"M##N)N+M- -	N6NNNNc                d    t        |dd      xs d}|j                         }d|v xs
 d|v xs d|v S )zFCheck if a SendResult failure is due to flood control / rate limiting.r   r   floodzretry afterrate)rC   lower)rJ   r   err	err_lowers       r'   r   z%GatewayStreamConsumer._is_flood_error+  s?    fgr*0bIIK	)#X}	'AXVyEXXr&   c                   | j                   j                  xs dj                         }|dk(  ry|dk(  ryt        | j                  t
              sy	 | j                  j                  | j                   j                  xs d| j                        }|s;|d
k(  r5t        j                  d| j                  | j                   j                         yy# t        $ r t        j                  dd	       d}Y bw xY w)u  Decide whether this run should use native draft streaming.

        Honors ``cfg.transport``:
          * ``"edit"``  → never use drafts (legacy progressive-edit path).
          * ``"draft"`` → require draft support; gracefully fall back to edit
            when the adapter declines.  Logs the downgrade at debug.
          * ``"auto"``  → use drafts when the adapter supports them for this
            chat type; otherwise edit.

        Adapter eligibility is checked via
        :meth:`BasePlatformAdapter.supports_draft_streaming`, which considers
        the chat type (e.g. Telegram drafts are DM-only) and platform-version
        gates (e.g. python-telegram-bot 22.6+).
        r   editFoffN)r   r1   z%supports_draft_streaming probe raisedTr^   draftuU   Draft streaming requested but unsupported (chat=%s, type=%r) — falling back to edit)r0   r   r   r   r.   r   supports_draft_streamingr   r1   r`   ra   rb   r/   )rJ   r   	supporteds      r'   r   z.GatewayStreamConsumer._resolve_draft_streaming1  s     XX''1688:	 $,,(<=	==((,,4 > I G#+LL$(("4"4
   	LL@4LPI	s   ?C "C32C3c                  K   | j                   d| _        y	 | j                  j                  | j                  | j                   || j
                         d{   }t        |dd      s>t        j                  dt        |dd	             | xj                  dz  c_	        d| _        y|| _        y
7 W# t        $ r<}t        j                  d|       | xj                  dz  c_	        d| _        Y d}~yd}~ww xY ww)a  Emit a single animated draft frame for the current accumulated text.

        Returns True when the frame landed.  On any failure, permanently
        disables drafts for the remainder of this run so subsequent frames
        flow through the edit-based path (which can adapt with flood-control
        backoff, etc.).  Drafts have no message_id and clear naturally on
        the client when the response finalizes via a regular sendMessage.
        NF)r/   draft_idr   r1   z=send_draft raised, disabling draft transport for this run: %sri   r   z@send_draft returned success=False, disabling draft transport: %sr   unknownT)rH   rG   r.   
send_draftr/   r1   r`   ra   rb   rI   rC   r=   rJ   r[   r   r   s       r'   _send_draft_framez'GatewayStreamConsumer._send_draft_frame\  s      >>! ).D%	<<22	 3  F vy%0LLR3   A% (-D%#/  	LLOQR   A% (-D%	sB   C:A B2 B0B2 AC:0B2 2	C7;2C2-C:2C77C:c                H  K   | j                   s| j                          d{    | j                  xs | j                         }| j                  }|r-|j                  |      r|t        |      d j                         }| j                  |      }|j                         sy	 | j                  j                  | j                  || j                         d{   }|j                  rd| _        yy7 7 # t         $ r }t"        j%                  d|       Y d}~yd}~ww xY ww)aK  Deliver un-sent tail content before a segment-break reset.

        When an edit fails (flood control, transport error) and a tool
        boundary arrives before the next retry, ``_accumulated`` holds text
        that was generated but never shown to the user. Without this flush,
        the segment reset would discard that tail and leave a frozen cursor
        in the partial message.

        Sends the tail that sits after the last successfully-delivered
        prefix as a new message, and best-effort strips the stuck cursor
        from the previous partial message.
        Nr   Tz"Segment-break tail flush error: %s)r>   _try_strip_cursorr?   r   r7   r   rt   r   r   r   r.   r   r/   r1   r   r:   r`   ra   r   )rJ   visibletailr   r   s        r'   r   z9GatewayStreamConsumer._flush_segment_tail_on_edit_failure  s     ((((***''A4+?+?+A  tw/G&--/D&&t,zz|		B<<,, -  F
 ~~%)"  +  	BLL=qAA	BsL    D"C2A?D"#5C6 C4C6 0D"4C6 6	D?DD"DD"c                0  K   | j                   r| j                   dk(  ry| j                         }|r|j                         sy	 | j                  j	                  | j
                  | j                   |       d{    || _        y7 # t        $ r Y yw xY ww)u   Best-effort edit to remove the cursor from the last visible message.

        Called when entering fallback mode so the user doesn't see a stuck
        cursor (▉) in the partial message.
        rh   Nr   )r8   r   r   r.   r   r/   r=   r`   r   s     r'   r  z'GatewayStreamConsumer._try_strip_cursor  s      4#3#3}#D%%'V\\^	,,++++ ,   
 $*D   		s<   A B5B 8B9B BB 	BBBBc                p  K   | j                  |      }|j                         sy	 | j                  j                  | j                  || j
                         d{   }|j                  r| j                          |j                  S 7 ,# t        $ r }t        j                  d|       Y d}~yd}~ww xY ww)z6Send a completed interim assistant commentary message.Fr   NzCommentary send error: %s)r   r   r.   r   r/   r1   r   rd   r`   ra   r   r  s       r'   r   z&GatewayStreamConsumer._send_commentary  s     &&t,zz|	<<,, -  F ~~ ((*>>!   	LL4a8	s@   #B65B
 B+B
 B6B
 
	B3B.)B6.B33B6c                    t        | j                  dd      xs d}|dk  ry| j                  r| j                  dk(  ry| j                  yt	        j
                         | j                  z
  }||k\  S )a  Return True when a long-lived preview should be replaced with a
        fresh final message instead of an edit.

        Conditions:
        - Fresh-final is enabled (``fresh_final_after_seconds > 0``).
        - We have a real preview message id (not the ``__no_edit__`` sentinel
          and not ``None``).
        - The preview has been visible for at least the configured threshold.

        Ported from openclaw/openclaw#72038.
        r   r   r   Frh   )rC   r0   r8   r9   r   r   )rJ   	thresholdages      r'   _should_send_fresh_finalz.GatewayStreamConsumer._should_send_fresh_final  sp     DHH&A3GN3	>4#3#3}#D##+nn!9!99ir&   c                  K   | j                   }	 | j                  j                  | j                  || j                         d{   }t        |dd      sy|r:|dk7  r5t        | j                  dd      }|	  || j                  |       d{    t        |d	d      }|r!|| _         t        j                         | _        nd| _         d| _        d
| _        || _        d
| _        y
7 # t
        $ r }t        j                  d|       Y d}~yd}~ww xY w7 # t
        $ r!}t        j                  d||       Y d}~d}~ww xY ww)aC  Send ``text`` as a brand-new message (best-effort delete the old
        preview) so the platform's visible timestamp reflects completion
        time.  Returns True on successful delivery, False on any failure so
        the caller falls back to the normal edit path.

        Ported from openclaw/openclaw#72038.
        r   Nz1Fresh-final send failed, falling back to edit: %sFr   rh   r   z+Fresh-final preview cleanup failed (%s): %sr   T)r8   r.   r   r/   r1   r`   ra   rb   rC   r   r   r9   r:   r=   rB   )rJ   r[   old_message_idr   r   r   new_message_ids          r'   _try_fresh_finalz&GatewayStreamConsumer._try_fresh_final  sE     ))	<<,, -  F vy%0 n=.>EI$#DLL.AAA !t<-D'+~~'7D$
  -D'+D$!#$(!S
  	LLLaP	 B  LLE& s}   E5C* C(C* 
.E9D DD AE(C* *	D3D	EDED 	E!D=8E=EEr   c                 K   | j                  |      }|}| j                  j                  r&|j                  | j                  j                  d      }|j	                         }|sy|j	                         syd}| j
                  =| j                  j                  r'| j                  j                  |v rt        |      |k  ry| j                  r:|s8| j
                  ,|| j                  k(  ry| j                  |       d{   }|ry	 | j
                  H| j                  r:|| j                  k(  r|r| j                  sy|r*| j                         r| j                  |       d{   ry| j                  j                  | j                   | j
                  ||       d{   }|j"                  rd| _        t'        |dd      xs d}|rp|j(                  rd|j(                  | j
                  k7  rKt+        |j(                        | _        t-        j.                         | _        d| _        | j3                          n|| _        d| _        y| j7                  |      r| xj4                  d	z  c_        t9        | j:                  d
z  d      | _        t<        j?                  d| j4                  | j@                  | j:                         | j4                  | j@                  k  rt-        j.                         | _!        yt<        j?                  d| j4                         | jE                         | _#        d| _$        d| _
        d| _        | jK                          d{    yy| j                  jM                  | j                   || jN                  | jP                         d{   }|j"                  r|j(                  r+|j(                  | _        t-        j.                         | _        nd| _
        d| _        || _        |j(                  s#| jE                         | _#        d| _$        d| _        | j3                          yd| _
        y7 C7 7 7 7 # tR        $ r }	t<        jU                  d|	       Y d}	~	yd}	~	ww xY ww)a\  Send or edit the streaming message.

        Returns True if the text was successfully delivered (sent or edited),
        False otherwise.  Callers like the overflow split loop use this to
        decide whether to advance past the delivered chunk.

        ``finalize`` is True when this is the last edit in a streaming
        sequence.
        r   T   N)r/   r   r   r   continuation_message_idsr%   r   ri   r   g      $@u@   Flood control on edit (strike %d/%d), backoff interval → %.1fsFz0Edit failed (strikes=%d), entering fallback moder   rh   zStream send/edit error: %s)+r   r0   r   r   r   r8   rt   rG   r=   r  r;   rD   r  r  r.   r   r/   r   r:   rC   r   r   r   r   r9   rd   r@   r   minrA   ra   rb   _MAX_FLOOD_STRIKESr<   r   r?   r>   r  r   r3   r1   r`   r   )
rJ   r[   r   visible_without_cursor_visible_stripped_MIN_NEW_MSG_CHARSr   r   _continuation_idsr   s
             r'   r   z#GatewayStreamConsumer._send_or_edit  s!     &&t, "&88??%;%C%CDHHOOUW%X"288: zz| $HHOOHHOOt+)*-?? %%  ( t+++--d33B
 R	+'' t333 T%D%D# ! 99;"&"7"7"===##'<<#<#< $#'#3#3 $!)	 $= $ F ~~-1* -4F<VXZ,[,a_a)- & 1 1 & 1 1T5E5E E/263D3D/ED,7;~~7GD435D0 44637D0./+#  //7 //14/:= $ ; ;a ?;D7 #LL!= $ 3 3 $ 7 7 $ ; ;  $22T5L5LL 8<~~7G 4',
 N // 150D0D0F-481/4,-1* #44666$ !  $||00 LL !66!]]	  1    >>((+1+<+<( 48>>3C0/4,)-D&+/D(!,,040D0D0F-481 ,9(
 ,,. ,1D( q 4F >N 7D  	LL5q9	s   C=Q;?Q Q;7Q ?Q; &Q &Q'Q +Q;,6Q "Q#B(Q Q;B,Q 8Q;9AQ QQ Q;A Q QBQ 9Q;:Q Q;Q Q Q Q 	Q8Q3.Q;3Q88Q;)NNNN)r.   r   r/   r   rK   zOptional[StreamConsumerConfig]r1   zOptional[dict]rL   zOptional[callable]rM   Optional[str])returnr   )r  None)r[   r   r  r  )rf   r   r  r  )r[   r   r  r   )r[   r   r   r  r  r  )r  r   )r   r   r  r   )r[   r   r   r   r   z'Callable[[str], int]'r  z	list[str])r[   r   r  r   )r[   r   r   r   r  r   )*r   r   r   r    r  r|   ry   r+   r"   rN   propertyrQ   rS   rW   r\   rd   rk   rm   rp   r   r   r   r   compiler   staticmethodr   r   r   r   rt   r   r   r   r   r  r   r  r   r  r  r   r%   r&   r'   r)   r)   M   s     
 s 26#'-1-1A!A! A! /	A!
 !A! +A! +A!F " " ) )&1
I @E :(
$Vp$r9n 

:;I   &>/  *-& 
 ({#zY)V&PBB*4 ,3j BG Vr&   r)   )!r    
__future__r   r   loggingr4   r   r   dataclassesr   typingr   r   r   gateway.platforms.baser   r   r	   gateway.configr
   r!   r   r#   r   r$   	getLoggerra   objectro   rV   rY   r   r)   r%   r&   r'   <module>r+     s    #    	  ! * * N 5  
		4	5 	 x h   8h hr&   