diff --git a/apps/indexer-coordinator/package.json b/apps/indexer-coordinator/package.json index f3e0579b5..0fe9818e5 100644 --- a/apps/indexer-coordinator/package.json +++ b/apps/indexer-coordinator/package.json @@ -1,6 +1,6 @@ { "name": "@subql/indexer-coordinator", - "version": "2.2.4-0", + "version": "2.2.4-3", "description": "", "author": "SubQuery", "license": "Apache-2.0", diff --git a/apps/indexer-coordinator/src/app.module.ts b/apps/indexer-coordinator/src/app.module.ts index dcaf10da7..d4bc7cbfb 100644 --- a/apps/indexer-coordinator/src/app.module.ts +++ b/apps/indexer-coordinator/src/app.module.ts @@ -28,6 +28,7 @@ import { ProjectModule } from './project/project.module'; import { RewardModule } from './reward/reward.module'; import { StatsModule } from './stats/stats.module'; import { SubscriptionModule } from './subscription/subscription.module'; +import { IntegrationModule } from './integration/integration.module'; @Module({ imports: [ @@ -81,6 +82,7 @@ import { SubscriptionModule } from './subscription/subscription.module'; NetworkModule, RewardModule, ConfigModule, + IntegrationModule, ], controllers: [AdminController, AgreementController, MonitorController], }) diff --git a/apps/indexer-coordinator/src/core/types.ts b/apps/indexer-coordinator/src/core/types.ts index 081b75e4e..f7a89a9c0 100644 --- a/apps/indexer-coordinator/src/core/types.ts +++ b/apps/indexer-coordinator/src/core/types.ts @@ -6,6 +6,9 @@ import { BigNumber, ContractTransaction, Overrides } from 'ethers'; export enum DesiredStatus { STOPPED, RUNNING, + + // LLM + PULLING, } export enum IndexerDeploymentStatus { diff --git a/apps/indexer-coordinator/src/integration/integration.model.ts b/apps/indexer-coordinator/src/integration/integration.model.ts new file mode 100644 index 000000000..770528dff --- /dev/null +++ b/apps/indexer-coordinator/src/integration/integration.model.ts @@ -0,0 +1,56 @@ +// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: Apache-2.0 + +import { ID, Field, ObjectType } from '@nestjs/graphql'; +import { + Column, + Entity, + Index, + PrimaryGeneratedColumn, + CreateDateColumn, + UpdateDateColumn, +} from 'typeorm'; +import { SeviceEndpoint } from '../project/project.model'; +import { IntegrationType, LLMModel } from '../project/types'; + +@Entity('integration') +@Index(['title'], { unique: true }) +@ObjectType() +export class IntegrationEntity { + @PrimaryGeneratedColumn('increment') + @Field(() => ID, { nullable: true }) + id: number; + + @Column({ type: 'varchar', length: 50 }) + @Field() + title: string; + + @Column() + @Field() + type: IntegrationType; + + @Column('jsonb', { default: {} }) + @Field(() => [SeviceEndpoint], { nullable: true }) + serviceEndpoints: SeviceEndpoint[]; + + @Field(() => [LLMModel], { nullable: true }) + models: LLMModel[]; + + @Column({ type: 'boolean', default: false }) + @Field() + enabled: boolean; + + @Column('jsonb', { default: {} }) + config: any; + + @Column('jsonb', { default: {} }) + extra: any; + + @CreateDateColumn({ type: 'timestamp', default: () => 'CURRENT_TIMESTAMP' }) + @Field({ nullable: true }) + created_at: Date; + + @UpdateDateColumn({ type: 'timestamp', default: () => 'CURRENT_TIMESTAMP' }) + @Field({ nullable: true }) + updated_at: Date; +} diff --git a/apps/indexer-coordinator/src/integration/integration.module.ts b/apps/indexer-coordinator/src/integration/integration.module.ts new file mode 100644 index 000000000..56359d3ef --- /dev/null +++ b/apps/indexer-coordinator/src/integration/integration.module.ts @@ -0,0 +1,20 @@ +// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: Apache-2.0 + +import { Module } from '@nestjs/common'; +import { TypeOrmModule } from '@nestjs/typeorm'; +import { ProjectModule } from '../project/project.module'; +import { IntegrationEntity } from './integration.model'; +import { IntegrationResolver } from './integration.resolver'; +import { IntegrationService } from './integration.service'; + +@Module({ + imports: [ + TypeOrmModule.forFeature([IntegrationEntity]), + ProjectModule + ], + providers: [IntegrationService, IntegrationResolver], + exports: [IntegrationService], +}) +@Module({}) +export class IntegrationModule {} diff --git a/apps/indexer-coordinator/src/integration/integration.resolver.ts b/apps/indexer-coordinator/src/integration/integration.resolver.ts new file mode 100644 index 000000000..59146019a --- /dev/null +++ b/apps/indexer-coordinator/src/integration/integration.resolver.ts @@ -0,0 +1,69 @@ +// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: Apache-2.0 + +import { Args, Mutation, Query, Resolver } from '@nestjs/graphql'; +import { SeviceEndpoint } from '../project/project.model'; +import { + IntegrationType, + LLMConfig, + LLMExtra, + LLMOngoingStreamRequestMeta, +} from '../project/types'; +import { IntegrationEntity } from './integration.model'; +import { IntegrationService } from './integration.service'; + +@Resolver(() => IntegrationEntity) +export class IntegrationResolver { + constructor(private integrationService: IntegrationService) {} + + @Query(() => [IntegrationEntity]) + async allIntegration(): Promise { + return await this.integrationService.getAll(); + } + + @Mutation(() => IntegrationEntity) + addIntegration( + @Args('title') title: string, + @Args('type') type: IntegrationType, + @Args('serviceEndpoints', { type: () => [SeviceEndpoint] }) + serviceEndpoints: SeviceEndpoint[], + + @Args('config', { nullable: true }) config?: LLMConfig, + @Args('extra', { nullable: true }) extra?: LLMExtra + ): Promise { + return this.integrationService.create(title, type, serviceEndpoints, config, extra); + } + + @Mutation(() => IntegrationEntity) + updateIntegration( + @Args('id') id: number, + @Args('title') title: string, + @Args('serviceEndpoints', { type: () => [SeviceEndpoint] }) + serviceEndpoints: SeviceEndpoint[], + @Args('enabled') enabled: boolean, + @Args('config', { nullable: true }) config?: LLMConfig, + @Args('extra', { nullable: true }) extra?: LLMExtra + ): Promise { + return this.integrationService.update(id, title, serviceEndpoints, enabled, config, extra); + } + + @Mutation(() => IntegrationEntity) + deleteIntegration(@Args('id') id: number): Promise { + return this.integrationService.delete(id); + } + + @Mutation(() => IntegrationEntity) + deleteModel(@Args('id') id: number, @Args('name') name: string): Promise { + return this.integrationService.deleteModel(id, name); + } + + @Mutation(() => IntegrationEntity) + pullModel(@Args('id') id: number, @Args('name') name: string): Promise { + return this.integrationService.pullModel(id, name); + } + + @Query(() => [LLMOngoingStreamRequestMeta]) + inspectOngoingStreamedRequests(): LLMOngoingStreamRequestMeta[] { + return this.integrationService.inspectOngoingStreamedRequests(); + } +} diff --git a/apps/indexer-coordinator/src/integration/integration.service.ts b/apps/indexer-coordinator/src/integration/integration.service.ts new file mode 100644 index 000000000..f59748469 --- /dev/null +++ b/apps/indexer-coordinator/src/integration/integration.service.ts @@ -0,0 +1,147 @@ +// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: Apache-2.0 + +import { Injectable } from '@nestjs/common'; +import { InjectRepository } from '@nestjs/typeorm'; +import { Repository } from 'typeorm'; +import { ProjectLLMService } from '../project/project.llm.service'; +import { SeviceEndpoint, ValidationResponse } from '../project/project.model'; +import { IntegrationType, LLMConfig, LLMExtra, LLMModelPullResult, LLMOngoingStreamRequestMeta } from '../project/types'; +import { IntegrationEntity } from './integration.model'; + +@Injectable() +export class IntegrationService { + constructor( + @InjectRepository(IntegrationEntity) + private integrationRepo: Repository, + private projectLLMService: ProjectLLMService + ) {} + + async get(id: number): Promise { + return this.integrationRepo.findOne({ where: { id } }); + } + + async getAll(): Promise { + const integrations = await this.integrationRepo.find(); + + for (const it of integrations) { + if (it.type === IntegrationType.LLM && it.serviceEndpoints.length > 0) { + const models = await this.projectLLMService.getModels(it.serviceEndpoints[0].value); + it.models = models; + } + } + + return integrations; + } + + async create( + title: string, + type: IntegrationType, + serviceEndpoints: SeviceEndpoint[], + config?: LLMConfig, + extra?: LLMExtra + ): Promise { + let integration = await this.integrationRepo.findOne({ where: { title } }); + if (integration) { + throw new Error(`${title} already exist`); + } + + let validateResult: ValidationResponse = { valid: true, reason: '' }; + switch (type) { + case IntegrationType.LLM: + validateResult = await this.projectLLMService.validate(serviceEndpoints[0].value); + break; + default: + throw new Error('Unsupported integration type'); + } + + if (!validateResult.valid) { + throw new Error(validateResult.reason); + } + + integration = new IntegrationEntity(); + integration.title = title; + integration.type = type; + integration.enabled = true; + integration.serviceEndpoints = serviceEndpoints; + integration.config = config || {}; + integration.extra = extra || {}; + return this.integrationRepo.save(integration); + } + + async update( + id: number, + title: string, + serviceEndpoints: SeviceEndpoint[], + enabled: boolean, + config?: LLMConfig, + extra?: LLMExtra + ): Promise { + let integration = await this.integrationRepo.findOne({ where: { title } }); + if (integration && integration.id !== id) { + throw new Error(`${title} already exist`); + } + + integration = await this.integrationRepo.findOne({ where: { id } }); + if (!integration) { + throw new Error(`${id} not exist`); + } + + let validateResult: ValidationResponse = { valid: true, reason: '' }; + switch (integration.type) { + case IntegrationType.LLM: + validateResult = await this.projectLLMService.validate(serviceEndpoints[0].value); + break; + default: + throw new Error('Unsupported integration type'); + } + + if (!validateResult.valid) { + throw new Error(validateResult.reason); + } + + integration.title = title; + integration.enabled = true; + integration.serviceEndpoints = serviceEndpoints; + integration.enabled = enabled; + integration.config = config || {}; + integration.extra = extra || {}; + return this.integrationRepo.save(integration); + } + + async delete(id: number): Promise { + const integration = await this.integrationRepo.findOne({ where: { id } }); + if (!integration) return; + return this.integrationRepo.remove(integration); + } + + async deleteModel(id: number, name: string): Promise { + const integration = await this.integrationRepo.findOne({ where: { id } }); + if (!integration) { + throw new Error(`${id} not exist`); + } + if (integration.type !== IntegrationType.LLM) { + throw new Error(`${id} not supported`); + } + const host = integration.serviceEndpoints[0].value; + await this.projectLLMService.deleteModel(host, name); + return integration; + } + + async pullModel(id: number, name: string) { + const integration = await this.integrationRepo.findOne({ where: { id } }); + if (!integration) { + throw new Error(`${id} not exist`); + } + if (integration.type !== IntegrationType.LLM) { + throw new Error(`${id} not supported`); + } + const host = integration.serviceEndpoints[0].value; + this.projectLLMService.pullModel(host, name); + return integration; + } + + inspectOngoingStreamedRequests(): LLMOngoingStreamRequestMeta[] { + return this.projectLLMService.inspectOngoingStreamedRequests(); + } +} diff --git a/apps/indexer-coordinator/src/migration/1721982447267-add-integration-table.ts b/apps/indexer-coordinator/src/migration/1721982447267-add-integration-table.ts new file mode 100644 index 000000000..4df5d594b --- /dev/null +++ b/apps/indexer-coordinator/src/migration/1721982447267-add-integration-table.ts @@ -0,0 +1,22 @@ +// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: Apache-2.0 + +import { MigrationInterface, QueryRunner } from 'typeorm'; + +export class AddIntegrationTable1721982447267 implements MigrationInterface { + name = 'AddIntegrationTable1721982447267'; + + async up(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `CREATE TABLE "integration" ("id" SERIAL NOT NULL, "title" character varying(50) NOT NULL, "type" integer NOT NULL, "serviceEndpoints" jsonb NOT NULL DEFAULT '{}', "enabled" boolean NOT NULL DEFAULT false, "config" jsonb NOT NULL DEFAULT '{}', "extra" jsonb NOT NULL DEFAULT '{}', "created_at" TIMESTAMP NOT NULL DEFAULT now(), "updated_at" TIMESTAMP NOT NULL DEFAULT now(), CONSTRAINT "PK_f348d4694945d9dc4c7049a178a" PRIMARY KEY ("id"))` + ); + await queryRunner.query( + `CREATE UNIQUE INDEX "IDX_814dc61a29c5383dc90993603f" ON "integration" ("title") ` + ); + } + + async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`DROP INDEX "public"."IDX_814dc61a29c5383dc90993603f"`); + await queryRunner.query(`DROP TABLE "integration"`); + } +} diff --git a/apps/indexer-coordinator/src/project/project.llm.service.ts b/apps/indexer-coordinator/src/project/project.llm.service.ts new file mode 100644 index 000000000..b0252df9c --- /dev/null +++ b/apps/indexer-coordinator/src/project/project.llm.service.ts @@ -0,0 +1,355 @@ +// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: Apache-2.0 + +import { Injectable } from '@nestjs/common'; +import { InjectRepository } from '@nestjs/typeorm'; +import fetch from 'node-fetch'; +import { DesiredStatus } from 'src/core/types'; +import { getLogger } from 'src/utils/logger'; +import { AbortableAsyncIterator, Ollama, normalizeModelName } from 'src/utils/ollama'; +import { Repository } from 'typeorm'; +import { LLMManifest } from './project.manifest'; +import { + IProjectConfig, + MetadataType, + Project, + ProjectEntity, + SeviceEndpoint, + ValidationResponse, +} from './project.model'; +import { ProjectService } from './project.service'; +import { + LLMEndpointAccessType, + LLMEndpointType, + LLMModel, + LLMModelPullResult, + LLMModelStatus, +} from './types'; + +const logger = getLogger('project.llm.service'); + +async function fetchAdapter(input: string | URL | Request, init?: RequestInit): Promise { + let url: string; + if (input instanceof URL) { + url = input.toString(); + } else if (typeof input === 'string') { + url = input; + } else { + url = input.url; + } + + const r = await fetch(url, { + method: init.method, + headers: init?.headers as any, + body: init?.body as any, + }); + + // const t = await r.text(); + const sstream: NodeJS.ReadableStream = r.body; + + const dstream: ReadableStream = new ReadableStream({ + start(controller) { + sstream.on('readable', function () { + let data; + while ((data = this.read()) !== null) { + console.log('data:', data.toString()); + controller.enqueue(data); + } + }); + sstream.on('end', () => { + console.log('end'); + controller.close(); + }); + sstream.on('error', (err) => { + console.log('error:', err); + controller.error(err); + }); + }, + cancel(reason) { + if ((sstream as any).destroy) { + (sstream as any).destroy.destroy(new Error(reason)); + } + }, + }); + const res = new Response(dstream, { + headers: r.headers as any, + status: r.status, + statusText: r.statusText, + }); + return res; +} + +@Injectable() +export class ProjectLLMService { + private ongoingStreamedRequests: AbortableAsyncIterator[] = []; + + constructor( + @InjectRepository(ProjectEntity) private projectRepo: Repository, + private projectService: ProjectService + ) {} + + async startLLMProject( + id: string, + projectConfig: IProjectConfig, + rateLimit: number + ): Promise { + let project = await this.projectService.getProject(id); + if (!project) { + project = await this.projectService.addProject(id); + } + if (project.rateLimit !== rateLimit) { + project.rateLimit = rateLimit; + } + const host = projectConfig.serviceEndpoints[0].value; + const targetModel = (project.manifest as LLMManifest).model.name; + + try { + this.pullModel(host, targetModel); + } catch (err) { + logger.error(`startLLMProject id: ${id} failed: ${err.message}`); + } + + project.status = DesiredStatus.RUNNING; + project.projectConfig = projectConfig; + project.serviceEndpoints = [ + new SeviceEndpoint( + LLMEndpointType.ApiGenerateEndpoint, + this.nodeEndpoint(host, '/v1/chat/completions'), + LLMEndpointAccessType[LLMEndpointType.ApiGenerateEndpoint] + ), + + new SeviceEndpoint( + LLMEndpointType.AdminShowEndpoint, + this.nodeEndpoint(host, '/api/show'), + LLMEndpointAccessType[LLMEndpointType.AdminShowEndpoint] + ), + ]; + return await this.projectRepo.save(project); + } + + async stopLLMProject(id: string): Promise { + const project = await this.projectService.getProject(id); + if (!project) { + return; + } + project.status = DesiredStatus.STOPPED; + return this.projectRepo.save(project); + } + + async removeLLMProject(id: string): Promise { + const project = await this.projectService.getProject(id); + if (!project) return []; + // const manifest = project.manifest as LLMManifest; + // const targetModel = manifest?.model?.name; + + // const endpoints = project.projectConfig.serviceEndpoints; + // const host = endpoints[0]?.value; + + // if (host && targetModel) { + // await this.deleteModel(host, targetModel); + // } + const res = await this.projectRepo.remove([project]); + return res; + } + + async validate(host): Promise { + try { + const ollama = new Ollama({ host }); + await ollama.list(); + return { valid: true, reason: '' }; + } catch (err) { + logger.error(`validate llm host: ${host} failed: ${err.message}`); + return { valid: false, reason: err.message }; + } + } + + async getModels(host: string): Promise { + const res = []; + try { + host = new URL(host).toString(); + const ollama = new Ollama({ host }); + const downloadedModels = await ollama.list(); + const loadedModels = await ollama.ps(); + const pullingModels = this.getOnPullingModels(host); + + const loadedModelNames = loadedModels?.models?.map((m) => m.name); + + downloadedModels?.models?.forEach((m) => { + res.push({ + name: m.name, + size: m.size, + digest: m.digest, + status: loadedModelNames.find((lm) => lm === m.name) + ? LLMModelStatus.LOADED + : LLMModelStatus.NORMAL, + }); + }); + + pullingModels.forEach((m) => { + res.push({ name: m.name, status: LLMModelStatus.PULLING, pullStatus: m }); + }); + } catch (err) { + logger.error(`getModels host: ${host} failed: ${err.message}`); + } + return res; + } + + async deleteModel(host: string, model: string): Promise { + host = new URL(host).toString(); + model = normalizeModelName(model); + + try { + const ollama = new Ollama({ host }); + await ollama.delete({ model }); + } catch (err) { + logger.warn(`delete model error: ${err.message}`); + } + const onPulling = this.ongoingStreamedRequests.find((iterator) => { + return iterator.meta.host === host && iterator.meta.model === model; + }); + onPulling?.abort(); + } + + async pullModel(host: string, model: string): Promise { + host = new URL(host).toString(); + model = normalizeModelName(model); + + const onPulling = this.ongoingStreamedRequests.find((iterator) => { + return iterator.meta.host === host && iterator.meta.model === model; + }); + if (onPulling) { + return; + } + + const ollama = new Ollama({ host }); + const allModels = await ollama.list(); + + const existModel = allModels?.models?.find((m) => m.name === model); + if (existModel) { + return; + } + let it: AbortableAsyncIterator; + ollama + .pull({ model, stream: true }) + .then(async (iterator) => { + it = iterator; + this.addOngoinStreamedRequests(iterator); + for await (const message of iterator) { + iterator.updateProgress({ name: model, ...message }); + } + this.removeOngoingStreamedRequests(it); + }) + .catch((err) => { + this.removeOngoingStreamedRequests(it); + logger.error(`pull error model:${model} host: ${host} failed: ${err.message}`); + }); + } + + async getLLMMetadata(id: string): Promise { + const project = await this.projectService.getProject(id); + if (!project) { + return; + } + const manifest = project.manifest as LLMManifest; + const model = manifest?.model?.name; + const endpoints = project.projectConfig.serviceEndpoints; + const host = endpoints[0]?.value; + + let m = null; + if (model && host) { + m = await this.getModel(host, model); + } + + return { + startHeight: 0, + lastHeight: 0, + lastTime: 0, + targetHeight: 0, + healthy: true, + chain: '', + specName: '', + genesisHash: '', + indexerNodeVersion: '', + queryNodeVersion: '', + indexerStatus: '', + queryStatus: '', + model: m, + }; + } + + async getModel(host: string, model: string): Promise { + const res: LLMModel = { + name: model, + status: LLMModelStatus.NOT_READY, + }; + try { + const normalizedModel = normalizeModelName(model); + host = new URL(host).toString(); + const pullingModels = this.getOnPullingModels(host); + const pullingModel = pullingModels.find((m) => m.name === normalizedModel); + if (pullingModels.find((m) => m.name === normalizedModel)) { + res.status = LLMModelStatus.PULLING; + res.pullStatus = pullingModel; + return res; + } + const ollama = new Ollama({ host }); + const downloadedModels = await ollama.list(); + const loadedModels = await ollama.ps(); + + if (downloadedModels?.models?.find((m) => m.name === normalizedModel)) { + res.status = LLMModelStatus.NORMAL; + } + if (loadedModels.models?.find((lm) => lm.name === normalizedModel)) { + res.status = LLMModelStatus.LOADED; + } + } catch (err) { + logger.error(`getModel host: ${host} model: ${model} failed: ${err.message}`); + } + return res; + } + + // todo: remove + getPullingProgress(host: string, model: string): LLMModelPullResult { + host = new URL(host).toString(); + model = normalizeModelName(model); + + const onPulling = this.ongoingStreamedRequests.find((iterator) => { + return iterator.meta.host === host && iterator.meta.model === model; + }); + return onPulling?.meta.progress; + } + + private getOnPullingModels(host: string): LLMModelPullResult[] { + const res = []; + for (const iter of this.ongoingStreamedRequests) { + if (iter.meta.host === host && iter.meta.progress) { + res.push(iter.meta.progress); + } + } + return res; + } + + private addOngoinStreamedRequests(iterator: AbortableAsyncIterator) { + this.ongoingStreamedRequests.push(iterator); + } + + private removeOngoingStreamedRequests(iterator: AbortableAsyncIterator) { + const i = this.ongoingStreamedRequests.indexOf(iterator); + if (i > -1) { + this.ongoingStreamedRequests.splice(i, 1); + } + } + + inspectOngoingStreamedRequests() { + const res = []; + for (const it of this.ongoingStreamedRequests) { + res.push(it.meta); + } + return res; + } + + nodeEndpoint(host: string, input: string): string { + const url = new URL(input, host); + return url.toString(); + } +} diff --git a/apps/indexer-coordinator/src/project/project.manifest.ts b/apps/indexer-coordinator/src/project/project.manifest.ts index b12255ee2..cc04f6145 100644 --- a/apps/indexer-coordinator/src/project/project.manifest.ts +++ b/apps/indexer-coordinator/src/project/project.manifest.ts @@ -95,6 +95,46 @@ export class SubgraphManifest { computeUnit?: ComputeUnitClass[]; } +@ObjectType('ModelClass') +class ModelClass { + @Field(() => String, { nullable: true }) + name?: string; + @Field(() => String, { nullable: true }) + file?: string; +} + +@ObjectType('ParameterClass') +class ParameterClass { + @Field(() => Number, { nullable: true }) + temperature?: number; + @Field(() => Number, { nullable: true }) + num_ctx?: number; +} + +@ObjectType('RunnerClass') +class RunnerClass { + @Field(() => String, { nullable: true }) + name?: string; + @Field(() => ParameterClass, { nullable: true }) + parameter?: ParameterClass; + @Field(() => String, { nullable: true }) + system?: string; +} + +@ObjectType('LLMManifest') +export class LLMManifest { + @Field(() => String, { nullable: true }) + kind?: string; + @Field(() => String, { nullable: true }) + specVersion?: string; + + @Field(() => ModelClass, { nullable: true }) + model: ModelClass; + + @Field(() => RunnerClass, { nullable: true }) + runner?: RunnerClass; +} + @ObjectType('AggregatedManifest') export class AggregatedManifest { @Field(() => SubqueryManifest, { nullable: true }) @@ -103,4 +143,6 @@ export class AggregatedManifest { rpcManifest?: RpcManifest; @Field(() => SubgraphManifest, { nullable: true }) subgraphManifest?: SubgraphManifest; + @Field(() => LLMManifest, { nullable: true }) + llmManifest?: LLMManifest; } diff --git a/apps/indexer-coordinator/src/project/project.model.ts b/apps/indexer-coordinator/src/project/project.model.ts index 9c88fd442..603a17381 100644 --- a/apps/indexer-coordinator/src/project/project.model.ts +++ b/apps/indexer-coordinator/src/project/project.model.ts @@ -3,7 +3,7 @@ import { Field, ID, InputType, Int, ObjectType } from '@nestjs/graphql'; import { Column, Entity, PrimaryColumn, BeforeInsert } from 'typeorm'; -import { AccessType, HostType, ProjectType } from './types'; +import { AccessType, HostType, LLMModel, ProjectType } from './types'; // TODO: temp place to put these types @ObjectType('ProjectInfo') @@ -76,6 +76,8 @@ export class MetadataType { indexerStatus?: string; @Field({ nullable: true }) queryStatus?: string; + @Field({ nullable: true }) + model?: LLMModel; } export interface IProjectBaseConfig { diff --git a/apps/indexer-coordinator/src/project/project.module.ts b/apps/indexer-coordinator/src/project/project.module.ts index 42b95843f..72a1a9d45 100644 --- a/apps/indexer-coordinator/src/project/project.module.ts +++ b/apps/indexer-coordinator/src/project/project.module.ts @@ -17,6 +17,7 @@ import { ProjectResolver } from './project.resolver'; import { ProjectRpcService } from './project.rpc.service'; import { ProjectService } from './project.service'; import { ProjectSubgraphService } from './project.subgraph.service'; +import { ProjectLLMService } from './project.llm.service'; @Module({ imports: [ @@ -33,8 +34,9 @@ import { ProjectSubgraphService } from './project.subgraph.service'; ProjectResolver, ProjectRpcService, ProjectSubgraphService, + ProjectLLMService, DbStatsService, ], - exports: [ProjectService, ProjectRpcService], + exports: [ProjectService, ProjectRpcService, ProjectLLMService], }) export class ProjectModule {} diff --git a/apps/indexer-coordinator/src/project/project.resolver.ts b/apps/indexer-coordinator/src/project/project.resolver.ts index 3079e3cd8..bd1193e41 100644 --- a/apps/indexer-coordinator/src/project/project.resolver.ts +++ b/apps/indexer-coordinator/src/project/project.resolver.ts @@ -6,8 +6,9 @@ import { Args, Mutation, Query, Resolver, Subscription } from '@nestjs/graphql'; import { DockerRegistry, DockerRegistryService } from '../core/docker.registry.service'; import { QueryService } from '../core/query.service'; import { SubscriptionService } from '../subscription/subscription.service'; -import { ProjectEvent } from '../utils/subscription'; +import { OllamaEvent, ProjectEvent } from '../utils/subscription'; import { DbStatsService } from './db.stats.service'; +import { ProjectLLMService } from './project.llm.service'; import { AggregatedManifest, RpcManifest, SubgraphManifest } from './project.manifest'; import { LogType, @@ -27,6 +28,7 @@ import { ProjectSubgraphService } from './project.subgraph.service'; import { AccessType, HostType, + LLMModelPullResult, ProjectType, SubgraphEndpoint, SubgraphEndpointAccessType, @@ -42,6 +44,7 @@ export class ProjectResolver { private projectService: ProjectService, private projectRpcService: ProjectRpcService, private projectSubgraphService: ProjectSubgraphService, + private projectLLMService: ProjectLLMService, private queryService: QueryService, private dockerRegistry: DockerRegistryService, private pubSub: SubscriptionService, @@ -75,6 +78,8 @@ export class ProjectResolver { return this.projectRpcService.getRpcMetadata(id); case ProjectType.SUBGRAPH: return this.projectSubgraphService.getSubgraphMetadata(id); + case ProjectType.LLM: + return this.projectLLMService.getLLMMetadata(id); default: throw new Error(`Unknown project type ${projectType}`); } @@ -124,6 +129,11 @@ export class ProjectResolver { ...project, metadata: await this.projectSubgraphService.getSubgraphMetadata(project.id), }; + case ProjectType.LLM: + return { + ...project, + metadata: await this.projectLLMService.getLLMMetadata(project.id), + }; default: throw new Error(`Unknown project type ${project.projectType}`); } @@ -203,6 +213,9 @@ export class ProjectResolver { case ProjectType.SUBGRAPH: manifest.subgraphManifest = await this.projectService.getManifest(projectId); break; + case ProjectType.LLM: + manifest.llmManifest = await this.projectService.getManifest(projectId); + break; default: throw new Error(`Unknown project type ${projectType}`); } @@ -286,6 +299,8 @@ export class ProjectResolver { return this.projectRpcService.startRpcProject(id, projectConfig, rateLimit ?? 0); case ProjectType.SUBGRAPH: return this.projectSubgraphService.startSubgraphProject(id, projectConfig, rateLimit ?? 0); + case ProjectType.LLM: + return this.projectLLMService.startLLMProject(id, projectConfig, rateLimit ?? 0); default: throw new Error(`Unknown project type ${projectType}`); } @@ -306,6 +321,8 @@ export class ProjectResolver { return this.projectRpcService.stopRpcProject(id); case ProjectType.SUBGRAPH: return this.projectSubgraphService.stopSubgraphProject(id); + case ProjectType.LLM: + return this.projectLLMService.stopLLMProject(id); default: throw new Error(`Unknown project type ${projectType}`); } @@ -326,6 +343,8 @@ export class ProjectResolver { return this.projectRpcService.removeRpcProject(id); case ProjectType.SUBGRAPH: return this.projectSubgraphService.removeSubgraphProject(id); + case ProjectType.LLM: + return this.projectLLMService.removeLLMProject(id); default: throw new Error(`Unknown project type ${projectType}`); } @@ -365,4 +384,17 @@ export class ProjectResolver { async getProjectDbSize(@Args('id') id: string): Promise { return (await this.dbStatsService.getProjectDbStats(id)).size || '0'; } + + @Query(() => LLMModelPullResult, { nullable: true }) + getPullingProgress( + @Args('host') host: string, + @Args('model') model: string + ): LLMModelPullResult { + return this.projectLLMService.getPullingProgress(host, model); + } + + @Subscription(() => String) + progressChanged() { + return this.pubSub.asyncIterator(OllamaEvent.PullProgress); + } } diff --git a/apps/indexer-coordinator/src/project/types.ts b/apps/indexer-coordinator/src/project/types.ts index 47143b5cc..bf26947c6 100644 --- a/apps/indexer-coordinator/src/project/types.ts +++ b/apps/indexer-coordinator/src/project/types.ts @@ -14,6 +14,73 @@ export enum ProjectType { RPC, DICTIONARY, SUBGRAPH, + LLM, +} + +export enum IntegrationType { + SUBGRAPH, + LLM, +} + +export enum LLMModelStatus { + NOT_READY = 'notReady', + NORMAL = 'normal', + PULLING = 'pulling', + LOADED = 'loaded', +} + +@ObjectType() +export class LLMModelPullResult { + @Field() + name: string; + @Field() + status: string; + @Field({ nullable: true }) + host?: string; + @Field({ nullable: true }) + digest?: string; + @Field({ nullable: true }) + total?: number; + @Field({ nullable: true }) + completed?: number; +} + +@ObjectType() +export class LLMModel { + @Field() + name: string; + @Field({ nullable: true }) + size?: number; + @Field({ nullable: true }) + digest?: string; + @Field() + status: LLMModelStatus; + @Field({ nullable: true }) + pullStatus?: LLMModelPullResult; +} + +@InputType() +@ObjectType() +export class LLMConfig { + @Field() + foo: string; +} + +@InputType() +@ObjectType() +export class LLMExtra { + @Field() + bar: string; +} + +@ObjectType() +export class LLMOngoingStreamRequestMeta { + @Field() + model: string; + @Field() + host: string; + @Field({ nullable: true }) + progress?: LLMModelPullResult; } export enum HostType { @@ -60,6 +127,11 @@ export enum SubgraphEndpointType { MetricsEndpoint = 'metrics-endpoint', } +export enum LLMEndpointType { + ApiGenerateEndpoint = 'api-generate-endpoint', + AdminShowEndpoint = 'api-show-endpoint', +} + export const SubgraphEndpointAccessType = { [SubgraphEndpointType.HttpEndpoint]: AccessType.DEFAULT, [SubgraphEndpointType.WsEndpoint]: AccessType.DEFAULT, @@ -68,6 +140,11 @@ export const SubgraphEndpointAccessType = { [SubgraphEndpointType.MetricsEndpoint]: AccessType.INTERNAL, }; +export const LLMEndpointAccessType = { + [LLMEndpointType.ApiGenerateEndpoint]: AccessType.DEFAULT, + [LLMEndpointType.AdminShowEndpoint]: AccessType.INTERNAL, +}; + @InputType('SubgraphPort') @ObjectType('SubgraphPort') export class SubgraphPort { diff --git a/apps/indexer-coordinator/src/utils/ollama.ts b/apps/indexer-coordinator/src/utils/ollama.ts new file mode 100644 index 000000000..cd5215fb9 --- /dev/null +++ b/apps/indexer-coordinator/src/utils/ollama.ts @@ -0,0 +1,222 @@ +// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: Apache-2.0 + +import fetch, { Response } from 'node-fetch'; +import { LLMModelPullResult, LLMOngoingStreamRequestMeta } from 'src/project/types'; + +export interface Config { + host: string; +} + +export interface DeleteRequest { + model: string; +} + +export interface PullRequest { + model: string; + insecure?: boolean; + stream?: boolean; +} + +export interface ModelDetails { + parent_model: string; + format: string; + family: string; + families: string[]; + parameter_size: string; + quantization_level: string; +} + +export interface ModelResponse { + name: string; + modified_at: Date; + size: number; + digest: string; + details: ModelDetails; + expires_at: Date; + size_vram: number; +} + +export interface ListResponse { + models: ModelResponse[]; +} + +export interface ErrorResponse { + error: string; +} + +export class AbortableAsyncIterator { + private readonly abortController: AbortController; + // private readonly itr: AsyncGenerator; + private readonly stream: NodeJS.ReadableStream; + readonly meta: LLMOngoingStreamRequestMeta; + constructor( + abortController: AbortController, + // itr: AsyncGenerator, + stream: NodeJS.ReadableStream, + meta: LLMOngoingStreamRequestMeta + ) { + this.abortController = abortController; + this.stream = stream; + this.meta = meta; + } + + abort() { + this.abortController.abort(); + } + + updateProgress(progress: LLMModelPullResult) { + this.meta.progress = progress; + } + + getProgress() { + return this.meta.progress; + } + + async *[Symbol.asyncIterator]() { + let buffer = ''; + // console.log('========= terrateor ==='); + this.stream.setEncoding('utf8'); + for await (const message of this.stream) { + // console.log(`message:${message} hh`); + buffer += message; + const parts = buffer.split('\n'); + buffer = parts.pop() ?? ''; + for (const part of parts) { + try { + const message = JSON.parse(part); + if ('error' in message) { + throw new Error(message.error); + } + yield message; + + if (message.done || message.status === 'success') { + return; + } + } catch (error) { + console.warn('invalid json: ', part); + } + } + } + + for (const part of buffer.split('\n').filter((p) => p !== '')) { + try { + const message = JSON.parse(part); + if ('error' in message) { + throw new Error(message.error); + } + yield message; + + if (message.done || message.status === 'success') { + return; + } + } catch (error) { + console.warn('invalid json: ', part); + } + } + throw new Error('Did not receive done or success response in stream.'); + } +} + +const defaultHeaders = { + 'Content-Type': 'application/json', + Accept: 'application/json', +}; + +export class Ollama { + config: Config; + + constructor(config: Config) { + this.config = config; + } + + async list(): Promise { + const url = new URL('api/tags', this.config.host).toString(); + const response = await fetch(url, { + method: 'GET', + headers: defaultHeaders, + }); + await checkOk(response); + return (await response.json()) as ListResponse; + } + + async ps(): Promise { + const url = new URL('api/ps', this.config.host).toString(); + const response = await fetch(url, { + method: 'GET', + headers: defaultHeaders, + }); + await checkOk(response); + return (await response.json()) as ListResponse; + } + + async delete(request: DeleteRequest) { + const url = new URL('api/delete', this.config.host).toString(); + const response = await fetch(url, { + method: 'DELETE', + body: JSON.stringify({ + name: request.model, + }), + headers: defaultHeaders, + }); + await checkOk(response); + return { status: 'success' }; + } + + async pull(request: PullRequest) { + const host = new URL(this.config.host).toString(); + const url = new URL('api/pull', host).toString(); + const model = normalizeModelName(request.model); + const abortController = new AbortController(); + const response = await fetch(url, { + method: 'POST', + body: JSON.stringify({ + name: model, + stream: request.stream, + insecure: request.insecure, + }), + headers: defaultHeaders, + signal: abortController.signal, + }); + await checkOk(response); + const abortableAsyncIterator = new AbortableAsyncIterator(abortController, response.body, { + model, + host, + }); + return abortableAsyncIterator; + } +} + +export function normalizeModelName(model: string): string { + if (model.lastIndexOf(':') === -1) { + return model + ':latest'; + } + return model; +} + +const checkOk = async (response: Response): Promise => { + if (response.ok) { + return; + } + let message = `Error ${response.status}: ${response.statusText}`; + let errorData: ErrorResponse | null = null; + + if (response.headers.get('content-type')?.includes('application/json')) { + try { + errorData = (await response.json()) as ErrorResponse; + message = errorData.error || message; + } catch (error) { + console.log('Failed to parse error response as JSON'); + } + } else { + try { + console.log('Getting text from response'); + const textResponse = await response.text(); + message = textResponse || message; + } catch (error) { + console.log('Failed to get text from error response'); + } + } + + throw new Error(message); +}; diff --git a/apps/indexer-coordinator/src/utils/subscription.ts b/apps/indexer-coordinator/src/utils/subscription.ts index 7b83001d5..899a6c8b2 100644 --- a/apps/indexer-coordinator/src/utils/subscription.ts +++ b/apps/indexer-coordinator/src/utils/subscription.ts @@ -16,3 +16,7 @@ export enum AccountEvent { Indexer = 'account_indexer', Controller = 'account_controller', } + +export enum OllamaEvent { + PullProgress = 'pull_progress', +} \ No newline at end of file