Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(datatypes) Add Serialization Support for Streams #3

Closed
cowtowncoder opened this issue Nov 3, 2016 · 12 comments
Closed

(datatypes) Add Serialization Support for Streams #3

cowtowncoder opened this issue Nov 3, 2016 · 12 comments
Milestone

Comments

@cowtowncoder
Copy link
Member

(moved from earlier issue filed by @jmax01)

Here is a first pass at serializing Streams.
It works for 2.6.x and above. A 2.8.1 version is also shown.

import java.io.IOException;
import java.lang.reflect.Type;
import java.util.Iterator;
import java.util.stream.Stream;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.AnnotationIntrospector;
import com.fasterxml.jackson.databind.BeanDescription;
import com.fasterxml.jackson.databind.BeanProperty;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.SerializationConfig;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.jsontype.TypeSerializer;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.ser.BasicSerializerFactory;
import com.fasterxml.jackson.databind.ser.BeanSerializerFactory;
import com.fasterxml.jackson.databind.ser.std.AsArraySerializerBase;
import com.fasterxml.jackson.databind.type.CollectionLikeType;
import com.fasterxml.jackson.databind.type.TypeBindings;
import com.fasterxml.jackson.databind.type.TypeFactory;
import com.fasterxml.jackson.databind.type.TypeModifier;

/**
 * The Class StreamModule.
 *
 * @author jmaxwell
 */
public class StreamModule extends SimpleModule {

    /** The Constant serialVersionUID. */
    private static final long serialVersionUID = -1324033833221219001L;

    @Override
    public void setupModule(final SetupContext context) {
        context.addTypeModifier(new StreamTypeModifier());
        context.addSerializers(new StreamSerializers());
    }

    /**
     * The Class StreamTypeModifier.
     */
    public static final class StreamTypeModifier extends TypeModifier {

        /**
         * Tested for both 2.6.x and 2.8.1
         */
        @Override
        public JavaType modifyType(final JavaType type, final Type jdkType, final TypeBindings context,
                final TypeFactory typeFactory) {

            if (type.isReferenceType() || type.isContainerType()) {
                return type;
            }

            final Class<?> raw = type.getRawClass();

            if (Stream.class.isAssignableFrom(raw)) {

                final JavaType[] params = typeFactory.findTypeParameters(type, Stream.class);

                if (params == null || params.length == 0) {

                    return typeFactory.constructReferenceType(raw, TypeFactory.unknownType());
                }

                return typeFactory.constructCollectionLikeType(raw, params[0]);
            }
            return type;
        }

        //
        // the 2.8.1 and above way
        // @Override
        // public JavaType modifyType(JavaType type, Type jdkType, TypeBindings context, TypeFactory typeFactory) {
        //
        // if (type.isReferenceType() || type.isContainerType()) {
        // return type;
        // }
        //
        // Class<?> raw = type.getRawClass();
        //
        // if (Stream.class.isAssignableFrom(raw)) {
        //
        // JavaType[] params = typeFactory.findTypeParameters(type, Stream.class);
        //
        // if (params == null || params.length == 0) {
        //
        // return ReferenceType.upgradeFrom(type, type.containedTypeOrUnknown(0));
        // }
        //
        // return typeFactory.constructCollectionLikeType(raw, params[0]);
        //
        // }
        // return type;
        // }
        //

    }

    /**
     * The Class StreamSerializers.
     */
    public static final class StreamSerializers extends com.fasterxml.jackson.databind.ser.Serializers.Base {

        @Override
        public JsonSerializer<?> findCollectionLikeSerializer(final SerializationConfig config,
                final CollectionLikeType type, final BeanDescription beanDesc,
                final TypeSerializer elementTypeSerializer, final JsonSerializer<Object> elementValueSerializer) {

            final Class<?> raw = type.getRawClass();

            if (Stream.class.isAssignableFrom(raw)) {

                final TypeFactory typeFactory = config.getTypeFactory();

                final JavaType[] params = typeFactory.findTypeParameters(type, Stream.class);

                final JavaType vt = (params == null || params.length != 1) ? TypeFactory.unknownType() : params[0];

                return new StreamSerializer(type.getContentType(), usesStaticTyping(config, beanDesc, null),
                        BeanSerializerFactory.instance.createTypeSerializer(config, vt));
            }

            return null;
        }

