Los flujos son funciones con algunas características adicionales: tienen tipado fuerte, se pueden transmitir, se pueden llamar de forma local y remota, y son totalmente observables. Firebase Genkit proporciona herramientas de la CLI y la IU para desarrolladores para trabajar con flujos (ejecutar, depurar, etcétera).
Definición de flujos
import { defineFlow } from '@genkit-ai/flow';
export const menuSuggestionFlow = defineFlow(
{
name: 'menuSuggestionFlow',
},
async (restaurantTheme) => {
const suggestion = makeMenuItemSuggestion(restaurantTheme);
return suggestion;
}
);
Los esquemas de entrada y salida para los flujos se pueden definir con zod
.
import { defineFlow } from '@genkit-ai/flow';
import * as z from 'zod';
export const menuSuggestionFlow = defineFlow(
{
name: 'menuSuggestionFlow',
inputSchema: z.string(),
outputSchema: z.string(),
},
async (restaurantTheme) => {
const suggestion = makeMenuItemSuggestion(input.restaurantTheme);
return suggestion;
}
);
Cuando se especifica el esquema, Genkit valida el esquema para las entradas y salidas.
Flujos en ejecución
Usa la función runFlow
para ejecutar el flujo:
const response = await runFlow(menuSuggestionFlow, 'French');
También puedes usar la CLI para ejecutar flujos:
genkit flow:run menuSuggestionFlow '"French"'
Contenido transmitido
Este es un ejemplo sencillo de un flujo que puede transmitir valores desde un flujo:
export const menuSuggestionFlow = defineFlow(
{
name: 'menuSuggestionFlow',
streamSchema: z.string(),
},
async (restaurantTheme, streamingCallback) => {
if (streamingCallback) {
makeMenuItemSuggestionsAsync(restaurantTheme).subscribe((suggestion) => {
streamingCallback(suggestion);
});
}
}
);
Ten en cuenta que streamingCallback
puede no estar definido. Solo se define si el cliente que realiza la invocación solicita una respuesta transmitida.
Para invocar un flujo en modo de transmisión, usa la función streamFlow
:
const response = streamFlow(menuSuggestionFlow, 'French');
for await (const suggestion of response.stream()) {
console.log('suggestion', suggestion);
}
Si el flujo no implementa la transmisión, streamFlow
se comportará de manera idéntica a runFlow
.
También puedes usar la CLI para transmitir flujos:
genkit flow:run menuSuggestionFlow '"French"' -s
Implementa flujos
Si deseas tener acceso a tu flujo a través de HTTP, primero debes implementarlo. Genkit proporciona integraciones para Cloud Functions para Firebase y hosts de Express.js, como Cloud Run.
Los flujos implementados admiten las mismas características que los flujos locales (como la transmisión y la observabilidad).
Cloud Function para Firebase
Para usar flujos con Cloud Functions para Firebase, utiliza el complemento firebase
, reemplaza defineFlow
por onFlow
y, luego, incluye un authPolicy
.
import { onFlow } from '@genkit-ai/firebase/functions';
import { firebaseAuth } from '@genkit-ai/firebase/auth';
export const menuSuggestionFlow = onFlow(
{
name: 'menuSuggestionFlow',
authPolicy: firebaseAuth((user) => {
if (!user.email_verified) {
throw new Error("Verified email required to run flow");
}
}
},
async (restaurantTheme) => {
// ....
}
);
Express.js
Para implementar flujos con Cloud Run y servicios similares, define los flujos con defineFlow
y, luego, llama a startFlowsServer()
:
import { defineFlow, startFlowsServer } from '@genkit-ai/flow';
export const menuSuggestionFlow = defineFlow(
{
name: 'menuSuggestionFlow',
},
async (restaurantTheme) => {
// ....
}
);
startFlowsServer();
De forma predeterminada, startFlowsServer
entregará todos los flujos que definiste en tu base de código como extremos HTTP (p.ej., http://localhost:3400/menuSuggestionFlow
).
Puedes elegir qué flujos se exponen a través del servidor de flujos. Puedes especificar un puerto personalizado (usará la variable de entorno PORT
si se configuró). También puedes configurar CORS.
import { defineFlow, startFlowsServer } from '@genkit-ai/flow';
export const flowA = defineFlow({ name: 'flowA' }, async (subject) => {
// ....
});
export const flowB = defineFlow({ name: 'flowB' }, async (subject) => {
// ....
});
startFlowsServer({
flows: [flowB],
port: 4567,
cors: {
origin: '*',
},
});
Observabilidad del flujo
A veces, cuando usas SDKs de terceros que no están instrumentados para la observabilidad, es posible que desees verlos como un paso de seguimiento independiente en la IU para desarrolladores. Lo único que debes hacer es unir el código en la función run
.
import { defineFlow, run } from '@genkit-ai/flow';
export const menuSuggestionFlow = defineFlow(
{
name: 'menuSuggestionFlow',
outputSchema: z.array(s.string()),
},
async (restaurantTheme) => {
const themes = await run('find-similar-themes', async () => {
return await findSimilarRestaurantThemes(restaurantTheme);
});
const suggestions = makeMenuItemSuggestions(themes);
return suggestions;
}
);