Implementare uno stream di dati in Java

Java Axiante insights - implementare stream dati in java

Di Marco Ferretti – Senior Java Architect @ Axiante


Introduzione

Java 8 ha introdotto la Stream API come un modo per poter facilmente processare una collezione di oggetti su una “pipeline”. Uno stream è una sequenza di oggetti che supporta vari metodi di manipolazioni che possono essere elaborati in sequenza (o in parallelo) su uno stesso oggetto.

Tutto molto bello ma, in una applicazione reale, come possiamo trarre beneficio da questa nuova API? È tutto zucchero sintattico o c’è veramente qualcosa di nuovo? 

In questa serie di post ci proponiamo di entrare nel mondo Java, in particolare degli stream, attraverso una applicazione che costruiremo per step e che ci permetterà di trasformare, alla fine, una serie di dati presi da un database SQL in un JSON che potrà poi essere consumato da un servizio web.

Prerequisiti

Supponiamo che abbiate già familiarità con il linguaggio java, in particolare con il collegamento a database (jdbc driver), e con JUnit. 

Il progetto verrà gestito con Apache Maven (https://maven.apache.org/) e, visto che non ci piace scrivere molto codice, utilizziamo Project Lombok (https://projectlombok.org/) per generare i pojo e per tutta una serie di utilities che mette a disposizione facilmente.

Parte 1

In questa prima parte andremo a implementare un oggetto InputStream che useremo per leggere i dati da un database e trasformarli al volo in un csv.

Se andiamo ad esaminare come funziona la classe InputStream, vediamo che ci basta fare l’override di un metodo astratto per poter implementare un input stream: 

public abstract int read() throws IOException;

Per implementare questo metodo possiamo operare in due modi: 

  • Una lettura di un byte alla volta dalla nostra sorgente di dati mano a mano che il metodo viene richiamato
  • Una lettura di un pezzo di sorgente di dati (buffer di dati) ed il consumo di quello che abbiamo letto finché non dobbiamo leggere un altro pezzo di sorgente.

Nel nostro caso andremo ad implementare il secondo modo di consumo della sorgente:

    public int read() throws IOException {
	if (buffer != null && bufferIndex.get() < buffer.length - 1) {
	    return this.buffer[bufferIndex.incrementAndGet()];
	} else {
	    // fill the buffer with a new chunk
	    bufferIndex.set(-1);
	    try {
		nextBuffer();
	    } catch (Exception e) {
		// wrap into ioexception
		throw new IOException("Error retrieving next data", e);
	    }
	    if (this.buffer != null) {
		return this.buffer[bufferIndex.incrementAndGet()];
	    } else {
		return -1;
	    }
	}
    }

L’unica logica a cui dobbiamo stare attenti è quella di controllo del buffer “consumato”. 

 if (buffer != null && bufferIndex.get() < buffer.length - 1) 

Nel momento in cui andiamo a “consumare” la sorgente dati creiamo la nostra riga di csv:

	StringBuilder data = new StringBuilder();
	
	if (headers) {
	    // read the headers ...
	    headers = false;
	    data.append("\"").append((IntStream.rangeClosed(1, meta.getColumnCount()).mapToObj(i -> {
		try {
		    return meta.getColumnLabel(i);
		} catch (SQLException e) {
		    log.error("error getting column name for index " + i, e);
		}
		return null;
	    }).filter(Objects::nonNull).collect(Collectors.joining("\",\""))));
	} else {
	    if (table.next()) {
		data.append("\"").append(IntStream.rangeClosed(1, meta.getColumnCount()).mapToObj(i -> {
		    try {
			return table.getObject(i);
		    } catch (SQLException e) {
			log.error("error reading data at index " + i, e);
		    }
		    return null;
		}).filter(Objects::nonNull).map(Object::toString).collect(Collectors.joining("\",\"")));
	    } 
	}
	if ( data.length()>0) {
	    data.append("\"\n");
	    this.buffer = new char[data.length()];
	    data.getChars(0, data.length(), this.buffer, 0);
	} else {
	    this.buffer = null;
	}
    }

In questa funzione accodiamo delle stringhe che sono o "testate" o valori. Questo è un caso semplice in cui non consideriamo dati binari o CLOB. Nel caso in cui ci fosse questa necessità basterà espandere

 
if (table.next()) {
	data.append("\"").append(IntStream.rangeClosed(1, meta.getColumnCount()).mapToObj(i -> {
		try {
			return table.getObject(i);
		} catch (SQLException e) {
			log.error("error reading data at index " + i, e);
		}
		return null;
	})
	.filter(Objects::nonNull).map(Object::toString).collect(Collectors.joining("\",\"")));
}

In questa funzione facciamo il nostro primo incontro con la Java Stream API:

 IntStream.rangeClosed(1, meta.getColumnCount()) 

Questa riga simula un ciclo for classico e ci permette di operare sugli oggetti ritornati dal Resultset e dal ResultsetMetadata come fossero in uno stream:

IntStream.rangeClosed(1,meta.getColumnCount()) //cicla sulle colonne da 1 a columnCount
    .mapToObj( // mappa
        i -> { // ogni indice di colonna
	    try {
	        return table.getObject(i); // nell’oggetto ritornato da table.getObject(columnIndex)
             } catch (SQLException e) {
	         log.error("error reading data at index " + i, e);
	     }
	     return null;
	})
    .filter(Objects::nonNull) // togli tutti gli oggetti nulli
    .map(Object::toString) //trasformali in stringa 
    .collect( // collezionali
        Collectors.joining("\",\"") // accodandoli in un stringa separata da “,”
    ));

La Stream API, in particolari condizioni, riesce ad accorpare alcune delle operazioni che vengono eseguite in sequenza e, quindi, a minimizzare il numero di operazioni o, in alcuni casi, a parallelizzarle.

Nel progetto che trovate allegato, c’è anche la parte di Unit Test. Usiamo JUnit per simulare un database (creiamo al volo un in memory database con Derby) e verifichiamo che il CSV finale contenga quello che ci aspettiamo. 

Nel prossimo episodio introdurremo delle API per convertire il database in una stringa JSON e, in quello successivo, consumeremo la sorgente dati trasformandola in uno stream JSON pronto per essere servito in una applicazione web (Servlet o WebService).

Clicca qui per fare il download del codice

Restate sintonizzati per altri approfondimenti del nostro Ax Lab,tramite la nostra sezione Insights oppure sulla nostra pagina Linkedin.