        /**
         * Uses static typing. Copied from {@link BasicSerializerFactory}
         *
         * @param config the config
         * @param beanDesc the bean desc
         * @param typeSer the type ser
         * @return true, if successful
         */
        private static final boolean usesStaticTyping(final SerializationConfig config, final BeanDescription beanDesc,
                final TypeSerializer typeSer) {
            /*
             * 16-Aug-2010, tatu: If there is a (value) type serializer, we can not force
             * static typing; that would make it impossible to handle expected subtypes
             */
            if (typeSer != null) {
                return false;
            }
            final AnnotationIntrospector intr = config.getAnnotationIntrospector();
            final JsonSerialize.Typing t = intr.findSerializationTyping(beanDesc.getClassInfo());
            if (t != null && t != JsonSerialize.Typing.DEFAULT_TYPING) {
                return (t == JsonSerialize.Typing.STATIC);
            }
            return config.isEnabled(MapperFeature.USE_STATIC_TYPING);
        }

        /**
         * The Class StreamSerializer.
         */
        public static final class StreamSerializer extends AsArraySerializerBase<Stream<?>> {

            /** The Constant serialVersionUID. */
            private static final long serialVersionUID = -455534622397905995L;

            /**
             * Instantiates a new stream serializer.
             *
             * @param elemType the elem type
             * @param staticTyping the static typing
             * @param vts the vts
             */
            public StreamSerializer(final JavaType elemType, final boolean staticTyping, final TypeSerializer vts) {
                super(Stream.class, elemType, staticTyping, vts, null);
            }

            /**
             * Instantiates a new stream serializer.
             *
             * @param src the src
             * @param property the property
             * @param vts the vts
             * @param valueSerializer the value serializer
             */
            public StreamSerializer(final StreamSerializer src, final BeanProperty property, final TypeSerializer vts,
                    final JsonSerializer<?> valueSerializer) {
                super(src, property, vts, valueSerializer, false);
            }

            @Override
            public void serialize(final Stream<?> value, final JsonGenerator gen, final SerializerProvider provider)
                    throws IOException {
                this.serializeContents(value, gen, provider);
            }

            /**
             * withResolved.
             *
             * @param property the property
             * @param vts the vts
             * @param elementSerializer the element serializer
             * @param unwrapSingle ignored always false since streams are one time use I don't believe we can get a
             *            single element
             * @return the as array serializer base
             */
            @Override
            public StreamSerializer withResolved(final BeanProperty property, final TypeSerializer vts,
                    final JsonSerializer<?> elementSerializer, final Boolean unwrapSingle) {
                return new StreamSerializer(this, property, vts, elementSerializer);
            }

            @Override
            protected void serializeContents(final Stream<?> value, final JsonGenerator gen,
                    final SerializerProvider provider) throws IOException {

                provider.findValueSerializer(Iterator.class, null)
                    .serialize(value.iterator(), gen, provider);

            }

            @Override
            public boolean hasSingleElement(final Stream<?> value) {
                // no really good way to determine (without consuming stream), so:
                return false;
            }

            @Override
            protected StreamSerializer _withValueTypeSerializer(final TypeSerializer vts) {

                return new StreamSerializer(this, this._property, vts, this._elementSerializer);
            }
        }
    }
}

Tests:

import static org.junit.Assert.*;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Stream;

import org.junit.Test;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.guava.GuavaModule;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.module.afterburner.AfterburnerModule;
import com.fasterxml.jackson.module.mrbean.MrBeanModule;
import com.fasterxml.jackson.module.paramnames.ParameterNamesModule;
import com.theice.cds.common.serialization.json.jackson2.StreamModule;

@SuppressWarnings("javadoc")
public class StreamModuleTest {

    public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().registerModule(new GuavaModule())
        .registerModule(new Jdk8Module())
        .registerModule(new JavaTimeModule())
        .registerModule(new ParameterNamesModule())
        .registerModule(new AfterburnerModule())
        .registerModule(new StreamModule())
        .registerModule(new MrBeanModule());

    static <T> void assertRoundTrip(final Collection<T> original, final ObjectMapper objectMapper) throws IOException {

        final Stream<T> asStream = original.stream();

        final String asJsonString = objectMapper.writeValueAsString(asStream);

        System.out.println("original: " + original + " -> " + asJsonString);

        final Collection<T> fromJsonString = OBJECT_MAPPER.readValue(asJsonString,
                new TypeReference<Collection<T>>() {});

        assertEquals(original, fromJsonString);
    }

    @SuppressWarnings("deprecation")
    @Test
    public void testEmptyStream() throws IOException {

        assertRoundTrip(new ArrayList<>(), OBJECT_MAPPER.copy()
            .enable(DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT));

        // shouldn't this fail?
        assertRoundTrip(new ArrayList<>(), OBJECT_MAPPER.copy()
            .disable(SerializationFeature.WRITE_EMPTY_JSON_ARRAYS)
            .disable(DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT));
    }

    @Test
    public void testSingleElementStream() throws IOException {

        final List<String> collection = new ArrayList<>();
        collection.add("element1");

        assertRoundTrip(collection, OBJECT_MAPPER.copy()
            .enable(SerializationFeature.WRITE_SINGLE_ELEM_ARRAYS_UNWRAPPED)
            .enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY));

