
    3fi5                       d Z ddlmZ ddlZddlZddlZddlmZ ddlm	Z	m
Z
mZmZ ddlmZ ddlmZmZmZ e	rddlmZ dd	lmZ  ej.                  e      Zd
ZdZdZdZ G d de      Z	 	 	 	 	 	 	 	 	 	 	 	 ddZ G d de      Z y)zKafka-based chat message history by using confluent-kafka-python.
confluent-kafka-python is under Apache 2.0 license.
https://github.com/confluentinc/confluent-kafka-python
    )annotationsN)Enum)TYPE_CHECKINGListOptionalSequence)BaseChatMessageHistory)BaseMessagemessage_to_dictmessages_from_dict)TopicPartitionAdminClientzbootstrap.serversi $      c                      e Zd ZdZdZdZdZy)ConsumeStartPositionzConsume start position for Kafka consumer to get chat history messages.
    LAST_CONSUMED: Continue from the last consumed offset.
    EARLIEST: Start consuming from the beginning.
    LATEST: Start consuming from the latest offset.
    r      r   N)__name__
__module____qualname____doc__LAST_CONSUMEDEARLIESTLATEST     n/var/www/auto_recruiter/arenv/lib/python3.12/site-packages/langchain_community/chat_message_histories/kafka.pyr   r      s     MHFr   r   c           	     F   ddl m} 	 | j                         j                  }||v r6t	        ||   j
                        }t        j                  d| d| d       |S 	  ||||dt        |      i	      g}		 | j                  |	      }
|
j                         D ]  \  }}|j                           t        j                  d| d
       |S # t        $ r}t        j                  d|        |d}~ww xY w# t        $ r"}t        j                  d| d|        |d}~ww xY w)zCreate topic if it doesn't exist, and return the number of partitions.
    If the topic already exists, we don't change the topic configuration.
    r   )NewTopicTopic z already exists with z partitionszFailed to list topics: Nzretention.ms)num_partitionsreplication_factorconfigz createdzFailed to create topic : )confluent_kafka.adminr    list_topicstopicslen
partitionsloggerinfo	Exceptionerrorstrcreate_topicsitemsresult)admin_client
topic_namer#   	partitionttl_msr    topic_metadatar"   er(   futures_fs                r   ensure_topic_existsr<   *   s?    /
%113::' 
!;!F!FGNKK$9.9IU "! ( 	$1"CK0		
F,,V4MMO 	DAqHHJ	fZL12
 +  .qc23"  .zl"QC@As1   AC
 6AC5 
	C2C--C25	D >DD c                      e Zd ZdZeeef	 	 	 	 	 	 	 	 	 ddZ	 d	 	 	 	 	 ddZ	 	 	 	 	 	 	 	 ddZ		 d	 	 	 	 	 ddZ
	 d	 	 	 	 	 ddZ	 d	 	 	 	 	 ddZedd       Zdd	Zdd
Zy)KafkaChatMessageHistorya  Chat message history stored in Kafka.

    Setup:
        Install ``confluent-kafka-python``.

        .. code-block:: bash

            pip install confluent_kafka

    Instantiate:
        .. code-block:: python

            from langchain_community.chat_message_histories import KafkaChatMessageHistory

            history = KafkaChatMessageHistory(
                session_id="your_session_id",
                bootstrap_servers="host:port",
            )

    Add and retrieve messages:
        .. code-block:: python

            # Add messages
            history.add_messages([message1, message2, message3, ...])

            # Retrieve messages
            message_batch_0 = history.messages

            # retrieve messages after message_batch_0
            message_batch_1 = history.messages

            # Reset to beginning and retrieve messages
            messages_from_beginning = history.messages_from_beginning()

    Retrieving messages is stateful. Internally, it uses Kafka consumer to read.
    The consumed offset is maintained persistently.

    To retrieve messages, you can use the following methods:
    - `messages`:
        continue consuming chat messages from last one.
    - `messages_from_beginning`:
        reset the consumer to the beginning of the chat history and return messages.
        Optional parameters:
        1. `max_message_count`: maximum number of messages to return.
        2. `max_time_sec`: maximum time in seconds to wait for messages.
    - `messages_from_latest`:
        reset to end of the chat history and try consuming messages.
        Optional parameters same as above.
    - `messages_from_last_consumed`:
        continuing from the last consumed message, similar to `messages`.
        Optional parameters same as above.

    `max_message_count` and `max_time_sec` are used to avoid blocking indefinitely
     when retrieving messages. As a result, the method to retrieve messages may not
     return all messages. Change `max_message_count` and `max_time_sec` to retrieve
     all history messages.
    c                    	 ddl m} ddlm} || _        || _         |t        |i      | _	        t        | j                  ||||      | _         |t        |i      | _        y# t        t
        f$ r t	        d      w xY w)a  
        Args:
            session_id: The ID for single chat session. It is used as Kafka topic name.
            bootstrap_servers:
                Comma-separated host/port pairs to establish connection to Kafka cluster
                https://kafka.apache.org/documentation.html#adminclientconfigs_bootstrap.servers
            ttl_ms:
                Time-to-live (milliseconds) for automatic expiration of entries.
                Default 7 days. -1 for no expiration.
                It translates to https://kafka.apache.org/documentation.html#topicconfigs_retention.ms
            replication_factor: The replication factor for the topic. Default 1.
            partition: The number of partitions for the topic. Default 3.
        r   )Producerr   z_Could not import confluent_kafka package. Please install it with `pip install confluent_kafka`.N)confluent_kafkar@   r&   r   ImportErrorModuleNotFoundError
