blob: 7db164befebf299157e766d82a1348a605a1aa53 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { HttpClient } from '@angular/common/http';
import { Injectable } from '@angular/core';
import { Observable, Subject } from 'rxjs';
import { Message } from '../pojo/Message';
import { LocalStorageService } from './local-storage.service';
export interface ChatMessage {
content: string;
role: 'user' | 'assistant';
timestamp: Date;
}
export interface ConversationDto {
conversationId: string;
createdAt: Date;
updatedAt: Date;
messages: ChatMessage[];
}
export interface ChatRequestContext {
message: string;
conversationId?: string;
}
const chat_uri = '/chat';
@Injectable({
providedIn: 'root'
})
export class AiChatService {
constructor(private http: HttpClient, private localStorageService: LocalStorageService) {}
/**
* Create a new conversation
*/
createConversation(): Observable<Message<ConversationDto>> {
return this.http.post<Message<ConversationDto>>(`${chat_uri}/conversations`, {});
}
/**
* Get all conversations
*/
getConversations(): Observable<Message<ConversationDto[]>> {
return this.http.get<Message<ConversationDto[]>>(`${chat_uri}/conversations`);
}
/**
* Get a specific conversation with its history
*/
getConversation(conversationId: string): Observable<Message<ConversationDto>> {
return this.http.get<Message<ConversationDto>>(`${chat_uri}/conversations/${conversationId}`);
}
/**
* Delete a conversation
*/
deleteConversation(conversationId: string): Observable<Message<void>> {
return this.http.delete<Message<void>>(`${chat_uri}/conversations/${conversationId}`);
}
/**
* Send a message and get streaming response
*/
streamChat(message: string, conversationId?: string): Observable<ChatMessage> {
const responseSubject = new Subject<ChatMessage>();
const requestBody: ChatRequestContext = { message };
if (conversationId) {
requestBody.conversationId = conversationId;
}
const token = this.localStorageService.getAuthorizationToken();
const headers: Record<string, string> = {
'Content-Type': 'application/json',
Accept: 'text/event-stream',
'Cache-Control': 'no-cache'
};
// Add Authorization header like the interceptor does
if (token) {
headers['Authorization'] = `Bearer ${token}`;
}
// Use fetch for SSE streaming (HttpClient doesn't support true streaming)
fetch(`/api${chat_uri}/stream`, {
method: 'POST',
headers,
body: JSON.stringify(requestBody)
})
.then(response => {
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const reader = response.body?.getReader();
if (!reader) {
throw new Error('No reader available');
}
const decoder = new TextDecoder();
let buffer = '';
function readStream(): Promise<void> {
if (!reader) {
return Promise.resolve();
}
return reader.read().then(({ value, done }) => {
if (done) {
responseSubject.complete();
return;
}
const chunk = decoder.decode(value, { stream: true });
buffer += chunk;
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
const trimmedLine = line.trim();
if (trimmedLine.startsWith('data:')) {
const jsonStr = trimmedLine.substring(5).trim();
if (jsonStr && jsonStr !== '[DONE]') {
try {
const data = JSON.parse(jsonStr);
if (data.response !== undefined) {
responseSubject.next({
content: data.response || '',
role: 'assistant',
timestamp: data.timestamp ? new Date(data.timestamp) : new Date()
});
}
} catch (parseError) {
console.error('Error parsing SSE data:', parseError, 'Raw data:', jsonStr);
if (jsonStr) {
responseSubject.next({
content: jsonStr,
role: 'assistant',
timestamp: new Date()
});
}
}
}
}
}
return readStream();
});
}
return readStream();
})
.catch(error => {
console.error('Chat stream error:', error);
responseSubject.error(error);
});
return responseSubject.asObservable();
}
}