        assertRoundTrip(collection, OBJECT_MAPPER.copy()
            .disable(SerializationFeature.WRITE_SINGLE_ELEM_ARRAYS_UNWRAPPED)
            .disable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY));

        // should fail but can't for stream
        assertRoundTrip(collection, OBJECT_MAPPER.copy()
            .enable(SerializationFeature.WRITE_SINGLE_ELEM_ARRAYS_UNWRAPPED)
            .disable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY));
    }

    @Test
    public void testMultipleElementStream() throws IOException {

        final List<String> collection = new ArrayList<>();
        collection.add("element1");
        collection.add("element2");

        assertRoundTrip(collection, OBJECT_MAPPER);

    }
}
@cowtowncoder
Copy link
Member Author

Unfortunately haven't had time to think about this; anyone got an opinion on this one?

@jmax01
Copy link

jmax01 commented May 3, 2017

@cowtowncoder Please update serializeContents to close stream as it can cause resource leaks.
I would hate for people to copy this and end up with mysterious leaks.

            @Override
            protected void serializeContents(final Stream<?> value, final JsonGenerator gen,
                    final SerializerProvider provider) throws IOException {
                
                // Stream needs to be closed to prevent resource leaks.
                try (Stream<?> stream = value) {
                    provider.findValueSerializer(Iterator.class, null)
                        .serialize(value.iterator(), gen, provider);
                 }
            }

@cowtowncoder cowtowncoder added this to the 2.9.0.pr4 milestone Jun 21, 2017
@cowtowncoder
Copy link
Member Author

Not 100% if this was in 2.9.0.pr4 or not (due to forgetting to merge 2.8); but will be in 2.9.0 final if not.

@cesartl
Copy link

cesartl commented Sep 7, 2017

Is there a plan for creating a StreamDeserializer that could read JSON produced by StreamSerializer?

@cowtowncoder
Copy link
Member Author

@cesartl I don't think there are any active plans for future work here.

@jmax01
Copy link

jmax01 commented Sep 8, 2017

This is a rather complicated to implement well.

Ideally ObjectMapper would implement readValuesAsStream similar to how readValues methods return MappingIterator.

One issue however makes this rather risky, MappingIterator implements Closeable which means one will be warned if it is not closed.

Stream does not implement Closeable and terminal operations do not call the close() method meaning it is very easy to leave the underlying resource open.

Barring a change to the standard library, such as the addition of a closeOnTerminalOperation() method on Stream I can't see a way forward.

@cowtowncoder
Copy link
Member Author

To me it seems that ObjectMapper should NOT support this, not ObjectReader, but rather that method in MappingIterator could do this.

Still, question of closing is important one: perhaps there could be "buffered" variant (which reads, closes, all in memory), and "unbuffered" that does eventually close it, if(f) iteration reaches end.
In latter case, could also perhaps implement sub-class if feasible, that is Closeable?

@jmax01
Copy link

jmax01 commented Sep 8, 2017

To me it seems that ObjectMapper should NOT support this, not ObjectReader, but rather that method in MappingIterator could do this.

'Ideally' was a poor choice of words, but in my defense I wrote this on my phone while walking to work 😉.

What I meant was that since we have methods on ObjectMapper and ObjectReader that return Iterators and since Stream is the 'modern' way to perform iteration it seemed least surprising to maintain symmetry with the Iterator methods by having these methods on ObjectMapper and/or ObjectReader.

Sure MappingIterator could have a toStream(), and you are correct, to prevent users from shooting themselves in the foot some sort of wrapper would be likely be required.

It's important to remember that many operations do not iterate to the end of the source, some examples:

  • anyMatch
  • findAny
  • findFirst
  • limit

Also if an exception is thrown in a lambda passed to a Stream method close is not called.

@cowtowncoder
Copy link
Member Author

Ok, and just to be sure, I wouldn't be opposed to something like, say, MappingStream as counterpart to MappingIterator. My point was just that I am trying to remove direct use of ObjectMapper as much as possible due to combinatorial explosion of things (and inability to reconfigure).

I mean, MappingIterator is pretty simple so anyone with itch and interest would be welcome to play with this idea.

And since we now have ng-3.0 branch, this could be started already.

Then again I have used so little Java 8 that there's plenty of learning to do.
Perhaps streams are not really meant to work with things bound to real resources, and are more "ephemeral" generators.

@whiskeysierra
Copy link

Perhaps streams are not really meant to work with things bound to real resources, and are more "ephemeral" generators.

Not necessarily. They implement AutoClosable and are meant to be used for actual resources.

@cowtowncoder
Copy link
Member Author

Quick update: master is now for 3.0, so anyone with an itch would be free to experiment with a PR.

@natami
Copy link

natami commented Nov 10, 2021

What ever happened with this feature?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants