avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dcrea...@apache.org
Subject svn commit: r1175832 [1/5] - in /avro/trunk: ./ lang/c/docs/ lang/c/examples/ lang/c/src/ lang/c/src/avro/ lang/c/tests/
Date Mon, 26 Sep 2011 12:36:27 GMT
Author: dcreager
Date: Mon Sep 26 12:36:26 2011
New Revision: 1175832

URL: http://svn.apache.org/viewvc?rev=1175832&view=rev
Log:
AVRO-863. C: Schema resolution using new value interface

This patch adds the avro_resolved_reader_t and avro_resolved_writer_t
classes, which handle schema resolution using the new avro_value_t
interface.  The resolved reader type wraps an existing value as the
"writer schema" side of schema resolution, and provides a view into the
wrapped value as if it were an instance of the reader schema.  The
resolved writer type does the inverse; it wraps an instance of the
reader schema, and lets you write into that value as if it were an
instance of the writer schema.

Added:
    avro/trunk/lang/c/src/avro/resolver.h
    avro/trunk/lang/c/src/avro_generic_internal.h
    avro/trunk/lang/c/src/consume-binary.c
    avro/trunk/lang/c/src/resolved-reader.c
    avro/trunk/lang/c/src/resolved-writer.c
    avro/trunk/lang/c/src/value-hash.c
Removed:
    avro/trunk/lang/c/src/generic-array.c
    avro/trunk/lang/c/src/generic-enum.c
    avro/trunk/lang/c/src/generic-fixed.c
    avro/trunk/lang/c/src/generic-map.c
    avro/trunk/lang/c/src/generic-primitives.c
    avro/trunk/lang/c/src/generic-record.c
    avro/trunk/lang/c/src/generic-union.c
Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/c/docs/index.txt
    avro/trunk/lang/c/examples/quickstop.c
    avro/trunk/lang/c/src/CMakeLists.txt
    avro/trunk/lang/c/src/Makefile.am
    avro/trunk/lang/c/src/array.c
    avro/trunk/lang/c/src/avro.h
    avro/trunk/lang/c/src/avro/data.h
    avro/trunk/lang/c/src/avro/generic.h
    avro/trunk/lang/c/src/avro/value.h
    avro/trunk/lang/c/src/avropipe.c
    avro/trunk/lang/c/src/datafile.c
    avro/trunk/lang/c/src/datum_read.c
    avro/trunk/lang/c/src/datum_value.c
    avro/trunk/lang/c/src/datum_write.c
    avro/trunk/lang/c/src/generic.c
    avro/trunk/lang/c/src/string.c
    avro/trunk/lang/c/src/value-read.c
    avro/trunk/lang/c/src/value.c
    avro/trunk/lang/c/tests/performance.c
    avro/trunk/lang/c/tests/test_avro_data.c
    avro/trunk/lang/c/tests/test_avro_values.c

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1175832&r1=1175831&r2=1175832&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Mon Sep 26 12:36:26 2011
@@ -20,6 +20,8 @@ Avro 1.6.0 (unreleased)
     AVRO-881. Java: Add a 'getmeta' tool that lists a file's metadata.
     (Tom White via cutting)
 
+    AVRO-863. C: Schema resolution using new value interface. (dcreager)
+
   OPTIMIZATIONS
 
     AVRO-853: Java: Cache Schema hash codes. (cutting)

Modified: avro/trunk/lang/c/docs/index.txt
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c/docs/index.txt?rev=1175832&r1=1175831&r2=1175832&view=diff
==============================================================================
--- avro/trunk/lang/c/docs/index.txt (original)
+++ avro/trunk/lang/c/docs/index.txt Mon Sep 26 12:36:26 2011
@@ -128,6 +128,7 @@ regardless of which Avro schema it's an 
 
 [source,c]
 ----
+#include <stdint.h>
 #include <avro.h>
 
 avro_type_t avro_value_get_type(const avro_value_t *value);
@@ -139,6 +140,8 @@ int avro_value_equal_fast(const avro_val
 int avro_value_copy(avro_value_t *dest, const avro_value_t *src);
 int avro_value_copy_fast(avro_value_t *dest, const avro_value_t *src);
 
+uint32_t avro_value_hash(avro_value_t *value);
+
 int avro_value_reset(avro_value_t *value);
 ----
 
@@ -161,6 +164,16 @@ content of a +bytes+, +string+, or +fixe
 +equal+, the two values must have the same schema; +copy+ checks this,
 while +copy_fast+ assumes it.
 
+The +hash+ method returns a hash value for the given Avro value.  This
+can be used to construct hash tables that use Avro values as keys.  The
+function works correctly even with maps; it produces a hash that doesn't
+depend on the ordering of the elements of the map.  Hash values are only
+meaningful for comparing values of exactly the same schema.  Hash values
+are _not_ guaranteed to be consistent across different platforms, or
+different versions of the Avro library.  That means that it's really
+only safe to use these hash values internally within the context of a
+single execution of a single application.
+
 The +reset+ method “clears out” an +avro_value_t instance, making sure
 that it's ready to accept the contents of a new value.  For scalars,
 this is usually a no-op, since the new value will just overwrite the old
@@ -473,21 +486,23 @@ current.)
 === Creating value instances
 
 Okay, so we've described how to interact with a value that you already
-have a pointer to, but how do you create one in the first place?  Since
-there can be many implementations of the value interface, this requires
-a couple of steps:
+have a pointer to, but how do you create one in the first place?  Each
+implementation of the value interface must provide its own functions for
+creating +avro_value_t+ instances for that class.  The 10,000-foot view
+is to:
 
 1. Get an _implementation struct_ for the value implementation that you
    want to use.  (This is represented by an +avro_value_iface_t+
    pointer.)
 
-2. Use +avro_value_new+ to allocate instances of that value
-   implementation on the heap.
+2. Use the implementation's constructor function to allocate instances
+   of that value implementation.
 
-3. Do whatever you need to the value (using the methods described in the
-   previous section).
+3. Do whatever you need to the value (using the +avro_value_t+ methods
+   described in the previous section).
 
-4. Free the value instance using +avro_value_free+.
+4. Free the value instance, if necessary, using the implementation's
+   destructor function.
 
 5. Free the implementation struct when you're done creating value
    instances.
@@ -500,9 +515,6 @@ These steps use the following functions:
 
 avro_value_iface_t *avro_value_iface_incref(avro_value_iface_t *iface);
 void avro_value_iface_decref(avro_value_iface_t *iface);
-
-int avro_value_new(const avro_value_iface_t *iface, avro_value_t *value);
-void avro_value_free(avro_value_t *val);
 ----
 
 Note that for most value implementations, it's fine to reuse a single
@@ -511,17 +523,27 @@ Note that for most value implementations
 value.  (This helps reduce the number of +malloc+ and +free+ calls that
 your application will make.)
 
+We provide a “generic” value implementation that will work (efficiently)
+for any Avro schema.
+
+
 For most applications, you won't need to write your own value
 implementation; the Avro C library provides an efficient “generic”
-implementation, which supports the full range of Avro schema types.  You
-can use the following function to create an implementation struct for a
-particular schema:
+implementation, which supports the full range of Avro schema types.
+There's a good chance that you just want to use this implementation,
+rather than rolling your own.  (The primary reason for rolling your own
+would be if you want to access the elements of a compound value using C
+syntax — for instance, translating an Avro record into a C struct.) You
+can use the following functions to create and work with a generic value
+implementation for a particular schema:
 
 [source,c]
 ----
 #include <avro.h>
 
 avro_value_iface_t *avro_generic_class_from_schema(avro_schema_t schema);
+int avro_generic_value_new(const avro_value_iface_t *iface, avro_value_t *dest);
+void avro_generic_value_free(avro_value_t *self);
 ----
 
 Combining all of this together, you might have the following snippet of
@@ -533,7 +555,7 @@ avro_schema_t  schema = avro_schema_long
 avro_value_iface_t  *iface = avro_generic_class_from_schema(schema);
 
 avro_value_t  val;
-avro_value_new(iface, &val);
+avro_generic_value_new(iface, &val);
 
 /* Generate Avro longs from 0-499 */
 int  i;
@@ -543,7 +565,7 @@ for (i = 0; i < 500; i++) {
     /* do something with the value */
 }
 
-avro_value_free(&val);
+avro_generic_value_free(&val);
 avro_value_iface_decref(iface);
 avro_schema_decref(schema);
 ----

Modified: avro/trunk/lang/c/examples/quickstop.c
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c/examples/quickstop.c?rev=1175832&r1=1175831&r2=1175832&view=diff
==============================================================================
--- avro/trunk/lang/c/examples/quickstop.c (original)
+++ avro/trunk/lang/c/examples/quickstop.c Mon Sep 26 12:36:26 2011
@@ -164,7 +164,10 @@ int main(void)
 	fprintf(stdout, "\nNow let's read all the records back out\n");
 
 	/* Read all the records and print them */