session_idbootstrap_serversBOOTSTRAP_SERVERS_CONFIGr3   r<   r"   producer)selfrD   rE   r6   r#   r5   r@   r   s           r   __init__z KafkaChatMessageHistory.__init__   s    *	09 %!2')ACT(UV1z+=y&
 !":<M!NO 01 	H 	s   A! !A<c           	     b   	 |D ]E  }| j                   j                  | j                  t        j                  t        |                   G | j                   j                  |      }|dkD  rt        j                  | d       yy# t        $ r}t        j                  d|        |d}~ww xY w)zAAdd messages to the chat history by producing to the Kafka topic.)topicvaluer   z messages are still in-flight.z!Failed to add messages to Kafka: N)rG   producerD   jsondumpsr   flushr+   warningr-   r.   )rH   messagesflush_timeout_secondsmessagemessage_remainingr8   s         r   add_messagesz$KafkaChatMessageHistory.add_messages   s    	# %%//**_W%=> & 
 !% 3 34I J 1$"3!44RST % 	LL<QC@AG	s   BB 	B.B))B.c                P   ddl mmm} t        | j
                  d| j                  d|t        j                  k(  rdndi}	 	 	 	 	 	 dfd}	 	 	 	 	 	 dfd}g } ||      }		 |t        j                  k(  r|	j                  | j                  g|	       nN|t        j                  k(  r|	j                  | j                  g|	       n|	j                  | j                  g       t        j                         }
	 |t        j                         |
z
  |kD  rn|t        |      |k\  rn|	j                  d      }|D|j                         't        j                  d|j                                 {|j!                         t        j#                  d       |j%                  t'        j(                  |j!                                      |	j-                          t/        |      S # t*        $ r}t        j                  d|        |d
}~ww xY w# |	j-                          w xY w)a  Retrieve messages from Kafka topic for the session.
           Please note this method is stateful. Internally, it uses Kafka consumer
           to consume messages, and maintains the consumed offset.

         Args:
              consume_start_pos: Start position for Kafka consumer.
              max_message_count: Maximum number of messages to consume.
              max_time_sec:      Time limit in seconds to consume messages.
        Returns:
              List of messages.
        r   )OFFSET_BEGINNING
OFFSET_ENDConsumerzgroup.idzauto.offset.resetlatestearliestc                D    |D ]	  }|_          | j                  |       y Noffsetassign)assigned_consumerassigned_partitionsprX   s      r   assign_beginningzAKafkaChatMessageHistory.__read_messages.<locals>.assign_beginning   s*     ) ,+,$$%89r   c                D    |D ]	  }|_          | j                  |       y r^   r_   )rb   rc   rd   rY   s      r   assign_latestz>KafkaChatMessageHistory.__read_messages.<locals>.assign_latest   s*     ) &%&$$%89r   )	on_assignNg      ?)timeoutzConsumer error: zEmpty message valuez'Failed to consume messages from Kafka: )rb   rZ   rc   zlist[TopicPartition]returnNone)rA   rX   rY   rZ   rF   rE   rD   r   r   r   	subscribetimer)   pollr.   r+   rL   rQ   appendrN   loadsr-   closer   )rH   consume_start_posmax_message_countmax_time_secrZ   consumer_configre   rg   rR   consumerstart_time_secrT   r8   rX   rY   s                @@r   __read_messagesz'KafkaChatMessageHistory.__read_messages   s   " 	KJ %d&<&< $8$?$?? "*
	:'	:>R	:	:	:'	:>R	:	:  "O,	 $8$A$AA""DOO#4@P"Q"&:&A&AA""DOO#4"N""DOO#45!YY[N ,		n4|C$0S]FW5W"---4?==?.LL#3GMMO3D!EF==?*NN#89

