mirror of
				https://github.com/actions/cache.git
				synced 2025-11-04 08:20:56 +00:00 
			
		
		
		
	Initial pass at chunked upload apis
This commit is contained in:
		
					parent
					
						
							
								e223b0a12d
							
						
					
				
			
			
				commit
				
					
						805d58ac4b
					
				
			
		
					 5 changed files with 150 additions and 42 deletions
				
			
		| 
						 | 
				
			
			@ -3,24 +3,39 @@ import * as fs from "fs";
 | 
			
		|||
import { BearerCredentialHandler } from "typed-rest-client/Handlers";
 | 
			
		||||
import { HttpClient } from "typed-rest-client/HttpClient";
 | 
			
		||||
import { IHttpClientResponse } from "typed-rest-client/Interfaces";
 | 
			
		||||
import { IRequestOptions, RestClient } from "typed-rest-client/RestClient";
 | 
			
		||||
import { ArtifactCacheEntry } from "./contracts";
 | 
			
		||||
import {
 | 
			
		||||
    IRequestOptions,
 | 
			
		||||
    RestClient,
 | 
			
		||||
    IRestResponse
 | 
			
		||||
} from "typed-rest-client/RestClient";
 | 
			
		||||
import {
 | 
			
		||||
    ArtifactCacheEntry,
 | 
			
		||||
    CommitCacheRequest,
 | 
			
		||||
    ReserveCacheRequest,
 | 
			
		||||
    ReserverCacheResponse
 | 
			
		||||
} from "./contracts";
 | 
			
		||||
import * as utils from "./utils/actionUtils";
 | 
			
		||||
 | 
			
		||||
