Потоки

Потоки — это функции с некоторыми дополнительными характеристиками: они строго типизированы, потокоемки, локально и удаленно вызываются и полностью наблюдаемы. Firebase Genkit предоставляет инструменты CLI и пользовательского интерфейса разработчика для работы с потоками (запуск, отладка и т. д.).

Определение потоков

import { defineFlow } from '@genkit-ai/flow';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
  },
  async (restaurantTheme) => {
    const suggestion = makeMenuItemSuggestion(restaurantTheme);

    return suggestion;
  }
);

Схемы ввода и вывода для потоков можно определить с помощью zod .

import { defineFlow } from '@genkit-ai/flow';
import * as z from 'zod';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
    inputSchema: z.string(),
    outputSchema: z.string(),
  },
  async (restaurantTheme) => {
    const suggestion = makeMenuItemSuggestion(input.restaurantTheme);

    return suggestion;
  }
);

Если с��ема указана, Genkit проверит схему на входные и выходные данные.

Запуск потоков

Используйте функцию runFlow для запуска потока:

const response = await runFlow(menuSuggestionFlow, 'French');

Вы также можете использовать CLI для запуска потоков:

genkit flow:run menuSuggestionFlow '"French"'

Потоковое

Вот простой пример потока, который может передавать значения из потока:

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
    streamSchema: z.string(),
  },
  async (restaurantTheme, streamingCallback) => {
    if (streamingCallback) {
      makeMenuItemSuggestionsAsync(restaurantTheme).subscribe((suggestion) => {
        streamingCallback(suggestion);
      });
    }
  }
);

Обратите внимание, что значение streamingCallback может быть неопределенным. Он определяется только в том случае, если вызывающий клиент запрашивает потоковый ответ.

Чтобы вызвать поток в потоковом режиме, используйте streamFlow :

const response = streamFlow(menuSuggestionFlow, 'French');

for await (const suggestion of response.stream()) {
  console.log('suggestion', suggestion);
}

Если поток не реализует потоковую передачу, streamFlow будет вести себя идентично runFlow .

Вы также можете использовать CLI для потоковой передачи потоков:

genkit flow:run menuSuggestionFlow '"French"' -s

Развертывание потоков

Если вы хотите иметь доступ к своему потоку через HTTP, вам необходимо сначала его развернуть. Genkit обеспечивает интеграцию облачных функций для хостов Firebase и Express.js, таких как Cloud Run.

Развернутые потоки поддерживают все те же функции, что и локальные потоки (например, потоковую передачу и наблюдаемость).

Облачная функция для Firebase

Чтобы использовать потоки с облачными функциями для Firebase, используйте плагин firebase , замените defineFlow на onFlow и включите authPolicy .

import { onFlow } from '@genkit-ai/firebase/functions';
import { firebaseAuth } from '@genkit-ai/firebase/auth';

export const menuSuggestionFlow = onFlow(
  {
    name: 'menuSuggestionFlow',
    authPolicy: firebaseAuth((user) => {
      if (!user.email_verified) {
        throw new Error("Verified email required to run flow");
      }
    }
  },
  async (restaurantTheme) => {
    // ....
  }
);

Экспресс.js

Чтобы развернуть потоки с помощью Cloud Run и подобных сервис��в, определите свои потоки с помощью defineFlow , а затем вызовите startFlowsServer() :

import { defineFlow, startFlowsServer } from '@genkit-ai/flow';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
  },
  async (restaurantTheme) => {
    // ....
  }
);

startFlowsServer();

По умолчанию startFlowsServer будет обслуживать все потоки, которые вы определили в своей кодовой базе, как конечные точки HTTP (например http://localhost:3400/menuSuggestionFlow ).

Вы можете выбрать, какие потоки будут доступны через сервер потоков. Вы можете указать собственный порт (он будет использовать переменную среды PORT , если она установлена). Вы также можете установить настройки CORS.

import { defineFlow, startFlowsServer } from '@genkit-ai/flow';

export const flowA = defineFlow({ name: 'flowA' }, async (subject) => {
  // ....
});

export const flowB = defineFlow({ name: 'flowB' }, async (subject) => {
  // ....
});

startFlowsServer({
  flows: [flowB],
  port: 4567,
  cors: {
    origin: '*',
  },
});

Наблюдаемость потока

Иногда при использовании сторонних SDK, не предназначенных для наблюдения, вы можете захотеть увидеть их как отдельный шаг трассировки в пользовательском интерфейсе разработчика. Все, что вам нужно сделать, это обернуть код в функцию run .

import { defineFlow, run } from '@genkit-ai/flow';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
    outputSchema: z.array(s.string()),
  },
  async (restaurantTheme) => {
    const themes = await run('find-similar-themes', async () => {
      return await findSimilarRestaurantThemes(restaurantTheme);
    });

    const suggestions = makeMenuItemSuggestions(themes);

    return suggestions;
  }
);