Flujos

Los flujos son funciones con algunas características adicionales: tienen tipado fuerte, se pueden transmitir, se pueden llamar de forma local y remota, y son totalmente observables. Firebase Genkit proporciona herramientas de la CLI y la IU para desarrolladores para trabajar con flujos (ejecutar, depurar, etcétera).

Definición de flujos

import { defineFlow } from '@genkit-ai/flow';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
  },
  async (restaurantTheme) => {
    const suggestion = makeMenuItemSuggestion(restaurantTheme);

    return suggestion;
  }
);

Los esquemas de entrada y salida para los flujos se pueden definir con zod.

import { defineFlow } from '@genkit-ai/flow';
import * as z from 'zod';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
    inputSchema: z.string(),
    outputSchema: z.string(),
  },
  async (restaurantTheme) => {
    const suggestion = makeMenuItemSuggestion(input.restaurantTheme);

    return suggestion;
  }
);

Cuando se especifica el esquema, Genkit valida el esquema para las entradas y salidas.

Flujos en ejecución

Usa la función runFlow para ejecutar el flujo:

const response = await runFlow(menuSuggestionFlow, 'French');

También puedes usar la CLI para ejecutar flujos:

genkit flow:run menuSuggestionFlow '"French"'

Contenido transmitido

Este es un ejemplo sencillo de un flujo que puede transmitir valores desde un flujo:

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
    streamSchema: z.string(),
  },
  async (restaurantTheme, streamingCallback) => {
    if (streamingCallback) {
      makeMenuItemSuggestionsAsync(restaurantTheme).subscribe((suggestion) => {
        streamingCallback(suggestion);
      });
    }
  }
);

Ten en cuenta que streamingCallback puede no estar definido. Solo se define si el cliente que realiza la invocación solicita una respuesta transmitida.

Para invocar un flujo en modo de transmisión, usa la función streamFlow:

const response = streamFlow(menuSuggestionFlow, 'French');

for await (const suggestion of response.stream()) {
  console.log('suggestion', suggestion);
}

Si el flujo no implementa la transmisión, streamFlow se comportará de manera idéntica a runFlow.

También puedes usar la CLI para transmitir flujos:

genkit flow:run menuSuggestionFlow '"French"' -s

Implementa flujos

Si deseas tener acceso a tu flujo a través de HTTP, primero debes implementarlo. Genkit proporciona integraciones para Cloud Functions para Firebase y hosts de Express.js, como Cloud Run.

Los flujos implementados admiten las mismas características que los flujos locales (como la transmisión y la observabilidad).

Cloud Function para Firebase

Para usar flujos con Cloud Functions para Firebase, utiliza el complemento firebase, reemplaza defineFlow por onFlow y, luego, incluye un authPolicy.

import { onFlow } from '@genkit-ai/firebase/functions';
import { firebaseAuth } from '@genkit-ai/firebase/auth';

export const menuSuggestionFlow = onFlow(
  {
    name: 'menuSuggestionFlow',
    authPolicy: firebaseAuth((user) => {
      if (!user.email_verified) {
        throw new Error("Verified email required to run flow");
      }
    }
  },
  async (restaurantTheme) => {
    // ....
  }
);

Express.js

Para implementar flujos con Cloud Run y servicios similares, define los flujos con defineFlow y, luego, llama a startFlowsServer():

import { defineFlow, startFlowsServer } from '@genkit-ai/flow';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
  },
  async (restaurantTheme) => {
    // ....
  }
);

startFlowsServer();

De forma predeterminada, startFlowsServer entregará todos los flujos que definiste en tu base de código como extremos HTTP (p.ej., http://localhost:3400/menuSuggestionFlow).

Puedes elegir qué flujos se exponen a través del servidor de flujos. Puedes especificar un puerto personalizado (usará la variable de entorno PORT si se configuró). También puedes configurar CORS.

import { defineFlow, startFlowsServer } from '@genkit-ai/flow';

export const flowA = defineFlow({ name: 'flowA' }, async (subject) => {
  // ....
});

export const flowB = defineFlow({ name: 'flowB' }, async (subject) => {
  // ....
});

startFlowsServer({
  flows: [flowB],
  port: 4567,
  cors: {
    origin: '*',
  },
});

Observabilidad del flujo

A veces, cuando usas SDKs de terceros que no están instrumentados para la observabilidad, es posible que desees verlos como un paso de seguimiento independiente en la IU para desarrolladores. Lo único que debes hacer es unir el código en la función run.

import { defineFlow, run } from '@genkit-ai/flow';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
    outputSchema: z.array(s.string()),
  },
  async (restaurantTheme) => {
    const themes = await run('find-similar-themes', async () => {
      return await findSimilarRestaurantThemes(restaurantTheme);
    });

    const suggestions = makeMenuItemSuggestions(themes);

    return suggestions;
  }
);