7==? ;<% . NN!(++  	LLB1#FGG	 NNs%   %E(G( (	H1HHH H%c                F    | j                  t        j                  ||      S )av  Retrieve messages from Kafka topic from the beginning.
        This method resets the consumer to the beginning and consumes messages.

             Args:
                 max_message_count: Maximum number of messages to consume.
                 max_time_sec:      Time limit in seconds to consume messages.
             Returns:
                 List of messages.
        rr   rs   rt   )'_KafkaChatMessageHistory__read_messagesr   r   rH   rs   rt   s      r   messages_from_beginningz/KafkaChatMessageHistory.messages_from_beginning  s+     ##2;;/% $ 
 	
r   c                F    | j                  t        j                  ||      S )a  Reset to the end offset. Try to consume messages if available.

        Args:
            max_message_count: Maximum number of messages to consume.
            max_time_sec:      Time limit in seconds to consume messages.
        Returns:
            List of messages.
        rz   )r{   r   r   r|   s      r   messages_from_latestz,KafkaChatMessageHistory.messages_from_latest*  s+     ##299/% $ 
 	
r   c                F    | j                  t        j                  ||      S )a  Retrieve messages from Kafka topic from the last consumed message.
        Please note this method is stateful. Internally, it uses Kafka consumer
        to consume messages, and maintains the commit offset.

          Args:
               max_message_count: Maximum number of messages to consume.
               max_time_sec:      Time limit in seconds to consume messages.
          Returns:
               List of messages.
        rz   )r{   r   r   r|   s      r   messages_from_last_consumedz3KafkaChatMessageHistory.messages_from_last_consumed<  s+     ##2@@/% $ 
 	
r   c                "    | j                         S )a  
        Retrieve the messages for the session, from Kafka topic continuously
        from last consumed message. This method is stateful and maintains
        consumed(committed) offset based on consumer group.
        Alternatively, use messages_from_last_consumed() with specified parameters.
        Use messages_from_beginning() to read from the earliest message.
        Use messages_from_latest() to read from the latest message.
        )r   rH   s    r   rR   z KafkaChatMessageHistory.messagesP  s     //11r   c                X   	 | j                   j                  | j                  g      }|j                         D ]  \  }}|j	                           t
        j                  d| j                   d       y# t        $ r,}t
        j                  d| j                   d|        |d}~ww xY w)z3Clear the chat history by deleting the Kafka topic.r!   z deletedzFailed to delete topic r%   N)	r3   delete_topicsrD   r1   r2   r+   r,   r-   r.   )rH   r9   r:   r;   r8   s        r   clearzKafkaChatMessageHistory.clear\  s    	''55t6GHG 1
KK& 1:; 	LL24??2C2aSIJG	s   A1A4 4	B)='B$$B)c                     y)zNRelease the resources.
        Nothing to be released at this moment.
        Nr   r   s    r   rq   zKafkaChatMessageHistory.closeg  s     	r   N)
rD   r/   rE   r/   r6   intr#   r   r5   r   )      @)rR   zSequence[BaseMessage]rS   floatrj   rk   )rr   r   rs   Optional[int]rt   Optional[float]rj   List[BaseMessage])   r   )rs   r   rt   r   rj   r   )rj   r   )rj   rk   )r   r   r   r   DEFAULT_TTL_MSDEFAULT_REPLICATION_FACTORDEFAULT_PARTITIONrI   rV   r{   r}   r   r   propertyrR   r   rq   r   r   r   r>   r>   V   s0   8| %"<*$P$P $P 	$P
  $P $PR (+'  % 
	&L,/L, )L, &	L,
 
L,^ UX
!.
BQ
	
& UX
!.
BQ
	
& UX
!.
BQ
	
( 	2 	2	r   r>   )r3   r   r4   r/   r#   r   r5   r   r6   r   rj   r   )!r   
__future__r   rN   loggingrm   enumr   typingr   r   r   r   langchain_core.chat_historyr	   langchain_core.messagesr
   r   r   rA   r   r&   r   	getLoggerr   r+   rF   r   r   r   r   r<   r>   r   r   r   <module>r      s   
 #     : : > T T.1			8	$.   	4 	))) ) 	)
 ) 	)XU4 Ur   