"use strict";
// Copyright 2021-2024 The Connect Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
Object.defineProperty(exports, "__esModule", { value: true });
exports.encodeEnvelopes = exports.encodeEnvelope = exports.envelopeDecompress = exports.envelopeCompress = exports.createEnvelopeReadableStream = void 0;
const connect_error_js_1 = require("../connect-error.js");
const code_js_1 = require("../code.js");
const compression_js_1 = require("./compression.js");
/**
 * Create a WHATWG ReadableStream of enveloped messages from a ReadableStream
 * of bytes.
 *
 * Ideally, this would simply be a TransformStream, but ReadableStream.pipeThrough
 * does not have the necessary availability at this time.
 *
 * @private Internal code, does not follow semantic versioning.
 */
function createEnvelopeReadableStream(stream) {
    let reader;
    let buffer = new Uint8Array(0);
    function append(chunk) {
        const n = new Uint8Array(buffer.length + chunk.length);
        n.set(buffer);
        n.set(chunk, buffer.length);
        buffer = n;
    }
    return new ReadableStream({
        start() {
            reader = stream.getReader();
        },
        async pull(controller) {
            let header = undefined;
            for (;;) {
                if (header === undefined && buffer.byteLength >= 5) {
                    let length = 0;
                    for (let i = 1; i < 5; i++) {
                        length = (length << 8) + buffer[i];
                    }
                    header = { flags: buffer[0], length };
                }
                if (header !== undefined && buffer.byteLength >= header.length + 5) {
                    break;
                }
                const result = await reader.read();
                if (result.done) {
                    break;
                }
                append(result.value);
            }
            if (header === undefined) {
                if (buffer.byteLength == 0) {
                    controller.close();
                    return;
                }
                controller.error(new connect_error_js_1.ConnectError("premature end of stream", code_js_1.Code.DataLoss));
                return;
            }
            const data = buffer.subarray(5, 5 + header.length);
            buffer = buffer.subarray(5 + header.length);
            controller.enqueue({
                flags: header.flags,
                data,
            });
        },
    });
}
exports.createEnvelopeReadableStream = createEnvelopeReadableStream;
/**
 * Compress an EnvelopedMessage.
 *
 * Raises Internal if an enveloped message is already compressed.
 *
 * @private Internal code, does not follow semantic versioning.
 */
async function envelopeCompress(envelope, compression, compressMinBytes) {
    let { flags, data } = envelope;
    if ((flags & compression_js_1.compressedFlag) === compression_js_1.compressedFlag) {
        throw new connect_error_js_1.ConnectError("invalid envelope, already compressed", code_js_1.Code.Internal);
    }
    if (compression && data.byteLength >= compressMinBytes) {
        data = await compression.compress(data);
        flags = flags | compression_js_1.compressedFlag;
    }
    return { data, flags };
}
exports.envelopeCompress = envelopeCompress;
/**
 * Decompress an EnvelopedMessage.
 *
 * Raises InvalidArgument if an envelope is compressed, but compression is null.
 *
 * Relies on the provided Compression to raise ResourceExhausted if the
 * *decompressed* message size is larger than readMaxBytes. If the envelope is
 * not compressed, readMaxBytes is not honored.
 *
 * @private Internal code, does not follow semantic versioning.
 */
async function envelopeDecompress(envelope, compression, readMaxBytes) {
    let { flags, data } = envelope;
    if ((flags & compression_js_1.compressedFlag) === compression_js_1.compressedFlag) {
        if (!compression) {
            throw new connect_error_js_1.ConnectError("received compressed envelope, but do not know how to decompress", code_js_1.Code.InvalidArgument);
        }
        data = await compression.decompress(data, readMaxBytes);
        flags = flags ^ compression_js_1.compressedFlag;
    }
    return { data, flags };
}
exports.envelopeDecompress = envelopeDecompress;
/**
 * Encode a single enveloped message.
 *
 * @private Internal code, does not follow semantic versioning.
 */
function encodeEnvelope(flags, data) {
    const bytes = new Uint8Array(data.length + 5);
    bytes.set(data, 5);
    const v = new DataView(bytes.buffer, bytes.byteOffset, bytes.byteLength);
    v.setUint8(0, flags); // first byte is flags
    v.setUint32(1, data.length); // 4 bytes message length
    return bytes;
}
exports.encodeEnvelope = encodeEnvelope;
/**
 * Encode a set of enveloped messages.
 *
 * @private Internal code, does not follow semantic versioning.
 */
function encodeEnvelopes(...envelopes) {
    const len = envelopes.reduce((previousValue, currentValue) => previousValue + currentValue.data.length + 5, 0);
    const bytes = new Uint8Array(len);
    const v = new DataView(bytes.buffer);
    let offset = 0;
    for (const e of envelopes) {
        v.setUint8(offset, e.flags); // first byte is flags
        v.setUint32(offset + 1, e.data.length); // 4 bytes message length
        bytes.set(e.data, offset + 5);
        offset += e.data.length + 5;
    }
    return bytes;
}
exports.encodeEnvelopes = encodeEnvelopes;
