GordianPipedStream.java

/*
 * GordianKnot: Security Suite
 * Copyright 2012-2026. Tony Washer
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License.  You may obtain a copy
 * of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package io.github.tonywasher.joceanus.gordianknot.impl.core.stream;

import org.bouncycastle.util.Arrays;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.ArrayBlockingQueue;

/**
 * Class to provide a pipe enabling data to be passed between threads via writing to an output
 * stream and reading from an input stream.
 */
class GordianPipedStream {
    /**
     * The Queue Capacity.
     */
    private static final int QUEUE_LEN = 4096;

    /**
     * The output buffer size.
     */
    private static final int BUFFER_LEN = 16384;

    /**
     * The Array Blocking queue that implements the pipe.
     */
    private final ArrayBlockingQueue<byte[]> theQueue;

    /**
     * The Input Stream.
     */
    private final GordianPipedInputStream theSource;

    /**
     * The Output Stream.
     */
    private final BufferedOutputStream theSink;

    /**
     * Constructor.
     */
    GordianPipedStream() {
        /* Create the queue */
        theQueue = new ArrayBlockingQueue<>(QUEUE_LEN);

        /* Create the source stream */
        theSource = new GordianPipedInputStream();

        /* Create the sink stream */
        theSink = new BufferedOutputStream(new GordianPipedOutputStream(), BUFFER_LEN);
    }

    /**
     * Obtain the source stream.
     *
     * @return the source stream
     */
    public InputStream getSource() {
        return theSource;
    }

    /**
     * Obtain the sink stream.
     *
     * @return the sink stream
     */
    OutputStream getSink() {
        return theSink;
    }

    /**
     * The inputStream class.
     */
    private final class GordianPipedInputStream
            extends InputStream {
        /**
         * The currently active element.
         */
        private byte[] theElement;

        /**
         * The length of the current element.
         */
        private int theDataLen = -1;

        /**
         * The offset within the element.
         */
        private int theReadOffset = -1;

        /**
         * has this stream been closed.
         */
        private boolean isClosed;

        /**
         * have we seen EOF.
         */
        private boolean hasEOFbeenSeen;

        /**
         * A buffer for single byte reads.
         */
        private final byte[] theByte = new byte[1];

        @Override
        public int read(final byte[] pBytes) throws IOException {
            /* Read the bytes from the stream */
            return read(pBytes, 0, pBytes.length);
        }

        @Override
        public int read() throws IOException {
            int iNumRead;

            /* Loop until we get a byte or EOF */
            do {
                iNumRead = read(theByte, 0, 1);
            } while (iNumRead == 0);

            /* Convert the byte read into an integer */
            if (iNumRead > 0) {
                iNumRead = theByte[0] & GordianInputStream.BYTE_MASK;
            }

            /* Return to the caller */
            return iNumRead;
        }

        @Override
        public int read(final byte[] pBuffer,
                        final int pOffset,
                        final int pLength) throws IOException {
            /* If we are already closed throw IO Exception */
            if (isClosed) {
                throw new IOException(GordianInputStream.ERROR_CLOSED);
            }

            /* If we have already seen EOF return now */
            if (hasEOFbeenSeen) {
                return -1;
            }

            /* If we have no current element */
            if (theElement == null) {
                /* Obtain the next element from the queue */
                try {
                    theElement = theQueue.take();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IOException(e.getMessage(), e);
                }

                /* Set variables */
                theDataLen = theElement.length;
                theReadOffset = 0;

                /* If the dataLen is zero, it is EOF */
                if (theDataLen == 0) {
                    hasEOFbeenSeen = true;
                    return -1;
                }
            }

            /* Determine how much data we have available */
            int iNumRead = theDataLen
                    - theReadOffset;

            /* Determine how much data we can transfer */
            iNumRead = Math.min(iNumRead, pLength);

            /* If we have data to copy */
            if (iNumRead > 0) {
                /* Transfer the bytes */
                System.arraycopy(theElement, theReadOffset, pBuffer, pOffset, iNumRead);

                /* Adjust ReadOffset */
                theReadOffset += iNumRead;

                /* If we have finished with the data in the element */
                if (theReadOffset >= theDataLen) {
                    /* Reset the values */
                    Arrays.fill(theElement, (byte) 0);
                    theElement = null;
                    theDataLen = 0;
                    theReadOffset = 0;
                }
            }

            /* Return the number of bytes read */
            return iNumRead;
        }

        @Override
        public void close() throws IOException {
            /* Note that we have closed the stream */
            isClosed = true;

            /* Clear any data buffer */
            if (theElement != null) {
                Arrays.fill(theElement, (byte) 0);
                theElement = null;
            }
        }
    }

    /**
     * The outputStream class.
     */
    private final class GordianPipedOutputStream
            extends OutputStream {
        /**
         * has this stream been closed.
         */
        private boolean isClosed;

        /**
         * A buffer for single byte writes.
         */
        private final byte[] theByte = new byte[1];

        @Override
        public void write(final byte[] pBytes,
                          final int pOffset,
                          final int pLength) throws IOException {
            /* If we are already closed throw IO Exception */
            if (isClosed) {
                throw new IOException(GordianInputStream.ERROR_CLOSED);
            }

            /* no data is to be written, just ignore */
            if (pLength == 0) {
                return;
            }

            /* Create a copy of the data */
            final byte[] myBuffer = new byte[pLength];
            System.arraycopy(pBytes, pOffset, myBuffer, 0, pLength);

            /* Write the element to the queue */
            try {
                theQueue.put(myBuffer);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IOException(e.getMessage(), e);
            }
        }

        @Override
        public void write(final byte[] pBytes) throws IOException {
            /* Write the bytes to the stream */
            write(pBytes, 0, pBytes.length);
        }

        @Override
        public void write(final int pByte) throws IOException {
            /* Store the byte */
            theByte[0] = (byte) pByte;

            /* Write the byte to the stream */
            write(theByte, 0, 1);
        }

        @Override
        public void flush() throws IOException {
            /* No need to flush */
        }

        @Override
        public void close() {
            /* Ignore if already closed */
            if (isClosed) {
                return;
            }

            /* Write a zero length element */
            try {
                theQueue.put(new byte[0]);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }

            /* Note that we have closed */
            isClosed = true;
        }
    }
}