-	avro_file_reader(dbname, &dbreader);
+	if (avro_file_reader(dbname, &dbreader)) {
+		fprintf(stderr, "Error opening file: %s\n", avro_strerror());
+		exit(EXIT_FAILURE);
+	}
 	for (i = 0; i < id; i++) {
 		if (print_person(dbreader, NULL)) {
 			fprintf(stderr, "Error printing person\n");
@@ -188,10 +191,14 @@ int main(void)
 	/* Read only the record you're interested in */
 	fprintf(stdout,
 		"\n\nUse projection to print only the First name and phone numbers\n");
-	avro_file_reader(dbname, &dbreader);
+	if (avro_file_reader(dbname, &dbreader)) {
+		fprintf(stderr, "Error opening file: %s\n", avro_strerror());
+		exit(EXIT_FAILURE);
+	}
 	for (i = 0; i < id; i++) {
 		if (print_person(dbreader, projection_schema)) {
-			fprintf(stderr, "Error printing person\n");
+			fprintf(stderr, "Error printing person: %s\n",
+				avro_strerror());
 			exit(EXIT_FAILURE);
 		}
 	}

Modified: avro/trunk/lang/c/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c/src/CMakeLists.txt?rev=1175832&r1=1175831&r2=1175832&view=diff
==============================================================================
--- avro/trunk/lang/c/src/CMakeLists.txt (original)
+++ avro/trunk/lang/c/src/CMakeLists.txt Mon Sep 26 12:36:26 2011
@@ -30,10 +30,13 @@ set(AVRO_SRC
     avro/io.h
     avro/legacy.h
     avro/refcount.h
+    avro/resolver.h
     avro/schema.h
     avro/value.h
+    avro_generic_internal.h
     avro_private.h
     consumer.c
+    consume-binary.c
     datafile.c
     datum.c
     datum.h
@@ -51,16 +54,11 @@ set(AVRO_SRC
     encoding_binary.c
     errors.c
     generic.c
-    generic-array.c
-    generic-enum.c
-    generic-fixed.c
-    generic-map.c
-    generic-primitives.c
-    generic-record.c
-    generic-union.c
     io.c
     map.c
     memoize.c
+    resolved-reader.c
+    resolved-writer.c
     resolver.c
     schema.c
     schema.h
@@ -69,6 +67,7 @@ set(AVRO_SRC
     st.h
     string.c
     value.c
+    value-hash.c
     value-read.c
     value-sizeof.c
     value-write.c

Modified: avro/trunk/lang/c/src/Makefile.am
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c/src/Makefile.am?rev=1175832&r1=1175831&r2=1175832&view=diff
==============================================================================
--- avro/trunk/lang/c/src/Makefile.am (original)
+++ avro/trunk/lang/c/src/Makefile.am Mon Sep 26 12:36:26 2011
@@ -18,19 +18,20 @@ avro/generic.h \
 avro/io.h \
 avro/legacy.h \
 avro/refcount.h \
+avro/resolver.h \
 avro/schema.h \
 avro/value.h
 
 lib_LTLIBRARIES = libavro.la
 libavro_la_SOURCES = st.c st.h schema.c schema.h schema_equal.c \
-value.c generic.c value-read.c value-sizeof.c value-write.c \
-generic-array.c generic-enum.c generic-fixed.c generic-map.c \
-generic-primitives.c generic-record.c generic-union.c \
+value.c value-hash.c value-read.c value-sizeof.c value-write.c \
+avro_generic_internal.h generic.c \
 datum.c datum_equal.c datum_validate.c datum_read.c datum_skip.c datum_write.c datum_size.c datum.h \
 datum_json.c \
 datum_value.c \
 array.c map.c memoize.c string.c \
-consumer.c resolver.c \
+consumer.c consume-binary.c resolver.c \
+resolved-reader.c resolved-writer.c \
 io.c dump.c dump.h encoding_binary.c \
 allocation.c \
 wrapped-buffer.c \

Modified: avro/trunk/lang/c/src/array.c
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c/src/array.c?rev=1175832&r1=1175831&r2=1175832&view=diff
==============================================================================
--- avro/trunk/lang/c/src/array.c (original)
+++ avro/trunk/lang/c/src/array.c Mon Sep 26 12:36:26 2011
@@ -22,6 +22,7 @@
 #include "avro/allocation.h"
 #include "avro/data.h"
 #include "avro/errors.h"
+#include "avro_private.h"
 
 
 void avro_raw_array_init(avro_raw_array_t *array, size_t element_size)
@@ -69,6 +70,10 @@ avro_raw_array_ensure_size(avro_raw_arra
 		new_size = array->allocated_size * 2;
 	}
 
+	if (required_size > new_size) {
+		new_size = required_size;
+	}
+
 	array->data = avro_realloc(array->data, array->allocated_size, new_size);
 	if (array->data == NULL) {
 		avro_set_error("Cannot allocate space in array for %zu elements",
@@ -81,6 +86,23 @@ avro_raw_array_ensure_size(avro_raw_arra
 }
 
 
+int
+avro_raw_array_ensure_size0(avro_raw_array_t *array, size_t desired_count)
+{
+	int  rval;
+	size_t  old_allocated_size = array->allocated_size;
+	check(rval, avro_raw_array_ensure_size(array, desired_count));
+
+	if (array->allocated_size > old_allocated_size) {
+		size_t  extra_space = array->allocated_size - old_allocated_size;
+		void  *buf = array->data;
+		memset(buf + old_allocated_size, 0, extra_space);
+	}
+
+	return 0;
+}
+
+
 void *avro_raw_array_append(avro_raw_array_t *array)
 {
 	int  rval;

Modified: avro/trunk/lang/c/src/avro.h
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c/src/avro.h?rev=1175832&r1=1175831&r2=1175832&view=diff
==============================================================================
--- avro/trunk/lang/c/src/avro.h (original)
+++ avro/trunk/lang/c/src/avro.h Mon Sep 26 12:36:26 2011
@@ -31,7 +31,7 @@ extern "C" {
 #include <avro/generic.h>
 #include <avro/io.h>
 #include <avro/legacy.h>
-#include <avro/refcount.h>
+#include <avro/resolver.h>
 #include <avro/schema.h>
 #include <avro/value.h>
 

Modified: avro/trunk/lang/c/src/avro/data.h
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c/src/avro/data.h?rev=1175832&r1=1175831&r2=1175832&view=diff
==============================================================================
--- avro/trunk/lang/c/src/avro/data.h (original)
+++ avro/trunk/lang/c/src/avro/data.h Mon Sep 26 12:36:26 2011
@@ -78,6 +78,17 @@ int
 avro_raw_array_ensure_size(avro_raw_array_t *array, size_t desired_count);
 
 /**
+ * Ensures that there is enough allocated space to store the given
+ * number of elements in an avro_raw_array_t.  If the array grows as a
+ * result of this operation, we will fill in any newly allocated space
+ * with 0 bytes.  If we can't allocate that much space, we return
+ * ENOMEM.
+ */
+
+int
+avro_raw_array_ensure_size0(avro_raw_array_t *array, size_t desired_count);
+
+/**
  * Returns the number of elements in an avro_raw_array_t.
  */
 

Modified: avro/trunk/lang/c/src/avro/generic.h
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c/src/avro/generic.h?rev=1175832&r1=1175831&r2=1175832&view=diff
==============================================================================
--- avro/trunk/lang/c/src/avro/generic.h (original)
+++ avro/trunk/lang/c/src/avro/generic.h Mon Sep 26 12:36:26 2011
@@ -36,17 +36,27 @@ extern "C" {
  */
 
 
-/*
+/**
  * Return a generic avro_value_iface_t implementation for the given
  * schema, regardless of what type it is.
  */
 
-avro_value_iface_t *avro_generic_class_from_schema(avro_schema_t schema);
+avro_value_iface_t *
+avro_generic_class_from_schema(avro_schema_t schema);
+
+/**
+ * Allocate a new instance of the given generic value class.  @a iface
+ * must have been created by @ref avro_generic_class_from_schema.
+ */
+
+int
+avro_generic_value_new(avro_value_iface_t *iface, avro_value_t *dest);
 
 
 /*
  * These functions return an avro_value_iface_t implementation for each
- * scalar schema type.
+ * primitive schema type.  (For enum, fixed, and the compound types, you
+ * must use the @ref avro_generic_class_from_schema function.)
  */
 
 avro_value_iface_t *avro_generic_boolean_class(void);
@@ -58,11 +68,9 @@ avro_value_iface_t *avro_generic_long_cl
 avro_value_iface_t *avro_generic_null_class(void);
 avro_value_iface_t *avro_generic_string_class(void);
 
-avro_value_iface_t *avro_generic_enum_class(avro_schema_t schema);
-avro_value_iface_t *avro_generic_fixed_class(avro_schema_t schema);
 
 /*
- * These functions instantiate a new generic scalar value.
+ * These functions instantiate a new generic primitive value.
  */
 
 int avro_generic_boolean_new(avro_value_t *value, int val);
@@ -74,42 +82,6 @@ int avro_generic_long_new(avro_value_t *
 int avro_generic_null_new(avro_value_t *value);
 int avro_generic_string_new(avro_value_t *value, char *val);
 int avro_generic_string_new_length(avro_value_t *value, char *val, size_t size);
-int avro_generic_enum_new(avro_value_t *value, int val);
-
-
-/*
- * These functions return an avro_value_iface_t implementation for each
- * compound schema type.
- */
-
-/*
- * The generic classes for compound schemas work with any avro_value_t
- * implementation for their subschemas.  They'll use the given function
- * pointer to instantiate a value implementation for each child schema.
- */
-
-typedef avro_value_iface_t *
-(*avro_value_iface_creator_t)(avro_schema_t schema, void *user_data);
-
-avro_value_iface_t *
-avro_generic_array_class(avro_schema_t schema,
-			 avro_value_iface_creator_t creator,
-			 void *user_data);
-
-avro_value_iface_t *
-avro_generic_map_class(avro_schema_t schema,
-		       avro_value_iface_creator_t creator,
-		       void *user_data);
-
-avro_value_iface_t *
-avro_generic_record_class(avro_schema_t schema,
-			  avro_value_iface_creator_t creator,
-			  void *user_data);
-
-avro_value_iface_t *
-avro_generic_union_class(avro_schema_t schema,
-			 avro_value_iface_creator_t creator,
-			 void *user_data);
 
 
 CLOSE_EXTERN

Added: avro/trunk/lang/c/src/avro/resolver.h
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c/src/avro/resolver.h?rev=1175832&view=auto
==============================================================================
--- avro/trunk/lang/c/src/avro/resolver.h (added)
+++ avro/trunk/lang/c/src/avro/resolver.h Mon Sep 26 12:36:26 2011
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+#ifndef AVRO_RESOLVER_H
+#define AVRO_RESOLVER_H
+#ifdef __cplusplus
+extern "C" {
+#define CLOSE_EXTERN }
+#else
+#define CLOSE_EXTERN
+#endif
+
+#include <avro/schema.h>
+#include <avro/value.h>
+
+/*
+ * A <i>resolved value</i> is a special kind of value that knows how to
+ * implement Avro's schema resolution rules to translate between a
+ * writer schema and a reader schema.  A resolved value doesn't store or
+ * process data itself; instead, it wraps an existing value instance.
+ *
+ * There are two resolved value classes.  In the first (@ref
+ * avro_resolved_writer_t), the resolved value is an instance of the
+ * writer schema, and wraps an instance of the reader schema.  This is
+ * used, for instance, when reading from an Avro data file; you want the
+ * end result to be a reader schema value, and the resolved value allows
+ * the file reader to ignore the schema resolution and simply fill in
+ * the values of the writer schema.  You can only set the values of a
+ * resolved writer; you must use the original wrapped value to read.
+ *
+ * With other class (@ref avro_resolved_reader_t), the resolved value is
+ * an instance of the reader schema, and wraps an instance of the writer
+ * schema.  This is used when resolving an existing Avro value to
+ * another schema; you've already got the value in the original (writer)
+ * schema, and want to transparently treat it as if it were an instance
+ * of the new (reader) schema.  You can only read the values of a
+ * resolved reader; you must use the original wrapped value to write.
+ *
+ * For both classes, the “self” pointer of the resolved value is an
+ * avro_value_t pointer, which points at the wrapped value.
+ */
+
+
+/**
+ * Create a new resolved writer implementation for the given writer and
+ * reader schemas.
+ */
+
+avro_value_iface_t *
+avro_resolved_writer_new(avro_schema_t writer_schema,
+			 avro_schema_t reader_schema);
+
+/**
+ * Creates a new resolved writer value.
+ */
+
+int
+avro_resolved_writer_new_value(avro_value_iface_t *iface,
+			       avro_value_t *value);
+
+/**
+ * Sets the wrapped value for a resolved writer.  This must be an
+ * instance of the reader schema.  We create our own reference to the
+ * destination value.
+ */
+
+void
+avro_resolved_writer_set_dest(avro_value_t *resolved,
+			      avro_value_t *dest);
+
+
+/**
+ * Clears the wrapped value for a resolved writer.
+ */
+
+void
+avro_resolved_writer_clear_dest(avro_value_t *resolved);
+
+
+/**
+ * Create a new resolved reader implementation for the given writer and
+ * reader schemas.
+ */
+
+avro_value_iface_t *
+avro_resolved_reader_new(avro_schema_t writer_schema,
+			 avro_schema_t reader_schema);
+
+/**
+ * Creates a new resolved reader value.
+ */
+
+int
+avro_resolved_reader_new_value(avro_value_iface_t *iface,
+			       avro_value_t *value);
+
+/**
+ * Sets the wrapped value for a resolved reader.  This must be an
+ * instance of the reader schema.  We create our own reference to the
+ * source value.
+ */
+
+void
+avro_resolved_reader_set_source(avro_value_t *resolved,
+				avro_value_t *dest);
+
+
+/**
+ * Clears the wrapped value for a resolved reader.
+ */
+
+void
+avro_resolved_reader_clear_source(avro_value_t *resolved);
+
+CLOSE_EXTERN
+#endif

Modified: avro/trunk/lang/c/src/avro/value.h
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c/src/avro/value.h?rev=1175832&r1=1175831&r2=1175832&view=diff
==============================================================================
--- avro/trunk/lang/c/src/avro/value.h (original)
+++ avro/trunk/lang/c/src/avro/value.h Mon Sep 26 12:36:26 2011
@@ -41,7 +41,7 @@ extern "C" {
 typedef struct avro_value_iface  avro_value_iface_t;
 
 typedef struct avro_value {
-	const avro_value_iface_t  *iface;
+	avro_value_iface_t  *iface;
 	void  *self;
 } avro_value_t;
 
@@ -56,7 +56,7 @@ struct avro_value_iface {
 	 * reference counts.
 	 */
 	avro_value_iface_t *
-	(*incref)(avro_value_iface_t *iface);
+	(*incref_iface)(avro_value_iface_t *iface);
 
 	/**
 	 * Decrement the reference count of the interface struct.  If
@@ -65,32 +65,28 @@ struct avro_value_iface {
 	 * counts.
 	 */
 	void
-	(*decref)(avro_value_iface_t *iface);
-
-	/**
-	 * Return the size of an instance of this value type.  If this
-	 * returns 0, then this value type can't be used with any
-	 * function or type (like avro_value_new) that expects to
-	 * allocate space for the value itself.
-	 */
-	size_t
-	(*instance_size)(const avro_value_iface_t *iface);
+	(*decref_iface)(avro_value_iface_t *iface);
 
 	/*-------------------------------------------------------------
 	 * General "instance" methods
 	 */
 
 	/**
-	 * Initialize a new value instance.
+	 * Increments the reference count of a value.
 	 */
-	int
-	(*init)(const avro_value_iface_t *iface, void *self);
+
+	void
+	(*incref)(avro_value_t *value);
 
 	/**
-	 * Finalize a value instance.
+	 * Decrements the reference count of a value, and frees the
+	 * value if the reference count drops to 0.  After calling this
+	 * method, your value instance is undefined, and cannot be used
+	 * anymore.
 	 */
+
 	void
-	(*done)(const avro_value_iface_t *iface, void *self);
+	(*decref)(avro_value_t *value);
 
 	/**
 	 * Reset the instance to its "empty", default value.  You don't
@@ -263,20 +259,47 @@ struct avro_value_iface {
 			  avro_value_t *branch);
 };
 
+
 /**
- * Allocate a new instance of the given value class, storing the result
- * into val.
+ * Increments the reference count of a value instance.  Normally you
+ * don't need to call this directly; you'll have a reference whenever
+ * you create the value, and @ref avro_value_copy and @ref
+ * avro_value_move update the reference counts correctly for you.
  */
 
-int
-avro_value_new(const avro_value_iface_t *cls, avro_value_t *val);
+void
+avro_value_incref(avro_value_t *value);
 
 /**
- * Finalize and deallocate a value instance.
+ * Decremenets the reference count of a value instance, freeing it if
+ * its reference count drops to 0.
  */
 
 void
-avro_value_free(avro_value_t *val);
+avro_value_decref(avro_value_t *value);
+
+/**
+ * Copies a reference to a value.  This does not copy any of the data
+ * in the value; you get two avro_value_t references that point at the
+ * same underlying value instance.
+ */
+
+void
+avro_value_copy_ref(avro_value_t *dest, const avro_value_t *src);
+
+/**
+ * Moves a reference to a value.  This does not copy any of the data in
+ * the value.  The @ref src value is invalidated by this function; its
+ * equivalent to the following:
+ *
+ * <code>
+ * avro_value_copy_ref(dest, src);
+ * avro_value_decref(src);
+ * </code>
+ */
+
+void
+avro_value_move_ref(avro_value_t *dest, avro_value_t *src);
 
 /**
  * Compares two values for equality.  The two values don't need to have
@@ -324,6 +347,13 @@ avro_value_copy(avro_value_t *dest, cons
 int
 avro_value_copy_fast(avro_value_t *dest, const avro_value_t *src);
 
+/**
+ * Returns a hash value for a given Avro value.
+ */
+
+uint32_t
+avro_value_hash(avro_value_t *value);
+
 
 /**
  * A helper macro for calling a given method in a value instance, if
@@ -342,16 +372,9 @@ avro_value_copy_fast(avro_value_t *dest,
 
 
 #define avro_value_iface_incref(cls) \
-    ((cls)->incref == NULL? (cls): (cls)->incref((cls)))
+    ((cls)->incref_iface == NULL? (cls): (cls)->incref_iface((cls)))
 #define avro_value_iface_decref(cls) \
-    ((cls)->decref == NULL? (void) 0: (cls)->decref((cls)))
-
-#define avro_value_instance_size(cls) \
-    ((cls)->instance_size == NULL? 0: (cls)->instance_size((cls)))
-#define avro_value_init(cls, self) \
-    ((cls)->init == NULL? EINVAL: (cls)->init((cls), (self)))
-#define avro_value_done(value) \
-    avro_value_call0(value, done, (void) 0)
+    ((cls)->decref_iface == NULL? (void) 0: (cls)->decref_iface((cls)))
 
 #define avro_value_reset(value) \
     avro_value_call0(value, reset, EINVAL)

Added: avro/trunk/lang/c/src/avro_generic_internal.h
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c/src/avro_generic_internal.h?rev=1175832&view=auto
==============================================================================
--- avro/trunk/lang/c/src/avro_generic_internal.h (added)
+++ avro/trunk/lang/c/src/avro_generic_internal.h Mon Sep 26 12:36:26 2011
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+#ifndef AVRO_GENERIC_INTERNAL_H
+#define AVRO_GENERIC_INTERNAL_H
+#ifdef __cplusplus
+extern "C" {
+#define CLOSE_EXTERN }
+#else
+#define CLOSE_EXTERN
+#endif
+
+#include "avro/generic.h"
+#include "avro/schema.h"
+#include "avro/value.h"
+
+/*
+ * Each generic value implementation struct defines a couple of extra
+ * methods that we use to control the lifecycle of the value objects.
+ */
+
+typedef struct avro_generic_value_iface {
+	avro_value_iface_t  parent;
+
+	/**
+	 * Return the size of an instance of this value type.  If this
+	 * returns 0, then this value type can't be used with any
+	 * function or type (like avro_value_new) that expects to
+	 * allocate space for the value itself.
+	 */
+	size_t
+	(*instance_size)(const avro_value_iface_t *iface);
+
+	/**
+	 * Initialize a new value instance.
+	 */
+	int
+	(*init)(const avro_value_iface_t *iface, void *self);
+
+	/**
+	 * Finalize a value instance.
+	 */
+	void
+	(*done)(const avro_value_iface_t *iface, void *self);
+} avro_generic_value_iface_t;
+
+
+#define avro_value_instance_size(gcls) \
+    ((gcls)->instance_size == NULL? 0: (gcls)->instance_size(&(gcls)->parent))
+#define avro_value_init(gcls, self) \
+    ((gcls)->init == NULL? EINVAL: (gcls)->init(&(gcls)->parent, (self)))
+#define avro_value_done(gcls, self) \
+    ((gcls)->done == NULL? (void) 0: (gcls)->done(&(gcls)->parent, (self)))
+
+
+CLOSE_EXTERN
+#endif

Modified: avro/trunk/lang/c/src/avropipe.c
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c/src/avropipe.c?rev=1175832&r1=1175831&r2=1175832&view=diff
==============================================================================
--- avro/trunk/lang/c/src/avropipe.c (original)
+++ avro/trunk/lang/c/src/avropipe.c Mon Sep 26 12:36:26 2011
@@ -355,7 +355,6 @@ process_file(const char *filename)
 		avro_datum_as_value(&value, datum);
 		create_array_prefix(&prefix, "", record_number);
 		process_value(avro_raw_string_get(&prefix), &value);
-		avro_value_done(&value);
 	}
 
 	avro_raw_string_done(&prefix);

Added: avro/trunk/lang/c/src/consume-binary.c
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c/src/consume-binary.c?rev=1175832&view=auto
==============================================================================
--- avro/trunk/lang/c/src/consume-binary.c (added)
+++ avro/trunk/lang/c/src/consume-binary.c Mon Sep 26 12:36:26 2011
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+#include "avro_private.h"
+#include "avro/allocation.h"
+#include "avro/consumer.h"
+#include "avro/errors.h"
+#include "avro/resolver.h"
+#include "avro/value.h"
+#include <stdlib.h>
+#include <errno.h>
+#include <string.h>
+#include "encoding.h"
+#include "schema.h"
+#include "datum.h"
+
+
+static int
+read_enum(avro_reader_t reader, const avro_encoding_t * enc,
+	  avro_consumer_t *consumer, void *ud)
+{
+	int rval;
+	int64_t index;
+
+	check_prefix(rval, enc->read_long(reader, &index),
+		     "Cannot read enum value: ");
+	return avro_consumer_call(consumer, enum_value, index, ud);
+}
+
+static int
+read_array(avro_reader_t reader, const avro_encoding_t * enc,
+	   avro_consumer_t *consumer, void *ud)
+{
+	int rval;
+	int64_t i;          /* index within the current block */
+	int64_t index = 0;  /* index within the entire array */
+	int64_t block_count;
+	int64_t block_size;
+
+	check_prefix(rval, enc->read_long(reader, &block_count),
+		     "Cannot read array block count: ");
+	check(rval, avro_consumer_call(consumer, array_start_block,
+				       1, block_count, ud));
+
+	while (block_count != 0) {
+		if (block_count < 0) {
+			block_count = block_count * -1;
+			check_prefix(rval, enc->read_long(reader, &block_size),
+				     "Cannot read array block size: ");
+		}
+
+		for (i = 0; i < block_count; i++, index++) {
+			avro_consumer_t  *element_consumer = NULL;
+			void  *element_ud = NULL;
+
+			check(rval,
+			      avro_consumer_call(consumer, array_element,
+					         index, &element_consumer, &element_ud,
+						 ud));
+
+			check(rval, avro_consume_binary(reader, element_consumer, element_ud));
+		}
+
+		check_prefix(rval, enc->read_long(reader, &block_count),
+			     "Cannot read array block count: ");
+		check(rval, avro_consumer_call(consumer, array_start_block,
+					       0, block_count, ud));
+	}
+
+	return 0;
+}
+
+static int
+read_map(avro_reader_t reader, const avro_encoding_t * enc,
+	 avro_consumer_t *consumer, void *ud)
+{
+	int rval;
+	int64_t i;          /* index within the current block */
+	int64_t index = 0;  /* index within the entire array */
+	int64_t block_count;
+	int64_t block_size;
+
+	check_prefix(rval, enc->read_long(reader, &block_count),
+		     "Cannot read map block count: ");
+	check(rval, avro_consumer_call(consumer, map_start_block,
+				       1, block_count, ud));
+
+	while (block_count != 0) {
+		if (block_count < 0) {
+			block_count = block_count * -1;
+			check_prefix(rval, enc->read_long(reader, &block_size),
+				     "Cannot read map block size: ");
+		}
+
+		for (i = 0; i < block_count; i++, index++) {
+			char *key;
+			int64_t key_size;
+			avro_consumer_t  *element_consumer = NULL;
+			void  *element_ud = NULL;
+
+			check_prefix(rval, enc->read_string(reader, &key, &key_size),
+				     "Cannot read map key: ");
+
+			rval = avro_consumer_call(consumer, map_element,
+						  index, key,
+						  &element_consumer, &element_ud,
+						  ud);
+			if (rval) {
+				avro_free(key, key_size);
+				return rval;
+			}
+
+			rval = avro_consume_binary(reader, element_consumer, element_ud);
+			if (rval) {
+				avro_free(key, key_size);
+				return rval;
+			}
+
+			avro_free(key, key_size);
+		}
+
+		check_prefix(rval, enc->read_long(reader, &block_count),
+			     "Cannot read map block count: ");
+		check(rval, avro_consumer_call(consumer, map_start_block,
+					       0, block_count, ud));
+	}
+
+	return 0;
+}
+
+static int
+read_union(avro_reader_t reader, const avro_encoding_t * enc,
+	   avro_consumer_t *consumer, void *ud)
+{
+	int rval;
+	int64_t discriminant;
+	avro_consumer_t  *branch_consumer = NULL;
+	void  *branch_ud = NULL;
+
+	check_prefix(rval, enc->read_long(reader, &discriminant),
+		     "Cannot read union discriminant: ");
+	check(rval, avro_consumer_call(consumer, union_branch,
+				       discriminant,
+				       &branch_consumer, &branch_ud, ud));
+	return avro_consume_binary(reader, branch_consumer, branch_ud);
+}
+
+static int
+read_record(avro_reader_t reader, const avro_encoding_t * enc,
+	    avro_consumer_t *consumer, void *ud)
+{
+	int rval;
+	size_t  num_fields;
+	unsigned int  i;
+
+	AVRO_UNUSED(enc);
+
+	check(rval, avro_consumer_call(consumer, record_start, ud));
+
+	num_fields = avro_schema_record_size(consumer->schema);
+	for (i = 0; i < num_fields; i++) {
+		avro_consumer_t  *field_consumer = NULL;
+		void  *field_ud = NULL;
+
+		check(rval, avro_consumer_call(consumer, record_field,
+					       i, &field_consumer, &field_ud,
+					       ud));
+
+		if (field_consumer) {
+			check(rval, avro_consume_binary(reader, field_consumer, field_ud));
+		} else {
+			avro_schema_t  field_schema =
+			    avro_schema_record_field_get_by_index(consumer->schema, i);
+			check(rval, avro_skip_data(reader, field_schema));
+		}
+	}
+
+	return 0;
+}
+
+int
+avro_consume_binary(avro_reader_t reader, avro_consumer_t *consumer, void *ud)
+{
+	int rval;
+	const avro_encoding_t *enc = &avro_binary_encoding;
+
+	check_param(EINVAL, reader, "reader");
+	check_param(EINVAL, consumer, "consumer");
+
+	switch (avro_typeof(consumer->schema)) {
+	case AVRO_NULL:
+		check_prefix(rval, enc->read_null(reader),
+			     "Cannot read null value: ");
+		check(rval, avro_consumer_call(consumer, null_value, ud));
+		break;
+
+	case AVRO_BOOLEAN:
+		{
+			int8_t b;
+			check_prefix(rval, enc->read_boolean(reader, &b),
+				     "Cannot read boolean value: ");
+			check(rval, avro_consumer_call(consumer, boolean_value, b, ud));
+		}
+		break;
+
+	case AVRO_STRING:
+		{
+			int64_t len;
+			char *s;
+			check_prefix(rval, enc->read_string(reader, &s, &len),
+				     "Cannot read string value: ");
+			check(rval, avro_consumer_call(consumer, string_value, s, len, ud));
+		}
+		break;
+
+	case AVRO_INT32:
+		{
+			int32_t i;
+			check_prefix(rval, enc->read_int(reader, &i),
+				    "Cannot read int value: ");
+			check(rval, avro_consumer_call(consumer, int_value, i, ud));
+		}
+		break;
+
+	case AVRO_INT64:
+		{
+			int64_t l;
+			check_prefix(rval, enc->read_long(reader, &l),
+				     "Cannot read long value: ");
+			check(rval, avro_consumer_call(consumer, long_value, l, ud));
+		}
+		break;
+
+	case AVRO_FLOAT:
+		{
+			float f;
+			check_prefix(rval, enc->read_float(reader, &f),
+				     "Cannot read float value: ");
+			check(rval, avro_consumer_call(consumer, float_value, f, ud));
+		}
+		break;
+
+	case AVRO_DOUBLE:
+		{
+			double d;
+			check_prefix(rval, enc->read_double(reader, &d),
+				     "Cannot read double value: ");
+			check(rval, avro_consumer_call(consumer, double_value, d, ud));
+		}
+		break;
+
+	case AVRO_BYTES:
+		{
+			char *bytes;
+			int64_t len;
+			check_prefix(rval, enc->read_bytes(reader, &bytes, &len),
+				     "Cannot read bytes value: ");
+			check(rval, avro_consumer_call(consumer, bytes_value, bytes, len, ud));
+		}
+		break;
+
+	case AVRO_FIXED:
+		{
+			char *bytes;
+			int64_t size =
+			    avro_schema_to_fixed(consumer->schema)->size;
+
+			bytes = avro_malloc(size);
+			if (!bytes) {
+				avro_prefix_error("Cannot allocate new fixed value");
+				return ENOMEM;
+			}
+			rval = avro_read(reader, bytes, size);
+			if (rval) {
+				avro_prefix_error("Cannot read fixed value: ");
+				avro_free(bytes, size);
+				return rval;
+			}
+
+			rval = avro_consumer_call(consumer, fixed_value, bytes, size, ud);
+			if (rval) {
+				avro_free(bytes, size);
+				return rval;
+			}
+		}
+		break;
+
+	case AVRO_ENUM:
+		check(rval, read_enum(reader, enc, consumer, ud));
+		break;
+
+	case AVRO_ARRAY:
+		check(rval, read_array(reader, enc, consumer, ud));
+		break;
+
+	case AVRO_MAP:
+		check(rval, read_map(reader, enc, consumer, ud));
+		break;
+
+	case AVRO_UNION:
+		check(rval, read_union(reader, enc, consumer, ud));
+		break;
+
+	case AVRO_RECORD:
+		check(rval, read_record(reader, enc, consumer, ud));
+		break;
+
+	case AVRO_LINK:
+		avro_set_error("Consumer can't consume a link schema directly");
+		return EINVAL;
+	}
+
+	return 0;
+}

Modified: avro/trunk/lang/c/src/datafile.c
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c/src/datafile.c?rev=1175832&r1=1175831&r2=1175832&view=diff
==============================================================================
--- avro/trunk/lang/c/src/datafile.c (original)
+++ avro/trunk/lang/c/src/datafile.c Mon Sep 26 12:36:26 2011
@@ -17,7 +17,9 @@
 
 #include "avro_private.h"
 #include "avro/allocation.h"
+#include "avro/generic.h"
 #include "avro/errors.h"
+#include "avro/value.h"
 #include "encoding.h"
 #include <stdio.h>
 #include <stdlib.h>
@@ -165,11 +167,12 @@ static int file_read_header(avro_reader_
 	int rval;
 	avro_schema_t meta_schema;
 	avro_schema_t meta_values_schema;
-	avro_datum_t meta;
+	avro_value_iface_t *meta_iface;
+	avro_value_t meta;
 	char magic[4];
-	avro_datum_t schema_bytes;
-	char *p;
-	int64_t len;
+	avro_value_t schema_bytes;
+	const void *p;
+	size_t len;
 	avro_schema_error_t schema_error;
 
 	check(rval, avro_read(reader, magic, sizeof(magic)));
@@ -181,29 +184,35 @@ static int file_read_header(avro_reader_
 
 	meta_values_schema = avro_schema_bytes();
 	meta_schema = avro_schema_map(meta_values_schema);
-	rval = avro_read_data(reader, meta_schema, NULL, &meta);
+	meta_iface = avro_generic_class_from_schema(meta_schema);
+	if (meta_iface == NULL) {
+		return EILSEQ;
+	}
+	check(rval, avro_generic_value_new(meta_iface, &meta));
+	rval = avro_value_read(reader, &meta);
 	if (rval) {
 		avro_prefix_error("Cannot read file header: ");
 		return EILSEQ;
 	}
 	avro_schema_decref(meta_schema);
 
-	rval = avro_map_get(meta, "avro.schema", &schema_bytes);
+	rval = avro_value_get_by_name(&meta, "avro.schema", &schema_bytes, NULL);
 	if (rval) {
 		avro_set_error("File header doesn't contain a schema");
-		avro_datum_decref(meta);
+		avro_value_decref(&meta);
 		return rval;
 	}
 
-	avro_bytes_get(schema_bytes, &p, &len);
+	avro_value_get_bytes(&schema_bytes, &p, &len);
 	rval = avro_schema_from_json(p, len, writers_schema, &schema_error);
 	if (rval) {
 		avro_prefix_error("Cannot parse file header: ");
-		avro_datum_decref(meta);
+		avro_value_decref(&meta);
 		return rval;
 	}
 
-	avro_datum_decref(meta);
+	avro_value_decref(&meta);
+	avro_value_iface_decref(meta_iface);
 	return avro_read(reader, sync, synclen);
 }
 

Modified: avro/trunk/lang/c/src/datum_read.c
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c/src/datum_read.c?rev=1175832&r1=1175831&r2=1175832&view=diff
==============================================================================
--- avro/trunk/lang/c/src/datum_read.c (original)
+++ avro/trunk/lang/c/src/datum_read.c Mon Sep 26 12:36:26 2011
@@ -15,16 +15,16 @@
  * permissions and limitations under the License. 
  */
 
-#include "avro_private.h"
-#include "avro/allocation.h"
-#include "avro/consumer.h"
-#include "avro/errors.h"
 #include <stdlib.h>
 #include <errno.h>
-#include <string.h>
-#include "encoding.h"
-#include "schema.h"
-#include "datum.h"
+
+#include "avro/errors.h"
+#include "avro/io.h"
+#include "avro/legacy.h"
+#include "avro/resolver.h"
+#include "avro/schema.h"
+#include "avro/value.h"
+#include "avro_private.h"
 
 int
 avro_schema_match(avro_schema_t wschema, avro_schema_t rschema)
@@ -32,314 +32,16 @@ avro_schema_match(avro_schema_t wschema,
 	check_param(0, is_avro_schema(wschema), "writer schema");
 	check_param(0, is_avro_schema(rschema), "reader schema");
 
-	avro_consumer_t  *resolver = avro_resolver_new(wschema, rschema);
-	if (resolver) {
-		avro_consumer_free(resolver);
+	avro_value_iface_t  *resolver =
+	    avro_resolved_writer_new(wschema, rschema);
+	if (resolver != NULL) {
+		avro_value_iface_decref(resolver);
 		return 1;
 	}
 
 	return 0;
 }
 
-static int
-read_enum(avro_reader_t reader, const avro_encoding_t * enc,
-	  avro_consumer_t *consumer, void *ud)
-{
-	int rval;
-	int64_t index;
-
-	check_prefix(rval, enc->read_long(reader, &index),
-		     "Cannot read enum value: ");
-	return avro_consumer_call(consumer, enum_value, index, ud);
-}
-
-static int
-read_array(avro_reader_t reader, const avro_encoding_t * enc,
-	   avro_consumer_t *consumer, void *ud)
-{
-	int rval;
-	int64_t i;          /* index within the current block */
-	int64_t index = 0;  /* index within the entire array */
-	int64_t block_count;
-	int64_t block_size;
-
-	check_prefix(rval, enc->read_long(reader, &block_count),
-		     "Cannot read array block count: ");
-	check(rval, avro_consumer_call(consumer, array_start_block,
-				       1, block_count, ud));
-
-	while (block_count != 0) {
-		if (block_count < 0) {
-			block_count = block_count * -1;
-			check_prefix(rval, enc->read_long(reader, &block_size),
-				     "Cannot read array block size: ");
-		}
-
-		for (i = 0; i < block_count; i++, index++) {
-			avro_consumer_t  *element_consumer = NULL;
-			void  *element_ud = NULL;
-
-			check(rval,
-			      avro_consumer_call(consumer, array_element,
-					         index, &element_consumer, &element_ud,
-						 ud));
-
-			check(rval, avro_consume_binary(reader, element_consumer, element_ud));
-		}
-
-		check_prefix(rval, enc->read_long(reader, &block_count),
-			     "Cannot read array block count: ");
-		check(rval, avro_consumer_call(consumer, array_start_block,
-					       0, block_count, ud));
-	}
-
-	return 0;
-}
-
-static int
-read_map(avro_reader_t reader, const avro_encoding_t * enc,
-	 avro_consumer_t *consumer, void *ud)
-{
-	int rval;
-	int64_t i;          /* index within the current block */
-	int64_t index = 0;  /* index within the entire array */
-	int64_t block_count;
-	int64_t block_size;
-
-	check_prefix(rval, enc->read_long(reader, &block_count),
-		     "Cannot read map block count: ");
-	check(rval, avro_consumer_call(consumer, map_start_block,
-				       1, block_count, ud));
-
-	while (block_count != 0) {
-		if (block_count < 0) {
-			block_count = block_count * -1;
-			check_prefix(rval, enc->read_long(reader, &block_size),
-				     "Cannot read map block size: ");
-		}
-
-		for (i = 0; i < block_count; i++, index++) {
-			char *key;
-			int64_t key_size;
-			avro_consumer_t  *element_consumer = NULL;
-			void  *element_ud = NULL;
-
-			check_prefix(rval, enc->read_string(reader, &key, &key_size),
-				     "Cannot read map key: ");
-
-			rval = avro_consumer_call(consumer, map_element,
-						  index, key,
-						  &element_consumer, &element_ud,
-						  ud);
-			if (rval) {
-				avro_free(key, key_size);
-				return rval;
-			}
-
-			rval = avro_consume_binary(reader, element_consumer, element_ud);
-			if (rval) {
-				avro_free(key, key_size);
-				return rval;
-			}
-
-			avro_free(key, key_size);
-		}
-
-		check_prefix(rval, enc->read_long(reader, &block_count),
-			     "Cannot read map block count: ");
-		check(rval, avro_consumer_call(consumer, map_start_block,
-					       0, block_count, ud));
-	}
-
-	return 0;
-}
-
-static int
-read_union(avro_reader_t reader, const avro_encoding_t * enc,
-	   avro_consumer_t *consumer, void *ud)
-{
-	int rval;
-	int64_t discriminant;
-	avro_consumer_t  *branch_consumer = NULL;
-	void  *branch_ud = NULL;
-
-	check_prefix(rval, enc->read_long(reader, &discriminant),
-		     "Cannot read union discriminant: ");
-	check(rval, avro_consumer_call(consumer, union_branch,
-				       discriminant,
-				       &branch_consumer, &branch_ud, ud));
-	return avro_consume_binary(reader, branch_consumer, branch_ud);
-}
-
-static int
-read_record(avro_reader_t reader, const avro_encoding_t * enc,
-	    avro_consumer_t *consumer, void *ud)
-{
-	int rval;
-	size_t  num_fields;
-	unsigned int  i;
-
-	AVRO_UNUSED(enc);
-
-	check(rval, avro_consumer_call(consumer, record_start, ud));
-
-	num_fields = avro_schema_record_size(consumer->schema);
-	for (i = 0; i < num_fields; i++) {
-		avro_consumer_t  *field_consumer = NULL;
-		void  *field_ud = NULL;
-
-		check(rval, avro_consumer_call(consumer, record_field,
-					       i, &field_consumer, &field_ud,
-					       ud));
-
-		if (field_consumer) {
-			check(rval, avro_consume_binary(reader, field_consumer, field_ud));
-		} else {
-			avro_schema_t  field_schema =
-			    avro_schema_record_field_get_by_index(consumer->schema, i);
-			check(rval, avro_skip_data(reader, field_schema));
-		}
-	}
-
-	return 0;
-}
-
-int
-avro_consume_binary(avro_reader_t reader, avro_consumer_t *consumer, void *ud)
-{
-	int rval;
-	const avro_encoding_t *enc = &avro_binary_encoding;
-
-	check_param(EINVAL, reader, "reader");
-	check_param(EINVAL, consumer, "consumer");
-
-	switch (avro_typeof(consumer->schema)) {
-	case AVRO_NULL:
-		check_prefix(rval, enc->read_null(reader),
-			     "Cannot read null value: ");
-		check(rval, avro_consumer_call(consumer, null_value, ud));
-		break;
-
-	case AVRO_BOOLEAN:
-		{
-			int8_t b;
-			check_prefix(rval, enc->read_boolean(reader, &b),
-				     "Cannot read boolean value: ");
-			check(rval, avro_consumer_call(consumer, boolean_value, b, ud));
-		}
-		break;
-
-	case AVRO_STRING:
-		{
-			int64_t len;
-			char *s;
-			check_prefix(rval, enc->read_string(reader, &s, &len),
-				     "Cannot read string value: ");
-			check(rval, avro_consumer_call(consumer, string_value, s, len, ud));
-		}
-		break;
-
-	case AVRO_INT32:
-		{
-			int32_t i;
-			check_prefix(rval, enc->read_int(reader, &i),
-				    "Cannot read int value: ");
-			check(rval, avro_consumer_call(consumer, int_value, i, ud));
-		}
-		break;
-
-	case AVRO_INT64:
-		{
-			int64_t l;
-			check_prefix(rval, enc->read_long(reader, &l),
-				     "Cannot read long value: ");
-			check(rval, avro_consumer_call(consumer, long_value, l, ud));
-		}
-		break;
-
-	case AVRO_FLOAT:
-		{
-			float f;
-			check_prefix(rval, enc->read_float(reader, &f),
-				     "Cannot read float value: ");
-			check(rval, avro_consumer_call(consumer, float_value, f, ud));
-		}
-		break;
-
-	case AVRO_DOUBLE:
-		{
-			double d;
-			check_prefix(rval, enc->read_double(reader, &d),
-				     "Cannot read double value: ");
-			check(rval, avro_consumer_call(consumer, double_value, d, ud));
-		}
-		break;
-
-	case AVRO_BYTES:
-		{
-			char *bytes;
-			int64_t len;
-			check_prefix(rval, enc->read_bytes(reader, &bytes, &len),
-				     "Cannot read bytes value: ");
-			check(rval, avro_consumer_call(consumer, bytes_value, bytes, len, ud));
-		}
-		break;
-
-	case AVRO_FIXED:
-		{
-			char *bytes;
-			int64_t size =
-			    avro_schema_to_fixed(consumer->schema)->size;
-
-			bytes = avro_malloc(size);
-			if (!bytes) {
-				avro_prefix_error("Cannot allocate new fixed value");
-				return ENOMEM;
-			}
-			rval = avro_read(reader, bytes, size);
-			if (rval) {
-				avro_prefix_error("Cannot read fixed value: ");
-				avro_free(bytes, size);
-				return rval;
-			}
-
-			rval = avro_consumer_call(consumer, fixed_value, bytes, size, ud);
-			if (rval) {
-				avro_free(bytes, size);
-				return rval;
-			}
-		}
-		break;
-
-	case AVRO_ENUM:
-		check(rval, read_enum(reader, enc, consumer, ud));
-		break;
-
-	case AVRO_ARRAY:
-		check(rval, read_array(reader, enc, consumer, ud));
-		break;
-
-	case AVRO_MAP:
-		check(rval, read_map(reader, enc, consumer, ud));
-		break;
-
-	case AVRO_UNION:
-		check(rval, read_union(reader, enc, consumer, ud));
-		break;
-
-	case AVRO_RECORD:
-		check(rval, read_record(reader, enc, consumer, ud));
-		break;
-
-	case AVRO_LINK:
-		avro_set_error("Consumer can't consume a link schema directly");
-		return EINVAL;
-	}
-
-	return 0;
-}
-
-
 int
 avro_read_data(avro_reader_t reader, avro_schema_t writers_schema,
 	       avro_schema_t readers_schema, avro_datum_t * datum)
@@ -359,20 +61,39 @@ avro_read_data(avro_reader_t reader, avr
 		return EINVAL;
 	}
 
-	avro_consumer_t  *resolver = avro_resolver_new(writers_schema, readers_schema);
+	avro_value_t  value;
+	check(rval, avro_datum_as_value(&value, result));
+
+	avro_value_iface_t  *resolver =
+	    avro_resolved_writer_new(writers_schema, readers_schema);
 	if (!resolver) {
+		avro_value_decref(&value);
 		avro_datum_decref(result);
 		return EINVAL;
 	}
 
-	rval = avro_consume_binary(reader, resolver, result);
+	avro_value_t  resolved_value;
+	rval = avro_resolved_writer_new_value(resolver, &resolved_value);
+	if (rval) {
+		avro_value_iface_decref(resolver);
+		avro_value_decref(&value);
+		avro_datum_decref(result);
+		return rval;
+	}
+
+	avro_resolved_writer_set_dest(&resolved_value, &value);
+	rval = avro_value_read(reader, &resolved_value);
 	if (rval) {
-		avro_consumer_free(resolver);
+		avro_value_decref(&resolved_value);
+		avro_value_iface_decref(resolver);
+		avro_value_decref(&value);
 		avro_datum_decref(result);
 		return rval;
 	}
 
-	avro_consumer_free(resolver);
+	avro_value_decref(&resolved_value);
+	avro_value_iface_decref(resolver);
+	avro_value_decref(&value);
 	*datum = result;
 	return 0;
 }

Modified: avro/trunk/lang/c/src/datum_value.c
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c/src/datum_value.c?rev=1175832&r1=1175831&r2=1175832&view=diff
==============================================================================
--- avro/trunk/lang/c/src/datum_value.c (original)
+++ avro/trunk/lang/c/src/datum_value.c Mon Sep 26 12:36:26 2011
@@ -28,11 +28,41 @@
 #include "avro/value.h"
 #include "avro_private.h"
 
+static avro_value_iface_t  AVRO_DATUM_VALUE_CLASS;
+
+avro_value_iface_t *
+avro_datum_class(void)
+{
+	return &AVRO_DATUM_VALUE_CLASS;
+}
+
+int
+avro_datum_as_value(avro_value_t *value, avro_datum_t src)
+{
+	value->iface = &AVRO_DATUM_VALUE_CLASS;
+	value->self = avro_datum_incref(src);
+	return 0;
+}
+
+static int
+avro_datum_as_child_value(avro_value_t *value, avro_datum_t src)
+{
+	value->iface = &AVRO_DATUM_VALUE_CLASS;
+	value->self = src;
+	return 0;
+}
+
 static void
-avro_datum_value_done(const avro_value_iface_t *iface, void *vself)
+avro_datum_value_incref(avro_value_t *value)
 {
-	AVRO_UNUSED(iface);
-	avro_datum_t  self = vself;
+	avro_datum_t  self = value->self;
+	avro_datum_incref(self);
+}
+
+static void
+avro_datum_value_decref(avro_value_t *value)
+{
+	avro_datum_t  self = value->self;
 	avro_datum_decref(self);
 }
 
@@ -459,7 +489,7 @@ avro_datum_value_get_size(const avro_val
 		return 0;
 	}
 
-	avro_set_error("Can only get size of array or map");
+	avro_set_error("Can only get size of array, map, or record, %d", avro_typeof(self));
 	return EINVAL;
 }
 
@@ -477,7 +507,7 @@ avro_datum_value_get_by_index(const avro
 
 	if (is_avro_array(self)) {
 		check(rval, avro_array_get(self, index, &child_datum));
-		return avro_datum_as_value(child, child_datum);
+		return avro_datum_as_child_value(child, child_datum);
 	}
 
 	if (is_avro_map(self)) {
@@ -487,7 +517,7 @@ avro_datum_value_get_by_index(const avro
 			*name = real_key;
 		}
 		check(rval, avro_map_get(self, real_key, &child_datum));
-		return avro_datum_as_value(child, child_datum);
+		return avro_datum_as_child_value(child, child_datum);
 	}
 
 	if (is_avro_record(self)) {
@@ -501,7 +531,7 @@ avro_datum_value_get_by_index(const avro
 			*name = field_name;
 		}
 		check(rval, avro_record_get(self, field_name, &child_datum));
-		return avro_datum_as_value(child, child_datum);
+		return avro_datum_as_child_value(child, child_datum);
 	}
 
 	avro_set_error("Can only get by index from array, map, or record");
@@ -528,7 +558,7 @@ avro_datum_value_get_by_name(const avro_
 		}
 
 		check(rval, avro_map_get(self, name, &child_datum));
-		return avro_datum_as_value(child, child_datum);
+		return avro_datum_as_child_value(child, child_datum);
 	}
 
 	if (is_avro_record(self)) {
@@ -538,7 +568,7 @@ avro_datum_value_get_by_name(const avro_
 		}
 
 		check(rval, avro_record_get(self, name, &child_datum));
-		return avro_datum_as_value(child, child_datum);
+		return avro_datum_as_child_value(child, child_datum);
 	}
 
 	avro_set_error("Can only get by name from map or record");
@@ -576,7 +606,7 @@ avro_datum_value_get_current_branch(cons
 	}
 
 	avro_datum_t  child_datum = avro_union_current_branch(self);
-	return avro_datum_as_value(branch, child_datum);
+	return avro_datum_as_child_value(branch, child_datum);
 }
 
 
@@ -602,12 +632,16 @@ avro_datum_value_append(const avro_value
 		return ENOMEM;
 	}
 
-	check(rval, avro_array_append_datum(self, child_datum));
+	rval = avro_array_append_datum(self, child_datum);
 	avro_datum_decref(child_datum);
+	if (rval != 0) {
+		return rval;
+	}
+
 	if (new_index != NULL) {
 		*new_index = avro_array_size(self) - 1;
 	}
-	return avro_datum_as_value(child_out, child_datum);
+	return avro_datum_as_child_value(child_out, child_datum);
 }
 
 static int
@@ -637,7 +671,7 @@ avro_datum_value_add(const avro_value_if
 			avro_map_get_index(self, key, &real_index);
 			*index = real_index;
 		}
-		return avro_datum_as_value(child, child_datum);
+		return avro_datum_as_child_value(child, child_datum);
 	}
 
 	/* key is new */
@@ -647,17 +681,21 @@ avro_datum_value_add(const avro_value_if
 	if (child_datum == NULL) {
 		return ENOMEM;
 	}
+
+	rval = avro_map_set(self, key, child_datum);
 	avro_datum_decref(child_datum);
+	if (rval != 0) {
+		return rval;
+	}
 
-	check(rval, avro_map_set(self, key, child_datum));
 	if (is_new != NULL) {
-		*is_new = 0;
+		*is_new = 1;
 	}
 	if (index != NULL) {
 		*index = avro_map_size(self) - 1;
 	}
 
-	return avro_datum_as_value(child, child_datum);
+	return avro_datum_as_child_value(child, child_datum);
 }
 
 static int
@@ -677,7 +715,7 @@ avro_datum_value_set_branch(const avro_v
 	int  rval;
 	avro_datum_t  child_datum;
 	check(rval, avro_union_set_discriminant(self, discriminant, &child_datum));
-	return avro_datum_as_value(branch, child_datum);
+	return avro_datum_as_child_value(branch, child_datum);
 }
 
 
@@ -686,10 +724,9 @@ static avro_value_iface_t  AVRO_DATUM_VA
 	/* "class" methods */
 	NULL, /* incref */
 	NULL, /* decref */
-	NULL, /* instance_size */
 	/* general "instance" methods */
-	NULL, /* init */
-	avro_datum_value_done,
+	avro_datum_value_incref,
+	avro_datum_value_decref,
 	avro_datum_value_reset,
 	avro_datum_value_get_type,
 	avro_datum_value_get_schema,
@@ -733,18 +770,3 @@ static avro_value_iface_t  AVRO_DATUM_VA
 	avro_datum_value_add,
 	avro_datum_value_set_branch
 };
-
-avro_value_iface_t *
-avro_datum_class(void)
-{
-	return &AVRO_DATUM_VALUE_CLASS;
-}
-
-
-int
-avro_datum_as_value(avro_value_t *value, avro_datum_t src)
-{
-	value->iface = &AVRO_DATUM_VALUE_CLASS;
-	value->self = avro_datum_incref(src);
-	return 0;
-}

Modified: avro/trunk/lang/c/src/datum_write.c
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c/src/datum_write.c?rev=1175832&r1=1175831&r2=1175832&view=diff
==============================================================================
--- avro/trunk/lang/c/src/datum_write.c (original)
+++ avro/trunk/lang/c/src/datum_write.c Mon Sep 26 12:36:26 2011
@@ -19,265 +19,20 @@
 #include <errno.h>
 #include <string.h>
 
+#include "avro/basics.h"
 #include "avro/errors.h"
+#include "avro/io.h"
 #include "avro/legacy.h"
+#include "avro/resolver.h"
 #include "avro/schema.h"
 #include "avro/value.h"
 #include "avro_private.h"
-#include "datum.h"
-#include "encoding.h"
-#include "schema.h"
-
-/*
- * Oof, we have to leave this legacy implementation in place for when
- * you write to write with a different schema.
- */
-
-static int write_datum(avro_writer_t writer, const avro_encoding_t * enc,
-		       avro_schema_t writers_schema, avro_datum_t datum);
-
-static int
-write_record(avro_writer_t writer, const avro_encoding_t * enc,
-	     struct avro_record_schema_t *schema, avro_datum_t datum)
-{
-	int rval;
-	long i;
-	avro_datum_t field_datum;
-
-	if (schema) {
-		for (i = 0; i < schema->fields->num_entries; i++) {
-			union {
-				st_data_t data;
-				struct avro_record_field_t *field;
-			} val;
-			st_lookup(schema->fields, i, &val.data);
-			check(rval,
-			      avro_record_get(datum, val.field->name,
-					      &field_datum));
-			check(rval,
-			      write_datum(writer, enc, val.field->type,
-					  field_datum));
-		}
-	} else {
-		/* No schema.  Just write the record datum */
-		struct avro_record_datum_t *record =
-		    avro_datum_to_record(datum);
-		for (i = 0; i < record->field_order->num_entries; i++) {
-			union {
-				st_data_t data;
-				char *name;
-			} val;
-			st_lookup(record->field_order, i, &val.data);
-			check(rval,
-			      avro_record_get(datum, val.name, &field_datum));
-			check(rval,
-			      write_datum(writer, enc, NULL, field_datum));
-		}
-	}
-	return 0;
-}
-
-static int
-write_enum(avro_writer_t writer, const avro_encoding_t * enc,
-	   struct avro_enum_schema_t *enump, struct avro_enum_datum_t *datum)
-{
-	AVRO_UNUSED(enump);
-
-	return enc->write_long(writer, datum->value);
-}
-
-struct write_map_args {
-	int rval;
-	avro_writer_t writer;
-	const avro_encoding_t *enc;
-	avro_schema_t values_schema;
-};
-
-static int
-write_map_foreach(char *key, avro_datum_t datum, struct write_map_args *args)
-{
-	int rval = args->enc->write_string(args->writer, key);
-	if (rval) {
-		args->rval = rval;
-		return ST_STOP;
-	}
-	rval = write_datum(args->writer, args->enc, args->values_schema, datum);
-	if (rval) {
-		args->rval = rval;
-		return ST_STOP;
-	}
-	return ST_CONTINUE;
-}
-
-static int
-write_map(avro_writer_t writer, const avro_encoding_t * enc,
-	  struct avro_map_schema_t *writers_schema,
-	  struct avro_map_datum_t *datum)
-{
-	int rval;
-	struct write_map_args args =
-	    { 0, writer, enc, writers_schema ? writers_schema->values : NULL };
-
-	if (datum->map->num_entries) {
-		check_prefix(rval, enc->write_long(writer, datum->map->num_entries),
-			     "Cannot write map block count: ");
-		st_foreach(datum->map, write_map_foreach, (st_data_t) & args);
-	}
-	if (args.rval) {
-		return args.rval;
-	}
-
-	check_prefix(rval, enc->write_long(writer, 0),
-		     "Cannot write map block count: ");
-	return 0;
-}
-
-static int
-write_array(avro_writer_t writer, const avro_encoding_t * enc,
-	    struct avro_array_schema_t *schema,
-	    struct avro_array_datum_t *array)
-{
-	int rval;
-	long i;
-
-	if (array->els->num_entries) {
-		check_prefix(rval, enc->write_long(writer, array->els->num_entries),
-			     "Cannot write array block count: ");
-		for (i = 0; i < array->els->num_entries; i++) {
-			union {
-				st_data_t data;
-				avro_datum_t datum;
-			} val;
-			st_lookup(array->els, i, &val.data);
-			check(rval,
-			      write_datum(writer, enc,
-					  schema ? schema->items : NULL,
-					  val.datum));
-		}
-	}
-	check_prefix(rval, enc->write_long(writer, 0),
-		     "Cannot write array block count: ");
-	return 0;
-}
-
-static int
-write_union(avro_writer_t writer, const avro_encoding_t * enc,
-	    struct avro_union_schema_t *schema,
-	    struct avro_union_datum_t *unionp)
-{
-	int rval;
-	avro_schema_t write_schema = NULL;
-
-	check_prefix(rval, enc->write_long(writer, unionp->discriminant),
-		     "Cannot write union discriminant: ");
-	if (schema) {
-		write_schema =
-		    avro_schema_union_branch(&schema->obj, unionp->discriminant);
-		if (!write_schema) {
-			return EINVAL;
-		}
-	}
-	return write_datum(writer, enc, write_schema, unionp->value);
-}
-
-static int write_datum(avro_writer_t writer, const avro_encoding_t * enc,
-		       avro_schema_t writers_schema, avro_datum_t datum)
-{
-	if (is_avro_schema(writers_schema) && is_avro_link(writers_schema)) {
-		return write_datum(writer, enc,
-				   (avro_schema_to_link(writers_schema))->to,
-				   datum);
-	}
-
-	switch (avro_typeof(datum)) {
-	case AVRO_NULL:
-		return enc->write_null(writer);
-
-	case AVRO_BOOLEAN:
-		return enc->write_boolean(writer,
-					  avro_datum_to_boolean(datum)->i);
-
-	case AVRO_STRING:
-		return enc->write_string(writer,
-					 avro_datum_to_string(datum)->s);
-
-	case AVRO_BYTES:
-		return enc->write_bytes(writer,
-					avro_datum_to_bytes(datum)->bytes,
-					avro_datum_to_bytes(datum)->size);
-
-	case AVRO_INT32:
-	case AVRO_INT64:{
-			int64_t val = avro_typeof(datum) == AVRO_INT32 ?
-			    avro_datum_to_int32(datum)->i32 :
-			    avro_datum_to_int64(datum)->i64;
-			if (is_avro_schema(writers_schema)) {
-				/* handle promotion */
-				if (is_avro_float(writers_schema)) {
-					return enc->write_float(writer,
-								(float)val);
-				} else if (is_avro_double(writers_schema)) {
-					return enc->write_double(writer,
-								 (double)val);
-				}
-			}
-			return enc->write_long(writer, val);
-		}
-
-	case AVRO_FLOAT:{
-			float val = avro_datum_to_float(datum)->f;
-			if (is_avro_schema(writers_schema)
-			    && is_avro_double(writers_schema)) {
-				/* handle promotion */
-				return enc->write_double(writer, (double)val);
-			}
-			return enc->write_float(writer, val);
-		}
-
-	case AVRO_DOUBLE:
-		return enc->write_double(writer,
-					 avro_datum_to_double(datum)->d);
-
-	case AVRO_RECORD:
-		return write_record(writer, enc,
-				    avro_schema_to_record(writers_schema),
-				    datum);
-
-	case AVRO_ENUM:
-		return write_enum(writer, enc,
-				  avro_schema_to_enum(writers_schema),
-				  avro_datum_to_enum(datum));
-
-	case AVRO_FIXED:
-		return avro_write(writer,
-				  avro_datum_to_fixed(datum)->bytes,
-				  avro_datum_to_fixed(datum)->size);
-
-	case AVRO_MAP:
-		return write_map(writer, enc,
-				 avro_schema_to_map(writers_schema),
-				 avro_datum_to_map(datum));
-
-	case AVRO_ARRAY:
-		return write_array(writer, enc,
-				   avro_schema_to_array(writers_schema),
-				   avro_datum_to_array(datum));
-
-	case AVRO_UNION:
-		return write_union(writer, enc,
-				   avro_schema_to_union(writers_schema),
-				   avro_datum_to_union(datum));
-
-	case AVRO_LINK:
-		break;
-	}
-
-	return 0;
-}
 
 int avro_write_data(avro_writer_t writer, avro_schema_t writers_schema,
 		    avro_datum_t datum)
 {
+	int  rval;
+
 	check_param(EINVAL, writer, "writer");
 	check_param(EINVAL, is_avro_datum(datum), "datum");
 
@@ -287,17 +42,50 @@ int avro_write_data(avro_writer_t writer
 		avro_set_error("Datum doesn't validate against schema");
 		return EINVAL;
 	    }
-	    return write_datum(writer, &avro_binary_encoding,
-			       writers_schema, datum);
+
+	    /*
+	     * Some confusing terminology here.  The "writers_schema"
+	     * parameter is the schema we want to use to write the data
+	     * into the "writer" buffer.  Before doing that, we need to
+	     * resolve the datum from its actual schema into this
+	     * "writer" schema.  For the purposes of that resolution,
+	     * the writer schema is the datum's actual schema, and the
+	     * reader schema is our eventual (when writing to the
+	     * buffer) "writer" schema.
+	     */
+
+	    avro_schema_t  datum_schema = avro_datum_get_schema(datum);
+	    avro_value_iface_t  *resolver =
+		avro_resolved_reader_new(datum_schema, writers_schema);
+	    if (resolver == NULL) {
+		    return EINVAL;
+	    }
+
+	    avro_value_t  value;
+	    check(rval, avro_datum_as_value(&value, datum));
+
+	    avro_value_t  resolved;
+	    rval = avro_resolved_reader_new_value(resolver, &resolved);
+	    if (rval != 0) {
+		    avro_value_decref(&value);
+		    avro_value_iface_decref(resolver);
+		    return rval;
+	    }
+
+	    avro_resolved_reader_set_source(&resolved, &value);
+	    rval = avro_value_write(writer, &resolved);
+	    avro_value_decref(&resolved);
+	    avro_value_decref(&value);
+	    avro_value_iface_decref(resolver);
+	    return rval;
 	}
 
-	/* If we're writing using the datum's actual schema, use the new
-	 * value implementation. */
+	/* If we're writing using the datum's actual schema, we don't
+	 * need a resolver. */
 
-	int  rval;
 	avro_value_t  value;
 	check(rval, avro_datum_as_value(&value, datum));
 	check(rval, avro_value_write(writer, &value));
-	avro_value_done(&value);
+	avro_value_decref(&value);
 	return 0;
 }



Mime
View raw message