Comment convertir OutputStream en InputStream?

Je suis sur la scène du développement, où j’ai deux modules et dont l’un est sorti en tant que OutputStream et le second, qui accepte uniquement InputStream . Savez-vous comment convertir OutputStream en InputStream (et non l’inverse, je veux dire vraiment de cette façon) que je pourrai connecter ces deux parties?

Merci

Un OutputStream est un où vous écrivez des données. Si un module expose un OutputStream , on s’attend à ce que quelque chose soit lu à l’autre extrémité.

Quelque chose qui expose un InputStream , par contre, indique que vous devrez écouter ce stream, et il y aura des données que vous pourrez lire.

Il est donc possible de connecter un InputStream à un OutputStream

InputStream----read---> intermediateBytes[n] ----write----> OutputStream

Comme quelqu’un l’a mentionné, c’est ce que la méthode copy() d’ IOUtils vous permet de faire. Cela n’a pas de sens d’aller dans l’autre sens … j’espère que cela a du sens

METTRE À JOUR:

Bien sûr, plus je pense à cela, plus je peux voir comment cela serait réellement nécessaire. Je connais certains des commentaires mentionnés dans les stream entrés / sortis, mais il existe une autre possibilité.

Si le stream de sortie exposé est un ByteArrayOutputStream , vous pouvez toujours obtenir le contenu complet en appelant la méthode toByteArray() . Vous pouvez ensuite créer un wrapper de stream d’entrée à l’aide de la sous-classe ByteArrayInputStream . Ces deux sont des pseudo-stream, ils contiennent tous deux un tableau d’octets. L’utilisation des stream de cette façon est donc techniquement possible, mais pour moi, c’est toujours très étrange …

Il semble y avoir beaucoup de liens et d’autres choses de ce genre, mais pas de code utilisant des pipes. L’avantage d’utiliser java.io.PipedInputStream et java.io.PipedOutputStream est qu’il n’y a pas de consommation supplémentaire de mémoire. ByteArrayOutputStream.toByteArray() renvoie une copie du tampon d’origine, ce qui signifie que tout ce que vous avez en mémoire, vous en avez maintenant deux copies. L’écriture dans un InputStream signifie que vous avez maintenant trois copies des données.

Le code:

 // take the copy of the stream and re-write it to an InputStream PipedInputStream in = new PipedInputStream(); final PipedOutputStream out = new PipedOutputStream(in); new Thread(new Runnable() { public void run () { try { // write the original OutputStream to the PipedOutputStream originalByteArrayOutputStream.writeTo(out); } catch (IOException e) { // logging and exception handling should go here } } }).start(); 

Ce code suppose que originalByteArrayOutputStream est un ByteArrayOutputStream car il s’agit généralement du seul stream de sortie utilisable, sauf si vous écrivez dans un fichier. J’espère que ça aide! Ce qui est génial à ce sujet, c’est que comme il est dans un thread séparé, il fonctionne également en parallèle, de sorte que tout ce qui consum votre stream d’entrée sortira aussi de votre ancien stream de sortie. Cela est bénéfique car le tampon peut restr plus petit et vous aurez moins de latence et moins d’utilisation de la mémoire.

Vous aurez besoin d’une classe intermédiaire entre les deux. Chaque fois que InputStream.read(byte[]...) est appelé, la classe de mise en mémoire tampon remplit le tableau d’octets transmis avec le bloc suivant transmis par OutputStream.write(byte[]...) . Étant donné que les tailles des blocs peuvent ne pas être les mêmes, la classe d’adaptateur devra stocker une certaine quantité jusqu’à ce qu’elle soit suffisante pour remplir le tampon de lecture et / ou pouvoir stocker tout dépassement de tampon.

Cet article présente quelques approches différentes de ce problème:

http://blog.ostermiller.org/convert-java-outputstream-inputstream

