Przepływy

Przepływy to funkcje, które mają dodatkowe cechy: są ściśle wpisywane, strumieniowane, wywoływane lokalnie i zdalnie oraz w pełni wykrywalne. Firebase Genkit udostępnia interfejs wiersza poleceń i narzędzia interfejsu dla programistów do pracy z przepływami (uruchamiania, debugowania itp.).

Definiowanie przepływów

import { defineFlow } from '@genkit-ai/flow';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
  },
  async (restaurantTheme) => {
    const suggestion = makeMenuItemSuggestion(restaurantTheme);

    return suggestion;
  }
);

Schematy wejściowe i wyjściowe dla przepływów można definiować za pomocą zod.

import { defineFlow } from '@genkit-ai/flow';
import * as z from 'zod';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
    inputSchema: z.string(),
    outputSchema: z.string(),
  },
  async (restaurantTheme) => {
    const suggestion = makeMenuItemSuggestion(input.restaurantTheme);

    return suggestion;
  }
);

Jeśli określisz schemat, Genkit zweryfikuje go pod kątem danych wejściowych i wyjściowych.

Uruchomione przepływy

Użyj funkcji runFlow, aby uruchomić przepływ:

const response = await runFlow(menuSuggestionFlow, 'French');

Za pomocą interfejsu wiersza poleceń możesz też uruchamiać przepływy:

genkit flow:run menuSuggestionFlow '"French"'

Transmisja zakończona

Oto prosty przykład przepływu, który może przesyłać strumieniowo wartości z przepływu:

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
    streamSchema: z.string(),
  },
  async (restaurantTheme, streamingCallback) => {
    if (streamingCallback) {
      makeMenuItemSuggestionsAsync(restaurantTheme).subscribe((suggestion) => {
        streamingCallback(suggestion);
      });
    }
  }
);

Pamiętaj, że pole streamingCallback może nie być zdefiniowane. Jest ona definiowana tylko wtedy, gdy klient wywołujący żąda odpowiedzi przesyłanej strumieniowo.

Aby wywołać przepływ w trybie strumieniowania, użyj funkcji streamFlow:

const response = streamFlow(menuSuggestionFlow, 'French');

for await (const suggestion of response.stream()) {
  console.log('suggestion', suggestion);
}

Jeśli ten proces nie zawiera implementacji strumieniowego przesyłania danych, streamFlow będzie działać tak samo jak runFlow.

Za pomocą interfejsu wiersza poleceń możesz też strumieniować przepływy:

genkit flow:run menuSuggestionFlow '"French"' -s

Wdrażanie przepływów

Jeśli chcesz mieć dostęp do przepływu przez HTTP, musisz go najpierw wdrożyć. Genkit zapewnia integrację z hostami w Cloud Functions dla Firebase i hostami Express.js, takimi jak Cloud Run.

Wdrożone przepływy obsługują te same funkcje co przepływy lokalne (np. strumieniowanie i dostrzegalność).

Funkcja w Cloud Functions dla Firebase

Aby używać przepływów w Cloud Functions dla Firebase, użyj wtyczki firebase, zastąp defineFlow elementem onFlow i dodaj authPolicy.

import { onFlow } from '@genkit-ai/firebase/functions';
import { firebaseAuth } from '@genkit-ai/firebase/auth';

export const menuSuggestionFlow = onFlow(
  {
    name: 'menuSuggestionFlow',
    authPolicy: firebaseAuth((user) => {
      if (!user.email_verified) {
        throw new Error("Verified email required to run flow");
      }
    }
  },
  async (restaurantTheme) => {
    // ....
  }
);

Plik Express.js

Aby wdrożyć przepływy za pomocą Cloud Run lub podobnych usług, zdefiniuj przepływy za pomocą polecenia defineFlow, a następnie wywołaj startFlowsServer():

import { defineFlow, startFlowsServer } from '@genkit-ai/flow';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
  },
  async (restaurantTheme) => {
    // ....
  }
);

startFlowsServer();

Domyślnie startFlowsServer będzie obsługiwać wszystkie przepływy zdefiniowane w bazie kodu jako punkty końcowe HTTP (np. http://localhost:3400/menuSuggestionFlow).

Możesz wybrać, które przepływy będą udostępniane przez serwer przepływów. Możesz podać port niestandardowy (będzie on używać zmiennej środowiskowej PORT, jeśli jest ustawiona). Możesz też skonfigurować ustawienia CORS.

import { defineFlow, startFlowsServer } from '@genkit-ai/flow';

export const flowA = defineFlow({ name: 'flowA' }, async (subject) => {
  // ....
});

export const flowB = defineFlow({ name: 'flowB' }, async (subject) => {
  // ....
});

startFlowsServer({
  flows: [flowB],
  port: 4567,
  cors: {
    origin: '*',
  },
});

Dostrzegalność przepływu

Czasami, gdy używasz zewnętrznych pakietów SDK, które nie są przystosowane do dostrzegalności, możesz chcieć zobaczyć je jako osobny krok śledzenia w interfejsie dewelopera. Wystarczy spakować kod do funkcji run.

import { defineFlow, run } from '@genkit-ai/flow';

export const menuSuggestionFlow = defineFlow(
  {
    name: 'menuSuggestionFlow',
    outputSchema: z.array(s.string()),
  },
  async (restaurantTheme) => {
    const themes = await run('find-similar-themes', async () => {
      return await findSimilarRestaurantThemes(restaurantTheme);
    });

    const suggestions = makeMenuItemSuggestions(themes);

    return suggestions;
  }
);