function getCacheUrl(): string {
 | 
			
		||||
const MAX_CHUNK_SIZE = 4000000; // 4 MB Chunks
 | 
			
		||||
 | 
			
		||||
function isSuccessStatusCode(statusCode: number): boolean {
 | 
			
		||||
    return statusCode >= 200 && statusCode < 300;
 | 
			
		||||
}
 | 
			
		||||
function getCacheApiUrl(): string {
 | 
			
		||||
    // Ideally we just use ACTIONS_CACHE_URL
 | 
			
		||||
    const cacheUrl: string = (
 | 
			
		||||
    const baseUrl: string = (
 | 
			
		||||
        process.env["ACTIONS_CACHE_URL"] ||
 | 
			
		||||
        process.env["ACTIONS_RUNTIME_URL"] ||
 | 
			
		||||
        ""
 | 
			
		||||
    ).replace("pipelines", "artifactcache");
 | 
			
		||||
    if (!cacheUrl) {
 | 
			
		||||
    if (!baseUrl) {
 | 
			
		||||
        throw new Error(
 | 
			
		||||
            "Cache Service Url not found, unable to restore cache."
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    core.debug(`Cache Url: ${cacheUrl}`);
 | 
			
		||||
    return cacheUrl;
 | 
			
		||||
    core.debug(`Cache Url: ${baseUrl}`);
 | 
			
		||||
    return `${baseUrl}_apis/artifactcache/`;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
function createAcceptHeader(type: string, apiVersion: string): string {
 | 
			
		||||
| 
						 | 
				
			
			@ -29,7 +44,7 @@ function createAcceptHeader(type: string, apiVersion: string): string {
 | 
			
		|||
 | 
			
		||||
function getRequestOptions(): IRequestOptions {
 | 
			
		||||
    const requestOptions: IRequestOptions = {
 | 
			
		||||
        acceptHeader: createAcceptHeader("application/json", "5.2-preview.1")
 | 
			
		||||
        acceptHeader: createAcceptHeader("application/json", "6.0-preview.1")
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    return requestOptions;
 | 
			
		||||
| 
						 | 
				
			
			@ -38,13 +53,11 @@ function getRequestOptions(): IRequestOptions {
 | 
			
		|||
export async function getCacheEntry(
 | 
			
		||||
    keys: string[]
 | 
			
		||||
): Promise<ArtifactCacheEntry | null> {
 | 
			
		||||
    const cacheUrl = getCacheUrl();
 | 
			
		||||
    const cacheUrl = getCacheApiUrl();
 | 
			
		||||
    const token = process.env["ACTIONS_RUNTIME_TOKEN"] || "";
 | 
			
		||||
    const bearerCredentialHandler = new BearerCredentialHandler(token);
 | 
			
		||||
 | 
			
		||||
    const resource = `_apis/artifactcache/cache?keys=${encodeURIComponent(
 | 
			
		||||
        keys.join(",")
 | 
			
		||||
    )}`;
 | 
			
		||||
    const resource = `cache?keys=${encodeURIComponent(keys.join(","))}`;
 | 
			
		||||
 | 
			
		||||
    const restClient = new RestClient("actions/cache", cacheUrl, [
 | 
			
		||||
        bearerCredentialHandler
 | 
			
		||||
| 
						 | 
				
			
			@ -57,14 +70,15 @@ export async function getCacheEntry(
 | 
			
		|||
    if (response.statusCode === 204) {
 | 
			
		||||
        return null;
 | 
			
		||||
    }
 | 
			
		||||
    if (response.statusCode !== 200) {
 | 
			
		||||
    if (!isSuccessStatusCode(response.statusCode)) {
 | 
			
		||||
        throw new Error(`Cache service responded with ${response.statusCode}`);
 | 
			
		||||
    }
 | 
			
		||||
    const cacheResult = response.result;
 | 
			
		||||
    if (!cacheResult || !cacheResult.archiveLocation) {
 | 
			
		||||
    const cacheDownloadUrl = cacheResult?.archiveLocation;
 | 
			
		||||
    if (!cacheDownloadUrl) {
 | 
			
		||||
        throw new Error("Cache not found.");
 | 
			
		||||
    }
 | 
			
		||||
    core.setSecret(cacheResult.archiveLocation);
 | 
			
		||||
    core.setSecret(cacheDownloadUrl);
 | 
			
		||||
    core.debug(`Cache Result:`);
 | 
			
		||||
    core.debug(JSON.stringify(cacheResult));
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -83,46 +97,127 @@ async function pipeResponseToStream(
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
export async function downloadCache(
 | 
			
		||||
    cacheEntry: ArtifactCacheEntry,
 | 
			
		||||
    archiveLocation: string,
 | 
			
		||||
    archivePath: string
 | 
			
		||||
): Promise<void> {
 | 
			
		||||
    const stream = fs.createWriteStream(archivePath);
 | 
			
		||||
    const httpClient = new HttpClient("actions/cache");
 | 
			
		||||
    // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
 | 
			
		||||
    const downloadResponse = await httpClient.get(cacheEntry.archiveLocation!);
 | 
			
		||||
    const downloadResponse = await httpClient.get(archiveLocation);
 | 
			
		||||
    await pipeResponseToStream(downloadResponse, stream);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Returns Cache ID
 | 
			
		||||
async function reserveCache(
 | 
			
		||||
    restClient: RestClient,
 | 
			
		||||
    key: string
 | 
			
		||||
): Promise<number> {
 | 
			
		||||
    const reserveCacheRequest: ReserveCacheRequest = {
 | 
			
		||||
        key
 | 
			
		||||
    };
 | 
			
		||||
    const response = await restClient.create<ReserverCacheResponse>(
 | 
			
		||||
        "caches",
 | 
			
		||||
        reserveCacheRequest
 | 
			
		||||
    );
 | 
			
		||||
 | 
			
		||||
    return response?.result?.cacheId || -1;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
function getContentRange(start: number, length: number): string {
 | 
			
		||||
    // Format: `bytes start-end/filesize
 | 
			
		||||
    // start and end are inclusive
 | 
			
		||||
    // filesize can be *
 | 
			
		||||
    // For a 200 byte chunk starting at byte 0:
 | 
			
		||||
    // Content-Range: bytes 0-199/*
 | 
			
		||||
    return `bytes ${start}-${start + length - 1}/*`;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async function uploadChunk(
 | 
			
		||||
    restClient: RestClient,
 | 
			
		||||
    cacheId: number,
 | 
			
		||||
    data: Buffer,
 | 
			
		||||
    offset: number
 | 
			
		||||
): Promise<IRestResponse<void>> {
 | 
			
		||||
    const requestOptions = getRequestOptions();
 | 
			
		||||
    requestOptions.additionalHeaders = {
 | 
			
		||||
        "Content-Type": "application/octet-stream",
 | 
			
		||||
        "Content-Range": getContentRange(offset, data.byteLength)
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    return await restClient.update(
 | 
			
		||||
        cacheId.toString(),
 | 
			
		||||
        data.toString("utf8"),
 | 
			
		||||
        requestOptions
 | 
			
		||||
    );
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async function commitCache(
 | 
			
		||||
    restClient: RestClient,
 | 
			
		||||
    cacheId: number,
 | 
			
		||||
    filesize: number
 | 
			
		||||
): Promise<IRestResponse<void>> {
 | 
			
		||||
    const requestOptions = getRequestOptions();
 | 
			
		||||
    const commitCacheRequest: CommitCacheRequest = { size: filesize };
 | 
			
		||||
    return await restClient.create(
 | 
			
		||||
        cacheId.toString(),
 | 
			
		||||
        commitCacheRequest,
 | 
			
		||||
        requestOptions
 | 
			
		||||
    );
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export async function saveCache(
 | 
			
		||||
    key: string,
 | 
			
		||||
    archivePath: string
 | 
			
		||||
): Promise<void> {
 | 
			
		||||
    const stream = fs.createReadStream(archivePath);
 | 
			
		||||
 | 
			
		||||
    const cacheUrl = getCacheUrl();
 | 
			
		||||
    const token = process.env["ACTIONS_RUNTIME_TOKEN"] || "";
 | 
			
		||||
    const bearerCredentialHandler = new BearerCredentialHandler(token);
 | 
			
		||||
 | 
			
		||||
    const resource = `_apis/artifactcache/cache/${encodeURIComponent(key)}`;
 | 
			
		||||
    const postUrl = cacheUrl + resource;
 | 
			
		||||
 | 
			
		||||
    const restClient = new RestClient("actions/cache", undefined, [
 | 
			
		||||
    const restClient = new RestClient("actions/cache", getCacheApiUrl(), [
 | 
			
		||||
        bearerCredentialHandler
 | 
			
		||||
    ]);
 | 
			
		||||
 | 
			
		||||
    const requestOptions = getRequestOptions();
 | 
			
		||||
    requestOptions.additionalHeaders = {
 | 
			
		||||
        "Content-Type": "application/octet-stream"
 | 
			
		||||
    };
 | 
			
		||||
    // Reserve Cache
 | 
			
		||||
    const cacheId = await reserveCache(restClient, key);
 | 
			
		||||
    if (cacheId < 0) {
 | 
			
		||||
        throw new Error(`Unable to reserve cache.`);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    const response = await restClient.uploadStream<void>(
 | 
			
		||||
        "POST",
 | 
			
		||||
        postUrl,
 | 
			
		||||
        stream,
 | 
			
		||||
        requestOptions
 | 
			
		||||
    // Upload Chunks
 | 
			
		||||
    const stream = fs.createReadStream(archivePath);
 | 
			
		||||
    let streamIsClosed = false;
 | 
			
		||||
    stream.on("close", () => {
 | 
			
		||||
        streamIsClosed = true;
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    const uploads: Promise<IRestResponse<void>>[] = [];
 | 
			
		||||
    let offset = 0;
 | 
			
		||||
    while (!streamIsClosed) {
 | 
			
		||||
        const chunk: Buffer = stream.read(MAX_CHUNK_SIZE);
 | 
			
		||||
        uploads.push(uploadChunk(restClient, cacheId, chunk, offset));
 | 
			
		||||
        offset += MAX_CHUNK_SIZE;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    const responses = await Promise.all(uploads);
 | 
			
		||||
 | 
			
		||||
    const failedResponse = responses.find(
 | 
			
		||||
        x => !isSuccessStatusCode(x.statusCode)
 | 
			
		||||
    );
 | 
			
		||||
    if (response.statusCode !== 200) {
 | 
			
		||||
        throw new Error(`Cache service responded with ${response.statusCode}`);
 | 
			
		||||
    if (failedResponse) {
 | 
			
		||||
        throw new Error(
 | 
			
		||||
            `Cache service responded with ${failedResponse.statusCode} during chunk upload.`
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // Commit Cache
 | 
			
		||||
    const cacheSize = utils.getArchiveFileSize(archivePath);
 | 
			
		||||
    const commitCacheResponse = await commitCache(
 | 
			
		||||
        restClient,
 | 
			
		||||
        cacheId,
 | 
			
		||||
        cacheSize
 | 
			
		||||
    );
 | 
			
		||||
    if (!isSuccessStatusCode(commitCacheResponse.statusCode)) {
 | 
			
		||||
        throw new Error(
 | 
			
		||||
            `Cache service responded with ${commitCacheResponse.statusCode} during commit cache.`
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    core.info("Cache saved successfully");
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue