CoreDX DDS Modern C++ API
DataReader Use Cases

Setup required for the following code snippets:

#include <dds/domain/ddsdomain.hpp>
#include <dds/sub/ddssub.hpp>
#include "foo.hh" // declares the foo types used in the examples
dds::topic::Topic< foo::ExampleType > topic( participant, "ExampleTopic" );
dds::sub::Subscriber subscriber( participant );

Create a DataReader

Create a DataReader with all default QoS policies and no listener.

Create a DataReader with specified QoS policies and no listener.

Create a DataReader with specified QoS policies and a listener.

template<typename T>
class ExampleDRListener : public dds::sub::NoOpDataReaderListener<foo::ExampleType> {
public:
{ std::cout << "Liveliness Changed" << std::endl; }
};
ExampleDRListener<foo::ExampleType> dr_listener;
dds::sub::DataReader< foo::ExampleType > reader( subscriber, topic, dr_qos, &dr_listener, dds::core::status::StatusMask::all() );
// NOTE: remove the listener before the listener goes out of scope because it lives on the stack in this example
reader.listener( NULL, dds::core::status::StatusMask::none() );

Using a DataReader

Reading data using a LoanedSamples container:

samples = test_dr.read( ); // 'read' (leaving samples in the Reader cache)
// Or,
samples = test_dr.take( ); // 'take' (ultimately removing samples from the Reader cache)
// LoanedSamples provides an iterator interface:
for (const auto& sample : samples) {
if (sample.info().valid()) {
std::cout << "Received: INFO: " << sample.info() << std::endl;
std::cout << " DATA: " << sample.data() << std::endl;
}
}
// LoanedSamples also provides array subscript support:
if ( samples.length() > 0 )
{
if ( samples[0].info().valid() ) { ... }
}
// when 'samples' goes out of scope, it will automatically invoke return_loan().
// alternatively, you can force a call to return_loan() when desired
samples.return_loan();

Reading or Taking with the stream operator:

test_dr >> dds::sub::take >> samples; // 'read' samples
// Or,
test_dr >> dds::sub::take >> samples; // 'take' samples

Selecting which samples to read or take.

The DataReader supports the concept of a Selector. This is used to configure which samples are of interest in a read or take operation. There are several different ways to specify which samples are of interest, for example, data state, instance handle, maximum count, and a content query.

// using a default selector (similar to calling read directly):
samples = reader.select()
.read();
// select with a specified maximum number of samples:
samples = reader.select()
.max_samples(10)
.read();
// select samples with a specified 'data state':
samples = reader.select()
.read();
// select samples of a specific instance
// get 'instanceHandle' from prior data
samples = reader.select()
.instance(instanceHandle)
.read();
// read 'instance-by-instance' using next_instance(), starting with 'nil' handle
dds::core::InstanceHandle prev_handle = dds::core::InstanceHandle::nil();
do {
samples = reader.select().next_instance(prev_handle).read();
if ( samples.length() > 0 )
{
// ... process sample[s] ...
prev_handle = samples[0].info().instance_handle(); // update 'prev_handle'
}
} while ( samples.length() > 0 );
// read with a content filter
samples = reader.select()
.content(dds::sub::Query(test_dr, "theKey >= 5"))
.read();
// selector modifiers may also be combined:
samples = reader.select()
.max_samples(10)
.instance(instanceHandle)
.read();

Similar selections can be applied to the 'stream' read operations

reader >> dds::sub::read
>> max_dds::sub::max_samples( 10 )
>> samples;

Instead of using a LoanedSamples container which holds the 'loaned' data that is owned by the DataReader, it is possible to copy the data sample[s] directly into a container.

// construct a vector container which enough space to hold the samples
std::vector<dds::sub::Sample<foo::ExampleType> > samples(10);
uint32_t nread = reader.select()
.read( samples.begin(), samples.size() );
// 'nread' indicates how many samples were added
// All of the selector modifications shown above also apply here.
// NOTE: max_samples() will apply if is is smaller than the 'max_samples' parameter passed to the 'read()' call.
// Alternatively, construct an empty container, and use a 'back_inserter' to append samples:
std::vector<dds::sub::Sample<foo::ExampleType> > samples;
uint32_t nread = test_dr.select()
.read( std::back_inserter(samples) );

Tracking reader status with a listener:

template <typename T>
class ExampleDRListener : public dds::sub::NoOpDataReaderListener<T>
{
public:
ExampleDRListener() {}
public:
{
std::cout << "on_requested_deadline_missed callback" << std::endl;
std::cout << " total_count : " << status.total_count() << std::endl;
std::cout << " total_count_change : " << status.total_count_change() << std::endl;
std::cout << " last_instance_handle : " << status.last_instance_handle() << std::endl;
}
{
std::cout << "on_requested_incompatible_qos callback" << std::endl;
std::cout << " total_count : " << status.total_count() << std::endl;
std::cout << " total_count_change : " << status.total_count_change() << std::endl;
std::cout << " last_policy_id : " << status.last_policy_id() << std::endl;
for (const auto& policy : status.policies()) {
std::cout << " policy_id : " << policy.policy_id() << ": " << policy.count() << std::endl;
}
}
{
std::cout << "on_sample_rejected callback" << std::endl;
std::cout << " total_count : " << status.total_count() << std::endl;
std::cout << " total_count_change : " << status.total_count_change() << std::endl;
std::cout << " last_reason : " << status.last_reason() << std::endl;
std::cout << " last_instance_handle : " << status.last_instance_handle() << std::endl;
}
{
std::cout << "on_liveliness_changed callback" << std::endl;
std::cout << " alive_count : " << status.alive_count() << std::endl;
std::cout << " not_alive_count : " << status.not_alive_count() << std::endl;
std::cout << " alive_count_change : " << status.alive_count_change() << std::endl;
std::cout << " not_alive_count_change : " << status.not_alive_count_change() << std::endl;
std::cout << " last_publication_handle : " << status.last_publication_handle() << std::endl;
}
{
std::cout << "on_data_available callback" << std::endl;
// read and print
dds::sub::LoanedSamples<T> samples = reader.take();
for (const auto& sample : samples) {
if (sample.info().valid()) {
std::cout << " Received: INFO: " << sample.info() << std::endl;
std::cout << " DATA: " << sample.data() << std::endl;
}
}
}
{
std::cout << "on_subscription_matched callback" << std::endl;
std::cout << " total_count : " << status.total_count() << std::endl;
std::cout << " total_count_change : " << status.total_count_change() << std::endl;
std::cout << " current_count : " << status.current_count() << std::endl;
std::cout << " current_count_change : " << status.current_count_change() << std::endl;
std::cout << " last_publication_handle : " << status.last_publication_handle() << std::endl;
}
{
std::cout << "on_sample_lost callback" << std::endl;
std::cout << " total_count : " << status.total_count() << std::endl;
std::cout << " total_count_change : " << status.total_count_change() << std::endl;
}
};
// examples of creating a reader with a listener were shown above.
// install a listener on an existing reader:
ExampleDRListener< foo::ExampleType > * listener = new ExampleDRListener<foo::ExampleType>();

A 'listener' is one way to get notification of events, including 'data available'. See the Conditions and WaitSets discussion for another approach.


© 2009-2020 Twin Oaks Computing, Inc
Castle Rock, CO 80104
All rights reserved.