Потоки — это функции с некоторыми дополнительными характеристиками: они строго типизированы, потокоемки, локально и удаленно вызываются и полностью наблюдаемы. Firebase Genkit предоставляет инструменты CLI и пользовательского интерфейса разработчика для работы с потоками (запуск, отладка и т. д.).
Определение потоков
import { defineFlow } from '@genkit-ai/flow';
export const menuSuggestionFlow = defineFlow(
{
name: 'menuSuggestionFlow',
},
async (restaurantTheme) => {
const suggestion = makeMenuItemSuggestion(restaurantTheme);
return suggestion;
}
);
Схемы ввода и вывода для потоков можно определить с помощью zod
.
import { defineFlow } from '@genkit-ai/flow';
import * as z from 'zod';
export const menuSuggestionFlow = defineFlow(
{
name: 'menuSuggestionFlow',
inputSchema: z.string(),
outputSchema: z.string(),
},
async (restaurantTheme) => {
const suggestion = makeMenuItemSuggestion(input.restaurantTheme);
return suggestion;
}
);
Если с��ема указана, Genkit проверит схему на входные и выходные данные.
Запуск потоков
Используйте функцию runFlow
для запуска потока:
const response = await runFlow(menuSuggestionFlow, 'French');
Вы также можете использовать CLI для запуска потоков:
genkit flow:run menuSuggestionFlow '"French"'
Потоковое
Вот простой пример потока, который может передавать значения из потока:
export const menuSuggestionFlow = defineFlow(
{
name: 'menuSuggestionFlow',
streamSchema: z.string(),
},
async (restaurantTheme, streamingCallback) => {
if (streamingCallback) {
makeMenuItemSuggestionsAsync(restaurantTheme).subscribe((suggestion) => {
streamingCallback(suggestion);
});
}
}
);
Обратите внимание, что значение streamingCallback
может быть неопределенным. Он определяется только в том случае, если вызывающий клиент запрашивает потоковый ответ.
Чтобы вызвать поток в потоковом режиме, используйте streamFlow
:
const response = streamFlow(menuSuggestionFlow, 'French');
for await (const suggestion of response.stream()) {
console.log('suggestion', suggestion);
}
Если поток не реализует потоковую передачу, streamFlow
будет вести себя идентично runFlow
.
Вы также можете использовать CLI для потоковой передачи потоков:
genkit flow:run menuSuggestionFlow '"French"' -s
Развертывание потоков
Если вы хотите иметь доступ к своему потоку через HTTP, вам необходимо сначала его развернуть. Genkit обеспечивает интеграцию облачных функций для хостов Firebase и Express.js, таких как Cloud Run.
Развернутые потоки поддерживают все те же функции, что и локальные потоки (например, потоковую передачу и наблюдаемость).
Облачная функция для Firebase
Чтобы использовать потоки с облачными функциями для Firebase, используйте плагин firebase
, замените defineFlow
на onFlow
и включите authPolicy
.
import { onFlow } from '@genkit-ai/firebase/functions';
import { firebaseAuth } from '@genkit-ai/firebase/auth';
export const menuSuggestionFlow = onFlow(
{
name: 'menuSuggestionFlow',
authPolicy: firebaseAuth((user) => {
if (!user.email_verified) {
throw new Error("Verified email required to run flow");
}
}
},
async (restaurantTheme) => {
// ....
}
);
Экспресс.js
Чтобы развернуть потоки с помощью Cloud Run и подобных сервис��в, определите свои потоки с помощью defineFlow
, а затем вызовите startFlowsServer()
:
import { defineFlow, startFlowsServer } from '@genkit-ai/flow';
export const menuSuggestionFlow = defineFlow(
{
name: 'menuSuggestionFlow',
},
async (restaurantTheme) => {
// ....
}
);
startFlowsServer();
По умолчанию startFlowsServer
будет обслуживать все потоки, которые вы определили в своей кодовой базе, как конечные точки HTTP (например http://localhost:3400/menuSuggestionFlow
).
Вы можете выбрать, какие потоки будут доступны через сервер потоков. Вы можете указать собственный порт (он будет использовать переменную среды PORT
, если она установлена). Вы также можете установить настройки CORS.
import { defineFlow, startFlowsServer } from '@genkit-ai/flow';
export const flowA = defineFlow({ name: 'flowA' }, async (subject) => {
// ....
});
export const flowB = defineFlow({ name: 'flowB' }, async (subject) => {
// ....
});
startFlowsServer({
flows: [flowB],
port: 4567,
cors: {
origin: '*',
},
});
Наблюдаемость потока
Иногда при использовании сторонних SDK, не предназначенных для наблюдения, вы можете захотеть увидеть их как отдельный шаг трассировки в пользовательском интерфейсе разработчика. Все, что вам нужно сделать, это обернуть код в функцию run
.
import { defineFlow, run } from '@genkit-ai/flow';
export const menuSuggestionFlow = defineFlow(
{
name: 'menuSuggestionFlow',
outputSchema: z.array(s.string()),
},
async (restaurantTheme) => {
const themes = await run('find-similar-themes', async () => {
return await findSimilarRestaurantThemes(restaurantTheme);
});
const suggestions = makeMenuItemSuggestions(themes);
return suggestions;
}
);