Comme les stream d’entrée et de sortie ne sont que des points de début et de fin, la solution consiste à stocker temporairement les données dans un tableau d’octets. Vous devez donc créer un ByteArrayOutputStream intermédiaire, à partir duquel vous créez un byte[] utilisé comme entrée pour le nouveau ByteArrayInputStream .

 public void doTwoThingsWithStream(InputStream inStream, OutputStream outStream){ //create temporary bayte array output stream ByteArrayOutputStream baos = new ByteArrayOutputStream(); doFirstThing(inStream, baos); //create input stream from baos InputStream isFromFirstData = new ByteArrayInputStream(baos.toByteArray()); doSecondThing(isFromFirstData, outStream); } 

J’espère que cela aide.

La bibliothèque open source easystream prend directement en charge la conversion d’un OutputStream en InputStream: http://io-tools.sourceforge.net/easystream/tutorial/tutorial.html

Ils listent également d’autres options: http://io-tools.sourceforge.net/easystream/OutputStream_to_InputStream.html

 ByteArrayOutputStream buffer = (ByteArrayOutputStream) aOutputStream; byte[] bytes = buffer.toByteArray(); InputStream inputStream = new ByteArrayInputStream(bytes); 

J’ai rencontré le même problème en convertissant un ByteArrayOutputStream en ByteArrayInputStream et en le résolvant en utilisant une classe dérivée de ByteArrayOutputStream capable de retourner un ByteArrayInputStream qui est initialisé avec le tampon interne de ByteArrayOutputStream . De cette façon, aucune mémoire supplémentaire n’est utilisée et la «conversion» est très rapide:

 package info.whitebyte.utils; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; /** * This class extends the ByteArrayOutputStream by * providing a method that returns a new ByteArrayInputStream * which uses the internal byte array buffer. This buffer * is not copied, so no additional memory is used. After * creating the ByteArrayInputStream the instance of the * ByteArrayInOutStream can not be used anymore. * 

* The ByteArrayInputStream can be resortingeved using getInputStream(). * @author Nick Russler */ public class ByteArrayInOutStream extends ByteArrayOutputStream { /** * Creates a new ByteArrayInOutStream. The buffer capacity is * initially 32 bytes, though its size increases if necessary. */ public ByteArrayInOutStream() { super(); } /** * Creates a new ByteArrayInOutStream, with a buffer capacity of * the specified size, in bytes. * * @param size the initial size. * @exception IllegalArgumentException if size is negative. */ public ByteArrayInOutStream(int size) { super(size); } /** * Creates a new ByteArrayInputStream that uses the internal byte array buffer * of this ByteArrayInOutStream instance as its buffer array. The initial value * of pos is set to zero and the initial value of count is the number of bytes * that can be read from the byte array. The buffer array is not copied. This * instance of ByteArrayInOutStream can not be used anymore after calling this * method. * @return the ByteArrayInputStream instance */ public ByteArrayInputStream getInputStream() { // create new ByteArrayInputStream that respects the current count ByteArrayInputStream in = new ByteArrayInputStream(this.buf, 0, this.count); // set the buffer of the ByteArrayOutputStream // to null so it can't be altered anymore this.buf = null; return in; } }

Je mets les choses sur github: https://github.com/nickrussler/ByteArrayInOutStream

Les io-extras de la bibliothèque peuvent être utiles. Par exemple, si vous souhaitez compresser un InputStream aide de GZIPOutputStream et que vous souhaitez qu’il se produise de manière synchrone (en utilisant la taille de tampon par défaut de 8192):

 InputStream is = ... InputStream gz = IOUtil.pipe(is, o -> new GZIPOutputStream(o)); 

Notez que la bibliothèque a une couverture de test unitaire de 100% (pour ce que cela vaut bien sûr!) Et se trouve sur Maven Central. La dépendance de Maven est:

  com.github.davidmoten io-extras 0.1  

Assurez-vous de vérifier une version ultérieure.

De mon sharepoint vue, java.io.PipedInputStream / java.io.PipedOutputStream est la meilleure option à prendre en compte. Dans certaines situations, vous souhaiterez peut-être utiliser ByteArrayInputStream / ByteArrayOutputStream. Le problème est que vous devez dupliquer le tampon pour convertir un ByteArrayOutputStream à un ByteArrayInputStream. ByteArrayOutpuStream / ByteArrayInputStream sont également limités à 2 Go. Voici une implémentation OutpuStream / InputStream que j’ai écrite pour contourner les limitations de ByteArrayOutputStream / ByteArrayInputStream (code Scala, mais facilement compréhensible pour les développeurs Java):

 import java.io.{IOException, InputStream, OutputStream} import scala.annotation.tailrec /** Acts as a replacement for ByteArrayOutputStream * */ class HugeMemoryOutputStream(capacity: Long) extends OutputStream { private val PAGE_SIZE: Int = 1024000 private val ALLOC_STEP: Int = 1024 /** Pages array * */ private var streamBuffers: Array[Array[Byte]] = Array.empty[Array[Byte]] /** Allocated pages count * */ private var pageCount: Int = 0 /** Allocated bytes count * */ private var allocatedBytes: Long = 0 /** Current position in stream * */ private var position: Long = 0 /** Stream length * */ private var length: Long = 0 allocSpaceIfNeeded(capacity) /** Gets page count based on given length * * @param length Buffer length * @return Page count to hold the specified amount of data */ private def getPageCount(length: Long) = { var pageCount = (length / PAGE_SIZE).toInt + 1 if ((length % PAGE_SIZE) == 0) { pageCount -= 1 } pageCount } /** Extends pages array * */ private def extendPages(): Unit = { if (streamBuffers.isEmpty) { streamBuffers = new Array[Array[Byte]](ALLOC_STEP) } else { val newStreamBuffers = new Array[Array[Byte]](streamBuffers.length + ALLOC_STEP) Array.copy(streamBuffers, 0, newStreamBuffers, 0, streamBuffers.length) streamBuffers = newStreamBuffers } pageCount = streamBuffers.length } /** Ensures buffers are bug enough to hold specified amount of data * * @param value Amount of data */ private def allocSpaceIfNeeded(value: Long): Unit = { @tailrec def allocSpaceIfNeededIter(value: Long): Unit = { val currentPageCount = getPageCount(allocatedBytes) val neededPageCount = getPageCount(value) if (currentPageCount < neededPageCount) { if (currentPageCount == pageCount) extendPages() streamBuffers(currentPageCount) = new Array[Byte](PAGE_SIZE) allocatedBytes = (currentPageCount + 1).toLong * PAGE_SIZE allocSpaceIfNeededIter(value) } } if (value < 0) throw new Error("AllocSpaceIfNeeded < 0") if (value > 0) { allocSpaceIfNeededIter(value) length = Math.max(value, length) if (position > length) position = length } } /** * Writes the specified byte to this output stream. The general * contract for write is that one byte is written * to the output stream. The byte to be written is the eight * low-order bits of the argument b. The 24 * high-order bits of b are ignored. * 

* Subclasses of OutputStream must provide an * implementation for this method. * * @param b the byte. */ @throws[IOException] override def write(b: Int): Unit = { val buffer: Array[Byte] = new Array[Byte](1) buffer(0) = b.toByte write(buffer) } /** * Writes len bytes from the specified byte array * starting at offset off to this output stream. * The general contract for write(b, off, len) is that * some of the bytes in the array b are written to the * output stream in order; element b[off] is the first * byte written and b[off+len-1] is the last byte written * by this operation. *

* The write method of OutputStream calls * the write method of one argument on each of the bytes to be * written out. Subclasses are encouraged to override this method and * provide a more efficient implementation. *

* If b is null, a * NullPointerException is thrown. *

* If off is negative, or len is negative, or * off+len is greater than the length of the array * b, then an IndexOutOfBoundsException is thrown. * * @param b the data. * @param off the start offset in the data. * @param len the number of bytes to write. */ @throws[IOException] override def write(b: Array[Byte], off: Int, len: Int): Unit = { @tailrec def writeIter(b: Array[Byte], off: Int, len: Int): Unit = { val currentPage: Int = (position / PAGE_SIZE).toInt val currentOffset: Int = (position % PAGE_SIZE).toInt if (len != 0) { val currentLength: Int = Math.min(PAGE_SIZE - currentOffset, len) Array.copy(b, off, streamBuffers(currentPage), currentOffset, currentLength) position += currentLength writeIter(b, off + currentLength, len - currentLength) } } allocSpaceIfNeeded(position + len) writeIter(b, off, len) } /** Gets an InputStream that points to HugeMemoryOutputStream buffer * * @return InputStream */ def asInputStream(): InputStream = { new HugeMemoryInputStream(streamBuffers, length) } private class HugeMemoryInputStream(streamBuffers: Array[Array[Byte]], val length: Long) extends InputStream { /** Current position in stream * */ private var position: Long = 0 /** * Reads the next byte of data from the input stream. The value byte is * returned as an int in the range 0 to * 255. If no byte is available because the end of the stream * has been reached, the value -1 is returned. This method * blocks until input data is available, the end of the stream is detected, * or an exception is thrown. * *

A subclass must provide an implementation of this method. * * @return the next byte of data, or -1 if the end of the * stream is reached. */ @throws[IOException] def read: Int = { val buffer: Array[Byte] = new Array[Byte](1) if (read(buffer) == 0) throw new Error("End of stream") else buffer(0) } /** * Reads up to len bytes of data from the input stream into * an array of bytes. An attempt is made to read as many as * len bytes, but a smaller number may be read. * The number of bytes actually read is returned as an integer. * *

This method blocks until input data is available, end of file is * detected, or an exception is thrown. * *

If len is zero, then no bytes are read and * 0 is returned; otherwise, there is an attempt to read at * least one byte. If no byte is available because the stream is at end of * file, the value -1 is returned; otherwise, at least one * byte is read and stored into b. * *

The first byte read is stored into element b[off], the * next one into b[off+1], and so on. The number of bytes read * is, at most, equal to len. Let k be the number of * bytes actually read; these bytes will be stored in elements * b[off] through b[off+k-1], * leaving elements b[off+k] through * b[off+len-1] unaffected. * *

In every case, elements b[0] through * b[off] and elements b[off+len] through * b[b.length-1] are unaffected. * *

The read(b, off, len) method * for class InputStream simply calls the method * read() repeatedly. If the first such call results in an * IOException, that exception is returned from the call to * the read(b, off, len) method. If * any subsequent call to read() results in a * IOException, the exception is caught and treated as if it * were end of file; the bytes read up to that point are stored into * b and the number of bytes read before the exception * occurred is returned. The default implementation of this method blocks * until the requested amount of input data len has been read, * end of file is detected, or an exception is thrown. Subclasses are encouraged * to provide a more efficient implementation of this method. * * @param b the buffer into which the data is read. * @param off the start offset in array b * at which the data is written. * @param len the maximum number of bytes to read. * @return the total number of bytes read into the buffer, or * -1 if there is no more data because the end of * the stream has been reached. * @see java.io.InputStream#read() */ @throws[IOException] override def read(b: Array[Byte], off: Int, len: Int): Int = { @tailrec def readIter(acc: Int, b: Array[Byte], off: Int, len: Int): Int = { val currentPage: Int = (position / PAGE_SIZE).toInt val currentOffset: Int = (position % PAGE_SIZE).toInt val count: Int = Math.min(len, length - position).toInt if (count == 0 || position >= length) acc else { val currentLength = Math.min(PAGE_SIZE - currentOffset, count) Array.copy(streamBuffers(currentPage), currentOffset, b, off, currentLength) position += currentLength readIter(acc + currentLength, b, off + currentLength, len - currentLength) } } readIter(0, b, off, len) } /** * Skips over and discards n bytes of data from this input * stream. The skip method may, for a variety of reasons, end * up skipping over some smaller number of bytes, possibly 0. * This may result from any of a number of conditions; reaching end of file * before n bytes have been skipped is only one possibility. * The actual number of bytes skipped is returned. If n is * negative, the skip method for class InputStream always * returns 0, and no bytes are skipped. Subclasses may handle the negative * value differently. * * The skip method of this class creates a * byte array and then repeatedly reads into it until n bytes * have been read or the end of the stream has been reached. Subclasses are * encouraged to provide a more efficient implementation of this method. * For instance, the implementation may depend on the ability to seek. * * @param n the number of bytes to be skipped. * @return the actual number of bytes skipped. */ @throws[IOException] override def skip(n: Long): Long = { if (n < 0) 0 else { position = Math.min(position + n, length) length - position } } } }

Facile à utiliser, pas de duplication de tampon, pas de limite de mémoire de 2 Go

 val out: HugeMemoryOutputStream = new HugeMemoryOutputStream(initialCapacity /*may be 0*/) out.write(...) ... val in1: InputStream = out.asInputStream() in1.read(...) ... val in2: InputStream = out.asInputStream() in2.read(...) ... 

Si vous voulez créer un OutputStream depuis un InputStream, il existe un problème de base. Une méthode d’écriture dans un OutputStream se bloque jusqu’à ce qu’elle soit terminée. Le résultat est donc disponible lorsque la méthode d’écriture est terminée. Cela a deux conséquences:

  1. Si vous utilisez un seul thread, vous devez attendre que tout soit écrit (vous devez donc stocker les données du stream en mémoire ou sur disque).
  2. Si vous voulez accéder aux données avant qu’elles ne soient terminées, vous avez besoin d’un deuxième thread.

La variante 1 peut être implémentée à l’aide de tableaux d’octets ou archivée. La variante 1 peut être implémentée en utilisant des pipies (directement ou avec une abstraction supplémentaire – par exemple RingBuffer ou la librairie google de l’autre commentaire).

En effet, avec Java standard, il n’y a pas d’autre moyen de résoudre le problème. Chaque solution est une implémentation de l’une d’entre elles.

Il y a un concept appelé “continuation” (voir wikipedia pour plus de détails). Dans ce cas, cela signifie essentiellement:

  • il y a un stream de sortie spécial qui attend une certaine quantité de données
  • si le montant est atteint, le stream donne le contrôle à son homologue qui est un stream d’entrée spécial
  • le stream d’entrée rend la quantité de données disponible jusqu’à ce qu’il soit lu, après cela, il renvoie le contrôle au stream de sortie

Alors que certaines langues ont ce concept intégré, pour Java, vous avez besoin de “magie”. Par exemple “commons-javaflow” à partir des implémentations apache telles que java. L’inconvénient est que cela nécessite des modifications spéciales du bytecode au moment de la construction. Il serait donc judicieux de mettre toutes les choses dans une bibliothèque supplémentaire avec des scripts de génération personnalisés.

Ancien message mais pourrait aider les autres, utilisez cette méthode:

 OutputStream out = new ByteArrayOutputStream(); ... out.write(); ... ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(out.toSsortingng().getBytes())); 

Bien que vous ne puissiez pas convertir un OutputStream en InputStream, java fournit un moyen d’utiliser PipedOutputStream et PipedInputStream pour que des données écrites sur un PipedOutputStream soient disponibles via un PipedInputStream associé.
Il y a quelque temps, j’ai été confronté à une situation similaire lorsque des bibliothèques tierces nécessitaient qu’une instance InputStream leur soit transmise au lieu d’une instance OutputStream.
La façon dont j’ai résolu ce problème consiste à utiliser PipedInputStream et PipedOutputStream.
Par ailleurs, ils sont difficiles à utiliser et vous devez utiliser le multithreading pour obtenir ce que vous voulez. J’ai récemment publié une implémentation sur github que vous pouvez utiliser.
Voici le lien . Vous pouvez parcourir le wiki pour comprendre comment l’utiliser.