mirror of
https://github.com/ZGCA-Forge/MsgCenterPy.git
synced 2025-12-17 04:50:55 +00:00
Update docs
This commit is contained in:
@@ -6,220 +6,161 @@ This page contains basic usage examples to help you get started with MsgCenterPy
|
||||
Creating Message Instances
|
||||
---------------------------
|
||||
|
||||
Dictionary Messages
|
||||
~~~~~~~~~~~~~~~~~~~
|
||||
ROS2 Message Instances
|
||||
----------------------
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
from msgcenterpy import MessageInstance, MessageType
|
||||
|
||||
# Simple dictionary
|
||||
simple_data = {"name": "sensor_01", "active": True}
|
||||
instance = MessageInstance.create(MessageType.DICT, simple_data)
|
||||
|
||||
# Access fields
|
||||
name = instance.get_field("name")
|
||||
print(f"Sensor name: {name}")
|
||||
|
||||
# Nested dictionary
|
||||
nested_data = {
|
||||
"device": {
|
||||
"id": "dev_001",
|
||||
"sensors": [
|
||||
{"type": "temperature", "value": 23.5},
|
||||
{"type": "humidity", "value": 65.2}
|
||||
]
|
||||
}
|
||||
}
|
||||
nested_instance = MessageInstance.create(MessageType.DICT, nested_data)
|
||||
|
||||
JSON Schema Generation
|
||||
~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
# Generate JSON Schema from dictionary
|
||||
schema = instance.get_json_schema()
|
||||
print("Generated Schema:")
|
||||
print(schema)
|
||||
|
||||
# Schema includes type information
|
||||
assert schema["type"] == "object"
|
||||
assert "name" in schema["properties"]
|
||||
assert schema["properties"]["name"]["type"] == "string"
|
||||
|
||||
Field Access and Manipulation
|
||||
------------------------------
|
||||
|
||||
Getting Field Values
|
||||
~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
# Simple field access
|
||||
name = instance.get_field("name")
|
||||
|
||||
# Nested field access using dot notation
|
||||
device_id = nested_instance.get_field("device.id")
|
||||
temp_value = nested_instance.get_field("device.sensors.0.value")
|
||||
|
||||
Setting Field Values
|
||||
~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
# Set simple field
|
||||
instance.set_field("name", "sensor_02")
|
||||
|
||||
# Set nested field
|
||||
nested_instance.set_field("device.id", "dev_002")
|
||||
nested_instance.set_field("device.sensors.0.value", 24.1)
|
||||
|
||||
Working with Field Information
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
# Get field type information
|
||||
field_info = instance.fields.get_field_info("name")
|
||||
print(f"Field type: {field_info.type}")
|
||||
print(f"Field constraints: {field_info.constraints}")
|
||||
|
||||
# Check if field exists
|
||||
if instance.fields.has_field("name"):
|
||||
print("Field 'name' exists")
|
||||
|
||||
Type Constraints and Validation
|
||||
-------------------------------
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
from msgcenterpy.core.types import TypeConstraintError
|
||||
|
||||
try:
|
||||
# This will raise an error if type doesn't match
|
||||
instance.set_field("active", "not_a_boolean")
|
||||
except TypeConstraintError as e:
|
||||
print(f"Type constraint violation: {e}")
|
||||
|
||||
# Type conversion when possible
|
||||
instance.set_field("name", 123) # Converts to string if allowed
|
||||
|
||||
Message Conversion
|
||||
------------------
|
||||
|
||||
Converting Between Formats
|
||||
Working with ROS2 Messages
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
# Create from dictionary
|
||||
dict_instance = MessageInstance.create(MessageType.DICT, {"key": "value"})
|
||||
from msgcenterpy.instances.ros2_instance import ROS2MessageInstance
|
||||
from std_msgs.msg import String, Float64MultiArray
|
||||
from geometry_msgs.msg import Point, Pose
|
||||
|
||||
# Convert to JSON Schema instance
|
||||
schema_instance = dict_instance.to_json_schema()
|
||||
# Simple String message
|
||||
string_msg = String()
|
||||
string_msg.data = "Hello ROS2"
|
||||
ros2_instance = ROS2MessageInstance(string_msg)
|
||||
|
||||
# Get the actual schema
|
||||
schema = schema_instance.json_schema
|
||||
print(schema)
|
||||
# Access and modify field
|
||||
ros2_instance.data = "Updated message"
|
||||
print(f"Message data: {ros2_instance.inner_data.data}")
|
||||
|
||||
Error Handling
|
||||
--------------
|
||||
# Float array message
|
||||
array_msg = Float64MultiArray()
|
||||
array_msg.data = [1.1, 2.2, 3.3, 4.4, 5.5]
|
||||
array_instance = ROS2MessageInstance(array_msg)
|
||||
|
||||
Common Error Scenarios
|
||||
~~~~~~~~~~~~~~~~~~~~~~
|
||||
# Complex nested message
|
||||
pose_msg = Pose()
|
||||
pose_instance = ROS2MessageInstance(pose_msg)
|
||||
|
||||
# Set nested fields
|
||||
pose_instance.position.x = 1.5
|
||||
pose_instance.position.y = 2.5
|
||||
pose_instance.position.z = 3.5
|
||||
|
||||
# Or set entire object
|
||||
new_position = Point(x=10.0, y=20.0, z=30.0)
|
||||
pose_instance.position = new_position
|
||||
|
||||
Converting ROS2 to Python Dictionary
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
from msgcenterpy.core.exceptions import FieldAccessError, TypeConstraintError
|
||||
# Get Python dictionary representation
|
||||
python_dict = ros2_instance.get_python_dict()
|
||||
print(f"Dictionary: {python_dict}")
|
||||
|
||||
# Field access errors
|
||||
try:
|
||||
value = instance.get_field("nonexistent_field")
|
||||
except FieldAccessError as e:
|
||||
print(f"Field not found: {e}")
|
||||
# Set data from dictionary
|
||||
new_data = {"data": "dictionary_value"}
|
||||
ros2_instance.set_python_dict(new_data)
|
||||
|
||||
# Type constraint errors
|
||||
try:
|
||||
instance.set_field("active", "invalid_boolean")
|
||||
except TypeConstraintError as e:
|
||||
print(f"Invalid type: {e}")
|
||||
|
||||
# Graceful handling
|
||||
def safe_get_field(instance, field_name, default=None):
|
||||
try:
|
||||
return instance.get_field(field_name)
|
||||
except FieldAccessError:
|
||||
return default
|
||||
JSON Schema Message Instances
|
||||
-----------------------------
|
||||
|
||||
# Usage
|
||||
value = safe_get_field(instance, "optional_field", "default_value")
|
||||
|
||||
Best Practices
|
||||
--------------
|
||||
|
||||
1. **Always handle exceptions** when accessing fields that might not exist
|
||||
2. **Use type hints** in your code for better development experience
|
||||
3. **Validate data** before creating instances when working with external data
|
||||
4. **Use dot notation** for nested field access instead of manual dictionary traversal
|
||||
5. **Check field existence** before accessing optional fields
|
||||
|
||||
Complete Example
|
||||
----------------
|
||||
|
||||
Here's a complete example that demonstrates multiple features:
|
||||
Creating JSON Schema Instances
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
from msgcenterpy import MessageInstance, MessageType
|
||||
from msgcenterpy.core.exceptions import FieldAccessError, TypeConstraintError
|
||||
import json
|
||||
from msgcenterpy.instances.json_schema_instance import JSONSchemaMessageInstance
|
||||
|
||||
def process_sensor_data(sensor_data):
|
||||
"""Process sensor data with proper error handling."""
|
||||
try:
|
||||
# Create instance
|
||||
instance = MessageInstance.create(MessageType.DICT, sensor_data)
|
||||
|
||||
# Validate required fields
|
||||
required_fields = ["id", "type", "readings"]
|
||||
for field in required_fields:
|
||||
if not instance.fields.has_field(field):
|
||||
raise ValueError(f"Missing required field: {field}")
|
||||
|
||||
# Process readings
|
||||
readings = instance.get_field("readings")
|
||||
if isinstance(readings, list) and len(readings) > 0:
|
||||
avg_reading = sum(readings) / len(readings)
|
||||
instance.set_field("average", avg_reading)
|
||||
|
||||
# Generate schema for validation
|
||||
schema = instance.get_json_schema()
|
||||
|
||||
# Return processed data and schema
|
||||
return {
|
||||
"processed_data": instance.to_dict(),
|
||||
"schema": schema,
|
||||
"success": True
|
||||
# Define JSON Schema
|
||||
schema = {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {"type": "string", "minLength": 2, "maxLength": 50},
|
||||
"age": {"type": "integer", "minimum": 0, "maximum": 150},
|
||||
"active": {"type": "boolean"},
|
||||
"tags": {
|
||||
"type": "array",
|
||||
"items": {"type": "string"},
|
||||
"minItems": 1,
|
||||
"maxItems": 10
|
||||
}
|
||||
|
||||
except (FieldAccessError, TypeConstraintError, ValueError) as e:
|
||||
return {
|
||||
"error": str(e),
|
||||
"success": False
|
||||
}
|
||||
|
||||
# Usage
|
||||
sensor_data = {
|
||||
"id": "temp_001",
|
||||
"type": "temperature",
|
||||
"readings": [23.1, 23.5, 24.0, 23.8],
|
||||
"unit": "celsius"
|
||||
},
|
||||
"required": ["name"]
|
||||
}
|
||||
|
||||
result = process_sensor_data(sensor_data)
|
||||
if result["success"]:
|
||||
print("Processing successful!")
|
||||
print(f"Average reading: {result['processed_data']['average']}")
|
||||
# Sample data
|
||||
data = {
|
||||
"name": "John Doe",
|
||||
"age": 30,
|
||||
"active": True,
|
||||
"tags": ["developer", "python"]
|
||||
}
|
||||
|
||||
# Create instance
|
||||
json_schema_instance = JSONSchemaMessageInstance(data, schema)
|
||||
|
||||
Working with JSON Schema Constraints
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
# Get field type information with constraints
|
||||
name_type_info = json_schema_instance.fields.get_sub_type_info("name")
|
||||
print(f"Field type: {name_type_info.standard_type}")
|
||||
print(f"Min length: {name_type_info.get_constraint_value('min_length')}")
|
||||
|
||||
# Check validation errors
|
||||
if len(json_schema_instance._validation_errors) == 0:
|
||||
print("Data is valid according to schema")
|
||||
else:
|
||||
print(f"Processing failed: {result['error']}")
|
||||
print(f"Validation errors: {json_schema_instance._validation_errors}")
|
||||
|
||||
# Export to envelope format
|
||||
envelope = json_schema_instance.export_to_envelope()
|
||||
print(f"Envelope metadata: {envelope['metadata']}")
|
||||
|
||||
Message Type Conversion
|
||||
-----------------------
|
||||
|
||||
Converting ROS2 to JSON Schema
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
from std_msgs.msg import String
|
||||
from msgcenterpy.instances.ros2_instance import ROS2MessageInstance
|
||||
|
||||
# Create ROS2 instance
|
||||
string_msg = String()
|
||||
string_msg.data = "conversion_test"
|
||||
ros2_instance = ROS2MessageInstance(string_msg)
|
||||
|
||||
# Convert to JSON Schema instance
|
||||
json_schema_instance = ros2_instance.to_json_schema()
|
||||
|
||||
# Verify conversion
|
||||
print(f"Original data: {ros2_instance.get_python_dict()}")
|
||||
print(f"Converted data: {json_schema_instance.get_python_dict()}")
|
||||
print(f"Generated schema: {json_schema_instance.json_schema}")
|
||||
|
||||
Generate JSON Schema from ROS2 Message
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
from geometry_msgs.msg import Pose
|
||||
|
||||
# Create complex ROS2 message
|
||||
pose_msg = Pose()
|
||||
pose_msg.position.x = 5.0
|
||||
pose_msg.position.y = 10.0
|
||||
pose_msg.position.z = 15.0
|
||||
pose_msg.orientation.w = 1.0
|
||||
|
||||
ros2_instance = ROS2MessageInstance(pose_msg)
|
||||
|
||||
# Generate JSON Schema
|
||||
schema = ros2_instance.get_json_schema()
|
||||
|
||||
print("Generated JSON Schema:")
|
||||
print(f"Type: {schema['type']}")
|
||||
print(f"Properties: {list(schema['properties'].keys())}")
|
||||
print(f"Position schema: {schema['properties']['position']}")
|
||||
|
||||
Reference in New Issue
Block a user