Skip to content
Snippets Groups Projects
openvswitch-2.15.0.patch 934 KiB
Newer Older
Louis Abel's avatar
Louis Abel committed
+            ukey_delete__(ukey);
+        }
         cmap_destroy(&udpif->ukeys[i].cmap);
         ovs_mutex_destroy(&udpif->ukeys[i].mutex);
     }
Louis Abel's avatar
Louis Abel committed
diff --git a/ofproto/ofproto-dpif-xlate.c b/ofproto/ofproto-dpif-xlate.c
Louis Abel's avatar
Louis Abel committed
index 7108c8a301..3942ddbdc7 100644
Louis Abel's avatar
Louis Abel committed
--- a/ofproto/ofproto-dpif-xlate.c
+++ b/ofproto/ofproto-dpif-xlate.c
Louis Abel's avatar
Louis Abel committed
@@ -6177,11 +6177,32 @@ static void
 compose_conntrack_action(struct xlate_ctx *ctx, struct ofpact_conntrack *ofc,
                          bool is_last_action)
 {
-    ovs_u128 old_ct_label_mask = ctx->wc->masks.ct_label;
-    uint32_t old_ct_mark_mask = ctx->wc->masks.ct_mark;
-    size_t ct_offset;
     uint16_t zone;
+    if (ofc->zone_src.field) {
+        union mf_subvalue value;
+        memset(&value, 0xff, sizeof(value));
 
+        zone = mf_get_subfield(&ofc->zone_src, &ctx->xin->flow);
+        if (ctx->xin->frozen_state) {
+            /* If the upcall is a resume of a recirculation, we only need to
+             * unwildcard the fields that are not in the frozen_metadata, as
+             * when the rules update, OVS will generate a new recirc_id,
+             * which will invalidate the megaflow with old the recirc_id.
+             */
+            if (!mf_is_frozen_metadata(ofc->zone_src.field)) {
+                mf_write_subfield_flow(&ofc->zone_src, &value,
+                                       &ctx->wc->masks);
+            }
+        } else {
+            mf_write_subfield_flow(&ofc->zone_src, &value, &ctx->wc->masks);
+        }
+    } else {
+        zone = ofc->zone_imm;
+    }
+
+    size_t ct_offset;
+    ovs_u128 old_ct_label_mask = ctx->wc->masks.ct_label;
+    uint32_t old_ct_mark_mask = ctx->wc->masks.ct_mark;
     /* Ensure that any prior actions are applied before composing the new
      * conntrack action. */
     xlate_commit_actions(ctx);
@@ -6193,11 +6214,6 @@ compose_conntrack_action(struct xlate_ctx *ctx, struct ofpact_conntrack *ofc,
     do_xlate_actions(ofc->actions, ofpact_ct_get_action_len(ofc), ctx,
                      is_last_action, false);
 
-    if (ofc->zone_src.field) {
-        zone = mf_get_subfield(&ofc->zone_src, &ctx->xin->flow);
-    } else {
-        zone = ofc->zone_imm;
-    }
 
     ct_offset = nl_msg_start_nested(ctx->odp_actions, OVS_ACTION_ATTR_CT);
     if (ofc->flags & NX_CT_F_COMMIT) {
@@ -7127,7 +7143,9 @@ do_xlate_actions(const struct ofpact *ofpacts, size_t ofpacts_len,
Louis Abel's avatar
Louis Abel committed
             break;
 
         case OFPACT_CT_CLEAR:
-            compose_ct_clear_action(ctx);
+            if (ctx->conntracked) {
+                compose_ct_clear_action(ctx);
+            }
             break;
 
         case OFPACT_NAT:
Louis Abel's avatar
Louis Abel committed
21069 21070 21071 21072 21073 21074 21075 21076 21077 21078 21079 21080 21081 21082 21083 21084 21085 21086 21087 21088 21089 21090 21091 21092 21093 21094 21095 21096 21097 21098 21099 21100 21101 21102 21103 21104 21105 21106 21107 21108 21109 21110 21111 21112 21113 21114 21115 21116 21117 21118 21119 21120 21121 21122 21123 21124 21125 21126 21127 21128 21129 21130 21131 21132 21133 21134 21135 21136 21137 21138 21139 21140 21141 21142 21143 21144 21145 21146 21147 21148 21149 21150 21151 21152 21153 21154 21155 21156 21157 21158 21159 21160 21161 21162 21163 21164 21165 21166 21167 21168 21169 21170 21171 21172 21173 21174 21175 21176 21177 21178 21179 21180 21181 21182 21183 21184 21185 21186 21187 21188 21189 21190 21191 21192 21193 21194 21195 21196 21197 21198 21199 21200 21201 21202 21203 21204 21205 21206 21207 21208 21209 21210 21211 21212 21213 21214 21215 21216 21217 21218 21219 21220 21221 21222 21223 21224 21225 21226 21227 21228 21229 21230 21231 21232 21233 21234 21235 21236 21237 21238 21239 21240 21241 21242 21243 21244 21245 21246 21247 21248 21249 21250 21251 21252 21253 21254 21255 21256 21257 21258 21259 21260 21261 21262 21263 21264 21265 21266 21267 21268 21269 21270 21271 21272 21273 21274 21275 21276 21277 21278 21279 21280 21281 21282 21283 21284 21285 21286 21287 21288 21289 21290 21291 21292 21293 21294 21295 21296 21297 21298 21299 21300 21301 21302 21303 21304 21305 21306 21307 21308 21309 21310 21311 21312 21313 21314 21315 21316 21317 21318 21319 21320 21321 21322 21323 21324 21325 21326 21327 21328 21329 21330 21331 21332 21333 21334 21335 21336 21337 21338 21339 21340 21341 21342 21343 21344 21345 21346 21347 21348 21349 21350 21351 21352 21353 21354 21355 21356 21357 21358 21359 21360 21361 21362 21363 21364 21365 21366 21367 21368 21369 21370 21371 21372 21373 21374 21375 21376 21377 21378 21379 21380 21381 21382 21383 21384 21385 21386 21387 21388 21389 21390 21391 21392 21393 21394 21395 21396 21397 21398 21399 21400 21401 21402 21403 21404 21405 21406 21407 21408 21409 21410 21411 21412 21413 21414 21415 21416 21417 21418 21419 21420 21421 21422 21423 21424 21425 21426 21427 21428 21429 21430 21431 21432 21433 21434 21435 21436 21437 21438 21439 21440 21441 21442 21443 21444
diff --git a/ofproto/ofproto-dpif.c b/ofproto/ofproto-dpif.c
index fd0b2fdea0..5ce56adfae 100644
--- a/ofproto/ofproto-dpif.c
+++ b/ofproto/ofproto-dpif.c
@@ -1389,6 +1389,24 @@ check_ct_timeout_policy(struct dpif_backer *backer)
     return !error;
 }
 
+/* Tests whether 'backer''s datapath supports the all-zero SNAT case. */
+static bool
+dpif_supports_ct_zero_snat(struct dpif_backer *backer)
+{
+    enum ct_features features;
+    bool supported = false;
+
+    if (!ct_dpif_get_features(backer->dpif, &features)) {
+        if (features & CONNTRACK_F_ZERO_SNAT) {
+            supported = true;
+        }
+    }
+    VLOG_INFO("%s: Datapath %s ct_zero_snat",
+              dpif_name(backer->dpif), (supported) ? "supports"
+                                                   : "does not support");
+    return supported;
+}
+
 /* Tests whether 'backer''s datapath supports the
  * OVS_ACTION_ATTR_CHECK_PKT_LEN action. */
 static bool
@@ -1588,8 +1606,9 @@ check_support(struct dpif_backer *backer)
     backer->rt_support.ct_timeout = check_ct_timeout_policy(backer);
     backer->rt_support.explicit_drop_action =
         dpif_supports_explicit_drop_action(backer->dpif);
-    backer->rt_support.lb_output_action=
+    backer->rt_support.lb_output_action =
         dpif_supports_lb_output_action(backer->dpif);
+    backer->rt_support.ct_zero_snat = dpif_supports_ct_zero_snat(backer);
 
     /* Flow fields. */
     backer->rt_support.odp.ct_state = check_ct_state(backer);
@@ -5413,6 +5432,8 @@ ct_add_timeout_policy_to_dpif(struct dpif *dpif,
     struct ct_dpif_timeout_policy cdtp;
     struct simap_node *node;
 
+    memset(&cdtp, 0, sizeof cdtp);
+
     cdtp.id = ct_tp->tp_id;
     SIMAP_FOR_EACH (node, &ct_tp->tp) {
         ct_dpif_set_timeout_policy_attr_by_name(&cdtp, node->name, node->data);
@@ -5603,6 +5624,7 @@ get_datapath_cap(const char *datapath_type, struct smap *cap)
     smap_add(cap, "explicit_drop_action",
              s.explicit_drop_action ? "true" :"false");
     smap_add(cap, "lb_output_action", s.lb_output_action ? "true" : "false");
+    smap_add(cap, "ct_zero_snat", s.ct_zero_snat ? "true" : "false");
 }
 
 /* Gets timeout policy name in 'backer' based on 'zone', 'dl_type' and
diff --git a/ofproto/ofproto-dpif.h b/ofproto/ofproto-dpif.h
index b41c3d82ad..191cfcb0df 100644
--- a/ofproto/ofproto-dpif.h
+++ b/ofproto/ofproto-dpif.h
@@ -204,7 +204,10 @@ struct group_dpif *group_dpif_lookup(struct ofproto_dpif *,
     DPIF_SUPPORT_FIELD(bool, explicit_drop_action, "Explicit Drop action")  \
                                                                             \
     /* True if the datapath supports balance_tcp optimization */            \
-    DPIF_SUPPORT_FIELD(bool, lb_output_action, "Optimized Balance TCP mode")
+    DPIF_SUPPORT_FIELD(bool, lb_output_action, "Optimized Balance TCP mode")\
+                                                                            \
+    /* True if the datapath supports all-zero IP SNAT. */                   \
+    DPIF_SUPPORT_FIELD(bool, ct_zero_snat, "Conntrack all-zero IP SNAT")
 
 
 /* Stores the various features which the corresponding backer supports. */
diff --git a/ofproto/ofproto.c b/ofproto/ofproto.c
index b91517cd25..80ec2d9ac9 100644
--- a/ofproto/ofproto.c
+++ b/ofproto/ofproto.c
@@ -968,7 +968,7 @@ ofproto_get_datapath_cap(const char *datapath_type, struct smap *dp_cap)
     datapath_type = ofproto_normalize_type(datapath_type);
     const struct ofproto_class *class = ofproto_class_find__(datapath_type);
 
-    if (class->get_datapath_cap) {
+    if (class && class->get_datapath_cap) {
         class->get_datapath_cap(datapath_type, dp_cap);
     }
 }
@@ -981,7 +981,7 @@ ofproto_ct_set_zone_timeout_policy(const char *datapath_type, uint16_t zone_id,
     datapath_type = ofproto_normalize_type(datapath_type);
     const struct ofproto_class *class = ofproto_class_find__(datapath_type);
 
-    if (class->ct_set_zone_timeout_policy) {
+    if (class && class->ct_set_zone_timeout_policy) {
         class->ct_set_zone_timeout_policy(datapath_type, zone_id,
                                           timeout_policy);
     }
@@ -993,7 +993,7 @@ ofproto_ct_del_zone_timeout_policy(const char *datapath_type, uint16_t zone_id)
     datapath_type = ofproto_normalize_type(datapath_type);
     const struct ofproto_class *class = ofproto_class_find__(datapath_type);
 
-    if (class->ct_del_zone_timeout_policy) {
+    if (class && class->ct_del_zone_timeout_policy) {
         class->ct_del_zone_timeout_policy(datapath_type, zone_id);
     }
 
diff --git a/ovsdb/ovsdb-client.c b/ovsdb/ovsdb-client.c
index 72756eb1f2..ba28e36d78 100644
--- a/ovsdb/ovsdb-client.c
+++ b/ovsdb/ovsdb-client.c
@@ -1664,14 +1664,15 @@ static void
 do_needs_conversion(struct jsonrpc *rpc, const char *database_ OVS_UNUSED,
                     int argc OVS_UNUSED, char *argv[])
 {
+    const char *schema_file_name = argv[argc - 1];
     struct ovsdb_schema *schema1;
-    check_ovsdb_error(ovsdb_schema_from_file(argv[0], &schema1));
+    check_ovsdb_error(ovsdb_schema_from_file(schema_file_name, &schema1));
 
     char *database = schema1->name;
     open_rpc(1, NEED_DATABASE, argc, argv, &rpc, &database);
 
     if (is_database_clustered(rpc, database)) {
-        ovsdb_schema_persist_ephemeral_columns(schema1, argv[0]);
+        ovsdb_schema_persist_ephemeral_columns(schema1, schema_file_name);
     }
 
     struct ovsdb_schema *schema2 = fetch_schema(rpc, schema1->name);
diff --git a/ovsdb/ovsdb-idlc.in b/ovsdb/ovsdb-idlc.in
index 5914e08789..61cded16d3 100755
--- a/ovsdb/ovsdb-idlc.in
+++ b/ovsdb/ovsdb-idlc.in
@@ -1,6 +1,5 @@
 #! @PYTHON3@
 
-from __future__ import print_function
 import getopt
 import os
 import re
diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c
index 29a2bace84..ce6aee3008 100644
--- a/ovsdb/ovsdb-server.c
+++ b/ovsdb/ovsdb-server.c
@@ -655,8 +655,6 @@ add_db(struct server_config *config, struct db *db)
 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
 open_db(struct server_config *config, const char *filename)
 {
-    struct db *db;
-
     /* If we know that the file is already open, return a good error message.
      * Otherwise, if the file is open, we'll fail later on with a harder to
      * interpret file locking error. */
@@ -671,9 +669,6 @@ open_db(struct server_config *config, const char *filename)
         return error;
     }
 
-    db = xzalloc(sizeof *db);
-    db->filename = xstrdup(filename);
-
     struct ovsdb_schema *schema;
     if (ovsdb_storage_is_clustered(storage)) {
         schema = NULL;
@@ -686,6 +681,9 @@ open_db(struct server_config *config, const char *filename)
         }
         ovs_assert(schema && !txn_json);
     }
+
+    struct db *db = xzalloc(sizeof *db);
+    db->filename = xstrdup(filename);
     db->db = ovsdb_create(schema, storage);
     ovsdb_jsonrpc_server_add_db(config->jsonrpc, db->db);
 
diff --git a/ovsdb/ovsdb.c b/ovsdb/ovsdb.c
index 9042658fa8..e019631e9a 100644
--- a/ovsdb/ovsdb.c
+++ b/ovsdb/ovsdb.c
@@ -31,6 +31,7 @@
 #include "simap.h"
 #include "storage.h"
 #include "table.h"
+#include "timeval.h"
 #include "transaction.h"
 #include "trigger.h"
 
@@ -525,6 +526,7 @@ ovsdb_snapshot(struct ovsdb *db, bool trim_memory OVS_UNUSED)
         return NULL;
     }
 
+    uint64_t elapsed, start_time = time_msec();
     struct json *schema = ovsdb_schema_to_json(db->schema);
     struct json *data = ovsdb_to_txn_json(db, "compacting database online");
     struct ovsdb_error *error = ovsdb_storage_store_snapshot(db->storage,
@@ -537,6 +539,12 @@ ovsdb_snapshot(struct ovsdb *db, bool trim_memory OVS_UNUSED)
         malloc_trim(0);
     }
 #endif
+
+    elapsed = time_msec() - start_time;
+    if (elapsed > 1000) {
+        VLOG_INFO("%s: Database compaction took %"PRIu64"ms",
+                  db->name, elapsed);
+    }
     return error;
 }
 
diff --git a/ovsdb/raft.c b/ovsdb/raft.c
index ea91d1fdba..8fa872494e 100644
--- a/ovsdb/raft.c
+++ b/ovsdb/raft.c
@@ -940,6 +940,34 @@ raft_reset_ping_timer(struct raft *raft)
     raft->ping_timeout = time_msec() + raft->election_timer / 3;
 }
 
+static void
+raft_conn_update_probe_interval(struct raft *raft, struct raft_conn *r_conn)
+{
+    /* Inactivity probe will be sent if connection will remain idle for the
+     * time of an election timeout.  Connection will be dropped if inactivity
+     * will last twice that time.
+     *
+     * It's not enough to just have heartbeats if connection is still
+     * established, but no packets received from the other side.  Without
+     * inactivity probe follower will just try to initiate election
+     * indefinitely staying in 'candidate' role.  And the leader will continue
+     * to send heartbeats to the dead connection thinking that remote server
+     * is still part of the cluster. */
+    int probe_interval = raft->election_timer + ELECTION_RANGE_MSEC;
+
+    jsonrpc_session_set_probe_interval(r_conn->js, probe_interval);
+}
+
+static void
+raft_update_probe_intervals(struct raft *raft)
+{
+    struct raft_conn *r_conn;
+
+    LIST_FOR_EACH (r_conn, list_node, &raft->conns) {
+        raft_conn_update_probe_interval(raft, r_conn);
+    }
+}
+
 static void
 raft_add_conn(struct raft *raft, struct jsonrpc_session *js,
               const struct uuid *sid, bool incoming)
@@ -954,7 +982,7 @@ raft_add_conn(struct raft *raft, struct jsonrpc_session *js,
                                               &conn->sid);
     conn->incoming = incoming;
     conn->js_seqno = jsonrpc_session_get_seqno(conn->js);
-    jsonrpc_session_set_probe_interval(js, 0);
+    raft_conn_update_probe_interval(raft, conn);
     jsonrpc_session_set_backlog_threshold(js, raft->conn_backlog_max_n_msgs,
                                               raft->conn_backlog_max_n_bytes);
 }
@@ -2804,6 +2832,7 @@ raft_update_commit_index(struct raft *raft, uint64_t new_commit_index)
                           raft->election_timer, e->election_timer);
                 raft->election_timer = e->election_timer;
                 raft->election_timer_new = 0;
+                raft_update_probe_intervals(raft);
             }
             if (e->servers) {
                 /* raft_run_reconfigure() can write a new Raft entry, which can
@@ -2820,6 +2849,7 @@ raft_update_commit_index(struct raft *raft, uint64_t new_commit_index)
                 VLOG_INFO("Election timer changed from %"PRIu64" to %"PRIu64,
                           raft->election_timer, e->election_timer);
                 raft->election_timer = e->election_timer;
+                raft_update_probe_intervals(raft);
             }
         }
         /* Check if any pending command can be completed, and complete it.
@@ -4122,9 +4152,24 @@ raft_may_snapshot(const struct raft *raft)
             && !raft->leaving
             && !raft->left
             && !raft->failed
+            && raft->role != RAFT_LEADER
             && raft->last_applied >= raft->log_start);
 }
 
+/* Prepares for soon snapshotting. */
+void
+raft_notify_snapshot_recommended(struct raft *raft)
+{
+    if (raft->role == RAFT_LEADER) {
+        /* Leader is about to write database snapshot to the disk and this
+         * might take significant amount of time.  Stepping back from the
+         * leadership to keep the cluster functional during this process.  */
+        VLOG_INFO("Transferring leadership to write a snapshot.");
+        raft_transfer_leadership(raft, "preparing to write snapshot");
+        raft_become_follower(raft);
+    }
+}
+
 /* Replaces the log for 'raft', up to the last log entry read, by
  * 'new_snapshot_data'.  Returns NULL if successful, otherwise an error that
  * the caller must eventually free.
@@ -4468,6 +4513,8 @@ raft_unixctl_status(struct unixctl_conn *conn,
                   : raft->leaving ? "leaving cluster"
                   : raft->left ? "left cluster"
                   : raft->failed ? "failed"
+                  : raft->candidate_retrying
+                      ? "disconnected from the cluster (election timeout)"
                   : "cluster member");
     if (raft->joining) {
         ds_put_format(&s, "Remotes for joining:");
diff --git a/ovsdb/raft.h b/ovsdb/raft.h
index 99d5307e54..59902fe825 100644
--- a/ovsdb/raft.h
+++ b/ovsdb/raft.h
@@ -174,6 +174,7 @@ void raft_command_wait(const struct raft_command *);
 bool raft_grew_lots(const struct raft *);
 uint64_t raft_get_log_length(const struct raft *);
 bool raft_may_snapshot(const struct raft *);
+void raft_notify_snapshot_recommended(struct raft *);
 struct ovsdb_error *raft_store_snapshot(struct raft *,
                                         const struct json *new_snapshot)
     OVS_WARN_UNUSED_RESULT;
diff --git a/ovsdb/storage.c b/ovsdb/storage.c
index f662e90566..40415fcf62 100644
--- a/ovsdb/storage.c
+++ b/ovsdb/storage.c
@@ -519,14 +519,11 @@ ovsdb_storage_should_snapshot(const struct ovsdb_storage *storage)
             return false;
         }
 
-        /* If we can't snapshot right now, don't. */
-        if (storage->raft && !raft_may_snapshot(storage->raft)) {
-            return false;
-        }
-
         uint64_t log_len = (storage->raft
                             ? raft_get_log_length(storage->raft)
                             : storage->n_read + storage->n_written);
+        bool snapshot_recommended = false;
+
         if (now < storage->next_snapshot_max) {
             /* Maximum snapshot time not yet reached.  Take a snapshot if there
              * have been at least 100 log entries and the log file size has
@@ -534,12 +531,25 @@ ovsdb_storage_should_snapshot(const struct ovsdb_storage *storage)
             bool grew_lots = (storage->raft
                               ? raft_grew_lots(storage->raft)
                               : ovsdb_log_grew_lots(storage->log));
-            return log_len >= 100 && grew_lots;
+            snapshot_recommended = (log_len >= 100 && grew_lots);
         } else {
             /* We have reached the maximum snapshot time.  Take a snapshot if
              * there have been any log entries at all. */
-            return log_len > 0;
+            snapshot_recommended = (log_len > 0);
         }
+
+        if (!snapshot_recommended) {
+            return false;
+        }
+
+        /* If we can't snapshot right now, don't. */
+        if (storage->raft && !raft_may_snapshot(storage->raft)) {
+            /* Notifying the storage that it needs to make a snapshot soon. */
+            raft_notify_snapshot_recommended(storage->raft);
+            return false;
+        }
+
+        return true;
     }
 
     return false;
diff --git a/python/ovs/compat/sortedcontainers/sortedlist.py b/python/ovs/compat/sortedcontainers/sortedlist.py
index 8aec6bbac1..ba55566926 100644
--- a/python/ovs/compat/sortedcontainers/sortedlist.py
+++ b/python/ovs/compat/sortedcontainers/sortedlist.py
@@ -3,8 +3,6 @@
 """
 # pylint: disable=redefined-builtin, ungrouped-imports
 
-from __future__ import print_function
-
 from bisect import bisect_left, bisect_right, insort
 from collections import Sequence, MutableSequence
 from functools import wraps
diff --git a/python/ovs/db/idl.py b/python/ovs/db/idl.py
Louis Abel's avatar
Louis Abel committed
index 5850ac7abf..3ca47f96bb 100644
Louis Abel's avatar
Louis Abel committed
--- a/python/ovs/db/idl.py
+++ b/python/ovs/db/idl.py
@@ -12,6 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import collections
 import functools
 import uuid
 
@@ -39,6 +40,10 @@ OVSDB_UPDATE2 = 1
 CLUSTERED = "clustered"
 
 
+Notice = collections.namedtuple('Notice', ('event', 'row', 'updates'))
+Notice.__new__.__defaults__ = (None,)  # default updates=None
+
+
 class Idl(object):
     """Open vSwitch Database Interface Definition Language (OVSDB IDL).
 
@@ -96,6 +101,7 @@ class Idl(object):
     IDL_S_SERVER_MONITOR_REQUESTED = 2
     IDL_S_DATA_MONITOR_REQUESTED = 3
     IDL_S_DATA_MONITOR_COND_REQUESTED = 4
+    IDL_S_MONITORING = 5
 
     def __init__(self, remote, schema_helper, probe_interval=None,
                  leader_only=True):
@@ -241,6 +247,7 @@ class Idl(object):
         i = 0
         while i < 50:
             i += 1
+            previous_change_seqno = self.change_seqno
             if not self._session.is_connected():
                 break
 
@@ -269,7 +276,7 @@ class Idl(object):
                 if msg.params[0] == str(self.server_monitor_uuid):
                     self.__parse_update(msg.params[1], OVSDB_UPDATE,
                                         tables=self.server_tables)
-                    self.change_seqno = initial_change_seqno
+                    self.change_seqno = previous_change_seqno
                     if not self.__check_server_db():
                         self.force_reconnect()
                         break
@@ -288,6 +295,7 @@ class Idl(object):
                     else:
                         assert self.state == self.IDL_S_DATA_MONITOR_REQUESTED
                         self.__parse_update(msg.result, OVSDB_UPDATE)
+                    self.state = self.IDL_S_MONITORING
 
                 except error.Error as e:
                     vlog.err("%s: parse error in received schema: %s"
@@ -312,7 +320,7 @@ class Idl(object):
                         self.__error()
                         break
                     else:
-                        self.change_seqno = initial_change_seqno
+                        self.change_seqno = previous_change_seqno
                         self.__send_monitor_request()
             elif (msg.type == ovs.jsonrpc.Message.T_REPLY
                   and self._server_monitor_request_id is not None
@@ -322,7 +330,7 @@ class Idl(object):
                     self._server_monitor_request_id = None
                     self.__parse_update(msg.result, OVSDB_UPDATE,
                                         tables=self.server_tables)
-                    self.change_seqno = initial_change_seqno
+                    self.change_seqno = previous_change_seqno
                     if self.__check_server_db():
                         self.__send_monitor_request()
                         self.__send_db_change_aware()
@@ -336,7 +344,7 @@ class Idl(object):
                         self.__error()
                         break
                     else:
-                        self.change_seqno = initial_change_seqno
+                        self.change_seqno = previous_change_seqno
                         self.__send_monitor_request()
             elif (msg.type == ovs.jsonrpc.Message.T_REPLY
                   and self._db_change_aware_request_id is not None
@@ -372,7 +380,7 @@ class Idl(object):
                     self.force_reconnect()
                     break
                 else:
-                    self.change_seqno = initial_change_seqno
+                    self.change_seqno = previous_change_seqno
                     self.__send_monitor_request()
             elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
                                ovs.jsonrpc.Message.T_REPLY)
@@ -435,6 +443,15 @@ class Idl(object):
     def force_reconnect(self):
         """Forces the IDL to drop its connection to the database and reconnect.
         In the meantime, the contents of the IDL will not change."""
+        if self.state == self.IDL_S_MONITORING:
+            # The IDL was in MONITORING state, so we either had data
+            # inconsistency on this server, or it stopped being the cluster
+            # leader, or the user requested to re-connect.  Avoiding backoff
+            # in these cases, as we need to re-connect as soon as possible.
+            # Connections that are not in MONITORING state should have their
+            # backoff to avoid constant flood of re-connection attempts in
+            # case there is no suitable database server.
+            self._session.reset_backoff()
         self._session.force_reconnect()
 
     def session_name(self):
@@ -614,6 +631,7 @@ class Idl(object):
             raise error.Error("<table-updates> is not an object",
                               table_updates)
 
+        notices = []
         for table_name, table_update in table_updates.items():
             table = tables.get(table_name)
             if not table:
@@ -639,7 +657,9 @@ class Idl(object):
                                       % (table_name, uuid_string))
 
                 if version == OVSDB_UPDATE2:
-                    if self.__process_update2(table, uuid, row_update):
+                    changes = self.__process_update2(table, uuid, row_update)
+                    if changes:
+                        notices.append(changes)
                         self.change_seqno += 1
                     continue
 
@@ -652,17 +672,20 @@ class Idl(object):
                     raise error.Error('<row-update> missing "old" and '
                                       '"new" members', row_update)
 
-                if self.__process_update(table, uuid, old, new):
+                changes = self.__process_update(table, uuid, old, new)
+                if changes:
+                    notices.append(changes)
                     self.change_seqno += 1
+        for notice in notices:
+            self.notify(*notice)
 
     def __process_update2(self, table, uuid, row_update):
+        """Returns Notice if a column changed, False otherwise."""
         row = table.rows.get(uuid)
-        changed = False
         if "delete" in row_update:
             if row:
                 del table.rows[uuid]
-                self.notify(ROW_DELETE, row)
-                changed = True
+                return Notice(ROW_DELETE, row)
             else:
                 # XXX rate-limit
                 vlog.warn("cannot delete missing row %s from table"
@@ -681,29 +704,27 @@ class Idl(object):
             changed = self.__row_update(table, row, row_update)
             table.rows[uuid] = row
             if changed:
-                self.notify(ROW_CREATE, row)
+                return Notice(ROW_CREATE, row)
         elif "modify" in row_update:
             if not row:
                 raise error.Error('Modify non-existing row')
 
             old_row = self.__apply_diff(table, row, row_update['modify'])
-            self.notify(ROW_UPDATE, row, Row(self, table, uuid, old_row))
-            changed = True
+            return Notice(ROW_UPDATE, row, Row(self, table, uuid, old_row))
         else:
             raise error.Error('<row-update> unknown operation',
                               row_update)
-        return changed
+        return False
 
     def __process_update(self, table, uuid, old, new):
-        """Returns True if a column changed, False otherwise."""
+        """Returns Notice if a column changed, False otherwise."""
         row = table.rows.get(uuid)
         changed = False
         if not new:
             # Delete row.
             if row:
                 del table.rows[uuid]
-                changed = True
-                self.notify(ROW_DELETE, row)
+                return Notice(ROW_DELETE, row)
             else:
                 # XXX rate-limit
                 vlog.warn("cannot delete missing row %s from table %s"
@@ -723,7 +744,7 @@ class Idl(object):
             if op == ROW_CREATE:
                 table.rows[uuid] = row
             if changed:
-                self.notify(ROW_CREATE, row)
+                return Notice(ROW_CREATE, row)
         else:
             op = ROW_UPDATE
             if not row:
@@ -737,8 +758,8 @@ class Idl(object):
             if op == ROW_CREATE:
                 table.rows[uuid] = row
             if changed:
-                self.notify(op, row, Row.from_json(self, table, uuid, old))
-        return changed
+                return Notice(op, row, Row.from_json(self, table, uuid, old))
+        return False
 
     def __check_server_db(self):
         """Returns True if this is a valid server database, False otherwise."""
Louis Abel's avatar
Louis Abel committed
@@ -1458,6 +1479,11 @@ class Transaction(object):
         if self != self.idl.txn:
             return self._status
 
+        if self.idl.state != Idl.IDL_S_MONITORING:
+            self._status = Transaction.TRY_AGAIN
+            self.__disassemble()
+            return self._status
+
         # If we need a lock but don't have it, give up quickly.
         if self.idl.lock_name and not self.idl.has_lock:
             self._status = Transaction.NOT_LOCKED
Louis Abel's avatar
Louis Abel committed
21663 21664 21665 21666 21667 21668 21669 21670 21671 21672 21673 21674 21675 21676 21677 21678 21679 21680 21681 21682 21683 21684 21685 21686 21687 21688 21689 21690 21691 21692 21693 21694 21695 21696 21697 21698 21699 21700 21701 21702 21703 21704 21705 21706 21707 21708 21709 21710 21711 21712 21713 21714 21715 21716 21717 21718 21719 21720 21721 21722 21723 21724 21725 21726 21727 21728 21729 21730 21731 21732 21733 21734 21735 21736 21737 21738 21739 21740 21741 21742 21743 21744 21745 21746 21747 21748 21749 21750 21751 21752 21753 21754 21755 21756 21757 21758 21759 21760 21761 21762 21763 21764 21765 21766 21767 21768 21769 21770 21771 21772 21773 21774 21775 21776 21777 21778 21779 21780 21781 21782 21783 21784 21785 21786 21787 21788 21789 21790 21791 21792 21793 21794 21795 21796 21797 21798 21799 21800 21801 21802 21803 21804 21805 21806 21807 21808 21809 21810 21811 21812 21813 21814 21815 21816 21817 21818 21819 21820 21821 21822 21823 21824 21825 21826 21827 21828 21829 21830 21831 21832 21833 21834 21835 21836 21837 21838 21839 21840 21841 21842 21843 21844 21845 21846 21847 21848 21849 21850 21851 21852 21853 21854 21855 21856 21857 21858 21859 21860 21861 21862 21863 21864 21865 21866 21867 21868 21869 21870 21871 21872 21873 21874 21875 21876 21877 21878 21879 21880 21881 21882 21883 21884 21885 21886 21887 21888 21889 21890 21891 21892 21893 21894 21895 21896 21897 21898 21899 21900 21901 21902 21903 21904 21905 21906 21907 21908 21909 21910 21911 21912 21913 21914 21915 21916 21917 21918 21919 21920 21921 21922 21923 21924 21925 21926 21927 21928 21929 21930 21931 21932 21933 21934 21935 21936 21937 21938 21939 21940 21941 21942 21943 21944 21945 21946 21947 21948 21949 21950 21951 21952 21953 21954 21955 21956 21957 21958 21959 21960 21961 21962 21963 21964 21965 21966 21967 21968 21969 21970 21971 21972 21973 21974 21975 21976 21977 21978 21979 21980 21981 21982 21983 21984 21985 21986 21987 21988 21989 21990 21991 21992 21993 21994 21995 21996 21997 21998 21999 22000
diff --git a/python/ovs/jsonrpc.py b/python/ovs/jsonrpc.py
index bf32f8c87c..d5127268aa 100644
--- a/python/ovs/jsonrpc.py
+++ b/python/ovs/jsonrpc.py
@@ -612,5 +612,18 @@ class Session(object):
     def force_reconnect(self):
         self.reconnect.force_reconnect(ovs.timeval.msec())
 
+    def reset_backoff(self):
+        """ Resets the reconnect backoff by allowing as many free tries as the
+        number of configured remotes.  This is to be used by upper layers
+        before calling force_reconnect() if backoff is undesirable."""
+        free_tries = len(self.remotes)
+
+        if self.is_connected():
+            # The extra free try will be consumed when the current remote
+            # is disconnected.
+            free_tries += 1
+
+        self.reconnect.set_backoff_free_tries(free_tries)
+
     def get_num_of_remotes(self):
         return len(self.remotes)
diff --git a/python/ovstest/rpcserver.py b/python/ovstest/rpcserver.py
index c4aab70207..05b6b1be20 100644
--- a/python/ovstest/rpcserver.py
+++ b/python/ovstest/rpcserver.py
@@ -18,22 +18,14 @@ rpcserver is an XML RPC server that allows RPC client to initiate tests
 
 import sys
 
-import exceptions
-
 import xmlrpc.client
 
-import tcp
-
 from twisted.internet import reactor
 from twisted.internet.error import CannotListenError
 from twisted.web import server
 from twisted.web import xmlrpc
 
-import udp
-
-import util
-
-import vswitch
+from . import tcp, udp, util, vswitch
 
 
 class TestArena(xmlrpc.XMLRPC):
@@ -210,7 +202,7 @@ class TestArena(xmlrpc.XMLRPC):
             (_, port) = self.__get_handle_resources(handle)
             port.loseConnection()
             self.__delete_handle(handle)
-        except exceptions.KeyError:
+        except KeyError:
             return -1
         return 0
 
@@ -222,7 +214,7 @@ class TestArena(xmlrpc.XMLRPC):
             (_, connector) = self.__get_handle_resources(handle)
             connector.disconnect()
             self.__delete_handle(handle)
-        except exceptions.KeyError:
+        except KeyError:
             return -1
         return 0
 
diff --git a/python/ovstest/tcp.py b/python/ovstest/tcp.py
index c495717f2f..098c6cba3e 100644
--- a/python/ovstest/tcp.py
+++ b/python/ovstest/tcp.py
@@ -21,7 +21,7 @@ import time
 from twisted.internet import interfaces
 from twisted.internet.protocol import ClientFactory, Factory, Protocol
 
-from zope.interface import implements
+from zope.interface.declarations import implementer
 
 
 class TcpListenerConnection(Protocol):
@@ -55,8 +55,8 @@ class TcpListenerFactory(Factory):
         return str(self.stats)
 
 
+@implementer(interfaces.IPushProducer)
 class Producer(object):
-    implements(interfaces.IPushProducer)
     """
     This producer class generates infinite byte stream for a specified time
     duration
diff --git a/python/ovstest/tests.py b/python/ovstest/tests.py
index 6de3cc3af4..f959f945ef 100644
--- a/python/ovstest/tests.py
+++ b/python/ovstest/tests.py
@@ -10,8 +10,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from __future__ import print_function
-
 import math
 import time
 
diff --git a/python/ovstest/util.py b/python/ovstest/util.py
index 72457158f2..270d6a0376 100644
--- a/python/ovstest/util.py
+++ b/python/ovstest/util.py
@@ -26,8 +26,6 @@ import socket
 import struct
 import subprocess
 
-import exceptions
-
 import xmlrpc.client
 
 
@@ -88,7 +86,7 @@ def start_process(args):
                              stderr=subprocess.PIPE)
         out, err = p.communicate()
         return (p.returncode, out, err)
-    except exceptions.OSError:
+    except OSError:
         return (-1, None, None)
 
 
diff --git a/python/ovstest/vswitch.py b/python/ovstest/vswitch.py
index 9d5b5cffd0..45c9587eeb 100644
--- a/python/ovstest/vswitch.py
+++ b/python/ovstest/vswitch.py
@@ -15,7 +15,7 @@
 """
 vswitch module allows its callers to interact with OVS DB.
 """
-import util
+from . import util
 
 
 def ovs_vsctl_add_bridge(bridge):
diff --git a/python/setup.py b/python/setup.py
index d385d83722..cfe01763f3 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -10,8 +10,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from __future__ import print_function
-
 import sys
 
 from distutils.command.build_ext import build_ext
@@ -82,8 +80,6 @@ setup_args = dict(
         'Topic :: Software Development :: Libraries :: Python Modules',
         'Topic :: System :: Networking',
         'License :: OSI Approved :: Apache Software License',
-        'Programming Language :: Python :: 2',
-        'Programming Language :: Python :: 2.7',
         'Programming Language :: Python :: 3',
         'Programming Language :: Python :: 3.4',
         'Programming Language :: Python :: 3.5',
diff --git a/tests/atlocal.in b/tests/atlocal.in
index 02e2dc57f2..cfca7e1926 100644
--- a/tests/atlocal.in
+++ b/tests/atlocal.in
@@ -175,6 +175,9 @@ find_command()
 # Set HAVE_NC
 find_command nc
 
+# Set HAVE_TC
+find_command tc
+
 # Determine correct netcat option to quit on stdin EOF
 if nc --version 2>&1 | grep -q nmap.org; then
     # Nmap netcat
diff --git a/tests/automake.mk b/tests/automake.mk
index 677b99a6b4..fc80e027df 100644
--- a/tests/automake.mk
+++ b/tests/automake.mk
@@ -134,7 +134,8 @@ FUZZ_REGRESSION_TESTS = \
 	tests/fuzz-regression/ofp_print_fuzzer-5722747668791296 \
 	tests/fuzz-regression/ofp_print_fuzzer-6285128790704128 \
 	tests/fuzz-regression/ofp_print_fuzzer-6470117922701312 \
-	tests/fuzz-regression/ofp_print_fuzzer-6502620041576448
+	tests/fuzz-regression/ofp_print_fuzzer-6502620041576448 \
+	tests/fuzz-regression/ofp_print_fuzzer-6540965472632832
 $(srcdir)/tests/fuzz-regression-list.at: tests/automake.mk
 	$(AM_V_GEN)for name in $(FUZZ_REGRESSION_TESTS); do \
             basename=`echo $$name | sed 's,^.*/,,'`; \
diff --git a/tests/daemon.at b/tests/daemon.at
index a7982de381..39d9aa391e 100644
--- a/tests/daemon.at
+++ b/tests/daemon.at
@@ -218,11 +218,11 @@ OVS_WAIT_UNTIL([test -s ovsdb-server.pid])
 OVS_WAIT_UNTIL([sc query ovsdb-server | grep STATE | grep RUNNING > /dev/null 2>&1])
 AT_CHECK([kill -0 `cat ovsdb-server.pid`], [0], [ignore])
 AT_CHECK([ovs-appctl -t ovsdb-server ovsdb-server/list-dbs], [0],
-[Open_vSwitch
+[_Server
 ])
 AT_CHECK([sc stop ovsdb-server], [0], [ignore])
 OVS_WAIT_UNTIL([test ! -s ovsdb-server.pid])
-AT_CHECK([sc query ovsdb-server | grep STATE | grep STOPPED], [0], [ignore])
+OVS_WAIT_UNTIL([sc query ovsdb-server | grep STATE | grep STOPPED > /dev/null 2>&1])
 AT_CHECK([sc delete ovsdb-server], [0], [[[SC]] DeleteService SUCCESS
 ])
 AT_CLEANUP
diff --git a/tests/dpif-netdev.at b/tests/dpif-netdev.at
index 2862a3c9b9..16402ebae2 100644
--- a/tests/dpif-netdev.at
+++ b/tests/dpif-netdev.at
@@ -299,60 +299,87 @@ type=drop rate=1 burst_size=2
 ])
 
 ovs-appctl time/warp 5000
-AT_CHECK([ovs-appctl netdev-dummy/receive p7 'in_port(7),packet_type(ns=0,id=0),eth(src=50:54:00:00:00:09,dst=50:54:00:00:00:0a),eth_type(0x0800),ipv4(src=10.0.0.2,dst=10.0.0.1,proto=1,tos=0,ttl=64,frag=no),icmp(type=8,code=0)' --len 60])
-AT_CHECK([ovs-appctl netdev-dummy/receive p7 'in_port(7),packet_type(ns=0,id=0),eth(src=50:54:00:00:00:09,dst=50:54:00:00:00:0a),eth_type(0x0800),ipv4(src=10.0.0.2,dst=10.0.0.1,proto=1,tos=0,ttl=64,frag=no),icmp(type=8,code=0)' --len 60])
-AT_CHECK([ovs-appctl netdev-dummy/receive p7 'in_port(7),packet_type(ns=0,id=0),eth(src=50:54:00:00:00:09,dst=50:54:00:00:00:0a),eth_type(0x0800),ipv4(src=10.0.0.2,dst=10.0.0.1,proto=1,tos=0,ttl=64,frag=no),icmp(type=8,code=0)' --len 60])
-AT_CHECK([ovs-appctl netdev-dummy/receive p7 'in_port(7),packet_type(ns=0,id=0),eth(src=50:54:00:00:00:09,dst=50:54:00:00:00:0a),eth_type(0x0800),ipv4(src=10.0.0.2,dst=10.0.0.1,proto=1,tos=0,ttl=64,frag=no),icmp(type=8,code=0)' --len 60])
-AT_CHECK([ovs-appctl netdev-dummy/receive p7 'in_port(7),packet_type(ns=0,id=0),eth(src=50:54:00:00:00:09,dst=50:54:00:00:00:0a),eth_type(0x0800),ipv4(src=10.0.0.2,dst=10.0.0.1,proto=1,tos=0,ttl=64,frag=no),icmp(type=8,code=0)' --len 60])
-AT_CHECK([ovs-appctl netdev-dummy/receive p8 'in_port(8),packet_type(ns=0,id=0),eth(src=50:54:00:00:00:0b,dst=50:54:00:00:00:0c),eth_type(0x0800),ipv4(src=10.0.0.3,dst=10.0.0.4,proto=1,tos=0,ttl=64,frag=no),icmp(type=8,code=0)' --len 60])
-AT_CHECK([ovs-appctl netdev-dummy/receive p8 'in_port(8),packet_type(ns=0,id=0),eth(src=50:54:00:00:00:0b,dst=50:54:00:00:00:0c),eth_type(0x0800),ipv4(src=10.0.0.3,dst=10.0.0.4,proto=1,tos=0,ttl=64,frag=no),icmp(type=8,code=0)' --len 60])
-AT_CHECK([ovs-appctl netdev-dummy/receive p8 'in_port(8),packet_type(ns=0,id=0),eth(src=50:54:00:00:00:0b,dst=50:54:00:00:00:0c),eth_type(0x0800),ipv4(src=10.0.0.3,dst=10.0.0.4,proto=1,tos=0,ttl=64,frag=no),icmp(type=8,code=0)' --len 60])
-AT_CHECK([ovs-appctl netdev-dummy/receive p8 'in_port(8),packet_type(ns=0,id=0),eth(src=50:54:00:00:00:0b,dst=50:54:00:00:00:0c),eth_type(0x0800),ipv4(src=10.0.0.3,dst=10.0.0.4,proto=1,tos=0,ttl=64,frag=no),icmp(type=8,code=0)' --len 60])
-AT_CHECK([ovs-appctl netdev-dummy/receive p8 'in_port(8),packet_type(ns=0,id=0),eth(src=50:54:00:00:00:0b,dst=50:54:00:00:00:0c),eth_type(0x0800),ipv4(src=10.0.0.3,dst=10.0.0.4,proto=1,tos=0,ttl=64,frag=no),icmp(type=8,code=0)' --len 60])
+for i in `seq 1 7`; do
+  AT_CHECK(
+    [ovs-appctl netdev-dummy/receive p7 \
+       'in_port(7),packet_type(ns=0,id=0),eth(src=50:54:00:00:00:09,dst=50:54:00:00:00:0a),eth_type(0x0800),ipv4(src=10.0.0.2,dst=10.0.0.1,proto=1,tos=0,ttl=64,frag=no),icmp(type=8,code=0)' --len 60])
+done
+
+for i in `seq 1 5`; do
+  AT_CHECK(
+    [ovs-appctl netdev-dummy/receive p8 \
+       'in_port(8),packet_type(ns=0,id=0),eth(src=50:54:00:00:00:0b,dst=50:54:00:00:00:0c),eth_type(0x0800),ipv4(src=10.0.0.3,dst=10.0.0.4,proto=1,tos=0,ttl=64,frag=no),icmp(type=8,code=0)' --len 60])
+done
+
 sleep 1  # wait for forwarders process packets
 
 # Meter 1 is measuring packets, allowing one packet per second with
-# bursts of one packet, so 4 out of 5 packets should hit the drop
-# band.
+# bursts of one packet, so 4 out of 5 packets should hit the drop band.
 # Meter 2 is measuring kbps, with burst size 2 (== 2000 bits). 4 packets
-# (240 bytes == 1920 bits) pass, but the last packet should hit the drop band.
+# (240 bytes == 1920 bits) pass, but the last three packets should hit the
+# drop band.  There should be 80 bits remaining for the next packets.
 AT_CHECK([ovs-ofctl -O OpenFlow13 meter-stats br0 | strip_timers], [0], [dnl
 OFPST_METER reply (OF1.3) (xid=0x2):
 meter:1 flow_count:1 packet_in_count:5 byte_in_count:300 duration:0.0s bands:
 0: packet_count:4 byte_count:240
 
-meter:2 flow_count:1 packet_in_count:5 byte_in_count:300 duration:0.0s bands:
-0: packet_count:1 byte_count:60
+meter:2 flow_count:1 packet_in_count:7 byte_in_count:420 duration:0.0s bands:
+0: packet_count:3 byte_count:180
 ])
 
-# Advance time by 1/2 second
-ovs-appctl time/warp 500
+# Advance time by 870 ms
+ovs-appctl time/warp 870
+
+for i in `seq 1 5`; do
+  AT_CHECK(
+    [ovs-appctl netdev-dummy/receive p7 \
+       'in_port(7),packet_type(ns=0,id=0),eth(src=50:54:00:00:00:09,dst=50:54:00:00:00:0a),eth_type(0x0800),ipv4(src=10.0.0.2,dst=10.0.0.1,proto=1,tos=0,ttl=64,frag=no),icmp(type=8,code=0)' --len 60])
+
+  AT_CHECK(
+    [ovs-appctl netdev-dummy/receive p8 \
+       'in_port(8),packet_type(ns=0,id=0),eth(src=50:54:00:00:00:0b,dst=50:54:00:00:00:0c),eth_type(0x0800),ipv4(src=10.0.0.3,dst=10.0.0.4,proto=1,tos=0,ttl=64,frag=no),icmp(type=8,code=0)' --len 60])
+done
 
-AT_CHECK([ovs-appctl netdev-dummy/receive p7 'in_port(7),packet_type(ns=0,id=0),eth(src=50:54:00:00:00:09,dst=50:54:00:00:00:0a),eth_type(0x0800),ipv4(src=10.0.0.2,dst=10.0.0.1,proto=1,tos=0,ttl=64,frag=no),icmp(type=8,code=0)' --len 60])
-AT_CHECK([ovs-appctl netdev-dummy/receive p7 'in_port(7),packet_type(ns=0,id=0),eth(src=50:54:00:00:00:09,dst=50:54:00:00:00:0a),eth_type(0x0800),ipv4(src=10.0.0.2,dst=10.0.0.1,proto=1,tos=0,ttl=64,frag=no),icmp(type=8,code=0)' --len 60])
-AT_CHECK([ovs-appctl netdev-dummy/receive p7 'in_port(7),packet_type(ns=0,id=0),eth(src=50:54:00:00:00:09,dst=50:54:00:00:00:0a),eth_type(0x0800),ipv4(src=10.0.0.2,dst=10.0.0.1,proto=1,tos=0,ttl=64,frag=no),icmp(type=8,code=0)' --len 60])
-AT_CHECK([ovs-appctl netdev-dummy/receive p7 'in_port(7),packet_type(ns=0,id=0),eth(src=50:54:00:00:00:09,dst=50:54:00:00:00:0a),eth_type(0x0800),ipv4(src=10.0.0.2,dst=10.0.0.1,proto=1,tos=0,ttl=64,frag=no),icmp(type=8,code=0)' --len 60])
-AT_CHECK([ovs-appctl netdev-dummy/receive p7 'in_port(7),packet_type(ns=0,id=0),eth(src=50:54:00:00:00:09,dst=50:54:00:00:00:0a),eth_type(0x0800),ipv4(src=10.0.0.2,dst=10.0.0.1,proto=1,tos=0,ttl=64,frag=no),icmp(type=8,code=0)' --len 60])
-AT_CHECK([ovs-appctl netdev-dummy/receive p8 'in_port(8),packet_type(ns=0,id=0),eth(src=50:54:00:00:00:0b,dst=50:54:00:00:00:0c),eth_type(0x0800),ipv4(src=10.0.0.3,dst=10.0.0.4,proto=1,tos=0,ttl=64,frag=no),icmp(type=8,code=0)' --len 60])
-AT_CHECK([ovs-appctl netdev-dummy/receive p8 'in_port(8),packet_type(ns=0,id=0),eth(src=50:54:00:00:00:0b,dst=50:54:00:00:00:0c),eth_type(0x0800),ipv4(src=10.0.0.3,dst=10.0.0.4,proto=1,tos=0,ttl=64,frag=no),icmp(type=8,code=0)' --len 60])
-AT_CHECK([ovs-appctl netdev-dummy/receive p8 'in_port(8),packet_type(ns=0,id=0),eth(src=50:54:00:00:00:0b,dst=50:54:00:00:00:0c),eth_type(0x0800),ipv4(src=10.0.0.3,dst=10.0.0.4,proto=1,tos=0,ttl=64,frag=no),icmp(type=8,code=0)' --len 60])
-AT_CHECK([ovs-appctl netdev-dummy/receive p8 'in_port(8),packet_type(ns=0,id=0),eth(src=50:54:00:00:00:0b,dst=50:54:00:00:00:0c),eth_type(0x0800),ipv4(src=10.0.0.3,dst=10.0.0.4,proto=1,tos=0,ttl=64,frag=no),icmp(type=8,code=0)' --len 60])
-AT_CHECK([ovs-appctl netdev-dummy/receive p8 'in_port(8),packet_type(ns=0,id=0),eth(src=50:54:00:00:00:0b,dst=50:54:00:00:00:0c),eth_type(0x0800),ipv4(src=10.0.0.3,dst=10.0.0.4,proto=1,tos=0,ttl=64,frag=no),icmp(type=8,code=0)' --len 60])
 sleep 1  # wait for forwarders process packets
 
 # Meter 1 is measuring packets, allowing one packet per second with
 # bursts of one packet, so all 5 of the new packets should hit the drop
 # band.
-# Meter 2 is measuring kbps, with burst size 2 (== 2000 bits). After 500ms
-# there should be space for 80 + 500 bits, so one new 60 byte (480 bit) packet
-# should pass, remaining 4 should hit the drop band.
+# Meter 2 is measuring kbps, with burst size 2 (== 2000 bits). After 870ms
+# there should be space for 80 + 870 = 950 bits, so one new 60 byte (480 bit)
+# packet should pass, remaining 4 should hit the drop band.  There should be
+# 470 bits left.
 AT_CHECK([ovs-ofctl -O OpenFlow13 meter-stats br0 | strip_timers], [0], [dnl
 OFPST_METER reply (OF1.3) (xid=0x2):
 meter:1 flow_count:1 packet_in_count:10 byte_in_count:600 duration:0.0s bands:
 0: packet_count:9 byte_count:540
 
-meter:2 flow_count:1 packet_in_count:10 byte_in_count:600 duration:0.0s bands:
-0: packet_count:5 byte_count:300
+meter:2 flow_count:1 packet_in_count:12 byte_in_count:720 duration:0.0s bands:
+0: packet_count:7 byte_count:420
+])
+
+# Advance time by 10 ms
+ovs-appctl time/warp 10
+
+for i in `seq 1 5`; do
+  AT_CHECK(
+    [ovs-appctl netdev-dummy/receive p7 \
+       'in_port(7),packet_type(ns=0,id=0),eth(src=50:54:00:00:00:09,dst=50:54:00:00:00:0a),eth_type(0x0800),ipv4(src=10.0.0.2,dst=10.0.0.1,proto=1,tos=0,ttl=64,frag=no),icmp(type=8,code=0)' --len 60])
+done
+
+sleep 1  # wait for forwarders process packets
+
+# Meter 1 should remain the same as we didn't send anything that should hit it.
+# Meter 2 is measuring kbps, with burst size 2 (== 2000 bits). After 10ms
+# there should be space for 470 + 10 = 480 bits, so one new 60 byte (480 bit)
+# packet should pass, remaining 4 should hit the drop band.
+AT_CHECK([ovs-ofctl -O OpenFlow13 meter-stats br0 | strip_timers], [0], [dnl
+OFPST_METER reply (OF1.3) (xid=0x2):
+meter:1 flow_count:1 packet_in_count:10 byte_in_count:600 duration:0.0s bands:
+0: packet_count:9 byte_count:540
+
+meter:2 flow_count:1 packet_in_count:17 byte_in_count:1020 duration:0.0s bands:
+0: packet_count:11 byte_count:660
 ])
 
 ovs-appctl time/warp 5000
@@ -360,7 +387,7 @@ ovs-appctl time/warp 5000
 AT_CHECK([
 ovs-appctl coverage/read-counter datapath_drop_meter
 ], [0], [dnl
-14
+20