Setup required for the following code snippets:
#include <dds/domain/ddsdomain.hpp>
#include <dds/sub/ddssub.hpp>
#include "foo.hh"
Create a DataReader
Create a DataReader with all default QoS policies and no listener.
Create a DataReader with specified QoS policies and no listener.
Create a DataReader with specified QoS policies and a listener.
template<typename T>
public:
{ std::cout << "Liveliness Changed" << std::endl; }
};
ExampleDRListener<foo::ExampleType> dr_listener;
Using a DataReader
Reading data using a LoanedSamples container:
samples = test_dr.read( );
samples = test_dr.take( );
for (const auto& sample : samples) {
if (sample.info().valid()) {
std::cout << "Received: INFO: " << sample.info() << std::endl;
std::cout << " DATA: " << sample.data() << std::endl;
}
}
if ( samples.length() > 0 )
{
if ( samples[0].info().valid() ) { ... }
}
Reading or Taking with the stream operator:
Selecting which samples to read or take.
The DataReader supports the concept of a Selector. This is used to configure which samples are of interest in a read or take operation. There are several different ways to specify which samples are of interest, for example, data state, instance handle, maximum count, and a content query.
samples = reader.select()
.read();
samples = reader.select()
.max_samples(10)
.read();
samples = reader.select()
.read();
samples = reader.select()
.instance(instanceHandle)
.read();
do {
samples = reader.select().next_instance(prev_handle).read();
if ( samples.length() > 0 )
{
prev_handle = samples[0].info().instance_handle();
}
} while ( samples.length() > 0 );
samples = reader.select()
.read();
samples = reader.select()
.max_samples(10)
.instance(instanceHandle)
.read();
Similar selections can be applied to the 'stream' read operations
>> max_dds::sub::max_samples( 10 )
>> samples;
Instead of using a LoanedSamples container which holds the 'loaned' data that is owned by the DataReader, it is possible to copy the data sample[s] directly into a container.
std::vector<dds::sub::Sample<foo::ExampleType> > samples(10);
uint32_t nread = reader.select()
.read( samples.begin(), samples.size() );
std::vector<dds::sub::Sample<foo::ExampleType> > samples;
uint32_t nread = test_dr.select()
.read( std::back_inserter(samples) );
Tracking reader status with a listener:
template <typename T>
{
public:
ExampleDRListener() {}
public:
{
std::cout << "on_requested_deadline_missed callback" << std::endl;
std::cout << " total_count : " << status.total_count() << std::endl;
std::cout << " total_count_change : " << status.total_count_change() << std::endl;
std::cout << " last_instance_handle : " << status.last_instance_handle() << std::endl;
}
{
std::cout << "on_requested_incompatible_qos callback" << std::endl;
std::cout << " total_count : " << status.total_count() << std::endl;
std::cout << " total_count_change : " << status.total_count_change() << std::endl;
std::cout << " last_policy_id : " << status.last_policy_id() << std::endl;
for (const auto& policy : status.policies()) {
std::cout << " policy_id : " << policy.policy_id() << ": " << policy.count() << std::endl;
}
}
{
std::cout << "on_sample_rejected callback" << std::endl;
std::cout << " total_count : " << status.total_count() << std::endl;
std::cout << " total_count_change : " << status.total_count_change() << std::endl;
std::cout << " last_reason : " << status.last_reason() << std::endl;
std::cout << " last_instance_handle : " << status.last_instance_handle() << std::endl;
}
{
std::cout << "on_liveliness_changed callback" << std::endl;
std::cout << " alive_count : " << status.alive_count() << std::endl;
std::cout << " not_alive_count : " << status.not_alive_count() << std::endl;
std::cout << " alive_count_change : " << status.alive_count_change() << std::endl;
std::cout << " not_alive_count_change : " << status.not_alive_count_change() << std::endl;
std::cout << " last_publication_handle : " << status.last_publication_handle() << std::endl;
}
{
std::cout << "on_data_available callback" << std::endl;
for (const auto& sample : samples) {
if (sample.info().valid()) {
std::cout << " Received: INFO: " << sample.info() << std::endl;
std::cout << " DATA: " << sample.data() << std::endl;
}
}
}
{
std::cout << "on_subscription_matched callback" << std::endl;
std::cout << " total_count : " << status.total_count() << std::endl;
std::cout << " total_count_change : " << status.total_count_change() << std::endl;
std::cout << " current_count : " << status.current_count() << std::endl;
std::cout << " current_count_change : " << status.current_count_change() << std::endl;
std::cout << " last_publication_handle : " << status.last_publication_handle() << std::endl;
}
{
std::cout << "on_sample_lost callback" << std::endl;
std::cout << " total_count : " << status.total_count() << std::endl;
std::cout << " total_count_change : " << status.total_count_change() << std::endl;
}
};
ExampleDRListener< foo::ExampleType > * listener = new ExampleDRListener<foo::ExampleType>();
A 'listener' is one way to get notification of events, including 'data available'. See the Conditions and WaitSets discussion for another approach.